Compare commits

...

6 Commits

@ -0,0 +1,15 @@
"use strict";
module.exports.up = function(knex, Promise) {
return knex.schema
.alterTable("task_results", (table) => {
table.index("item_id");
});
};
module.exports.down = function(knex, Promise) {
return knex.schema
.alterTable("task_results", (table) => {
table.dropIndex("item_id");
});
};

@ -293,3 +293,22 @@ SELECT failures.* FROM failures
ON failures.task_result_id = task_results.id
WHERE
task_results.operation = :operation
==================
task_states
all task_results, with is_candidate precalculated
dependency_task_states
all task_states for tasks+versions listed in :dependencyTasks
candidates (items which are *permitted* to be run)
all tag-relevant items which are not listed as "in progress", and for which all dependency tasks have been satisfied
all dependency tasks have been satisfied = NOT EXISTS ( join(dependencyTasks, task_states on task/version and where item_id) where is_candidate is null or true )
results (items which *should* be run, in order of preference)
all candidates (item + task_result) for which there is either:
1. no entry in task_states
2. an is_candidate=TRUE entry in task_states
3. an entry in task_states with mismatching task_version

@ -12,6 +12,7 @@ const pipe = require("@promistream/pipe");
const parallelize = require("@ppstreams/parallelize");
const initialize = require("./initialize");
const logStatus = require("./log-status");
// TODO: Publish this as a separate package
// Inverts an object of arrays, eg. {a: [x, y], b: [x, z]} becomes {x: [a, b], y: [a], z: [b]}
@ -95,18 +96,27 @@ module.exports = function createKernel(configuration) {
: null,
globalParallelize: (configuration.parallelTasks != null)
? parallelize(configuration.parallelTasks)
: null
: null,
taskDependencies: defaultValue(taskConfiguration.dependsOn, []).map((task) => {
return {
task: task,
taskVersion: defaultValue(configuration.tasks[task].taskVersion, "0")
};
})
});
return pipe([
taskStream,
simpleSink((completedItem) => {
console.log(`[completed] ${completedItem.id}`);
logStatus(task, chalk.bold.green, "completed", completedItem.id);
})
]).read();
} else {
throw new Error(`Task '${task}' is defined to run for tags [${tags}], but no such task is defined`);
}
}).catch((error) => {
console.dir(error, { depth: null, colors: true });
throw error;
});
}

@ -0,0 +1,7 @@
"use strict";
const chalk = require("chalk");
module.exports = function logStatus(task, color, type, message) {
console.log(`${chalk.bold(`[${task}]`)} ${color(`[${type}]`)} ${message}`);
};

@ -1,14 +1,16 @@
"use strict";
const defaultValue = require("default-value");
const chalk = require("chalk");
const logStatus = require("../log-status");
module.exports = function (state) {
const queries = require("../queries")(state);
return function createDatabaseMutationAPI({ tx, item, taskVersion }) {
return function createDatabaseMutationAPI({ tx, item, taskVersion, task }) {
return {
createItem: function (options) {
console.log(`[new] ${options.id}`);
logStatus(task, chalk.gray, "new", options.id);
return queries.createItem(tx, {
...options,

@ -11,40 +11,73 @@ const buffer = require("@promistream/buffer");
const pipe = require("@promistream/pipe");
const createMutationAPIWrapper = require("./mutation-api/wrapper");
const logStatus = require("./log-status");
const chalk = require("chalk");
// FIXME: Revert inlining of task_states once switched to PostgreSQL 12+, which can do this automatically using NOT MATERIALIZED
let query = `
WITH candidates AS (
WITH
dependency_tasks AS (
SELECT * FROM
json_to_recordset(:dependencyTaskDefinitions) AS x(task text, task_version text)
),
matching_items AS (
SELECT
DISTINCT ON (items.id)
items.*,
results.updated_at AS result_date,
results.task_version,
(
results.is_successful = TRUE
AND (
results.expires_at < NOW()
OR results.is_invalidated = TRUE
)
) AS is_candidate
FROM items
INNER JOIN tags
ON tags.item_id = items.id
AND tags.name = ANY(:tags)
LEFT JOIN task_results AS results
ON results.item_id = items.id
AND results.task = :task
WHERE
NOT EXISTS (
SELECT FROM tasks_in_progress AS pr WHERE pr.item_id = items.id
)
),
candidates AS (
SELECT * FROM matching_items
WHERE result_date IS NULL
UNION
SELECT * FROM matching_items
WHERE is_candidate = TRUE
OR NOT (task_version = :taskVersion)
)
(
SELECT
DISTINCT ON (items.id)
items.*,
results.expires_at,
results.is_invalidated,
results.is_successful,
results.updated_at AS result_date,
results.task_version
FROM items
INNER JOIN tags
ON tags.item_id = items.id
AND tags.name = ANY(:tags)
LEFT JOIN task_results AS results
ON results.item_id = items.id
AND results.task = :task
WHERE NOT EXISTS (
SELECT FROM tasks_in_progress AS pr WHERE pr.item_id = items.id
*
FROM
candidates
WHERE
NOT EXISTS (
SELECT
results.*
FROM dependency_tasks
LEFT JOIN task_results AS results
ON dependency_tasks.task = results.task
AND dependency_tasks.task_version = results.task_version
AND results.item_id = candidates.id
WHERE
results.is_successful IS NULL
OR (
results.is_successful = TRUE
AND (
results.expires_at < NOW()
OR results.is_invalidated = TRUE
)
)
)
)
(
SELECT * FROM candidates
WHERE result_date IS NULL
UNION
SELECT * FROM candidates
WHERE
is_successful = TRUE
AND (
expires_at < NOW()
OR is_invalidated = TRUE
OR NOT (task_version = :taskVersion)
)
) LIMIT :resultLimit;
`;
@ -57,7 +90,7 @@ module.exports = function (state) {
// FIXME: Transaction support!
return function createTaskStream({ task, taskVersion, tags, run, ttl, globalRateLimiter, globalParallelize }) {
return function createTaskStream({ task, taskVersion, taskDependencies, tags, run, ttl, globalRateLimiter, globalParallelize }) {
// TODO: Make nicer
let ttlInSeconds = (ttl != null)
? (typeof ttl === "number")
@ -75,12 +108,16 @@ module.exports = function (state) {
tags: tags,
task: task,
taskVersion: taskVersion,
resultLimit: 1000 // TODO: Make configurable
resultLimit: 1000, // TODO: Make configurable
dependencyTaskDefinitions: JSON.stringify(taskDependencies.map((dependency) => {
// Case-mapping for SQL compatibility
return { task_version: dependency.taskVersion, task: dependency.task };
}))
});
}).then((result) => {
let timeElapsed = Date.now() - startTime;
debug(`Task retrieval query took ${timeElapsed}ms and produced ${result.rowCount} results`);
debug(`Task retrieval query for '${task}' took ${timeElapsed}ms and produced ${result.rowCount} results`);
if (result.rowCount > 0) {
// console.log("rows:", result.rows);
@ -93,7 +130,7 @@ module.exports = function (state) {
buffer(),
globalRateLimiter,
processTaskSafely(task, (item, tx) => {
console.log(`[started] ${item.id}`);
logStatus(task, chalk.bold.cyan, "started", item.id);
let context = { tx, item, task, taskVersion };
@ -136,7 +173,7 @@ module.exports = function (state) {
expires_at: dateFns.add(new Date(), { seconds: ttlInSeconds })
});
}).catch((error) => {
console.warn(`[failed] ${item.id}`, error);
logStatus(task, chalk.bold.red, "failed", item.id);
return Promise.try(() => {
// Task failed -- note, cannot use tx here because it has failed

Loading…
Cancel
Save