"use strict" ;
const Promise = require ( "bluebird" ) ;
const debug = require ( "debug" ) ( "srap:backend:postgresql:query:get-task-stream" ) ;
const buffer = require ( "@promistream/buffer" ) ;
const pipe = require ( "@promistream/pipe" ) ;
const simpleSource = require ( "@promistream/simple-source" ) ;
const fetchQuery = `
SELECT
srap _items . *
FROM
srap _queue
LEFT JOIN srap _items
ON srap _items . id = srap _queue . item _id
WHERE
srap _queue . task = : task
AND srap _queue . started = FALSE
LIMIT : resultLimit
` ;
function makeFillQuery ( withDependencies ) {
return `
WITH
$ { withDependencies ? `
dependencies AS (
SELECT * FROM json _to _recordset ( : dependencyTaskDefinitions ) AS x ( task text , task _version text )
) ,
satisfied AS (
SELECT results . * FROM dependencies
LEFT JOIN srap _task _results AS results
ON dependencies . task = results . task
AND dependencies . task _version = results . task _version
WHERE
results . is _successful = TRUE
AND results . is _invalidated = FALSE
AND (
results . expires _at > NOW ( )
OR results . expires _at IS NULL
)
) ,
counts AS (
SELECT item _id , COUNT ( task ) AS count FROM satisfied GROUP BY item _id
) ,
dependency _candidates AS (
SELECT item _id FROM counts WHERE count = : dependencyCount
) ,
` : "" }
tag _candidates AS (
SELECT item _id FROM srap _tags WHERE name = ANY ( : tags )
) ,
full _candidates AS MATERIALIZED (
$ { withDependencies
? `
SELECT tag _candidates . item _id FROM dependency _candidates
INNER JOIN tag _candidates
ON dependency _candidates . item _id = tag _candidates . item _id
`
: `
SELECT item _id FROM tag _candidates
`
}
)
SELECT
: task AS task ,
item _id
FROM full _candidates
WHERE NOT EXISTS (
SELECT item _id FROM srap _task _results AS results
WHERE
item _id = full _candidates . item _id
AND results . task = : task
AND results . task _version = : taskVersion
AND (
results . is _successful = FALSE
OR (
results . is _successful = TRUE
AND results . is _invalidated = FALSE
AND (
results . expires _at > NOW ( )
OR results . expires _at IS NULL
)
)
)
)
` ;
}
const fillQueryWithDependencies = makeFillQuery ( true ) ;
const fillQueryWithoutDependencies = makeFillQuery ( false ) ;
module . exports = function ( { metrics , backendSettings , knex } ) {
return function ( tx , { task } ) {
let hasDependencies = ( task . dependencies . length > 0 ) ;
let refillParameters = {
tags : task . tags ,
task : task . name ,
taskVersion : task . version ,
... hasDependencies
? {
dependencyCount : task . dependencies . length ,
dependencyTaskDefinitions : JSON . stringify ( task . dependencies . map ( ( dependency ) => {
// Case-mapping for SQL compatibility
return { task _version : dependency . version , task : dependency . name } ;
} ) )
}
: { }
} ;
let fetchParameters = {
task : task . name ,
resultLimit : backendSettings . taskBatchSize
// resultLimit: 1 // For tracking down race conditions
} ;
function refillQueue ( ) {
let startTime = Date . now ( ) ;
return Promise . try ( ( ) => {
let fillQuery = ( hasDependencies )
? fillQueryWithDependencies
: fillQueryWithoutDependencies ;
// NOTE: We are deliberately bypassing the transaction here! Also deliberately not using VALUES, since we're inserting from the results of another query instead
return knex . raw ( `
INSERT INTO srap _queue ( task , item _id )
( $ { fillQuery } )
ON CONFLICT ( task , item _id ) DO NOTHING ;
` , refillParameters);
} ) . then ( ( response ) => {
let timeElapsed = Date . now ( ) - startTime ;
metrics . taskRefillTime . labels ( { task : task . name } ) . set ( timeElapsed / 1000 ) ;
metrics . taskRefillResults . labels ( { task : task . name } ) . set ( response . rowCount ) ;
debug ( ` Queue for ' ${ task . name } ' was refilled with ${ response . rowCount } items in ${ timeElapsed } ms ` ) ;
return response . rowCount ;
} ) ;
}
return pipe ( [
simpleSource ( ( ) => {
let startTime = Date . now ( ) ;
return Promise . try ( ( ) => {
return tx . raw ( fetchQuery , fetchParameters ) ;
} ) . then ( ( result ) => {
let timeElapsed = Date . now ( ) - startTime ;
metrics . taskFetchTime . labels ( { task : task . name } ) . set ( timeElapsed / 1000 ) ;
metrics . taskFetchResults . labels ( { task : task . name } ) . set ( result . rowCount ) ;
debug ( ` Task retrieval query for ' ${ task . name } ' took ${ timeElapsed } ms and produced ${ result . rowCount } results ` ) ;
if ( result . rowCount > 0 ) {
return result . rows ;
} else {
return Promise . try ( ( ) => {
return refillQueue ( ) ;
} ) . then ( ( newItems ) => {
if ( newItems === 0 ) {
// TODO: Consider using LISTEN/NOTIFY instead? Worth the added complexity?
let randomization = Math . random ( ) * backendSettings . delayRandomization * backendSettings . taskBatchDelay ; // To prevent stampeding by low-throughput tasks
return Promise . resolve ( [ ] ) . delay ( backendSettings . taskBatchDelay + randomization ) ;
} else {
// Have another go right away
return [ ] ;
}
} ) ;
}
} ) ;
} ) ,
buffer ( )
] ) ;
} ;
} ;