"use strict"; const Promise = require("bluebird"); const ms = require("ms"); const dateFns = require("date-fns"); const syncpipe = require("syncpipe"); const debug = require("debug")("scrapingserver"); const simpleSource = require("@promistream/simple-source"); const buffer = require("@promistream/buffer"); const pipe = require("@promistream/pipe"); const rateLimit = require("@promistream/rate-limit"); const createMutationAPIWrapper = require("./mutation-api/wrapper"); const logStatus = require("./log-status"); const chalk = require("chalk"); const parallelize = require("@promistream/parallelize"); // FIXME: Revert inlining of task_states once switched to PostgreSQL 12+, which can do this automatically using NOT MATERIALIZED let query = ` WITH dependency_tasks AS ( SELECT * FROM json_to_recordset(:dependencyTaskDefinitions) AS x(task text, task_version text) ), matching_items AS ( SELECT DISTINCT ON (items.id) items.*, results.updated_at AS result_date, results.task_version, ( results.is_successful = TRUE AND ( results.expires_at < NOW() OR results.is_invalidated = TRUE ) ) AS is_candidate FROM items INNER JOIN tags ON tags.item_id = items.id AND tags.name = ANY(:tags) LEFT JOIN task_results AS results ON results.item_id = items.id AND results.task = :task WHERE NOT EXISTS ( SELECT FROM tasks_in_progress AS pr WHERE pr.item_id = items.id ) ), candidates AS ( SELECT * FROM matching_items WHERE result_date IS NULL UNION SELECT * FROM matching_items WHERE is_candidate = TRUE OR NOT (task_version = :taskVersion) ) ( SELECT * FROM candidates WHERE NOT EXISTS ( SELECT results.* FROM dependency_tasks LEFT JOIN task_results AS results ON dependency_tasks.task = results.task AND dependency_tasks.task_version = results.task_version AND results.item_id = candidates.id WHERE results.is_successful IS NULL OR ( results.is_successful = TRUE AND ( results.expires_at < NOW() OR results.is_invalidated = TRUE ) ) ) ) LIMIT :resultLimit; `; module.exports = function (state) { const processTaskSafely = require("./streams/process-task-safely")(state); const queries = require("./queries")(state); const createDatabaseMutationAPI = require("./mutation-api/database")(state); let { knex, db } = state; // FIXME: Transaction support! return function createTaskStream({ task, taskVersion, taskDependencies, taskDependents, taskInterval, tags, run, ttl, globalRateLimiter, globalParallelize, parallelTasks }) { // TODO: Make nicer let ttlInSeconds = (ttl != null) ? (typeof ttl === "number") ? ttl / 1000 : ms(ttl) / 1000 : undefined; return pipe([ simpleSource(() => { let startTime = Date.now(); return Promise.try(() => { // console.log("Fetching new batch"); return knex.raw(query, { tags: tags, task: task, taskVersion: taskVersion, resultLimit: 1000, // TODO: Make configurable dependencyTaskDefinitions: JSON.stringify(taskDependencies.map((dependency) => { // Case-mapping for SQL compatibility return { task_version: dependency.taskVersion, task: dependency.task }; })) }); }).then((result) => { let timeElapsed = Date.now() - startTime; debug(`Task retrieval query for '${task}' took ${timeElapsed}ms and produced ${result.rowCount} results`); if (result.rowCount > 0) { // console.log("rows:", result.rows); return result.rows; } else { // FIXME: Make this delay configurable, or maybe even use LISTEN/NOTIFY return Promise.resolve([]).delay(30000); } }); }), buffer(), globalRateLimiter, (taskInterval != null) ? rateLimit(taskInterval) : null, processTaskSafely(task, (item, tx) => { logStatus(task, chalk.bold.cyan, "started", item.id); let context = { tx, item, task, taskVersion, taskDependents, taskDependencies }; let databaseMutationAPI = createDatabaseMutationAPI(context); let mutationAPI = createMutationAPIWrapper(context, databaseMutationAPI); let queue = []; let methods = [ "createItem", "renameItem", "mergeItem", "deleteItem", "createAlias", "deleteAlias", "updateData", "updateMetadata", "expire", "expireDependents" ]; let queueMethods = syncpipe(methods, [ (_) => _.map((method) => [ method, function() { queue.push([ method, arguments ]); } ]), (_) => Object.fromEntries(_) ]); return Promise.try(() => { // TODO: Proper Validatem schemas for each API method return run({ id: item.id, data: item.data, getItem: function (id) { return queries.getItem(tx, id); }, ... queueMethods }); }).then(() => { if (!queue.some((method) => method[0] === "updateMetadata")) { // Doing an updateMetadata call is necessary to mark a task 'completed', so we inject a dummy call that doesn't actually change the metadata itself // FIXME: Split apart 'markTaskCompleted' and 'updateMetadata' queries so that this hack is no longer necessary queue.push([ "updateMetadata", [ (data) => data ]]); } return Promise.each(queue, ([ method, args ]) => { return mutationAPI[method](... args); }); }).then(() => { // Update succeeded return db.TaskResult.query(tx).findById([ task, item.id ]).patch({ is_successful: true, updated_at: new Date(), expires_at: (ttlInSeconds != null) ? dateFns.add(new Date(), { seconds: ttlInSeconds }) : null }); }).catch((error) => { logStatus(task, chalk.bold.red, "failed", `${item.id}: ${error.stack}`); return Promise.try(() => { // Task failed -- note, cannot use tx here because it has failed return db.TaskResult.query(knex).findById([ task, item.id ]).patch({ is_successful: false }); }).then(() => { // throw error; }); }); }), // TODO: Sort out a cleaner way to organize local vs. global parallelization (parallelTasks != null) ? parallelize(parallelTasks) : globalParallelize ]); }; };