"use strict"; const Promise = require("bluebird"); const debug = require("debug")("srap:backend:postgresql:query:get-task-stream"); const buffer = require("@promistream/buffer"); const pipe = require("@promistream/pipe"); const simpleSource = require("@promistream/simple-source"); const fetchQuery = ` SELECT srap_items.* FROM srap_queue LEFT JOIN srap_items ON srap_items.id = srap_queue.item_id WHERE srap_queue.task = :task AND srap_queue.started = FALSE LIMIT :resultLimit `; function makeFillQuery(withDependencies) { return ` WITH ${withDependencies ? ` dependencies AS ( SELECT * FROM json_to_recordset(:dependencyTaskDefinitions) AS x(task text, task_version text) ), satisfied AS ( SELECT results.* FROM dependencies LEFT JOIN srap_task_results AS results ON dependencies.task = results.task AND dependencies.task_version = results.task_version WHERE results.is_successful = TRUE AND results.is_invalidated = FALSE AND results.expires_at > NOW() ), counts AS ( SELECT item_id, COUNT(task) AS count FROM satisfied GROUP BY item_id ), dependency_candidates AS ( SELECT item_id FROM counts WHERE count = :dependencyCount ), ` : "" } tag_candidates AS ( SELECT item_id FROM srap_tags WHERE name = ANY(:tags) ), full_candidates AS MATERIALIZED ( ${withDependencies ? ` SELECT tag_candidates.item_id FROM dependency_candidates INNER JOIN tag_candidates ON dependency_candidates.item_id = tag_candidates.item_id ` : ` SELECT item_id FROM tag_candidates ` } ) SELECT :task AS task, item_id FROM full_candidates WHERE NOT EXISTS ( SELECT item_id FROM srap_task_results AS results WHERE item_id = full_candidates.item_id AND results.task = :task AND results.task_version = :taskVersion AND results.is_successful = TRUE AND results.is_invalidated = FALSE AND results.expires_at > NOW() ) `; } const fillQueryWithDependencies = makeFillQuery(true); const fillQueryWithoutDependencies = makeFillQuery(false); module.exports = function ({ metrics, backendSettings, knex }) { return function (tx, { task }) { let hasDependencies = (task.dependencies.length > 0); let refillParameters = { tags: task.tags, task: task.name, taskVersion: task.version, ... hasDependencies ? { dependencyCount: task.dependencies.length, dependencyTaskDefinitions: JSON.stringify(task.dependencies.map((dependency) => { // Case-mapping for SQL compatibility return { task_version: dependency.version, task: dependency.name }; })) } : {} }; let fetchParameters = { task: task.name, resultLimit: backendSettings.taskBatchSize // resultLimit: 1 // For tracking down race conditions }; function refillQueue() { let startTime = Date.now(); return Promise.try(() => { let fillQuery = (hasDependencies) ? fillQueryWithDependencies : fillQueryWithoutDependencies; // NOTE: We are deliberately bypassing the transaction here! Also deliberately not using VALUES, since we're inserting from the results of another query instead return knex.raw(` INSERT INTO srap_queue (task, item_id) (${fillQuery}) ON CONFLICT (task, item_id) DO NOTHING; `, refillParameters); }).then((response) => { let timeElapsed = Date.now() - startTime; metrics.taskRefillTime.labels({ task: task.name }).set(timeElapsed / 1000); metrics.taskRefillResults.labels({ task: task.name }).set(response.rowCount); debug(`Queue for '${task.name}' was refilled with ${response.rowCount} items in ${timeElapsed}ms`); return response.rowCount; }); } return pipe([ simpleSource(() => { let startTime = Date.now(); return Promise.try(() => { return tx.raw(fetchQuery, fetchParameters); }).then((result) => { let timeElapsed = Date.now() - startTime; metrics.taskFetchTime.labels({ task: task.name }).set(timeElapsed / 1000); metrics.taskFetchResults.labels({ task: task.name }).set(result.rowCount); debug(`Task retrieval query for '${task.name}' took ${timeElapsed}ms and produced ${result.rowCount} results`); if (result.rowCount > 0) { return result.rows; } else { return Promise.try(() => { return refillQueue(); }).then((newItems) => { if (newItems === 0) { // TODO: Consider using LISTEN/NOTIFY instead? Worth the added complexity? let randomization = Math.random() * backendSettings.delayRandomization * backendSettings.taskBatchDelay; // To prevent stampeding by low-throughput tasks return Promise.resolve([]).delay(backendSettings.taskBatchDelay + randomization); } else { // Have another go right away return []; } }); } }); }), buffer() ]); }; };