diff --git a/package.json b/package.json index 8710e1e..179098d 100644 --- a/package.json +++ b/package.json @@ -1,14 +1,17 @@ { - "name": "scraping-server", + "name": "srap", "version": "1.0.0", "main": "index.js", - "repository": "git@git.cryto.net:joepie91/scraping-server.git", + "repository": "git@git.cryto.net:joepie91/srap.git", "author": "Sven Slootweg ", "license": "WTFPL OR CC0-1.0", "dependencies": { "@promistream/buffer": "^0.1.1", "@promistream/map": "^0.1.1", + "@promistream/map-filter": "^0.1.0", + "@promistream/parallelize": "^0.1.0", "@promistream/pipe": "^0.1.2", + "@promistream/rate-limit": "^1.0.1", "@promistream/simple-sink": "^0.1.1", "@promistream/simple-source": "^0.1.3", "@validatem/any-property": "^0.1.3", diff --git a/src/dependency-map.js b/src/dependency-map.js new file mode 100644 index 0000000..573515a --- /dev/null +++ b/src/dependency-map.js @@ -0,0 +1,48 @@ +"use strict"; + +const defaultValue = require("default-value"); +const mapObj = require("map-obj"); + +module.exports = function createDependencyMap(configuration) { + let dependencyMap = mapObj(configuration.tasks, (task, definition) => { + return [ + task, + defaultValue(definition.dependsOn, []).map((dependencyName) => { + let dependency = configuration.tasks[dependencyName]; + + if (dependency != null) { + return { + task: dependencyName, + // TODO: Do defaults processing in configuration loading/validation instead + taskVersion: defaultValue(dependency.version, "0") + }; + } else { + throw new Error(`Invalid dependency specified, task does not exist: ${dependencyName}`); + } + }) + ]; + }); + + // NOTE: When inverting the dependencyMap, we totally ignore the taskVersion of dependencies when keying this mapping. While the taskVersion of specific tasks *may* matter to the code that uses these mappings, we don't support more than one version of a task simultaneously existing, and so keying by the task name alone is sufficient. + let dependentMap = {}; + + for (let [ task, dependencies ] of Object.entries(dependencyMap)) { + let taskDefinition = configuration.tasks[task]; + + for (let dependency of dependencies) { + if (dependentMap[dependency.task] == null) { + dependentMap[dependency.task] = []; + } + + dependentMap[dependency.task].push({ + task: task, + taskVersion: defaultValue(taskDefinition.version, "0") + }); + } + } + + return { + dependencyMap, + dependentMap + }; +}; diff --git a/src/kernel.js b/src/kernel.js index 2ffaa13..7058b35 100644 --- a/src/kernel.js +++ b/src/kernel.js @@ -6,13 +6,16 @@ const chalk = require("chalk"); const util = require("util"); const syncpipe = require("syncpipe"); -const rateLimit = require("@ppstreams/rate-limit"); +const rateLimit = require("@promistream/rate-limit"); const simpleSink = require("@promistream/simple-sink"); const pipe = require("@promistream/pipe"); -const parallelize = require("@ppstreams/parallelize"); +const parallelize = require("@promistream/parallelize"); const initialize = require("./initialize"); const logStatus = require("./log-status"); +const createDependencyMap = require("./dependency-map"); + +// FIXME: *Require* a taskInterval to be set, even if explicitly null, to prevent accidentally forgetting it // TODO: Publish this as a separate package // Inverts an object of arrays, eg. {a: [x, y], b: [x, z]} becomes {x: [a, b], y: [a], z: [b]} @@ -33,11 +36,6 @@ function invertMapping(object) { return newObject; } -function log(value) { - console.log(value); - return value; -} - module.exports = function createKernel(configuration) { return Promise.try(() => { return initialize({ @@ -51,6 +49,7 @@ module.exports = function createKernel(configuration) { const createTaskStream = require("./task-stream")(state); let { knex } = state; + let { dependencyMap, dependentMap } = createDependencyMap(configuration); function insertSeeds() { return Promise.map(configuration.seed, (item) => { @@ -89,6 +88,8 @@ module.exports = function createKernel(configuration) { task: task, tags: tags, taskVersion: defaultValue(taskConfiguration.version, "0"), + taskInterval: taskConfiguration.taskInterval, + parallelTasks: taskConfiguration.parallelTasks, ttl: taskConfiguration.ttl, run: taskConfiguration.run, globalRateLimiter: (attachToGlobalRateLimit != null) @@ -97,12 +98,8 @@ module.exports = function createKernel(configuration) { globalParallelize: (configuration.parallelTasks != null) ? parallelize(configuration.parallelTasks) : null, - taskDependencies: defaultValue(taskConfiguration.dependsOn, []).map((task) => { - return { - task: task, - taskVersion: defaultValue(configuration.tasks[task].taskVersion, "0") - }; - }) + taskDependencies: dependencyMap[task], + taskDependents: dependentMap[task] }); return pipe([ diff --git a/src/mutation-api/database.js b/src/mutation-api/database.js index d7de7d5..440f5b5 100644 --- a/src/mutation-api/database.js +++ b/src/mutation-api/database.js @@ -21,6 +21,7 @@ module.exports = function (state) { return queries.renameItem(tx, options); }, mergeItem: function (options) { + // FIXME: Move default return queries.mergeItem(tx, { ...options, from: defaultValue(options.from, item.id) @@ -28,10 +29,11 @@ module.exports = function (state) { }, deleteItem: function (options) { return queries.deleteItem(tx, { - id: defaultValue(options.id, item.id) + id: options.id }); }, createAlias: function (options) { + // FIXME: Move default return queries.createAlias(tx, { ...options, to: defaultValue(options.to, item.id) diff --git a/src/mutation-api/wrapper.js b/src/mutation-api/wrapper.js index 84e5746..d3acb80 100644 --- a/src/mutation-api/wrapper.js +++ b/src/mutation-api/wrapper.js @@ -1,17 +1,21 @@ "use strict"; +const Promise = require("bluebird"); + const wrapValueAsOption = require("@validatem/wrap-value-as-option"); const required = require("@validatem/required"); const isString = require("@validatem/is-string"); const defaultTo = require("@validatem/default-to"); const validateOptions = require("@validatem/core/src/api/validate-options"); const isFunction = require("@validatem/is-function"); +const arrayOf = require("@validatem/array-of"); // FIXME: Remaining validators -module.exports = function wrapMutationAPI({ item, task }, api) { +module.exports = function wrapMutationAPI({ item, task, taskDependents }, api) { return { createItem: function (options) { + // FIXME: Require tags to be set, even if to an empty array, to avoid accidentally forgetting the tags return api.createItem(options); }, renameItem: function (_options) { @@ -27,7 +31,14 @@ module.exports = function wrapMutationAPI({ item, task }, api) { mergeItem: function (options) { return api.mergeItem(options); }, - deleteItem: function (options = {}) { + deleteItem: function (_options) { + let options = validateOptions(arguments, [ + defaultTo({}), + wrapValueAsOption("id"), { + id: [ defaultTo(item.id), isString ] + } + ]); + return api.deleteItem(options); }, createAlias: function (options) { @@ -68,6 +79,30 @@ module.exports = function wrapMutationAPI({ item, task }, api) { ]); return api.expire(options); + }, + expireDependents: function (_options) { + let options = validateOptions(arguments, [ + defaultTo({}), + wrapValueAsOption("dependents"), { + id: [ defaultTo(item.id), isString ], + dependents: [ arrayOf(isString) ] + } + ]); + + let selectedDependents = (options.dependents != null) + ? new Set(options.dependents) + : null; + + let affectedDependents = (selectedDependents != null) + ? taskDependents.filter((dependent) => selectedDependents.has(dependent.task)) + : taskDependents; + + return Promise.map(affectedDependents, (dependent) => { + return this.expire({ + id: options.id, + taskName: dependent.task + }); + }); } }; }; diff --git a/src/queries.js b/src/queries.js index 3e4c3e5..b513d28 100644 --- a/src/queries.js +++ b/src/queries.js @@ -355,10 +355,13 @@ module.exports = function ({ db }) { }] }); - return db.Alias - .relatedQuery("item.taskResults", tx).for(id) - .where({ taskName: taskName }) - .patch({ isInvalidated: true }); + return Promise.try(() => { + return db.Alias.query(tx).findById(id); + }).then((alias) => { + return db.TaskResult.query(tx) + .where({ task: taskName, itemId: alias.itemId }) + .patch({ isInvalidated: true }); + }); }, setTTL: function (options) { // options = ttl || { id, taskName, ttl } diff --git a/src/task-stream.js b/src/task-stream.js index 53f4961..a3508f3 100644 --- a/src/task-stream.js +++ b/src/task-stream.js @@ -9,10 +9,12 @@ const debug = require("debug")("scrapingserver"); const simpleSource = require("@promistream/simple-source"); const buffer = require("@promistream/buffer"); const pipe = require("@promistream/pipe"); +const rateLimit = require("@promistream/rate-limit"); const createMutationAPIWrapper = require("./mutation-api/wrapper"); const logStatus = require("./log-status"); const chalk = require("chalk"); +const parallelize = require("@promistream/parallelize"); // FIXME: Revert inlining of task_states once switched to PostgreSQL 12+, which can do this automatically using NOT MATERIALIZED let query = ` @@ -90,7 +92,7 @@ module.exports = function (state) { // FIXME: Transaction support! - return function createTaskStream({ task, taskVersion, taskDependencies, tags, run, ttl, globalRateLimiter, globalParallelize }) { + return function createTaskStream({ task, taskVersion, taskDependencies, taskDependents, taskInterval, tags, run, ttl, globalRateLimiter, globalParallelize, parallelTasks }) { // TODO: Make nicer let ttlInSeconds = (ttl != null) ? (typeof ttl === "number") @@ -123,23 +125,27 @@ module.exports = function (state) { // console.log("rows:", result.rows); return result.rows; } else { - return Promise.resolve([]).delay(1000); + // FIXME: Make this delay configurable, or maybe even use LISTEN/NOTIFY + return Promise.resolve([]).delay(30000); } }); }), buffer(), globalRateLimiter, + (taskInterval != null) + ? rateLimit(taskInterval) + : null, processTaskSafely(task, (item, tx) => { logStatus(task, chalk.bold.cyan, "started", item.id); - let context = { tx, item, task, taskVersion }; + let context = { tx, item, task, taskVersion, taskDependents, taskDependencies }; let databaseMutationAPI = createDatabaseMutationAPI(context); let mutationAPI = createMutationAPIWrapper(context, databaseMutationAPI); let queue = []; - let methods = [ "createItem", "renameItem", "mergeItem", "deleteItem", "createAlias", "deleteAlias", "updateData", "updateMetadata", "expire" ]; + let methods = [ "createItem", "renameItem", "mergeItem", "deleteItem", "createAlias", "deleteAlias", "updateData", "updateMetadata", "expire", "expireDependents" ]; let queueMethods = syncpipe(methods, [ (_) => _.map((method) => [ method, function() { queue.push([ method, arguments ]); } ]), (_) => Object.fromEntries(_) @@ -170,10 +176,12 @@ module.exports = function (state) { return db.TaskResult.query(tx).findById([ task, item.id ]).patch({ is_successful: true, updated_at: new Date(), - expires_at: dateFns.add(new Date(), { seconds: ttlInSeconds }) + expires_at: (ttlInSeconds != null) + ? dateFns.add(new Date(), { seconds: ttlInSeconds }) + : null }); }).catch((error) => { - logStatus(task, chalk.bold.red, "failed", item.id); + logStatus(task, chalk.bold.red, "failed", `${item.id}: ${error.stack}`); return Promise.try(() => { // Task failed -- note, cannot use tx here because it has failed @@ -185,7 +193,10 @@ module.exports = function (state) { }); }); }), - globalParallelize + // TODO: Sort out a cleaner way to organize local vs. global parallelization + (parallelTasks != null) + ? parallelize(parallelTasks) + : globalParallelize ]); }; }; diff --git a/yarn.lock b/yarn.lock index 4207b4a..2160ee7 100644 --- a/yarn.lock +++ b/yarn.lock @@ -43,6 +43,11 @@ resolved "https://registry.yarnpkg.com/@joepie91/eslint-config/-/eslint-config-1.1.0.tgz#9397e6ce0a010cb57dcf8aef8754d3a5ce0ae36a" integrity sha512-XliasRSUfOz1/bAvTBaUlCjWDbceCW4y1DnvFfW7Yw9p2FbNRR0w8WoPdTxTCjKuoZ7/OQMeBxIe2y9Qy6rbYw== +"@joepie91/promise-delay-every@^1.0.0": + version "1.0.1" + resolved "https://registry.yarnpkg.com/@joepie91/promise-delay-every/-/promise-delay-every-1.0.1.tgz#d30035ef3150a6f7247dfd8a1a03a7a136167b2e" + integrity sha512-sMPFngDo7xsfBZWQ9dqARaC+UZdRnvoU6j4LaIPqnf1JmeHL6Bhlm+Nc9hOvrmulAhV0HWoEydJaAxAFTCRapA== + "@joepie91/unreachable@^1.0.0": version "1.0.0" resolved "https://registry.yarnpkg.com/@joepie91/unreachable/-/unreachable-1.0.0.tgz#8032bb8a5813e81bbbe516cb3031d60818526687" @@ -83,6 +88,18 @@ resolved "https://registry.yarnpkg.com/@promistream/is-end-of-stream/-/is-end-of-stream-0.1.1.tgz#7f84e630c9e49a92739df6a8c574eff99dd4c09d" integrity sha512-GZn7W0wrUen7kkgWCcwFFgr0g/ftfuddnuK/Tp0MLWCCJA4hyAboglCZP0JzEJdi34gClEP8lCfDwGekw18LHg== +"@promistream/map-filter@^0.1.0": + version "0.1.0" + resolved "https://registry.yarnpkg.com/@promistream/map-filter/-/map-filter-0.1.0.tgz#2bb4988b386a2e520f2d1cc1b6f7d333a24266d0" + integrity sha512-kHO1NvwuxEB4tTZ9b4XR9iBBJftHxT/857KbkWRe0kSt6zvFIVYHSzmKz/cipUx1f2FrmpQC+R6R84MlHD+ugQ== + dependencies: + "@promistream/propagate-abort" "^0.1.6" + "@promistream/propagate-peek" "^0.1.1" + "@validatem/core" "^0.3.15" + "@validatem/is-function" "^0.1.0" + "@validatem/required" "^0.1.1" + bluebird "^3.7.2" + "@promistream/map@^0.1.1": version "0.1.1" resolved "https://registry.yarnpkg.com/@promistream/map/-/map-0.1.1.tgz#2f771372e5d1dd12f41b6efd57874014d406f123" @@ -95,11 +112,39 @@ "@validatem/required" "^0.1.1" bluebird "^3.5.4" +"@promistream/parallelize@^0.1.0": + version "0.1.0" + resolved "https://registry.yarnpkg.com/@promistream/parallelize/-/parallelize-0.1.0.tgz#0f261c15e58acc141605a88d01ecce6a7a27612c" + integrity sha512-kbu2aheDxOl+PxvZWMkiU5WJdpnR3A+4s3/B9ky1W74tSt40Cl1/Qa+5sQtdmMuuqwhdhDFwjhhSlNJx4LwLJw== + dependencies: + "@promistream/is-aborted" "^0.1.1" + "@promistream/is-end-of-stream" "^0.1.1" + "@promistream/pipe" "^0.1.4" + "@promistream/propagate-abort" "^0.1.2" + "@promistream/sequentialize" "^0.1.0" + bluebird "^3.5.4" + debug "^4.1.1" + default-value "^1.0.0" + "@promistream/pipe@^0.1.2": version "0.1.3" resolved "https://registry.yarnpkg.com/@promistream/pipe/-/pipe-0.1.3.tgz#8a937cb6f7c42de6afd7ff5657cfef82493e7aed" integrity sha512-P3X1xzTO41TfDoRF6kMcsu0yttPaJB14r18jKM4KNpeuafhi7KYP6Vp5Vsnsx5bd90wBhhxDw82zz4xZHuMjMQ== +"@promistream/pipe@^0.1.4": + version "0.1.4" + resolved "https://registry.yarnpkg.com/@promistream/pipe/-/pipe-0.1.4.tgz#ef05fe582a33768c7eb56ad20635e1b7b48ac95b" + integrity sha512-4js6lhu/aTNEMosIBFcCz8Rkxc1S2V4zzI2QvZp9HqglhL5UTuxnv5VbU2ZlPFAFVID1aJOurZ8KdiVagHfOCw== + dependencies: + "@validatem/allow-extra-properties" "^0.1.0" + "@validatem/anything" "^0.1.0" + "@validatem/array-of" "^0.1.2" + "@validatem/core" "^0.3.15" + "@validatem/error" "^1.1.0" + "@validatem/remove-nullish-items" "^0.1.0" + "@validatem/required" "^0.1.1" + "@validatem/wrap-error" "^0.3.0" + "@promistream/propagate-abort@^0.1.2", "@promistream/propagate-abort@^0.1.6": version "0.1.6" resolved "https://registry.yarnpkg.com/@promistream/propagate-abort/-/propagate-abort-0.1.6.tgz#dfc3c78c2e22662b9e5d548afce2180c40584ef5" @@ -110,6 +155,28 @@ resolved "https://registry.yarnpkg.com/@promistream/propagate-peek/-/propagate-peek-0.1.1.tgz#c7dd69efcd894c408d7a3e9713b6a9036f70a501" integrity sha512-4xfkSmtPQzlvL4+KCquPHX7sPXiAACGJac/y7fB3Sv6ZKXAT/cjTfms1nEjlDGn1nroN0MzReBza2HnpF59deg== +"@promistream/rate-limit@^1.0.1": + version "1.0.1" + resolved "https://registry.yarnpkg.com/@promistream/rate-limit/-/rate-limit-1.0.1.tgz#e7fc1baf231df34d014d89594acc6f95626c4628" + integrity sha512-miBc4WR1xhcliAMzO04tetxsDNgVTlP96amf3pomxhScQiazLtX4U+oHFRcaCKscC82kkQFSZOCcRTDE+yw+Sw== + dependencies: + "@joepie91/promise-delay-every" "^1.0.0" + "@promistream/propagate-abort" "^0.1.6" + "@promistream/propagate-peek" "^0.1.1" + bluebird "^3.5.4" + debug "^4.3.1" + ms "^2.1.1" + +"@promistream/sequentialize@^0.1.0": + version "0.1.0" + resolved "https://registry.yarnpkg.com/@promistream/sequentialize/-/sequentialize-0.1.0.tgz#8cab499c2518ee856fcb1e13943859ca5b77ba71" + integrity sha512-lm7wJmlOSmBvHq49zLfs3cghOt9kcRhLezCbuhXQUXhhiaKLCvYuyA1AGId0kiJDPX2SggrU3Ojb+TOcxPEAqw== + dependencies: + "@joepie91/unreachable" "^1.0.0" + "@promistream/propagate-abort" "^0.1.2" + bluebird "^3.5.4" + p-defer "^3.0.0" + "@promistream/simple-sink@^0.1.1": version "0.1.1" resolved "https://registry.yarnpkg.com/@promistream/simple-sink/-/simple-sink-0.1.1.tgz#e3808179102ffe4bc10d70d681f19c649e1f3811" @@ -363,6 +430,11 @@ dependencies: "@validatem/error" "^1.0.0" +"@validatem/remove-nullish-items@^0.1.0": + version "0.1.0" + resolved "https://registry.yarnpkg.com/@validatem/remove-nullish-items/-/remove-nullish-items-0.1.0.tgz#fe1a8b64d11276b506fae2bd2c41da4985a5b5ff" + integrity sha512-cs4YSF47TA/gHnV5muSUUqGi5PwybP5ztu5SYnPKxQVTyubvcbrFat51nOvJ2PmUasyrIccoYMmATiviXkTi6g== + "@validatem/require-either@^0.1.0": version "0.1.0" resolved "https://registry.yarnpkg.com/@validatem/require-either/-/require-either-0.1.0.tgz#250e35ab06f124ea90f3925d74b5f53a083923b0" @@ -412,6 +484,19 @@ default-value "^1.0.0" split-filter-n "^1.1.2" +"@validatem/wrap-error@^0.3.0": + version "0.3.0" + resolved "https://registry.yarnpkg.com/@validatem/wrap-error/-/wrap-error-0.3.0.tgz#f8d170e79b6fdd68321d82c60581ad345be7d6b9" + integrity sha512-km5v6F/Xm7j8W/tmCmht2BTzxMLSpBUJ5MdhJD7ABEut/fdO0tNca1u1imTnWCULCJcdDHbNtpSmDMvXFg3E7Q== + dependencies: + "@validatem/combinator" "^0.1.1" + "@validatem/error" "^1.0.0" + "@validatem/match-validation-error" "^0.1.0" + "@validatem/validation-result" "^0.1.2" + as-expression "^1.0.0" + default-value "^1.0.0" + split-filter-n "^1.1.2" + "@validatem/wrap-value-as-option@^0.1.0": version "0.1.0" resolved "https://registry.yarnpkg.com/@validatem/wrap-value-as-option/-/wrap-value-as-option-0.1.0.tgz#57fa8d535f6cdf40cf8c8846ad45f4dd68f44568" @@ -1934,7 +2019,7 @@ ms@2.1.2: resolved "https://registry.yarnpkg.com/ms/-/ms-2.1.2.tgz#d09d1f357b443f493382a8eb3ccd183872ae6009" integrity sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w== -ms@^2.1.3: +ms@^2.1.1, ms@^2.1.3: version "2.1.3" resolved "https://registry.yarnpkg.com/ms/-/ms-2.1.3.tgz#574c8138ce1d2b5861f0b44579dbadd60c6615b2" integrity sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA== @@ -2041,6 +2126,11 @@ optionator@^0.9.1: type-check "^0.4.0" word-wrap "^1.2.3" +p-defer@^3.0.0: + version "3.0.0" + resolved "https://registry.yarnpkg.com/p-defer/-/p-defer-3.0.0.tgz#d1dceb4ee9b2b604b1d94ffec83760175d4e6f83" + integrity sha512-ugZxsxmtTln604yeYd29EGrNhazN2lywetzpKhfmQjW/VJmhpDmWbiX+h0zL8V91R0UXkhb3KtPmyq9PZw3aYw== + packet-reader@1.0.0: version "1.0.0" resolved "https://registry.yarnpkg.com/packet-reader/-/packet-reader-1.0.0.tgz#9238e5480dedabacfe1fe3f2771063f164157d74"