"use strict"; const Promise = require("bluebird"); const ms = require("ms"); const dateFns = require("date-fns"); const debug = require("debug")("scrapingserver"); const chalk = require("chalk"); const simpleSource = require("@promistream/simple-source"); const buffer = require("@promistream/buffer"); const pipe = require("@promistream/pipe"); const rateLimit = require("@promistream/rate-limit"); const parallelize = require("@promistream/parallelize"); const logStatus = require("./log-status"); // const { UniqueViolationError } = require("objection"); // FIXME: Revert inlining of task_states once switched to PostgreSQL 12+, which can do this automatically using NOT MATERIALIZED // FIXME: Check whether the dependency task_versions are actually being correctly passed in, and aren't accidentally nulls let query = ` WITH dependency_tasks AS ( SELECT * FROM json_to_recordset(:dependencyTaskDefinitions) AS x(task text, task_version text) ), matching_items AS ( SELECT DISTINCT ON (srap_items.id) srap_items.*, results.updated_at AS result_date, results.task_version, ( results.is_successful = TRUE AND ( results.expires_at < NOW() OR results.is_invalidated = TRUE ) ) AS is_candidate FROM srap_items INNER JOIN srap_tags ON srap_tags.item_id = srap_items.id AND srap_tags.name = ANY(:tags) LEFT JOIN srap_task_results AS results ON results.item_id = srap_items.id AND results.task = :task WHERE NOT EXISTS ( SELECT FROM srap_tasks_in_progress AS pr WHERE pr.item_id = srap_items.id ) ), candidates AS ( SELECT * FROM matching_items WHERE result_date IS NULL UNION SELECT * FROM matching_items WHERE is_candidate = TRUE OR NOT (task_version = :taskVersion) ) ( SELECT * FROM candidates WHERE NOT EXISTS ( SELECT results.* FROM dependency_tasks LEFT JOIN srap_task_results AS results ON dependency_tasks.task = results.task AND dependency_tasks.task_version = results.task_version AND results.item_id = candidates.id WHERE results.is_successful IS NULL OR results.is_successful = FALSE OR ( results.is_successful = TRUE AND ( results.expires_at < NOW() OR results.is_invalidated = TRUE ) ) ) ) LIMIT :resultLimit; `; module.exports = function (state) { const processTaskSafely = require("./streams/process-task-safely")(state); const queries = require("./queries")(state); const createDatabaseQueue = require("./queued-database-api")(state); let { knex, db } = state; // FIXME: Transaction support! return function createTaskStream({ task, taskVersion, taskDependencies, taskDependents, taskInterval, tags, run, ttl, globalRateLimiter, globalParallelize, parallelTasks }) { // TODO: Make nicer let ttlInSeconds = (ttl != null) ? (typeof ttl === "number") ? ttl / 1000 : ms(ttl) / 1000 : undefined; return pipe([ simpleSource(() => { let startTime = Date.now(); return Promise.try(() => { // console.log("Fetching new batch"); return knex.raw(query, { tags: tags, task: task, taskVersion: taskVersion, resultLimit: 1000, // TODO: Make configurable dependencyTaskDefinitions: JSON.stringify(taskDependencies.map((dependency) => { // Case-mapping for SQL compatibility return { task_version: dependency.taskVersion, task: dependency.task }; })) }); }).then((result) => { let timeElapsed = Date.now() - startTime; debug(`Task retrieval query for '${task}' took ${timeElapsed}ms and produced ${result.rowCount} results`); if (result.rowCount > 0) { // console.log("rows:", result.rows); return result.rows; } else { // FIXME: Make this delay configurable, or maybe even use LISTEN/NOTIFY return Promise.resolve([]).delay(30000); } }); }), buffer(), globalRateLimiter, (taskInterval != null) ? rateLimit(taskInterval) : null, processTaskSafely(task, (item, tx) => { logStatus(task, chalk.bold.cyan, "started", item.id); let queue = createDatabaseQueue({ tx, item, task, taskVersion, taskDependents, taskDependencies }); return Promise.try(() => { // TODO: Proper Validatem schemas for each API method return run({ id: item.id, data: item.data, getItem: function (id) { return queries.getItem(tx, id); }, ... queue.api }); }).then(() => { return queue.execute(); }).then(() => { // Update succeeded return db.TaskResult.query(tx).findById([ task, item.id ]).patch({ is_successful: true, updated_at: new Date(), expires_at: (ttlInSeconds != null) ? dateFns.add(new Date(), { seconds: ttlInSeconds }) : null }); }).catch((error) => { logStatus(task, chalk.bold.red, "failed", `${item.id}: ${error.stack}`); let commonUpdate = { is_successful: false, task_version: taskVersion }; return Promise.try(() => { // Task failed -- note, cannot use tx here because it has failed return db.TaskResult.query(knex).insert({ item_id: item.id, task: task, metadata: {}, ... commonUpdate }); }).catch({ name: "UniqueViolationError" }, () => { return db.TaskResult.query(knex).findById([ task, item.id ]).patch({ ... commonUpdate }); }).then(() => { // throw error; }); }); }), // TODO: Sort out a cleaner way to organize local vs. global parallelization (parallelTasks != null) ? parallelize(parallelTasks) : globalParallelize ]); }; };