diff --git a/notes.txt b/notes.txt index 4c996ec..0df6b0f 100644 --- a/notes.txt +++ b/notes.txt @@ -293,3 +293,22 @@ SELECT failures.* FROM failures ON failures.task_result_id = task_results.id WHERE task_results.operation = :operation + + +================== + +task_states + all task_results, with is_candidate precalculated + +dependency_task_states + all task_states for tasks+versions listed in :dependencyTasks + +candidates (items which are *permitted* to be run) + all tag-relevant items which are not listed as "in progress", and for which all dependency tasks have been satisfied + all dependency tasks have been satisfied = NOT EXISTS ( join(dependencyTasks, task_states on task/version and where item_id) where is_candidate is null or true ) + +results (items which *should* be run, in order of preference) + all candidates (item + task_result) for which there is either: + 1. no entry in task_states + 2. an is_candidate=TRUE entry in task_states + 3. an entry in task_states with mismatching task_version diff --git a/src/kernel.js b/src/kernel.js index 6d0dbbb..302ab54 100644 --- a/src/kernel.js +++ b/src/kernel.js @@ -95,7 +95,13 @@ module.exports = function createKernel(configuration) { : null, globalParallelize: (configuration.parallelTasks != null) ? parallelize(configuration.parallelTasks) - : null + : null, + taskDependencies: defaultValue(taskConfiguration.dependsOn, []).map((task) => { + return { + task: task, + taskVersion: defaultValue(configuration.tasks[task].taskVersion, "0") + }; + }) }); return pipe([ @@ -107,6 +113,9 @@ module.exports = function createKernel(configuration) { } else { throw new Error(`Task '${task}' is defined to run for tags [${tags}], but no such task is defined`); } + }).catch((error) => { + console.dir(error, { depth: null, colors: true }); + throw error; }); } diff --git a/src/task-stream.js b/src/task-stream.js index 6ac4a9b..2402418 100644 --- a/src/task-stream.js +++ b/src/task-stream.js @@ -13,38 +13,68 @@ const pipe = require("@promistream/pipe"); const createMutationAPIWrapper = require("./mutation-api/wrapper"); let query = ` - WITH candidates AS ( + WITH + dependency_tasks AS ( + SELECT * FROM + json_to_recordset(:dependencyTaskDefinitions) AS x(task text, task_version text) + ), + task_states AS ( + SELECT + *, + ( + is_successful = TRUE + AND ( + expires_at < NOW() + OR is_invalidated = TRUE + ) + ) AS is_candidate + FROM task_results + ), + matching_items AS ( + SELECT + DISTINCT ON (items.id) + items.*, + states.updated_at AS result_date, + states.is_candidate, + states.task_version + FROM items + INNER JOIN tags + ON tags.item_id = items.id + AND tags.name = ANY(:tags) + LEFT JOIN task_states AS states + ON states.item_id = items.id + AND states.task = :task + WHERE + NOT EXISTS ( + SELECT FROM tasks_in_progress AS pr WHERE pr.item_id = items.id + ) + ), + candidates AS ( + SELECT * FROM matching_items + WHERE result_date IS NULL + UNION + SELECT * FROM matching_items + WHERE is_candidate = TRUE + OR NOT (task_version = :taskVersion) + ) + ( SELECT - DISTINCT ON (items.id) - items.*, - results.expires_at, - results.is_invalidated, - results.is_successful, - results.updated_at AS result_date, - results.task_version - FROM items - INNER JOIN tags - ON tags.item_id = items.id - AND tags.name = ANY(:tags) - LEFT JOIN task_results AS results - ON results.item_id = items.id - AND results.task = :task - WHERE NOT EXISTS ( - SELECT FROM tasks_in_progress AS pr WHERE pr.item_id = items.id + * + FROM + candidates + WHERE + NOT EXISTS ( + SELECT + task_states.* + FROM dependency_tasks + LEFT JOIN task_states + ON dependency_tasks.task = task_states.task + AND dependency_tasks.task_version = task_states.task_version + AND task_states.item_id = candidates.id + WHERE + task_states.is_candidate IS NULL + OR task_states.is_candidate = TRUE ) - ) - ( - SELECT * FROM candidates - WHERE result_date IS NULL - UNION - SELECT * FROM candidates - WHERE - is_successful = TRUE - AND ( - expires_at < NOW() - OR is_invalidated = TRUE - OR NOT (task_version = :taskVersion) - ) ) LIMIT :resultLimit; `; @@ -57,7 +87,7 @@ module.exports = function (state) { // FIXME: Transaction support! - return function createTaskStream({ task, taskVersion, tags, run, ttl, globalRateLimiter, globalParallelize }) { + return function createTaskStream({ task, taskVersion, taskDependencies, tags, run, ttl, globalRateLimiter, globalParallelize }) { // TODO: Make nicer let ttlInSeconds = (ttl != null) ? (typeof ttl === "number") @@ -75,12 +105,13 @@ module.exports = function (state) { tags: tags, task: task, taskVersion: taskVersion, - resultLimit: 1000 // TODO: Make configurable + resultLimit: 1000, // TODO: Make configurable + dependencyTaskDefinitions: JSON.stringify(taskDependencies) }); }).then((result) => { let timeElapsed = Date.now() - startTime; - debug(`Task retrieval query took ${timeElapsed}ms and produced ${result.rowCount} results`); + debug(`Task retrieval query for '${task}' took ${timeElapsed}ms and produced ${result.rowCount} results`); if (result.rowCount > 0) { // console.log("rows:", result.rows);