"use strict"; const Promise = require("bluebird"); const debug = require("debug")("srap:backend:postgresql:query:get-task-stream"); const buffer = require("@promistream/buffer"); const pipe = require("@promistream/pipe"); const simpleSource = require("@promistream/simple-source"); const fetchQuery = ` SELECT srap_items.* FROM srap_queue LEFT JOIN srap_items ON srap_items.id = srap_queue.item_id WHERE srap_queue.task = :task AND srap_queue.started = FALSE LIMIT :resultLimit `; const fillQuery = ` WITH dependency_tasks AS ( SELECT * FROM json_to_recordset(:dependencyTaskDefinitions) AS x(task text, task_version text) ), matching_items AS ( SELECT srap_items.*, results.updated_at AS result_date, results.task_version, ( results.is_successful = TRUE AND ( results.expires_at < NOW() OR results.is_invalidated = TRUE ) ) AS is_candidate FROM srap_items INNER JOIN srap_tags ON srap_tags.item_id = srap_items.id AND srap_tags.name = ANY(:tags) LEFT JOIN srap_task_results AS results ON results.item_id = srap_items.id AND results.task = :task ), candidates AS ( SELECT * FROM matching_items WHERE result_date IS NULL UNION ALL SELECT * FROM matching_items WHERE is_candidate = TRUE OR NOT (task_version = :taskVersion) ) ( SELECT :task AS task, id AS item_id FROM candidates WHERE NOT EXISTS ( SELECT results.* FROM dependency_tasks LEFT JOIN srap_task_results AS results ON dependency_tasks.task = results.task AND dependency_tasks.task_version = results.task_version AND results.item_id = candidates.id WHERE results.is_successful IS NULL OR results.is_successful = FALSE OR ( results.is_successful = TRUE AND ( results.expires_at < NOW() OR results.is_invalidated = TRUE ) ) ) ) `; module.exports = function ({ metrics, backendSettings, knex }) { return function (tx, { task }) { let refillParameters = { tags: task.tags, task: task.name, taskVersion: task.version, dependencyTaskDefinitions: JSON.stringify(task.dependencies.map((dependency) => { // Case-mapping for SQL compatibility return { task_version: dependency.version, task: dependency.name }; })) }; let fetchParameters = { task: task.name, resultLimit: backendSettings.taskBatchSize }; function refillQueue() { let startTime = Date.now(); return Promise.try(() => { // NOTE: We are deliberately bypassing the transaction here! Also deliberately not using VALUES, since we're inserting from the results of another query instead return knex.raw(` INSERT INTO srap_queue (task, item_id) (${fillQuery}) ON CONFLICT (task, item_id) DO NOTHING; `, refillParameters); }).then((response) => { let timeElapsed = Date.now() - startTime; metrics.taskRefillTime.labels({ task: task.name }).set(timeElapsed / 1000); metrics.taskRefillResults.labels({ task: task.name }).set(response.rowCount); debug(`Queue for '${task.name}' was refilled with ${response.rowCount} items in ${timeElapsed}ms`); return response.rowCount; }); } return pipe([ simpleSource(() => { let startTime = Date.now(); return Promise.try(() => { return tx.raw(fetchQuery, fetchParameters); }).then((result) => { let timeElapsed = Date.now() - startTime; metrics.taskFetchTime.labels({ task: task.name }).set(timeElapsed / 1000); metrics.taskFetchResults.labels({ task: task.name }).set(result.rowCount); debug(`Task retrieval query for '${task.name}' took ${timeElapsed}ms and produced ${result.rowCount} results`); if (result.rowCount > 0) { return result.rows; } else { return Promise.try(() => { return refillQueue(); }).then((newItems) => { if (newItems === 0) { // TODO: Consider using LISTEN/NOTIFY instead? Worth the added complexity? return Promise.resolve([]).delay(backendSettings.taskBatchDelay); } else { // Have another go right away return []; } }); } }); }), buffer() ]); }; };