"use strict"; const Promise = require("bluebird"); const ms = require("ms"); const dateFns = require("date-fns"); const syncpipe = require("syncpipe"); const debug = require("debug")("scrapingserver"); const simpleSource = require("@promistream/simple-source"); const buffer = require("@promistream/buffer"); const pipe = require("@promistream/pipe"); const createMutationAPIWrapper = require("./mutation-api/wrapper"); let query = ` WITH candidates AS ( SELECT DISTINCT ON (items.id) items.*, results.expires_at, results.is_invalidated, results.is_successful, results.updated_at AS result_date, results.task_version FROM items INNER JOIN tags ON tags.item_id = items.id AND tags.name = ANY(:tags) LEFT JOIN task_results AS results ON results.item_id = items.id AND results.task = :task WHERE NOT EXISTS ( SELECT FROM tasks_in_progress AS pr WHERE pr.item_id = items.id ) ) ( SELECT * FROM candidates WHERE result_date IS NULL UNION SELECT * FROM candidates WHERE is_successful = TRUE AND ( expires_at < NOW() OR is_invalidated = TRUE OR NOT (task_version = :taskVersion) ) ) LIMIT :resultLimit; `; module.exports = function (state) { const processTaskSafely = require("./streams/process-task-safely")(state); const queries = require("./queries")(state); const createDatabaseMutationAPI = require("./mutation-api/database")(state); let { knex, db } = state; // FIXME: Transaction support! return function createTaskStream({ task, taskVersion, tags, run, ttl, globalRateLimiter, globalParallelize }) { // TODO: Make nicer let ttlInSeconds = (ttl != null) ? (typeof ttl === "number") ? ttl / 1000 : ms(ttl) / 1000 : undefined; return pipe([ simpleSource(() => { let startTime = Date.now(); return Promise.try(() => { // console.log("Fetching new batch"); return knex.raw(query, { tags: tags, task: task, taskVersion: taskVersion, resultLimit: 1000 // TODO: Make configurable }); }).then((result) => { let timeElapsed = Date.now() - startTime; debug(`Task retrieval query took ${timeElapsed}ms and produced ${result.rowCount} results`); if (result.rowCount > 0) { // console.log("rows:", result.rows); return result.rows; } else { return Promise.resolve([]).delay(1000); } }); }), buffer(), globalRateLimiter, processTaskSafely(task, (item, tx) => { console.log(`[started] ${item.id}`); let context = { tx, item, task, taskVersion }; let databaseMutationAPI = createDatabaseMutationAPI(context); let mutationAPI = createMutationAPIWrapper(context, databaseMutationAPI); let queue = []; let methods = [ "createItem", "renameItem", "mergeItem", "deleteItem", "createAlias", "deleteAlias", "updateData", "updateMetadata", "expire" ]; let queueMethods = syncpipe(methods, [ (_) => _.map((method) => [ method, function() { queue.push([ method, arguments ]); } ]), (_) => Object.fromEntries(_) ]); return Promise.try(() => { // TODO: Proper Validatem schemas for each API method return run({ id: item.id, data: item.data, getItem: function (id) { return queries.getItem(tx, id); }, ... queueMethods }); }).then(() => { if (!queue.some((method) => method[0] === "updateMetadata")) { // Doing an updateMetadata call is necessary to mark a task 'completed', so we inject a dummy call that doesn't actually change the metadata itself // FIXME: Split apart 'markTaskCompleted' and 'updateMetadata' queries so that this hack is no longer necessary queue.push([ "updateMetadata", [ (data) => data ]]); } return Promise.each(queue, ([ method, args ]) => { return mutationAPI[method](... args); }); }).then(() => { // Update succeeded return db.TaskResult.query(tx).findById([ task, item.id ]).patch({ is_successful: true, updated_at: new Date(), expires_at: dateFns.add(new Date(), { seconds: ttlInSeconds }) }); }).catch((error) => { console.warn(`[failed] ${item.id}`, error); return Promise.try(() => { // Task failed -- note, cannot use tx here because it has failed return db.TaskResult.query(knex).findById([ task, item.id ]).patch({ is_successful: false }); }).then(() => { // throw error; }); }); }), globalParallelize ]); }; };