@ -11,40 +11,73 @@ const buffer = require("@promistream/buffer");
const pipe = require ( "@promistream/pipe" ) ;
const createMutationAPIWrapper = require ( "./mutation-api/wrapper" ) ;
const logStatus = require ( "./log-status" ) ;
const chalk = require ( "chalk" ) ;
// FIXME: Revert inlining of task_states once switched to PostgreSQL 12+, which can do this automatically using NOT MATERIALIZED
let query = `
WITH candidates AS (
WITH
dependency _tasks AS (
SELECT * FROM
json _to _recordset ( : dependencyTaskDefinitions ) AS x ( task text , task _version text )
) ,
matching _items AS (
SELECT
DISTINCT ON ( items . id )
items . * ,
results . updated _at AS result _date ,
results . task _version ,
(
results . is _successful = TRUE
AND (
results . expires _at < NOW ( )
OR results . is _invalidated = TRUE
)
) AS is _candidate
FROM items
INNER JOIN tags
ON tags . item _id = items . id
AND tags . name = ANY ( : tags )
LEFT JOIN task _results AS results
ON results . item _id = items . id
AND results . task = : task
WHERE
NOT EXISTS (
SELECT FROM tasks _in _progress AS pr WHERE pr . item _id = items . id
)
) ,
candidates AS (
SELECT * FROM matching _items
WHERE result _date IS NULL
UNION
SELECT * FROM matching _items
WHERE is _candidate = TRUE
OR NOT ( task _version = : taskVersion )
)
(
SELECT
DISTINCT ON ( items . id )
items . * ,
results . expires _at ,
results . is _invalidated ,
results . is _successful ,
results . updated _at AS result _date ,
results . task _version
FROM items
INNER JOIN tags
ON tags . item _id = items . id
AND tags . name = ANY ( : tags )
LEFT JOIN task _results AS results
ON results . item _id = items . id
AND results . task = : task
WHERE NOT EXISTS (
SELECT FROM tasks _in _progress AS pr WHERE pr . item _id = items . id
*
FROM
candidates
WHERE
NOT EXISTS (
SELECT
results . *
FROM dependency _tasks
LEFT JOIN task _results AS results
ON dependency _tasks . task = results . task
AND dependency _tasks . task _version = results . task _version
AND results . item _id = candidates . id
WHERE
results . is _successful IS NULL
OR (
results . is _successful = TRUE
AND (
results . expires _at < NOW ( )
OR results . is _invalidated = TRUE
)
)
)
)
(
SELECT * FROM candidates
WHERE result _date IS NULL
UNION
SELECT * FROM candidates
WHERE
is _successful = TRUE
AND (
expires _at < NOW ( )
OR is _invalidated = TRUE
OR NOT ( task _version = : taskVersion )
)
) LIMIT : resultLimit ;
` ;
@ -57,7 +90,7 @@ module.exports = function (state) {
// FIXME: Transaction support!
return function createTaskStream ( { task , taskVersion , ta gs, run , ttl , globalRateLimiter , globalParallelize } ) {
return function createTaskStream ( { task , taskVersion , ta skDependencies, ta gs, run , ttl , globalRateLimiter , globalParallelize } ) {
// TODO: Make nicer
let ttlInSeconds = ( ttl != null )
? ( typeof ttl === "number" )
@ -75,12 +108,16 @@ module.exports = function (state) {
tags : tags ,
task : task ,
taskVersion : taskVersion ,
resultLimit : 1000 // TODO: Make configurable
resultLimit : 1000 , // TODO: Make configurable
dependencyTaskDefinitions : JSON . stringify ( taskDependencies . map ( ( dependency ) => {
// Case-mapping for SQL compatibility
return { task _version : dependency . taskVersion , task : dependency . task } ;
} ) )
} ) ;
} ) . then ( ( result ) => {
let timeElapsed = Date . now ( ) - startTime ;
debug ( ` Task retrieval query took ${ timeElapsed } ms and produced ${ result . rowCount } results ` ) ;
debug ( ` Task retrieval query for '${ task } ' took ${ timeElapsed } ms and produced ${ result . rowCount } results ` ) ;
if ( result . rowCount > 0 ) {
// console.log("rows:", result.rows);
@ -93,7 +130,7 @@ module.exports = function (state) {
buffer ( ) ,
globalRateLimiter ,
processTaskSafely ( task , ( item , tx ) => {
console. log ( ` [started] ${ item . id } ` ) ;
logStatus( task , chalk . bold . cyan , "started" , item . id ) ;
let context = { tx , item , task , taskVersion } ;
@ -136,7 +173,7 @@ module.exports = function (state) {
expires _at : dateFns . add ( new Date ( ) , { seconds : ttlInSeconds } )
} ) ;
} ) . catch ( ( error ) => {
console. warn ( ` [failed] ${ item . id } ` , error ) ;
logStatus( task , chalk . bold . red , "failed" , item . id ) ;
return Promise . try ( ( ) => {
// Task failed -- note, cannot use tx here because it has failed