From bc1a9349c30b30084e7c2a6628799111e4ee33b3 Mon Sep 17 00:00:00 2001 From: Sven Slootweg Date: Fri, 27 May 2022 17:48:01 +0200 Subject: [PATCH] Backend refactor WIP --- .gitignore | 1 + package.json | 7 + src/database-backends/api.md | 222 ++++++++ src/database-backends/index.js | 418 +++++++++++++++ src/database-backends/postgresql/index.js | 352 +++++++++++++ .../postgresql}/models/alias.js | 0 .../postgresql}/models/failure.js | 0 .../postgresql}/models/index.js | 0 .../postgresql}/models/item.js | 0 .../postgresql}/models/tag.js | 0 .../postgresql}/models/task-in-progress.js | 0 .../postgresql}/models/task-result.js | 0 .../postgresql/queries/get-task-stream.js | 106 ++++ .../postgresql/queries/get-update-stream.js | 65 +++ src/database-backends/simulated/index.js | 179 +++++++ src/dependency-map.js | 48 -- src/generate-task-graph.js | 44 ++ src/kernel.js | 427 ++++++++------- src/mutation-api/database.js | 61 --- src/mutation-api/simulation.js | 79 --- src/mutation-api/wrapper.js | 111 ---- src/{initialize.js => prometheus/index.js} | 31 +- src/queries.js | 487 ------------------ src/semantics/merge-items.js | 61 +++ src/semantics/pick-task-result.js | 13 + src/streams/process-task-safely.js | 29 +- src/task-kernel.js | 53 ++ src/task-stream.js | 201 -------- src/util/array-unique.js | 5 + src/util/invert-mapping.js | 20 + src/validators/is-ms.js | 16 + src/validators/is-positive-integer.js | 6 + src/validators/is-task-object.js | 30 ++ src/validators/is-valid-configuration.js | 46 ++ yarn.lock | 45 +- 35 files changed, 1943 insertions(+), 1220 deletions(-) create mode 100644 src/database-backends/api.md create mode 100644 src/database-backends/index.js create mode 100644 src/database-backends/postgresql/index.js rename src/{ => database-backends/postgresql}/models/alias.js (100%) rename src/{ => database-backends/postgresql}/models/failure.js (100%) rename src/{ => database-backends/postgresql}/models/index.js (100%) rename src/{ => database-backends/postgresql}/models/item.js (100%) rename src/{ => database-backends/postgresql}/models/tag.js (100%) rename src/{ => database-backends/postgresql}/models/task-in-progress.js (100%) rename src/{ => database-backends/postgresql}/models/task-result.js (100%) create mode 100644 src/database-backends/postgresql/queries/get-task-stream.js create mode 100644 src/database-backends/postgresql/queries/get-update-stream.js create mode 100644 src/database-backends/simulated/index.js delete mode 100644 src/dependency-map.js create mode 100644 src/generate-task-graph.js delete mode 100644 src/mutation-api/database.js delete mode 100644 src/mutation-api/simulation.js delete mode 100644 src/mutation-api/wrapper.js rename src/{initialize.js => prometheus/index.js} (70%) delete mode 100644 src/queries.js create mode 100644 src/semantics/merge-items.js create mode 100644 src/semantics/pick-task-result.js create mode 100644 src/task-kernel.js delete mode 100644 src/task-stream.js create mode 100644 src/util/array-unique.js create mode 100644 src/util/invert-mapping.js create mode 100644 src/validators/is-ms.js create mode 100644 src/validators/is-positive-integer.js create mode 100644 src/validators/is-task-object.js create mode 100644 src/validators/is-valid-configuration.js diff --git a/.gitignore b/.gitignore index 3c3629e..2bc12db 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ node_modules +junk diff --git a/package.json b/package.json index 79143d8..78d5934 100644 --- a/package.json +++ b/package.json @@ -7,8 +7,10 @@ "license": "WTFPL OR CC0-1.0", "dependencies": { "@joepie91/consumable": "^1.0.1", + "@joepie91/unreachable": "^1.0.0", "@promistream/buffer": "^0.1.1", "@promistream/combine-sequential-streaming": "^0.1.0", + "@promistream/filter": "^0.1.1", "@promistream/from-iterable": "^0.1.0", "@promistream/from-node-stream": "^0.1.1", "@promistream/map": "^0.1.1", @@ -23,12 +25,17 @@ "@validatem/array-of": "^0.1.2", "@validatem/core": "^0.3.16", "@validatem/default-to": "^0.1.0", + "@validatem/either": "^0.1.9", "@validatem/error": "^1.1.0", + "@validatem/is-array": "^0.1.1", "@validatem/is-boolean": "^0.1.1", "@validatem/is-date": "^0.1.0", "@validatem/is-function": "^0.1.0", + "@validatem/is-integer": "^0.1.0", "@validatem/is-number": "^0.1.3", + "@validatem/is-positive": "^1.0.0", "@validatem/is-string": "^1.0.0", + "@validatem/is-value": "^0.1.0", "@validatem/matches-format": "^0.1.0", "@validatem/require-either": "^0.1.0", "@validatem/required": "^0.1.1", diff --git a/src/database-backends/api.md b/src/database-backends/api.md new file mode 100644 index 0000000..f2347f5 --- /dev/null +++ b/src/database-backends/api.md @@ -0,0 +1,222 @@ +# runInTransaction + +```js +runInTransaction(parentTX, (tx) => { + // ... database operations go here ... +}) +``` + +# lock + +```js +lock(tx, { + id, + task +}) +``` + +Returns: `true` if locking succeeded, `false` if the task was already locked + +# unlock + +```js +lock(tx, { + id, + task +}) +``` + +# getItem + +Fetch an item from the database by its unique named ID. + +Wrapper API: + +```js +getItem(tx, id) + +getItem(tx, { + id, + optional = false +}) +``` + +Expected backend API: + +```js +getItem(tx, { + id, + optional +}) +``` + +# createItem + +Creates a new item or, if the item already exists, updates the existing item (if allowed). Tags/aliases are only ever added by this operation, never removed. + +Wrapper API: + +```js +createItem(tx, { + id, + tags = [], + aliases = [], + // Either `data` or `update` is required + data, + update, // callback(oldData) + failIfExists = false, + allowUpsert = true, // whether the *item object* is allowed to be updated, or should just be skipped if already exists + parentID +}) +``` + +Expected backend API: + +```js +createItem(tx, { + id, + tags, + aliases, + update, // callback(oldData ?? {}) + failIfExists, + allowUpsert, + parentID +}) +``` + +# renameItem + +```js +let [ tx, { to, from }] = validateArguments(arguments, { + tx: [ required, isTX ], + options: [{ + to: [ required, isString ], + from: [ required, isString ] + }] +}); +``` + +# repointAliases + +```js +let [ tx, { to, from }] = validateArguments(arguments, { + tx: [ required, isTX ], + options: [{ + to: [ required, isString ], + from: [ required, isString ] + }] +}); +``` + +# mergeItem + +```js +let [ tx, { from, into, merge, mergeMetadata }] = validateArguments(arguments, { + tx: [ required, isTX ], + options: [{ + from: [ required, isString ], + into: [ required, isString ], + merge: [ required, isFunction ], + mergeMetadata: [ defaultTo({}), anyProperty({ + key: [ required ], + value: [ required, isFunction ] + })], + }] +}); +``` + +# deleteItem + +```js +let [ tx, { id }] = validateArguments(arguments, { + tx: [ required, isTX ], + options: [{ + id: [ required, isString ] + }] +}); +``` + +# createAlias + +```js +let [ tx, { from, to, failIfExists }] = validateArguments(arguments, { + tx: [ required, isTX ], + options: [{ + from: [ required, isString ], + to: [ required, isString ], + failIfExists: [ defaultTo(false), isBoolean ] // TODO: Shouldn't this default to true, for any occurrence outside of a merge/rename? + }] +}); +``` + +# deleteAlias + +```js +let [ tx, { from }] = validateArguments(arguments, { + tx: [ required, isTX ], + options: [{ + from: [ required, isString ] + }] +}); +``` + +# updateData + +```js +let [ tx, { id, update }] = validateArguments(arguments, { + tx: [ required, isTX ], + options: [{ + id: [ required, isString ], + update: [ required, isFunction ] + }] +}); +``` + +# updateMetadata + +```js +let [ tx, { id, update, taskName, taskVersion, ttl }] = validateArguments(arguments, { + tx: [ required, isTX ], + options: [{ + id: [ required, isString ], + update: [ required, isFunction ], + taskName: [ required, isString ], + taskVersion: [ required, isString ], + ttl: [ isNumber ] + }] +}); +``` + +# expire + +```js +let [ tx, { id, taskName }] = validateArguments(arguments, { + tx: [ required, isTX ], + options: [{ + id: [ required, isString ], + taskName: [ required, isString ] + }] +}); +``` + +# setTTL + +# allowFailure? + +# log? + +# countLockedTasks + +Only a `tx` argument + +# getUpdates + +```js +let [ tx, { timestamp, prefix }] = validateArguments(arguments, { + tx: [ required, isTX ], + options: [ defaultTo({}), { + timestamp: [ isDate ], + prefix: [ isString ] + }] +}); +``` diff --git a/src/database-backends/index.js b/src/database-backends/index.js new file mode 100644 index 0000000..7898a9f --- /dev/null +++ b/src/database-backends/index.js @@ -0,0 +1,418 @@ +"use strict"; + +const unreachable = require("@joepie91/unreachable"); + +const { validateArguments } = require("@validatem/core"); +const required = require("@validatem/required"); +const requireEither = require("@validatem/require-either"); +const isString = require("@validatem/is-string"); +const isBoolean = require("@validatem/is-boolean"); +const isFunction = require("@validatem/is-function"); +const isDate = require("@validatem/is-date"); +const arrayOf = require("@validatem/array-of"); +const defaultTo = require("@validatem/default-to"); +const anyProperty = require("@validatem/any-property"); +const anything = require("@validatem/anything"); +const wrapValueAsOption = require("@validatem/wrap-value-as-option"); +const ValidationError = require("@validatem/error"); +const either = require("@validatem/either"); + +const isTaskObject = require("../validators/is-task-object"); + +// NOTE: The purpose of this module is to implement all the database API logic that's common across database backends; that mainly means initialization, settings merging, and input validation/normalization. Each individual database backend can then assume that it will always be called with valid input. + +/* TODO: API semantics to document: +- All mutating operations are queued, and executed in a transaction *after* the task has been completed; this is to avoid long-lived transactions interfering with task concurrency +- All read-only operations (as specified in the task logic) are executed immediately, bypassing any queue, and outside of any transaction. +- This means that read-only operations see the state of the database *before* any mutations take place, but mutating methods see the state *after* any preceding mutations (within that task) have taken place. +- Simulated execution works exactly like regular execution, except the mutation transaction is cancelled at the last moment, preventing any true changes to the database. This essentially means you are operating on a read-only view of the database. + - This also means that a transactional database is *required* to build a srap backend. If you want to implement a backend for a non-transactional database and are able to provide semantically equivalent simulation functionality otherwise, please open an issue! +*/ + +// FIXME: Verify that all internal method calls in the PostgreSQL backend are still valid after moving argument validation/normalization into this module + +module.exports = function (state) { + let { tasks } = state; + + const backendModules = { + "postgresql": require("./postgresql")(state), + "simulated": require("./simulated")(state), + }; + + function wrapBackend(backend) { + // NOTE: The backend.getDefaultTransaction method MUST return synchronously. + // TODO: Only accept an explicit `null` when defaulting, not an `undefined` which may be implicit? To ensure that the caller didn't just forget to provide one. Though most/all queries have arguments coming after the TX, so this might not be necessary. + let maybeTX = [ defaultTo(backend.getDefaultTransaction), backend.isTransaction ]; + + function maybeGetTaskObject(taskName) { + if (tasks.has(taskName)) { + return tasks.get(taskName); + } else { + throw new ValidationError(`No task named '${taskName}' exists`); + } + } + + let isTask = either([ + [ isTaskObject ], + [ isString, maybeGetTaskObject, isTaskObject ] + ]); + + return { + topLevel: { + simulate: function () { + // NOTE: Simulated backend is initialized synchronously; all other backends are not! + return wrapBackend(backendModules.simulated.create({ backend: backend })); + }, + + shutdown: function () { + return backend.shutdown(); + }, + + runInTransaction: function (_tx, _callback) { + let [ tx, callback ] = validateArguments(arguments, { + tx: maybeTX, + callback: [ required, isFunction ] + }); + + return backend.runInTransaction(tx, callback); + }, + countLockedTasks: function (_tx) { + let [ tx ] = validateArguments(arguments, { + tx: maybeTX + }); + + return backend.countLockedTasks(tx); + }, + + getUpdateStream: function (_tx, _options) { + let [ tx, options ] = validateArguments(arguments, { + tx: maybeTX, + options: [ defaultTo({}), { + timestamp: [ isDate ], + prefix: [ isString ] + }] + }); + + return backend.getUpdateStream(tx, options); + }, + + getTaskStream: function (_tx, _options) { + let [ tx, options ] = validateArguments(arguments, { + tx: maybeTX, + options: [ required, { + task: [ required, isTask ] + }] + }); + + return backend.getTaskStream(tx, options); + }, + + insertSeeds: function (_tx, _options) { + let [ tx, options ] = validateArguments(arguments, { + tx: maybeTX, + options: [ required, { + // FIXME: Stricter validation + seeds: [ required, arrayOf(anything) ] + }] + }); + + return Promise.map(options.seeds, (seed) => { + let { data, ... props } = seed; + + return backend.storeItem(tx, { + ... props, + update: () => data, + allowUpsert: false, + failIfExists: false + }); + }); + } + }, + forItem: function ({ item, task, mutationQueue }) { + // We create a new instance of the actual API for every item being processed. This is necessary because some of the input arguments will default to item-specific values, and some of the logic is dependent on task-specific metadata. This is a more efficient (and understandable) approach than pretending the API is stateless and then separately wrapping the API *again* for every individual item with a whole separate layer of input validation rules. + + // FIXME: Is this still correct, with the new task (graph) format? + let dependentTaskNames = new Set(task.dependents.map((dependent) => dependent.task)); + + function mutableOperation(func) { + if (simulate) { + return func(backend); + } else if (mutationQueue != null) { + mutationQueue.push(func); + } else { + unreachable("No mutation queue provided in live mode"); + } + } + + return { + // NOTE: 'internal' API methods are accessible to srap, but not to user-defined tasks. + internal: { + lock: function (_tx, _options) { + let [ tx, options ] = validateArguments(arguments, { + tx: maybeTX, + options: [ defaultTo({}), { + id: [ defaultTo(item.id), isString ], + task: [ defaultTo(task), isTask ] + }] + }); + + return backend.lock(tx, options); + }, + + unlock: function (_tx, _options) { + let [ tx, options ] = validateArguments(arguments, { + tx: maybeTX, + options: [ defaultTo({}), { + id: [ defaultTo(item.id), isString ], + task: [ defaultTo(task), isTask ] + }] + }); + + return backend.unlock(tx, options); + }, + + markTaskCompleted: function (_tx) { + // TODO: Allow specifying a different task or item ID? + let [ tx ] = validateArguments(arguments, { + tx: maybeTX + }); + + return backend.markTaskStatus(tx, { + id: item.id, + task: task, + isSuccessful: true + }); + }, + + markTaskFailed: function (_tx, _options) { + // TODO: Allow specifying a different task or item ID? + let [ tx, { error }] = validateArguments(arguments, { + tx: maybeTX, + options: [ required, { + error: [ required, anything ] // TODO: Restrict to Error types? + }] + }); + + // FIXME: Persist error + console.error("FIXME(persist error):", error.stack); + + return backend.markTaskStatus(tx, { + id: item.id, + task: task, + isSuccessful: false + }); + }, + }, + exposed: { + // NOTE: 'exposed' API methods are the ones that are passed into a user-defined task, and which the task uses to eg. update or create new items + getItem: function (_tx, _id, _optional) { + let [ tx, options ] = validateArguments(arguments, { + tx: maybeTX, + options: [ required, wrapValueAsOption("id"), { + id: [ required, isString ], + optional: [ defaultTo(false), isBoolean ] // FIXME: Can this handling be moved to the wrapper? + }] + }); + + return backend.getItem(tx, options); + }, + + storeItem: function (_tx, _options) { + // NOTE: Using `update` instead of `data` makes it an upsert! + let [ tx, options ] = validateArguments(arguments, { + tx: maybeTX, + options: [ required, { + id: [ required, isString ], + // Tags are required to be specified (even if an empty array) because it's easily forgotten + tags: [ required, arrayOf(isString) ], + aliases: [ defaultTo([]), arrayOf(isString) ], + data: [ anything ], // FIXME: Check for object + update: [ isFunction ], + failIfExists: [ defaultTo(false), isBoolean ], + allowUpsert: [ defaultTo(true), isBoolean ], + parentID: [ defaultTo(item.id), isString ] + }, requireEither([ "data", "update" ]) ] + }); + + let { data, ... rest } = options; + + return mutableOperation((backend) => { + return backend.storeItem(tx, { + ... rest, + // We normalize `data` and `update` (which are mutually-exclusive) into a single option here, so that the backend only needs to deal with the `update` case + // TODO: Can this be folded into the validation rules in a reasonable and readable way? + update: (data != null) + ? (existingData) => ({ ... existingData, ... data }) + : rest.update + }); + }); + }, + + moveItem: function (_tx, _options) { + let [ tx, options ] = validateArguments(arguments, { + tx: maybeTX, + options: [ required, wrapValueAsOption("into"), { + from: [ defaultTo(item.id), isString ], + into: [ required, isString ], + // NOTE: If no `merge` function is specified, that indicates that merging is not allowed (ie. this is strictly a rename), and mergeMetadata is ignored too + merge: [ isFunction ], + mergeMetadata: [ defaultTo({}), anyProperty({ + key: [ required ], + value: [ required, isFunction ] + })], + }] + }); + + return mutableOperation((backend) => { + return backend.moveItem(tx, { options, allowMerge: (options.merge != null) }); + }); + }, + + deleteItem: function (_tx, _options) { + let [ tx, options ] = validateArguments(arguments, { + tx: maybeTX, + options: [ + defaultTo({}), + wrapValueAsOption("id"), { + id: [ defaultTo(item.id), isString ] + } + ] + }); + + return mutableOperation((backend) => { + return backend.deleteItem(tx, options); + }); + }, + + createAlias: function (_tx, _options) { + let [ tx, options ] = validateArguments(arguments, { + tx: maybeTX, + options: [ required, wrapValueAsOption("from"), { + from: [ required, isString ], + to: [ defaultTo(item.id), isString ], + failIfExists: [ defaultTo(false), isBoolean ] // TODO: Shouldn't this default to true, for any occurrence outside of a merge/rename? + }] + }); + + return mutableOperation((backend) => { + return backend.createAlias(tx, options); + }); + }, + + deleteAlias: function (_tx, _options) { + let [ tx, options ] = validateArguments(arguments, { + tx: maybeTX, + options: [ required, wrapValueAsOption("from"), { + from: [ required, isString ] + }] + }); + + return mutableOperation((backend) => { + return backend.deleteAlias(tx, options); + }); + }, + + updateData: function (_tx, _options) { + // NOTE: This is a semantically self-describing convenience wrapper for `createItem` that updates the currently-being-processed item + let [ tx, options ] = validateArguments(arguments, { + tx: maybeTX, + options: [ required, wrapValueAsOption("update"), { + id: [ defaultTo(item.id), isString ], + update: [ required, isFunction ] + }] + }); + + return mutableOperation((backend) => { + return backend.createItem(tx, { + ... options, + tags: [] + }); + }); + }, + + updateMetadata: function (_tx, _options) { + let [ tx, options ] = validateArguments(arguments, { + tx: maybeTX, + options: [ required, wrapValueAsOption("update"), { + id: [ defaultTo(item.id), isString ], + update: [ required, isFunction ], + task: [ required, isTask ] + }] + }); + + return mutableOperation((backend) => { + return backend.updateMetadata(tx, options); + }); + }, + + expire: function (_tx, _options) { + // TODO: It probably doesn't make any semantic sense to leave *both* arguments unspecified. Maybe that should be prohibited via eg. a non-exclusive requireEither? Otherwise the user might expect to immediately expire the *current* task, but since the task is only updated *after* the task logic runs, that is not currently possible to express. + let [ tx, options ] = validateArguments(arguments, { + tx: maybeTX, + options: [ required, { + id: [ defaultTo(item.id), isString ], + isTask: [ defaultTo(task), isTask ] + }] + }); + + return mutableOperation((backend) => { + return backend.expire(tx, options); + }); + }, + + expireDependents: function (_tx, _options) { + // NOTE: This method does not have a counterpart in the database backend; it's a convenience abstraction over regular `backend.expire` calls + let [ tx, { id, dependents }] = validateArguments(arguments, { + tx: maybeTX, + options: [ defaultTo({}), wrapValueAsOption("dependents"), { + id: [ defaultTo(item.id), isString ], + dependents: [ defaultTo([]), arrayOf(isString), (dependents) => { + // Only consider dependents that actually exist for this task + return dependents.filter((dependent) => dependentTaskNames.has(dependent)); + }] + }] + }); + + // FIXME: This doesn't work with the synchronous queueing model + return Promise.map(dependents, (dependent) => { + return this.expire(tx, { + id: id, + taskName: dependent + }); + }); + }, + + // Temporary compatibility aliases + createItem: (... args) => this.storeItem(... args), + mergeItem: (... args) => this.moveItem(... args), + renameItem: (tx, options) => { + if (typeof options === "string") { + return this.moveItem(tx, options); + } else { + return this.moveItem(tx, { into: options.to, from: options.from }); + } + }, + } + }; + } + }; + } + + return function createDatabaseAPI({ backend: backendName, options, simulate }) { + // TODO: Validate inputs here, or maybe that belongs in config validation instead? + return Promise.try(() => { + let backendModule = backendModules[backendName]; + + if (backendModule != null) { + // FIXME: Smarter merge, maybe expose merge-by-template merge rules from the database module? + let settings = { ... backendModule.defaultSettings, ... options }; + + return backendModule.create(settings); + } else { + throw new Error(`No backend named '${backendName}' exists`); + } + }).then((backend) => { + return wrapBackend(backend); + }); + }; +}; diff --git a/src/database-backends/postgresql/index.js b/src/database-backends/postgresql/index.js new file mode 100644 index 0000000..443739d --- /dev/null +++ b/src/database-backends/postgresql/index.js @@ -0,0 +1,352 @@ +"use strict"; + +const Promise = require("bluebird"); + +const path = require("path"); +const defaultValue = require("default-value"); +const knexLibrary = require("knex"); +const { knexSnakeCaseMappers } = require("objection"); +const { addMilliseconds } = require("date-fns"); +const ValidationError = require("@validatem/error"); + +const models = require("./models"); +const mergeItems = require("../../semantics/merge-items"); + +let migrationsFolder = path.join(__dirname, "../../../migrations"); + +function noop() {} + +module.exports = function(state) { + let { metrics } = state; + + return { + defaultSettings: { + taskBatchSize: 1000, + taskBatchDelay: 30 * 1000 + }, + create: function createPostgreSQLBackend(options) { + let knex = knexLibrary({ + client: "pg", + pool: { min: 0, max: 32 }, + migrations: { tableName: "srap_knex_migrations" }, + connection: options, + ... knexSnakeCaseMappers() + }); + + return Promise.try(() => { + return knex.migrate.latest({ + directory: migrationsFolder + }); + }).then(() => { + let db = models({ ... state, knex: knex }); + + let queryState = { ... state, knex, db }; + + // TODO: Should this be inlined instead? + function repointAliases (tx, { to, from }) { + return db.Alias.query(tx) + .patch({ itemId: to, updatedAt: new Date() }) + .where({ itemId: from }); + } + + function getTaskResult(tx, task, id) { + return Promise.try(() => { + return db.Alias.query(tx).findById(id); + }).then((alias) => { + if (alias != null) { + return Promise.try(() => { + return db.TaskResult.query(tx).findById([ task.name, alias.itemId ]); + }); + } + }); + } + + return { + shutdown: function () { + knex.destroy(); + }, + + getDefaultTransaction: function () { + return knex; + }, + + isTransaction: function (value) { + if (value.where == null || value.raw == null) { + throw new ValidationError(`Must be a valid Knex or Knex transaction instance`); + } + }, + + runInTransaction: function (tx, callback) { + return tx.transaction((tx) => { + return callback(tx); + }, { doNotRejectOnRollback: false }); + }, + + lock: function (tx, { id, task }) { + return Promise.try(() => { + return db.TaskInProgress.query(tx).insert({ + task: task.name, + item_id: id + }); + }).then(() => { + return true; + }).catch({ name: "UniqueViolationError" }, () => { + return false; + }); + }, + + unlock: function (tx, { id, task }) { + return db.TaskInProgress.query(tx) + .delete() + .where({ + task: task.name, + item_id: id + }); + // TODO: return true/false depending on whether unlocking succeeded? + }, + + getItem: function (tx, { id, optional }) { + return Promise.try(() => { + return db.Alias.relatedQuery("item", tx) + .for(id) + .withGraphFetched("taskResults"); + }).then((results) => { + if (optional === true || results.length > 0) { + return results[0]; + } else { + throw new Error(`No item exists with ID '${id}'`); + } + }); + }, + + storeItem: function (tx, { id, tags, aliases, update, failIfExists, allowUpsert, parentID }) { + // FIXME: Make failIfExists actually work, currently it does nothing as the UNIQUE constraint violation cannot occur for an upsert + // TODO: Ensure that we run the transaction in full isolation mode, and retry in case of a conflict + + return Promise.try(() => { + // NOTE: We look up by alias, since this is an upsert - and so if the specified ID already exists as an alias, we should update the existing item instead of creating a new one with the specified (aliased) ID + return db.Alias + .relatedQuery("item", tx) + .for(id); + }).then((existingItems) => { + let existingItem = existingItems[0]; + + let actualID = (existingItem != null) + ? existingItem.id + : id; + + let existingData = (existingItem != null) + ? existingItem.data + : {}; + + // Make sure to add a self:self alias + let allAliases = aliases.concat([ actualID ]); + + let newItem = { + id: actualID, + data: update(existingData), + createdBy: parentID, + tags: tags.map((tag) => ({ name: tag })), + aliases: allAliases.map((alias) => ({ alias: alias })), + updatedAt: new Date() + }; + + return Promise.try(() => { + if (allowUpsert) { + // NOTE: We *always* do upserts here, even if the user specified `data` rather than `update`, because tags and aliases should always be added even if the item itself already exists. We trust the user not to accidentally reuse IDs between different kinds of objects (which would break in various other ways anyway). + return db.Item.query(tx).upsertGraph(newItem, { + insertMissing: true, + noDelete: true + }); + } else { + return db.Item.query(tx).insertGraph(newItem, { + insertMissing: true + }); + } + }).tap(() => { + // FIXME: We should probably move the metrics stuff to the wrapper instead, so that it works for *any* backend + metrics.storedItems.inc(1); + + // TODO: This currently produces somewhat misleading metrics; it only counts *explicitly specified* tags. That will *mostly* correlate to amount of genuinely-new items, but not perfectly. In the future, we should probably refactor the insertion code such that it is aware of the *current* tags of an item that it is about to merge into - but maybe that should be delayed until the zapdb migration. + if (newItem.tags != null) { + for (let tag of newItem.tags) { + metrics.storedItems.labels({ tag: tag.name }).inc(1); + } + } + }); + + }).catch({ name: "UniqueViolationError", table: "srap_items" }, (error) => { + if (failIfExists) { + throw error; + } else { + // Do nothing, just ignore the failure + } + }); + }, + + moveItem: function (tx, { from, into, merge, mergeMetadata }) { + let allowMerge = (merge != null); + + // TODO: Make stuff like `this.getItem` roundtrip through the backend abstraction instead of directly calling internal database methods, eg. by providing the backend itself as an argument to the method + return Promise.all([ + this.getItem(tx, { id: from, optional: true }), + this.getItem(tx, { id: into, optional: true }), + ]).then(([ fromObj, maybeIntoObj ]) => { + // NOTE: If the source item does not exist, we silently ignore that. This is to ensure that opportunistic renaming of items (eg. after a naming convention change which only affects older items) in scraper configurations is possible. + if (fromObj != null) { + if (allowMerge) { + let intoObj = defaultValue(maybeIntoObj, { + id: into, + data: {}, + taskResults: [] + }); + + let newObject = mergeItems({ fromObj, intoObj, merge, mergeMetadata }); + + let upsertOptions = { + insertMissing: true, + noDelete: true + }; + + return Promise.all([ + // NOTE: We don't use this.deleteItem, to sidestep any alias lookups + db.Item.query(tx).findById(from).delete(), + // NOTE: Cannot use into.$query here because that adds an implicit query builder operation, which upsertGraph does not allow + db.Item.query(tx).upsertGraph(newObject, upsertOptions), + ]); + } else { + return Promise.all([ + db.Item.query(tx).findById(from).patch({ id: into }), + this.createAlias(tx, { from: into, to: into }) + ]); + } + } + }).then(() => { + // NOTE: Repointing aliases has the side-effect of leaving a redirect from the source to the destination item, as each item has a self:self alias + return repointAliases(tx, { from: from, to: into }); + }); + }, + + deleteItem: function (tx, { id }) { + return db.Alias.relatedQuery("item", tx) + .for(id) + .delete(); + }, + + createAlias: function (tx, { from, to, failIfExists }) { + // Isolate this operation into a savepoint so that it can fail without breaking the entire transaction + let promise = this.runInTransaction(tx, (tx) => { + return db.Alias.query(tx).insert({ + alias: from, + itemId: to, + updatedAt: new Date() + }); + }); + + if (failIfExists) { + return promise; + } else { + return Promise.resolve(promise) + .catch({ name: "UniqueViolationError" }, noop); + } + }, + + deleteAlias: function (tx, { from }) { + // TODO: This cannot yet be propagated to the update feed, because we don't keep a record of deletions + return db.Alias.query(tx).findById(from).delete(); + }, + + updateMetadata: function (tx, { id, update, task }) { + // TODO: failIfExists + return Promise.try(() => { + return getTaskResult(tx, task, id); + }).then((taskResult) => { + let sharedFields = { + isInvalidated: false, + updatedAt: new Date() + }; + + if (taskResult != null) { + return taskResult.$query(tx).patch({ + ... sharedFields, + metadata: update(taskResult.metadata), + }); + } else { + return db.TaskResult.query(tx).insert({ + ... sharedFields, + task: task.name, + itemId: id, + metadata: update({}) + }); + } + }); + }, + + expire: function (tx, { id, task }) { + return Promise.try(() => { + return db.Alias.query(tx).findById(id); + }).then((alias) => { + return db.TaskResult.query(tx) + .where({ task: task.name, itemId: alias.itemId }) + .patch({ isInvalidated: true }); + }); + }, + + setTTL: function (options) { + // options = ttl || { id, taskName, ttl } + // FIXME + }, + + allowFailure: function (allowed) { + + }, + + log: function (category, message) { + + }, + + markTaskStatus: function (tx, { id, task, isSuccessful }) { + return Promise.try(() => { + return getTaskResult(tx, task, id); + }).then((taskResult) => { + let sharedFields = { + isSuccessful: isSuccessful, + taskVersion: task.version, + expiresAt: (task.ttl != null) + ? addMilliseconds(new Date(), task.ttl) + : undefined + }; + + if (taskResult != null) { + return taskResult.$query(tx).patch({ + ... sharedFields + }); + } else { + return db.TaskResult.query(tx).insert({ + ... sharedFields, + task: task.name, + itemId: id + }); + } + }); + }, + + logTaskError: function (tx, { id, task, error }) { + throw new Error(`UNIMPLEMENTED`); // FIXME + }, + + countLockedTasks: function (tx) { + return Promise.try(() => { + return db.TaskInProgress.query(tx).count({ count: "*" }); + }).then((result) => { + return result[0].count; + }); + }, + + getUpdateStream: require("./queries/get-update-stream")(queryState), + getTaskStream: require("./queries/get-task-stream")(queryState) + }; + }); + } + }; +}; diff --git a/src/models/alias.js b/src/database-backends/postgresql/models/alias.js similarity index 100% rename from src/models/alias.js rename to src/database-backends/postgresql/models/alias.js diff --git a/src/models/failure.js b/src/database-backends/postgresql/models/failure.js similarity index 100% rename from src/models/failure.js rename to src/database-backends/postgresql/models/failure.js diff --git a/src/models/index.js b/src/database-backends/postgresql/models/index.js similarity index 100% rename from src/models/index.js rename to src/database-backends/postgresql/models/index.js diff --git a/src/models/item.js b/src/database-backends/postgresql/models/item.js similarity index 100% rename from src/models/item.js rename to src/database-backends/postgresql/models/item.js diff --git a/src/models/tag.js b/src/database-backends/postgresql/models/tag.js similarity index 100% rename from src/models/tag.js rename to src/database-backends/postgresql/models/tag.js diff --git a/src/models/task-in-progress.js b/src/database-backends/postgresql/models/task-in-progress.js similarity index 100% rename from src/models/task-in-progress.js rename to src/database-backends/postgresql/models/task-in-progress.js diff --git a/src/models/task-result.js b/src/database-backends/postgresql/models/task-result.js similarity index 100% rename from src/models/task-result.js rename to src/database-backends/postgresql/models/task-result.js diff --git a/src/database-backends/postgresql/queries/get-task-stream.js b/src/database-backends/postgresql/queries/get-task-stream.js new file mode 100644 index 0000000..063a75c --- /dev/null +++ b/src/database-backends/postgresql/queries/get-task-stream.js @@ -0,0 +1,106 @@ +"use strict"; + +const debug = require("debug")("srap:backend:postgresql:query:get-task-stream"); +const simpleSource = require("@promistream/simple-source"); + +const query = ` + WITH + dependency_tasks AS ( + SELECT * FROM + json_to_recordset(:dependencyTaskDefinitions) AS x(task text, task_version text) + ), + matching_items AS ( + SELECT + DISTINCT ON (srap_items.id) + srap_items.*, + results.updated_at AS result_date, + results.task_version, + ( + results.is_successful = TRUE + AND ( + results.expires_at < NOW() + OR results.is_invalidated = TRUE + ) + ) AS is_candidate + FROM srap_items + INNER JOIN srap_tags + ON srap_tags.item_id = srap_items.id + AND srap_tags.name = ANY(:tags) + LEFT JOIN srap_task_results AS results + ON results.item_id = srap_items.id + AND results.task = :task + WHERE + NOT EXISTS ( + SELECT FROM srap_tasks_in_progress AS pr WHERE pr.item_id = srap_items.id + ) + ), + candidates AS ( + SELECT * FROM matching_items + WHERE result_date IS NULL + UNION + SELECT * FROM matching_items + WHERE is_candidate = TRUE + OR NOT (task_version = :taskVersion) + ) + ( + SELECT + * + FROM + candidates + WHERE + NOT EXISTS ( + SELECT + results.* + FROM dependency_tasks + LEFT JOIN srap_task_results AS results + ON dependency_tasks.task = results.task + AND dependency_tasks.task_version = results.task_version + AND results.item_id = candidates.id + WHERE + results.is_successful IS NULL + OR results.is_successful = FALSE + OR ( + results.is_successful = TRUE + AND ( + results.expires_at < NOW() + OR results.is_invalidated = TRUE + ) + ) + ) + ) LIMIT :resultLimit; +`; + +module.exports = function ({ metrics, backendSettings }) { + return function (tx, { task }) { + return simpleSource(() => { + let startTime = Date.now(); + + return Promise.try(() => { + return tx.raw(query, { + tags: task.tags, + task: task.name, + taskVersion: task.version, + resultLimit: backendSettings.taskBatchSize, + dependencyTaskDefinitions: JSON.stringify(task.dependencies.map((dependency) => { + // Case-mapping for SQL compatibility + return { task_version: dependency.version, task: dependency.name }; + })) + }); + }).then((result) => { + let timeElapsed = Date.now() - startTime; + + metrics.taskFetchTime.labels({ task: task.name }).set(timeElapsed / 1000); + metrics.taskFetchResults.labels({ task: task.name }).set(result.rowCount); + + debug(`Task retrieval query for '${task.name}' took ${timeElapsed}ms and produced ${result.rowCount} results`); + + if (result.rowCount > 0) { + return result.rows; + } else { + // TODO: Consider using LISTEN/NOTIFY instead? + return Promise.resolve([]).delay(backendSettings.taskBatchDelay); + } + }); + }); + }; +} diff --git a/src/database-backends/postgresql/queries/get-update-stream.js b/src/database-backends/postgresql/queries/get-update-stream.js new file mode 100644 index 0000000..84be6ba --- /dev/null +++ b/src/database-backends/postgresql/queries/get-update-stream.js @@ -0,0 +1,65 @@ +"use strict"; + +const dateFns = require("date-fns"); + +const pipe = require("@promistream/pipe"); +const combineSequentialStreaming = require("@promistream/combine-sequential-streaming"); +const fromIterable = require("@promistream/from-iterable"); +const fromNodeStream = require("@promistream/from-node-stream"); + +const createTypeTaggingStream = require("./streams/tag-type"); + +module.exports = function ({ db, knex }) { + return function (tx, { timestamp, prefix }) { + // NOTE: This returns snake_cased keys! As we're bypassing the Objection internals, no casemapping occurs. + // FIXME/REFACTOR: That needs to be changed in the refactor, for consistency across database backends + + // NOTE: This is a hacky workaround - if we don't do this, then for some reason also entries *at* the exact timestamp are included, which is not what we want. + // FIXME: Verify that this doesn't break anything, eg. when an entry is created inbetween the original timestamp and +1ms. + let actualTimestamp = (timestamp != null) + ? dateFns.addMilliseconds(timestamp, 1) + : undefined; + + function applyWhereClauses(query, idField) { + if (timestamp != null) { + // FIXME: An error in the query here throws an error, resulting in an abort handling bug in a promistream + query = query.whereRaw(`updated_at > ?`, [ actualTimestamp ]); + } + + if (prefix != null) { + query = query.whereRaw(`${idField} LIKE ?`, [ `${prefix.replace(/%/g, "\\%")}%` ]); + } + + return query; + } + + function* streamGenerator() { + yield pipe([ + fromNodeStream.fromReadable( + applyWhereClauses(db.Item.query(tx), "id").toKnexQuery().stream() + ), + createTypeTaggingStream("item") + ]); + + yield pipe([ + fromNodeStream.fromReadable( + // NOTE: We are only interested in aliases which don't point at themselves + applyWhereClauses(db.Alias.query(tx).where("alias", "!=", knex.ref("item_id")), "alias").toKnexQuery().stream() + ), + createTypeTaggingStream("alias") + ]); + + yield pipe([ + fromNodeStream.fromReadable( + applyWhereClauses(db.TaskResult.query(tx), "item_id").toKnexQuery().stream() + ), + createTypeTaggingStream("taskResult") + ]); + } + + return pipe([ + fromIterable(streamGenerator()), + combineSequentialStreaming() + ]); + } +}; diff --git a/src/database-backends/simulated/index.js b/src/database-backends/simulated/index.js new file mode 100644 index 0000000..b1a22d3 --- /dev/null +++ b/src/database-backends/simulated/index.js @@ -0,0 +1,179 @@ +"use strict"; + +const mergeItems = require("../../semantics/merge-items"); + +function printTX(tx) { + // TODO: Print entire chain? + return `[tx ${tx.__txID}]`; +} + +function printItem(id, task) { + if (task != null) { + return `[${id}:${task.name}]`; + } else { + return `[${id}]`; + } +} + +function logSimulated(... args) { + console.log(... args); +} + +module.exports = function (state) { + + // NOTE: The simulated backend needs access to the 'real' backend; a task may eg. mutate an item based on its current data, and we'd need to read that from the real data source. The only constraint is that the simulated backend cannot *mutate* anything in the real backend, but reading is fine! + return function attachSimulatedBackend({ backend }) { + return { + defaultSettings: {}, + create: function createSimulatedBackend(_options) { + let txCounter = 0; + let locks = new Map(); // Map> + + return { + shutdown: function () { + return backend.shutdown(); + }, + + getDefaultTransaction: function () { + return { __txID: null }; + }, + + isTransaction: function (value) { + return ("__txID" in value); + }, + + runInTransaction: function (tx, callback) { + let newTransaction = { __txID: txCounter++, __parentTX: tx }; + + return callback(newTransaction); + }, + + lock: function (tx, { id, task }) { + if (!locks.has(task)) { + locks.set(task, new Set()); + } + + let taskLocks = locks.get(task); + + if (taskLocks.has(id)) { + logSimulated(`${printTX(tx)} Already locked: ${printItem(id, task)}`); + return false; + } else { + logSimulated(`${printTX(tx)} Locking ${printItem(id, task)}`); + taskLocks.add(id); + return true; + } + }, + + unlock: function (tx, { id, task }) { + logSimulated(`${printTX(tx)} Unlocking ${printItem(id, task)}`); + locks.get(task).delete(id); + }, + + getItem: function (tx, options) { + return backend.getItem(backend.getDefaultTransaction(), options); + }, + + storeItem: function (tx, { id, parentID, update, tags, aliases, allowUpsert }) { + return Promise.try(() => { + return this.getItem(tx, { id: id, optional: true }); + }).then((currentItem) => { + let actualID = currentItem.id ?? id; + + let newItem = { + id: actualID, + data: (currentItem != null) + ? update(currentItem.data) + : update({}), + createdBy: parentID, + tags: tags, + aliases: aliases.concat([ actualID ]), + updatedAt: new Date() + }; + + logSimulated(`${printTX(tx)} ${printItem(id)} Storing item (upsert=${allowUpsert})`, newItem); + }); + }, + + moveItem: function (tx, { from, into, merge, mergeMetadata, allowMerge }) { + return Promise.all([ + this.getItem(tx, { id: from, optional: true }), + this.getItem(tx, { id: into, optional: true }), + ]).then((fromObj, maybeIntoObj) => { + if (fromObj != null) { + let intoObj = maybeIntoObj ?? { + id: into, + data: {}, + taskResults: [] + }; + + if (allowMerge) { + let newItem = mergeItems({ fromObj, intoObj, merge, mergeMetadata }); + logSimulated(`${printTX(tx)} ${printItem(from)} Moving item to ${printItem[into]} (merge=true)`, newItem); + } else { + logSimulated(`${printTX(tx)} ${printItem(from)} Moving item to ${printItem[into]} (merge=false)`); + } + } + }); + }, + + deleteItem: function (tx, { id }) { + logSimulated(`${printTX(tx)} ${printItem(id)} Deleting item`); + }, + + createAlias: function (tx, { from, to }) { + logSimulated(`${printTX(tx)} ${printItem(to)} Creating alias ${from}`); + }, + + deleteAlias: function (tx, { from }) { + logSimulated(`${printTX(tx)} ${printItem(from)} Deleting alias`); + }, + + updateMetadata: function (tx, { id, task, update }) { + return Promise.try(() => { + return this.getItem(tx, { id, optional: true }); + }).then((item) => { + let taskResult = (item != null) + ? item.taskResults.find((result) => result.task === task) + : undefined; + + let existingData = (taskResult != null) + ? taskResult.metadata + : {}; + + let newData = update(existingData); + + logSimulated(`${printTX(tx)} ${printItem(id, task)} Storing metadata`, newData); + }); + }, + + expire: function (tx, { id, task }) { + logSimulated(`${printTX(tx)} ${printItem(id, task)} Expiring task result`); + }, + + markTaskStatus: function (tx, { id, task, isSuccessful }) { + if (isSuccessful) { + logSimulated(`${printTX(tx)} ${printItem(id, task)} Marking task as completed`); + } else { + logSimulated(`${printTX(tx)} ${printItem(id, task)} Marking task as failed`); + } + }, + + countLockedTasks: function (tx) { + return Array.from(locks).reduce((total, [ task, taskLocks ]) => { + return total + taskLocks.size; + }, 0); + }, + + getUpdateStream: function (... args) { + return backend.getUpdateStream(... args); + }, + + getTaskStream: function (... args) { + return backend.getTaskStream(... args); + } + }; + } + }; + }; +}; diff --git a/src/dependency-map.js b/src/dependency-map.js deleted file mode 100644 index 573515a..0000000 --- a/src/dependency-map.js +++ /dev/null @@ -1,48 +0,0 @@ -"use strict"; - -const defaultValue = require("default-value"); -const mapObj = require("map-obj"); - -module.exports = function createDependencyMap(configuration) { - let dependencyMap = mapObj(configuration.tasks, (task, definition) => { - return [ - task, - defaultValue(definition.dependsOn, []).map((dependencyName) => { - let dependency = configuration.tasks[dependencyName]; - - if (dependency != null) { - return { - task: dependencyName, - // TODO: Do defaults processing in configuration loading/validation instead - taskVersion: defaultValue(dependency.version, "0") - }; - } else { - throw new Error(`Invalid dependency specified, task does not exist: ${dependencyName}`); - } - }) - ]; - }); - - // NOTE: When inverting the dependencyMap, we totally ignore the taskVersion of dependencies when keying this mapping. While the taskVersion of specific tasks *may* matter to the code that uses these mappings, we don't support more than one version of a task simultaneously existing, and so keying by the task name alone is sufficient. - let dependentMap = {}; - - for (let [ task, dependencies ] of Object.entries(dependencyMap)) { - let taskDefinition = configuration.tasks[task]; - - for (let dependency of dependencies) { - if (dependentMap[dependency.task] == null) { - dependentMap[dependency.task] = []; - } - - dependentMap[dependency.task].push({ - task: task, - taskVersion: defaultValue(taskDefinition.version, "0") - }); - } - } - - return { - dependencyMap, - dependentMap - }; -}; diff --git a/src/generate-task-graph.js b/src/generate-task-graph.js new file mode 100644 index 0000000..8c0ea65 --- /dev/null +++ b/src/generate-task-graph.js @@ -0,0 +1,44 @@ +"use strict"; + +const syncpipe = require("syncpipe"); +const defaultValue = require("default-value"); // FIXME: Move to config validation + +const invertMapping = require("./util/invert-mapping"); + +module.exports = function generateTaskGraph({ tags, tasks }) { + let tagsMapping = invertMapping(tags); + + let tasksMap = syncpipe(tasks, [ + _ => Object.entries(_), + _ => _.map(([ name, taskDefinition ]) => { + return [ name, { + ... taskDefinition, + name: name, + tags: tagsMapping[name], + dependencies: [], + dependents: [] + }]; + }), + _ => new Map(_) + ]); + + function getTask(name) { + if (tasksMap.has(name)) { + return tasksMap.get(name); + } else { + throw new Error(`Encountered dependency reference to non-existent task '${name}'`); + } + } + + for (let task of tasksMap.values()) { + for (let dependencyName of defaultValue(task.dependsOn, [])) { + task.dependencies.push(getTask(dependencyName)); + getTask(dependencyName).dependents.push(task); + } + + // NOTE: We are mutating a local copy of the task definition here, that we created further up + delete task.dependsOn; + } + + return tasksMap; +}; diff --git a/src/kernel.js b/src/kernel.js index d69a35b..ea18059 100644 --- a/src/kernel.js +++ b/src/kernel.js @@ -11,217 +11,272 @@ const simpleSink = require("@promistream/simple-sink"); const pipe = require("@promistream/pipe"); const parallelize = require("@promistream/parallelize"); -const initialize = require("./initialize"); const logStatus = require("./log-status"); -const createDependencyMap = require("./dependency-map"); +const { validateOptions } = require("@validatem/core"); +const isValidConfiguration = require("./validators/is-valid-configuration"); +const createPrometheus = require("./prometheus"); +const generateTaskGraph = require("./generate-task-graph"); +const unreachable = require("@joepie91/unreachable")("srap"); // FIXME: *Require* a taskInterval to be set, even if explicitly null, to prevent accidentally forgetting it -// TODO: Publish this as a separate package -// Inverts an object of arrays, eg. {a: [x, y], b: [x, z]} becomes {x: [a, b], y: [a], z: [b]} -// Useful for eg. tag mappings -function invertMapping(object) { - let newObject = {}; - - for (let [ key, valueList ] of Object.entries(object)) { - for (let value of valueList) { - if (newObject[value] == null) { - newObject[value] = []; - } - - newObject[value].push(key); - } - } - - return newObject; -} - -module.exports = function createKernel(configuration) { - return Promise.try(() => { - return initialize({ - knexfile: { - client: "pg", - connection: configuration.database, - pool: { min: 0, max: 32 }, - migrations: { tableName: "srap_knex_migrations" } +module.exports = async function createKernel(_configuration) { + let configuration = validateOptions(arguments, isValidConfiguration); + + let state = { + ... createPrometheus(), + tasks: generateTaskGraph({ + tags: configuration.tags, + tasks: configuration.tasks + }) + }; + + let { metrics, tasks } = state; + + const createBackend = require("./database-backends")(state); + + let attachToGlobalRateLimit = (configuration.taskInterval != null) + ? rateLimit.clonable(configuration.taskInterval) + : undefined; + + let backend = await createBackend({ + backend: configuration.backend, + options: configuration.database + }); + + Object.assign(state, { backend: backend }); + + const createTaskKernel = require("./task-kernel")(state); + + function checkLockedTasks() { + return Promise.try(() => { + return backend.topLevel.countLockedTasks(); + }).then((lockedCount) => { + if (lockedCount > 0) { + console.log(`${chalk.bold.red("WARNING:")} There are ${lockedCount} tasks currently locked, and they will not be run! This may be caused by a process crash in the past. See the documentation for more details on how to solve this issue.`); } }); - }).then((state) => { - const queries = require("./queries")(state); - const createTaskStream = require("./task-stream")(state); - const createDatabaseQueue = require("./queued-database-api")(state); + } - let { knex, prometheusRegistry, metrics } = state; - let { dependencyMap, dependentMap } = createDependencyMap(configuration); - - function insertSeeds() { - return Promise.map(configuration.seed, (item) => { - return queries.createItem(knex, { - ... item, - allowUpsert: false, - failIfExists: false - }); - }); + let databasePreparePromise; + async function prepareDatabase() { + if (databasePreparePromise == null) { + databasePreparePromise = Promise.all([ + checkLockedTasks(), + backend.topLevel.insertSeeds(configuration.seed) + ]); } - function checkLockedTasks() { - return Promise.try(() => { - return queries.countLockedTasks(knex); - }).then((lockedCount) => { - if (lockedCount > 0) { - console.log(`${chalk.bold.red("WARNING:")} There are ${lockedCount} tasks currently locked, and they will not be run! This may be caused by a process crash in the past. See the documentation for more details on how to solve this issue.`); - } - }); - } + return databasePreparePromise; + } - function runTaskStreams() { - let tasks = invertMapping(configuration.tags); - - let attachToGlobalRateLimit = (configuration.taskInterval != null) - ? rateLimit.clonable(configuration.taskInterval) - : undefined; - - console.log(`Starting ${Object.keys(tasks).length} tasks...`); - - return Promise.map(Object.entries(tasks), ([ task, tags ]) => { - let taskConfiguration = configuration.tasks[task]; - - if (taskConfiguration != null) { - let taskStream = createTaskStream({ - task: task, - tags: tags, - taskVersion: defaultValue(taskConfiguration.version, "0"), - taskInterval: taskConfiguration.taskInterval, - parallelTasks: taskConfiguration.parallelTasks, - ttl: taskConfiguration.ttl, - run: taskConfiguration.run, - globalRateLimiter: (attachToGlobalRateLimit != null) - ? attachToGlobalRateLimit() - : null, - globalParallelize: (configuration.parallelTasks != null) - ? parallelize(configuration.parallelTasks) - : null, - taskDependencies: dependencyMap[task], - taskDependents: dependentMap[task] - }); + return { + run: async function runKernel() { + console.log(`Starting ${tasks.size} tasks...`); - return pipe([ - taskStream, - simpleSink((completedItem) => { + await prepareDatabase(); + + return Promise.map(tasks.values(), (task) => { + return pipe([ + createTaskKernel(task), + simpleSink(({ status, item, error }) => { + if (status === "completed") { metrics.successfulItems.inc(1); metrics.successfulItems.labels({ task: task }).inc(1); - logStatus(task, chalk.bold.green, "completed", completedItem.id); - }) - ]).read(); - } else { - throw new Error(`Task '${task}' is defined to run for tags [${tags}], but no such task is defined`); - } - }).catch((error) => { - console.dir(error, { depth: null, colors: true }); - throw error; + } else if (status === "failed") { + metrics.failedItems.inc(1); + metrics.failedItems.labels({ task: task }).inc(1); + } else { + unreachable(`Unrecognized status '${status}'`); + } + }) + ]).read(); + }); + }, + simulate: async function simulate({ itemID, task }) { + await prepareDatabase(); + let simulatedBackend = backend.simulate(); + + return simulateTask(itemID, task); + }, + execute: async function simulate({ itemID, task }) { + await prepareDatabase(); + return executeTask(itemID, task); + }, + shutdown: function () { + // TODO: Properly lock all public methods after shutdown is called, and wait for any running tasks to have completed + return backend.shutdown(); + }, + getMetrics: function () { + return Promise.try(() => { + return state.prometheusRegistry.metrics(); + }).then((metrics) => { + return { + contentType: state.prometheusRegistry.contentType, + metrics: metrics + }; }); } - - function executeTask(id, task) { - let taskConfiguration = configuration.tasks[task]; + }; - return knex.transaction((tx) => { - return Promise.try(() => { - return queries.getItem(knex, id); - }).then((item) => { - let queue = createDatabaseQueue({ - tx, - item, - task, - taskVersion: defaultValue(taskConfiguration.version, "0"), - taskDependents: dependentMap[task], - taskDependencies: dependencyMap[task] - }); - return Promise.try(() => { - return taskConfiguration.run({ - id: item.id, - data: item.data, - getItem: function (id) { - return queries.getItem(knex, id); - }, - ... queue.api - }); - }).then(() => { - return queue.execute(); - }); - }); - }, { doNotRejectOnRollback: false }); - } + + + + + + + + + + + + + + function runTaskStreams() { + - function simulateTask(id, task) { + + + + return Promise.map(Object.entries(tasks), ([ task, tags ]) => { let taskConfiguration = configuration.tasks[task]; - let methods = [ "createItem", "renameItem", "mergeItem", "deleteItem", "createAlias", "deleteAlias", "updateData", "updateMetadata", "expire", "expireDependents" ]; + if (taskConfiguration != null) { + let taskStream = createTaskStream({ + task: task, + tags: tags, + taskVersion: defaultValue(taskConfiguration.version, "0"), + taskInterval: taskConfiguration.taskInterval, + parallelTasks: taskConfiguration.parallelTasks, + ttl: taskConfiguration.ttl, + run: taskConfiguration.run, + globalRateLimiter: (attachToGlobalRateLimit != null) + ? attachToGlobalRateLimit() + : null, + globalParallelize: (configuration.parallelTasks != null) + ? parallelize(configuration.parallelTasks) + : null, + taskDependencies: dependencyMap[task], + taskDependents: dependentMap[task] + }); - let simulatedMethods = syncpipe(methods, [ - (_) => _.map((method) => [ method, function() { - console.log(`${chalk.bold.yellow.bgBlack(`${method} (simulated):`)} ${util.inspect(arguments, { colors: true, depth: null })}`); - }]), - (_) => Object.fromEntries(_) - ]); - + return pipe([ + taskStream, + simpleSink((completedItem) => { + metrics.successfulItems.inc(1); + metrics.successfulItems.labels({ task: task }).inc(1); + logStatus(task, chalk.bold.green, "completed", completedItem.id); + }) + ]).read(); + } else { + throw new Error(`Task '${task}' is defined to run for tags [${tags}], but no such task is defined`); + } + }).catch((error) => { + console.dir(error, { depth: null, colors: true }); + throw error; + }); + } + + function executeTask(id, task) { + let taskConfiguration = configuration.tasks[task]; + + return knex.transaction((tx) => { return Promise.try(() => { return queries.getItem(knex, id); }).then((item) => { - return taskConfiguration.run({ - id: item.id, - data: item.data, - getItem: function (id) { - return queries.getItem(knex, id); - }, - ... simulatedMethods + let queue = createDatabaseQueue({ + tx, + item, + task, + taskVersion: defaultValue(taskConfiguration.version, "0"), + taskDependents: dependentMap[task], + taskDependencies: dependencyMap[task] }); - }); - } - - return { - run: function runKernel() { - return Promise.try(() => { - return insertSeeds(); - }).then(() => { - return checkLockedTasks(); - }).then(() => { - return runTaskStreams(); - }); - }, - simulate: function simulate({ itemID, task }) { - return Promise.try(() => { - return insertSeeds(); - }).then(() => { - return checkLockedTasks(); - }).then(() => { - return simulateTask(itemID, task); - }); - }, - execute: function simulate({ itemID, task }) { + return Promise.try(() => { - return insertSeeds(); - }).then(() => { - return checkLockedTasks(); + return taskConfiguration.run({ + id: item.id, + data: item.data, + getItem: function (id) { + return queries.getItem(knex, id); + }, + ... queue.api + }); }).then(() => { - return executeTask(itemID, task); - }); - }, - shutdown: function () { - // TODO: Properly lock all public methods after shutdown is called, and wait for any running tasks to have completed - knex.destroy(); - }, - getMetrics: function () { - return Promise.try(() => { - return prometheusRegistry.metrics(); - }).then((metrics) => { - return { - contentType: prometheusRegistry.contentType, - metrics: metrics - }; - }); - } - }; - }); + return queue.execute(); + }); + }); + }, { doNotRejectOnRollback: false }); + } + + function simulateTask(id, task) { + let taskConfiguration = configuration.tasks[task]; + + let methods = [ "createItem", "renameItem", "mergeItem", "deleteItem", "createAlias", "deleteAlias", "updateData", "updateMetadata", "expire", "expireDependents" ]; + + let simulatedMethods = syncpipe(methods, [ + (_) => _.map((method) => [ method, function() { + console.log(`${chalk.bold.yellow.bgBlack(`${method} (simulated):`)} ${util.inspect(arguments, { colors: true, depth: null })}`); + }]), + (_) => Object.fromEntries(_) + ]); + + return Promise.try(() => { + return queries.getItem(knex, id); + }).then((item) => { + return taskConfiguration.run({ + id: item.id, + data: item.data, + getItem: function (id) { + return queries.getItem(knex, id); + }, + ... simulatedMethods + }); + }); + } + + return { + run: function runKernel() { + return Promise.try(() => { + return insertSeeds(); + }).then(() => { + return checkLockedTasks(); + }).then(() => { + return runTaskStreams(); + }); + }, + simulate: function simulate({ itemID, task }) { + return Promise.try(() => { + return insertSeeds(); + }).then(() => { + return checkLockedTasks(); + }).then(() => { + return simulateTask(itemID, task); + }); + }, + execute: function simulate({ itemID, task }) { + return Promise.try(() => { + return insertSeeds(); + }).then(() => { + return checkLockedTasks(); + }).then(() => { + return executeTask(itemID, task); + }); + }, + shutdown: function () { + // TODO: Properly lock all public methods after shutdown is called, and wait for any running tasks to have completed + knex.destroy(); + }, + getMetrics: function () { + return Promise.try(() => { + return prometheusRegistry.metrics(); + }).then((metrics) => { + return { + contentType: prometheusRegistry.contentType, + metrics: metrics + }; + }); + } + }; }; diff --git a/src/mutation-api/database.js b/src/mutation-api/database.js deleted file mode 100644 index 440f5b5..0000000 --- a/src/mutation-api/database.js +++ /dev/null @@ -1,61 +0,0 @@ -"use strict"; - -const defaultValue = require("default-value"); -const chalk = require("chalk"); -const logStatus = require("../log-status"); - -module.exports = function (state) { - const queries = require("../queries")(state); - - return function createDatabaseMutationAPI({ tx, item, taskVersion, task }) { - return { - createItem: function (options) { - logStatus(task, chalk.gray, "new", options.id); - - return queries.createItem(tx, { - ...options, - parentID: item.id - }); - }, - renameItem: function (options) { - return queries.renameItem(tx, options); - }, - mergeItem: function (options) { - // FIXME: Move default - return queries.mergeItem(tx, { - ...options, - from: defaultValue(options.from, item.id) - }); - }, - deleteItem: function (options) { - return queries.deleteItem(tx, { - id: options.id - }); - }, - createAlias: function (options) { - // FIXME: Move default - return queries.createAlias(tx, { - ...options, - to: defaultValue(options.to, item.id) - }); - }, - deleteAlias: function (from) { - return queries.deleteAlias(tx, { - from: from - }); - }, - updateData: function (options) { - return queries.updateData(tx, options); - }, - updateMetadata: function (options) { - return queries.updateMetadata(tx, { - ... options, - taskVersion: taskVersion - }); - }, - expire: function (options) { - return queries.expire(tx, options); - } - }; - }; -}; diff --git a/src/mutation-api/simulation.js b/src/mutation-api/simulation.js deleted file mode 100644 index e66f754..0000000 --- a/src/mutation-api/simulation.js +++ /dev/null @@ -1,79 +0,0 @@ -"use strict"; - -const Promise = require("bluebird"); -const chalk = require("chalk"); -const util = require("util"); - -function logCall(methodName, args) { - console.log(`${chalk.bold.yellow.bgBlack(`${methodName} (simulated):`)} ${util.inspect(args, { colors: true, depth: null })}`); -} - -// TODO: Make this do an actual database query and then rollback; that way the behaviour is the same as when really modifying the DB, in that earlier operations can affect what later operations see (eg. a createItem followed by a mergeItem involving that new item). - -module.exports = function (state) { - const queries = require("../queries")(state); - - return function createSimulationMutationAPI({ tx, item, taskVersion }) { - return { - createItem: function (options) { - logCall("createItem", { - ... options, - update: (options.update != null) - ? options.update(item.data) - : undefined - }); - }, - renameItem: function (options) { - logCall("renameItem", { - ... options - }); - }, - mergeItem: function (options) { - // FIXME: Visualize merges - logCall("createItem", { - ... options - }); - }, - deleteItem: function (options) { - logCall("deleteItem", { - ... options - }); - }, - createAlias: function (options) { - logCall("createAlias", { - ... options - }); - }, - deleteAlias: function (from) { - logCall("deleteAlias", { - from: from - }); - }, - updateData: function (options) { - logCall("updateData", { - ... options, - update: (options.update != null) - ? options.update(item.data) - : undefined - }); - }, - updateMetadata: function (options) { - return Promise.try(() => { - return queries.getItem(tx, id); - }).then((item) => { - // FIXME: Visualize metadata update, actually merge with the correct thing - // MARKER: Expose taskResults as an object mapping instead of an array, somehow - logCall("updateMetadata", { - ... options, - update: (options.update != null) - ? options.update(item.data) - : undefined - }); - }); - }, - expire: function (options) { - return queries.expire(tx, options); - } - }; - }; -}; diff --git a/src/mutation-api/wrapper.js b/src/mutation-api/wrapper.js deleted file mode 100644 index 15a5280..0000000 --- a/src/mutation-api/wrapper.js +++ /dev/null @@ -1,111 +0,0 @@ -"use strict"; - -const Promise = require("bluebird"); - -const wrapValueAsOption = require("@validatem/wrap-value-as-option"); -const required = require("@validatem/required"); -const isString = require("@validatem/is-string"); -const defaultTo = require("@validatem/default-to"); -const validateOptions = require("@validatem/core/src/api/validate-options"); -const isFunction = require("@validatem/is-function"); -const arrayOf = require("@validatem/array-of"); -const defaultValue = require("default-value"); - -// FIXME: Remaining validators - -module.exports = function wrapMutationAPI({ item, task, taskDependents }, api) { - return { - createItem: function (options) { - // FIXME: Require tags to be set, even if to an empty array, to avoid accidentally forgetting the tags - return api.createItem(options); - }, - renameItem: function (_options) { - let options = validateOptions(arguments, [ - wrapValueAsOption("to"), { - to: [ required, isString ], - from: [ defaultTo(item.id), isString ] - } - ]); - - return api.renameItem(options); - }, - mergeItem: function (options) { - return api.mergeItem(options); - }, - deleteItem: function (_options) { - let options = validateOptions(arguments, [ - defaultTo({}), - wrapValueAsOption("id"), { - id: [ defaultTo(item.id), isString ] - } - ]); - - return api.deleteItem(options); - }, - createAlias: function (options) { - // TODO: Single-parameter variant? - return api.createAlias(options); - }, - deleteAlias: function (from) { - // FIXME: options wrapper - return api.deleteAlias(from); - }, - updateData: function (_options) { - let options = validateOptions(arguments, [ - wrapValueAsOption("update"), { - id: [ defaultTo(item.id), isString ], - update: [ required, isFunction ] - } - ]); - - return api.updateData(options); - }, - updateMetadata: function (_options) { - let options = validateOptions(arguments, [ - wrapValueAsOption("update"), { - id: [ defaultTo(item.id), isString ], - taskName: [ defaultTo(task), isString ], - update: [ required, isFunction ] - } - ]); - - return api.updateMetadata(options); - }, - expire: function (_options) { - let options = validateOptions(arguments, [ - defaultTo({}), { - id: [ defaultTo(item.id), isString ], - taskName: [ defaultTo(task), isString ] - } - ]); - - return api.expire(options); - }, - expireDependents: function (_options) { - let options = validateOptions(arguments, [ - defaultTo({}), - wrapValueAsOption("dependents"), { - id: [ defaultTo(item.id), isString ], - dependents: [ arrayOf(isString) ] - } - ]); - - let selectedDependents = (options.dependents != null) - ? new Set(options.dependents) - : null; - - let allDependents = defaultValue(taskDependents, []); - - let affectedDependents = (selectedDependents != null) - ? allDependents.filter((dependent) => selectedDependents.has(dependent.task)) - : allDependents; - - return Promise.map(affectedDependents, (dependent) => { - return this.expire({ - id: options.id, - taskName: dependent.task - }); - }); - } - }; -}; diff --git a/src/initialize.js b/src/prometheus/index.js similarity index 70% rename from src/initialize.js rename to src/prometheus/index.js index 7b7c96b..c9420f6 100644 --- a/src/initialize.js +++ b/src/prometheus/index.js @@ -1,26 +1,12 @@ "use strict"; -const Promise = require("bluebird"); -const path = require("path"); -const knex = require("knex"); -const { knexSnakeCaseMappers } = require("objection"); const prometheusClient = require("prom-client"); -const models = require("./models"); - -let migrationsFolder = path.join(__dirname, "../migrations"); - -module.exports = function initialize({ knexfile }) { +module.exports = function createPrometheus() { let prometheusRegistry = new prometheusClient.Registry(); prometheusClient.collectDefaultMetrics({ register: prometheusRegistry }); - let knexInstance = knex({ - ... knexfile, - ... knexSnakeCaseMappers() - }); - - let state = { - knex: knexInstance, + return { prometheusRegistry: prometheusRegistry, metrics: { storedItems: new prometheusClient.Counter({ @@ -53,18 +39,7 @@ module.exports = function initialize({ knexfile }) { help: "Amount of new scraping tasks fetched during the most recent attempt", labelNames: [ "task" ] }) + // FIXME: Measure queue-refill task } }; - - return Promise.try(() => { - return knexInstance.migrate.latest({ - directory: migrationsFolder - }); - }).then(() => { - return { - ... state, - db: models(state) - }; - }); }; - diff --git a/src/queries.js b/src/queries.js deleted file mode 100644 index d1bbc1e..0000000 --- a/src/queries.js +++ /dev/null @@ -1,487 +0,0 @@ -"use strict"; - -const Promise = require("bluebird"); -// const { UniqueViolationError } = require("objection"); -const dateFns = require("date-fns"); - -const { validateArguments } = require("@validatem/core"); -const required = require("@validatem/required"); -const requireEither = require("@validatem/require-either"); -const isString = require("@validatem/is-string"); -const isBoolean = require("@validatem/is-boolean"); -const isFunction = require("@validatem/is-function"); -const isNumber = require("@validatem/is-number"); -const isDate = require("@validatem/is-date"); -const arrayOf = require("@validatem/array-of"); -const defaultTo = require("@validatem/default-to"); -const anyProperty = require("@validatem/any-property"); -const anything = require("@validatem/anything"); -const ValidationError = require("@validatem/error"); - -const pipe = require("@promistream/pipe"); -const combineSequentialStreaming = require("@promistream/combine-sequential-streaming"); -const fromIterable = require("@promistream/from-iterable"); -const fromNodeStream = require("@promistream/from-node-stream"); -const createTypeTaggingStream = require("./streams/tag-type"); - -const { addSeconds } = require("date-fns"); -const syncpipe = require("syncpipe"); -const defaultValue = require("default-value"); - -function isTX(value) { - if (value.where == null || value.raw == null) { - throw new ValidationError(`Must be a valid Knex or Knex transaction instance`); - } -} - -function noop() {} - -function taskResultsToObject(taskResults) { - return syncpipe(taskResults, [ - (_) => _.map((result) => [ result.taskName, result.metadata ]), - (_) => Object.fromEntries(_) - ]); -} - -module.exports = function ({ db, knex, metrics }) { - return { - // FIXME: Make object API instead - getItem: function (_tx, _id, _optional) { - let [ tx, id, optional ] = validateArguments(arguments, { - tx: [ required, isTX ], - id: [ required, isString ], - optional: [ defaultTo(false), isBoolean ] - }); - - return Promise.try(() => { - return db.Alias.relatedQuery("item", tx) - .for(id) - .withGraphFetched("taskResults"); - }).then((results) => { - if (optional === true || results.length > 0) { - return results[0]; - } else { - throw new Error(`No item exists with ID '${id}'`); - } - }); - }, - createItem: function (_tx, _options) { - // NOTE: Using `update` instead of `data` makes it an upsert! - // FIXME: Make failIfExists actually work, currently it does nothing as the UNIQUE constraint violation cannot occur for an upsert - let [ tx, { id, tags, aliases, data, update, failIfExists, allowUpsert, parentID }] = validateArguments(arguments, { - tx: [ required, isTX ], - options: [{ - id: [ required, isString ], - tags: [ defaultTo([]), arrayOf(isString) ], - aliases: [ defaultTo([]), arrayOf(isString) ], - data: [ anything ], // FIXME: Check for object - update: [ isFunction ], - failIfExists: [ defaultTo(false), isBoolean ], - allowUpsert: [ defaultTo(true), isBoolean ], - parentID: [ isString ] - }, requireEither([ "data", "update" ]) ] - }); - - // FIXME: Ensure that we run the transaction in full isolation mode, and retry in case of a conflict - - return Promise.try(() => { - // NOTE: We look up by alias, since this is an upsert - and so if the specified ID already exists as an alias, we should update the existing item instead of creating a new one with the specified (aliased) ID - return db.Alias - .relatedQuery("item", tx) - .for(id); - }).then((existingItems) => { - let existingItem = existingItems[0]; - - let actualID = (existingItem != null) - ? existingItem.id - : id; - - let existingData = (existingItem != null) - ? existingItem.data - : {}; - - let newData = (update != null) - ? update(existingData) - : { ... existingData, ... data }; - - // Make sure to add a self:self alias - let allAliases = aliases.concat([ actualID ]); - - let newItem = { - id: actualID, - data: newData, - createdBy: parentID, - tags: tags.map((tag) => ({ name: tag })), - aliases: allAliases.map((alias) => ({ alias: alias })), - updatedAt: new Date() - }; - - return Promise.try(() => { - if (allowUpsert) { - // NOTE: We *always* do upserts here, even if the user specified `data` rather than `update`, because tags and aliases should always be added even if the item itself already exists. We trust the user not to accidentally reuse IDs between different kinds of objects (which would break in various other ways anyway). - return db.Item.query(tx).upsertGraph(newItem, { - insertMissing: true, - noDelete: true - }); - } else { - return db.Item.query(tx).insertGraph(newItem, { - insertMissing: true - }); - } - }).tap(() => { - // FIXME: We should probably move the metrics stuff to the wrapper instead, so that it works for *any* backend - metrics.storedItems.inc(1); - - // TODO: This currently produces somewhat misleading metrics; it only counts *explicitly specified* tags. That will *mostly* correlate to amount of genuinely-new items, but not perfectly. In the future, we should probably refactor the insertion code such that it is aware of the *current* tags of an item that it is about to merge into - but maybe that should be delayed until the zapdb migration. - if (newItem.tags != null) { - for (let tag of newItem.tags) { - metrics.storedItems.labels({ tag: tag.name }).inc(1); - } - } - }); - - }).catch({ name: "UniqueViolationError", table: "srap_items" }, (error) => { - if (failIfExists) { - throw error; - } else { - // Do nothing, just ignore the failure - } - }); - }, - renameItem: function (_tx, _options) { - // options == to || { from, to } - let [ tx, { to, from }] = validateArguments(arguments, { - tx: [ required, isTX ], - options: [{ - to: [ required, isString ], - from: [ required, isString ] - }] - }); - - return Promise.all([ - db.Item.query(tx).findById(from).patch({ id: to }), - this.createAlias(tx, { from: to, to: to }) - ]); - }, - repointAliases: function (_tx, _options) { - // { from, to } - let [ tx, { to, from }] = validateArguments(arguments, { - tx: [ required, isTX ], - options: [{ - to: [ required, isString ], - from: [ required, isString ] - }] - }); - - return db.Alias.query(tx) - .patch({ itemId: to, updatedAt: new Date() }) - .where({ itemId: from }); - }, - mergeItem: function (_tx, _options) { - // options = { from, into, merge, mergeMetadata{} } - let [ tx, { from, into, merge, mergeMetadata }] = validateArguments(arguments, { - tx: [ required, isTX ], - options: [{ - from: [ required, isString ], - into: [ required, isString ], - merge: [ required, isFunction ], - mergeMetadata: [ defaultTo({}), anyProperty({ - key: [ required ], - value: [ required, isFunction ] - })], - }] - }); - - return Promise.all([ - this.getItem(tx, from, true), - this.getItem(tx, into, true), - ]).then(([ fromObj, intoObj ]) => { - if (fromObj != null) { - let defaultedIntoObj = defaultValue(intoObj, { - id: into, - data: {}, - taskResults: [] - }); - - let newData = merge(defaultedIntoObj.data, fromObj.data); - - let fromTaskResults = taskResultsToObject(fromObj.taskResults); - let intoTaskResults = taskResultsToObject(defaultedIntoObj.taskResults); - - // FIXME: Deduplicate function - let allTaskKeys = Array.from(new Set([ - ... Object.keys(fromTaskResults), - ... Object.keys(intoTaskResults) - ])); - - function selectNewestResult(taskA, taskB) { - if (taskA == null) { - return taskB; - } else if (taskB == null) { - return taskA; - } else if (taskA.updatedAt > taskB.updatedAt) { - return taskA; - } else { - return taskB; - } - } - - // TODO: Use merge-by-template here instead? - - let newTaskResults = allTaskKeys.map((key) => { - let merger = mergeMetadata[key]; - let fromTask = fromTaskResults[key]; - let intoTask = intoTaskResults[key]; - - if (merger != null) { - // Generate a new TaskResult that includes data combined from both - let newMetadata = merger( - defaultValue(intoTask.metadata, {}), - defaultValue(fromTask.metadata, {}) - ); - - return { - ... intoTask, - metadata: newMetadata, - updatedAt: Date.now() - }; - } else { - // Take the newest known TaskResult and just make sure that it is pointing at the correct ID - return { - ... selectNewestResult(intoTask, fromTask), - itemId: defaultedIntoObj.id - }; - } - }); - - let upsertOptions = { - insertMissing: true, - noDelete: true - }; - - return Promise.try(() => { - // NOTE: Cannot use into.$query here because that adds an implicit query builder operation, which upsertGraph does not allow - return db.Item.query(tx).upsertGraph({ - id: defaultedIntoObj.id, - data: newData, - taskResults: newTaskResults - }, upsertOptions); - }).then(() => { - // NOTE: Repointing aliases has the side-effect of leaving a redirect from the source to the destination item, as each item has a self:self alias - return this.repointAliases(tx, { from: fromObj.id, to: intoObj.id }); - }).then(() => { - // NOTE: We don't use this.deleteItem, to sidestep any alias lookups - return db.Item.query(tx).findById(fromObj.id).delete(); - }); - } - }); - }, - deleteItem: function (_tx, _options) { - // options = none || { id } - let [ tx, { id }] = validateArguments(arguments, { - tx: [ required, isTX ], - options: [{ - id: [ required, isString ] - }] - }); - - return db.Alias.relatedQuery("item", tx) - .for(id) - .delete(); - // return db.Item.query(tx).findById(id).delete(); - }, - createAlias: function (_tx, _options) { - // options = { from, to, failIfExists } - let [ tx, { from, to, failIfExists }] = validateArguments(arguments, { - tx: [ required, isTX ], - options: [{ - from: [ required, isString ], - to: [ required, isString ], - failIfExists: [ defaultTo(false), isBoolean ] // TODO: Shouldn't this default to true, for any occurrence outside of a merge/rename? - }] - }); - - // Isolate this operation into a savepoint so that it can fail without breaking the entire transaction - let promise = tx.transaction((tx) => { - return db.Alias.query(tx).insert({ - alias: from, - itemId: to, - updatedAt: new Date() - }); - }); - - if (failIfExists) { - return promise; - } else { - return Promise.resolve(promise) - .catch({ name: "UniqueViolationError" }, noop); - } - }, - deleteAlias: function (_tx, _options) { - let [ tx, { from }] = validateArguments(arguments, { - tx: [ required, isTX ], - options: [{ - from: [ required, isString ] - }] - }); - - // TODO: This cannot yet be propagated to the update feed, because we don't keep a record of deletions - return db.Alias.query(tx).findById(from).delete(); - }, - updateData: function (_tx, _options) { - // options = update || { id, update } - let [ tx, { id, update }] = validateArguments(arguments, { - tx: [ required, isTX ], - options: [{ - id: [ required, isString ], - update: [ required, isFunction ] - }] - }); - - // TODO: Figure out the proper delineation between 'creating' and 'updating' an item - return this.createItem(tx, { id, update }); - }, - updateMetadata: function (_tx, _options) { - // options = update || { id, update, taskName } - let [ tx, { id, update, taskName, taskVersion, ttl }] = validateArguments(arguments, { - tx: [ required, isTX ], - options: [{ - id: [ required, isString ], - update: [ required, isFunction ], - taskName: [ required, isString ], - taskVersion: [ required, isString ], - ttl: [ isNumber ] - }] - }); - // TODO: failIfExists - - // FIXME: metadata_updated_at - return Promise.try(() => { - return db.Alias.query(tx).findById(id); - }).then((alias) => { - let sharedFields = { - isSuccessful: true, - isInvalidated: false, - taskVersion: taskVersion, - updatedAt: new Date(), - expiresAt: (ttl != null) - ? addSeconds(new Date(), ttl) - : undefined - }; - - if (alias != null) { - return Promise.try(() => { - return db.TaskResult.query(tx).findById([ taskName, alias.itemId ]); - }).then((taskResult) => { - if (taskResult != null) { - return taskResult.$query(tx).patch({ - ... sharedFields, - metadata: update(taskResult.metadata), - }); - } else { - return db.TaskResult.query(tx).insert({ - ... sharedFields, - task: taskName, - itemId: id, - metadata: update({}) - }); - } - }); - } - }); - }, - expire: function (_tx, _options) { - // options = none || { id, taskName } - let [ tx, { id, taskName }] = validateArguments(arguments, { - tx: [ required, isTX ], - options: [{ - id: [ required, isString ], - taskName: [ required, isString ] - }] - }); - - return Promise.try(() => { - return db.Alias.query(tx).findById(id); - }).then((alias) => { - return db.TaskResult.query(tx) - .where({ task: taskName, itemId: alias.itemId }) - .patch({ isInvalidated: true }); - }); - }, - setTTL: function (options) { - // options = ttl || { id, taskName, ttl } - // FIXME - }, - allowFailure: function (allowed) { - - }, - log: function (category, message) { - - }, - countLockedTasks: function (tx) { - return Promise.try(() => { - return db.TaskInProgress.query(tx).count({ count: "*" }); - }).then((result) => { - return result[0].count; - }); - }, - getUpdates: function (_tx, _options) { - // NOTE: This returns snake_cased keys! As we're bypassing the Objection internals, no casemapping occurs. - let [ tx, { timestamp, prefix }] = validateArguments(arguments, { - tx: [ required, isTX ], - options: [ defaultTo({}), { - timestamp: [ isDate ], - prefix: [ isString ] - }] - }); - - // NOTE: This is a hacky workaround - if we don't do this, then for some reason also entries *at* the exact timestamp are included, which is not what we want. - // FIXME: Verify that this doesn't break anything, eg. when an entry is created inbetween the original timestamp and +1ms. - let actualTimestamp = (timestamp != null) - ? dateFns.addMilliseconds(timestamp, 1) - : undefined; - - function applyWhereClauses(query, idField) { - if (timestamp != null) { - // FIXME: An error in the query here throws an error, resulting in an abort handling bug in a promistream - query = query.whereRaw(`updated_at > ?`, [ actualTimestamp ]); - } - - if (prefix != null) { - query = query.whereRaw(`${idField} LIKE ?`, [ `${prefix.replace(/%/g, "\\%")}%` ]); - } - - return query; - } - - function* streamGenerator() { - yield pipe([ - fromNodeStream.fromReadable( - applyWhereClauses(db.Item.query(tx), "id").toKnexQuery().stream() - ), - createTypeTaggingStream("item") - ]); - - yield pipe([ - fromNodeStream.fromReadable( - // NOTE: We are only interested in aliases which don't point at themselves - applyWhereClauses(db.Alias.query(tx).where("alias", "!=", knex.ref("item_id")), "alias").toKnexQuery().stream() - ), - createTypeTaggingStream("alias") - ]); - - yield pipe([ - fromNodeStream.fromReadable( - applyWhereClauses(db.TaskResult.query(tx), "item_id").toKnexQuery().stream() - ), - createTypeTaggingStream("taskResult") - ]); - } - - return pipe([ - fromIterable(streamGenerator()), - combineSequentialStreaming() - ]); - } - }; -}; diff --git a/src/semantics/merge-items.js b/src/semantics/merge-items.js new file mode 100644 index 0000000..bb1d966 --- /dev/null +++ b/src/semantics/merge-items.js @@ -0,0 +1,61 @@ +"use strict"; + +const defaultValue = require("default-value"); +const syncpipe = require("syncpipe"); + +const arrayUnique = require("../util/array-unique"); +const pickTaskResult = require("./pick-task-result"); + +function taskResultsToObject(taskResults) { + return syncpipe(taskResults, [ + (_) => _.map((result) => [ result.taskName, result.metadata ]), + (_) => Object.fromEntries(_) + ]); +} + +module.exports = function mergeItems({ fromObj, intoObj, merge, mergeMetadata }) { + let mergedData = merge(intoObj.data, fromObj.data); + + let fromTaskResults = taskResultsToObject(fromObj.taskResults); + let intoTaskResults = taskResultsToObject(intoObj.taskResults); + + let allTaskKeys = arrayUnique([ + ... Object.keys(fromTaskResults), + ... Object.keys(intoTaskResults) + ]); + + // TODO: Use merge-by-template here instead? + + let mergedTaskResults = allTaskKeys.map((key) => { + let merger = mergeMetadata[key]; + let fromTask = fromTaskResults[key]; + let intoTask = intoTaskResults[key]; + + if (merger != null) { + // Generate a new TaskResult that includes data combined from both + let newMetadata = merger( + defaultValue(intoTask.metadata, {}), + defaultValue(fromTask.metadata, {}) + ); + + return { + ... intoTask, + metadata: newMetadata, + updatedAt: Date.now() + }; + } else { + // Take the newest known TaskResult and just make sure that it is pointing at the correct ID + // TODO: Does this really belong in the semantics module? Or is this database-specific? + return { + ... pickTaskResult(intoTask, fromTask), + itemId: intoObj.id + }; + } + }); + + return { + id: intoObj.id, + data: mergedData, + taskResults: mergedTaskResults + }; +}; diff --git a/src/semantics/pick-task-result.js b/src/semantics/pick-task-result.js new file mode 100644 index 0000000..77ae1e3 --- /dev/null +++ b/src/semantics/pick-task-result.js @@ -0,0 +1,13 @@ +"use strict"; + +module.exports = function pickTaskResult(taskA, taskB) { + if (taskA == null) { + return taskB; + } else if (taskB == null) { + return taskA; + } else if (taskA.updatedAt > taskB.updatedAt) { + return taskA; + } else { + return taskB; + } +}; diff --git a/src/streams/process-task-safely.js b/src/streams/process-task-safely.js index e877451..c96aa18 100644 --- a/src/streams/process-task-safely.js +++ b/src/streams/process-task-safely.js @@ -7,33 +7,28 @@ const pipe = require("@promistream/pipe"); const map = require("@promistream/map"); const mapFilter = require("@promistream/map-filter"); -module.exports = function ({ db, knex }) { +module.exports = function ({ backend }) { return function processTaskSafely(task, processHandler) { let lockStream = mapFilter((item) => { return Promise.try(() => { - return db.TaskInProgress.query(knex).insert({ - task: task, - item_id: item.id - }); - }).then(() => { - return item; - }).catch({ name: "UniqueViolationError" }, () => { - return mapFilter.NoValue; + return backend.lock(null, { id: item.id, task: task }); + }).then((success) => { + if (success) { + return item; + } else { + return mapFilter.NoValue; + } }); }); let processUnlockStream = map((item) => { return Promise.try(() => { - return knex.transaction((tx) => { + return backend.runInTransaction((tx) => { return processHandler(item, tx); - }, { doNotRejectOnRollback: false }); + }); }).finally(() => { - return db.TaskInProgress.query(knex) - .delete() - .where({ - task: task, - item_id: item.id - }); + // NOTE: The unlock deliberately happens outside of a transaction, so that it can always succeed, even if a task and its associated database changes failed + return backend.unlock(null, { id: item.id, task: task }); }).then(() => { return item; }); diff --git a/src/task-kernel.js b/src/task-kernel.js new file mode 100644 index 0000000..82fbdd8 --- /dev/null +++ b/src/task-kernel.js @@ -0,0 +1,53 @@ +"use strict"; + +const chalk = require("chalk"); +const pipe = require("@promistream/pipe"); +const filter = require("@promistream/filter"); +const map = require("@promistream/map"); + +const logStatus = require("./log-status"); + +module.exports = function ({ backend }) { + function runTask(task, item) { + let queue = []; + let api = backend.forItem({ task: task, id: item.id, mutationQueue: queue }); + + return Promise.try(() => { + logStatus(task, chalk.bold.cyan, "started", item.id); + + // NOTE: We only pass in the item data itself, *not* any associated metadata like tags. If the scraping task wants access to that sort of information, it should do a `getItem` call from within its task logic where needed. + // FIXME: Is that actually still true post-refactor? + task.run({ + data: item.data, + ... api.exposed + }); + }).then(() => { + return backend.topLevel.runInTransaction((tx) => { + + // FIXME: use queue + }); + }).then(async () => { + await api.internal.markTaskCompleted(); + logStatus(task, chalk.bold.green, "completed", item.id); + return { status: "completed", item: item }; + }).catch(async (error) => { + await api.internal.markTaskFailed(null, { error }); + logStatus(task, chalk.bold.red, "failed", `${item.id}: ${error.stack}`); + return { status: "failed", item: item, error: error }; + }); + } + + return function createTaskKernelStream(task) { + return pipe([ + backend.topLevel.getTaskStream(null, { task: task }), + filter((item) => backend.forItem({ task: task, id: item.id }).internal.lock()), + map((item) => { + return Promise.try(() => { + return runTask(task, item); + }).tap(() => { + return backend.forItem({ task: task, id: item.id }).internal.unlock(); + }); + }) + ]); + }; +}; diff --git a/src/task-stream.js b/src/task-stream.js deleted file mode 100644 index 5a8e4b9..0000000 --- a/src/task-stream.js +++ /dev/null @@ -1,201 +0,0 @@ -"use strict"; - -const Promise = require("bluebird"); -const ms = require("ms"); -const dateFns = require("date-fns"); -const debug = require("debug")("scrapingserver"); -const chalk = require("chalk"); - -const simpleSource = require("@promistream/simple-source"); -const buffer = require("@promistream/buffer"); -const pipe = require("@promistream/pipe"); -const rateLimit = require("@promistream/rate-limit"); -const parallelize = require("@promistream/parallelize"); - -const logStatus = require("./log-status"); -// const { UniqueViolationError } = require("objection"); - -// FIXME: Revert inlining of task_states once switched to PostgreSQL 12+, which can do this automatically using NOT MATERIALIZED -// FIXME: Check whether the dependency task_versions are actually being correctly passed in, and aren't accidentally nulls -let query = ` - WITH - dependency_tasks AS ( - SELECT * FROM - json_to_recordset(:dependencyTaskDefinitions) AS x(task text, task_version text) - ), - matching_items AS ( - SELECT - DISTINCT ON (srap_items.id) - srap_items.*, - results.updated_at AS result_date, - results.task_version, - ( - results.is_successful = TRUE - AND ( - results.expires_at < NOW() - OR results.is_invalidated = TRUE - ) - ) AS is_candidate - FROM srap_items - INNER JOIN srap_tags - ON srap_tags.item_id = srap_items.id - AND srap_tags.name = ANY(:tags) - LEFT JOIN srap_task_results AS results - ON results.item_id = srap_items.id - AND results.task = :task - WHERE - NOT EXISTS ( - SELECT FROM srap_tasks_in_progress AS pr WHERE pr.item_id = srap_items.id - ) - ), - candidates AS ( - SELECT * FROM matching_items - WHERE result_date IS NULL - UNION - SELECT * FROM matching_items - WHERE is_candidate = TRUE - OR NOT (task_version = :taskVersion) - ) - ( - SELECT - * - FROM - candidates - WHERE - NOT EXISTS ( - SELECT - results.* - FROM dependency_tasks - LEFT JOIN srap_task_results AS results - ON dependency_tasks.task = results.task - AND dependency_tasks.task_version = results.task_version - AND results.item_id = candidates.id - WHERE - results.is_successful IS NULL - OR results.is_successful = FALSE - OR ( - results.is_successful = TRUE - AND ( - results.expires_at < NOW() - OR results.is_invalidated = TRUE - ) - ) - ) - ) LIMIT :resultLimit; -`; - -module.exports = function (state) { - const processTaskSafely = require("./streams/process-task-safely")(state); - const queries = require("./queries")(state); - const createDatabaseQueue = require("./queued-database-api")(state); - - let { knex, db, metrics } = state; - - // FIXME: Transaction support! - - return function createTaskStream({ task, taskVersion, taskDependencies, taskDependents, taskInterval, tags, run, ttl, globalRateLimiter, globalParallelize, parallelTasks }) { - // TODO: Make nicer - let ttlInSeconds = (ttl != null) - ? (typeof ttl === "number") - ? ttl / 1000 - : ms(ttl) / 1000 - : undefined; - - return pipe([ - simpleSource(() => { - let startTime = Date.now(); - - return Promise.try(() => { - // console.log("Fetching new batch"); - return knex.raw(query, { - tags: tags, - task: task, - taskVersion: taskVersion, - resultLimit: 1000, // TODO: Make configurable - dependencyTaskDefinitions: JSON.stringify(taskDependencies.map((dependency) => { - // Case-mapping for SQL compatibility - return { task_version: dependency.taskVersion, task: dependency.task }; - })) - }); - }).then((result) => { - let timeElapsed = Date.now() - startTime; - - metrics.taskFetchTime.labels({ task: task }).set(timeElapsed / 1000); - metrics.taskFetchResults.labels({ task: task }).set(result.rowCount); - - debug(`Task retrieval query for '${task}' took ${timeElapsed}ms and produced ${result.rowCount} results`); - - if (result.rowCount > 0) { - // console.log("rows:", result.rows); - return result.rows; - } else { - // FIXME: Make this delay configurable, or maybe even use LISTEN/NOTIFY - return Promise.resolve([]).delay(30000); - } - }); - }), - buffer(), - globalRateLimiter, - (taskInterval != null) - ? rateLimit(taskInterval) - : null, - processTaskSafely(task, (item, tx) => { - logStatus(task, chalk.bold.cyan, "started", item.id); - - let queue = createDatabaseQueue({ tx, item, task, taskVersion, taskDependents, taskDependencies }); - - return Promise.try(() => { - // TODO: Proper Validatem schemas for each API method - return run({ - id: item.id, - data: item.data, - getItem: function (id) { - return queries.getItem(tx, id); - }, - ... queue.api - }); - }).then(() => { - return queue.execute(); - }).then(() => { - // Update succeeded - return db.TaskResult.query(tx).findById([ task, item.id ]).patch({ - is_successful: true, - updated_at: new Date(), - expires_at: (ttlInSeconds != null) - ? dateFns.add(new Date(), { seconds: ttlInSeconds }) - : null - }); - }).catch((error) => { - metrics.failedItems.inc(1); - metrics.failedItems.labels({ task: task }).inc(1); - logStatus(task, chalk.bold.red, "failed", `${item.id}: ${error.stack}`); - - let commonUpdate = { - is_successful: false, - task_version: taskVersion - }; - - return Promise.try(() => { - // Task failed -- note, cannot use tx here because it has failed - return db.TaskResult.query(knex).insert({ - item_id: item.id, - task: task, - metadata: {}, - ... commonUpdate - }); - }).catch({ name: "UniqueViolationError" }, () => { - return db.TaskResult.query(knex).findById([ task, item.id ]).patch({ - ... commonUpdate - }); - }).then(() => { - // throw error; - }); - }); - }), - // TODO: Sort out a cleaner way to organize local vs. global parallelization - (parallelTasks != null) - ? parallelize(parallelTasks) - : globalParallelize - ]); - }; -}; diff --git a/src/util/array-unique.js b/src/util/array-unique.js new file mode 100644 index 0000000..1bfa0ef --- /dev/null +++ b/src/util/array-unique.js @@ -0,0 +1,5 @@ +"use strict"; + +module.exports = function arrayUnique(array) { + return Array.from(new Set(array)); +}; diff --git a/src/util/invert-mapping.js b/src/util/invert-mapping.js new file mode 100644 index 0000000..d239f08 --- /dev/null +++ b/src/util/invert-mapping.js @@ -0,0 +1,20 @@ +"use strict"; + +// Inverts an object of arrays, eg. {a: [x, y], b: [x, z]} becomes {x: [a, b], y: [a], z: [b]} +// TODO: See if this can be replaced with an off-the-shelf module with equivalent semantics, something transpose-y maybe? + +module.exports = function invertMapping(mapping) { + let newObject = {}; + + for (let [ key, values ] of Object.entries(mapping)) { + for (let value of values) { + if (newObject[value] == null) { + newObject[value] = []; + } + + newObject[value].push(key); + } + } + + return newObject; +}; diff --git a/src/validators/is-ms.js b/src/validators/is-ms.js new file mode 100644 index 0000000..6ad6ac9 --- /dev/null +++ b/src/validators/is-ms.js @@ -0,0 +1,16 @@ +"use strict"; + +// FIXME: Move to own @validatem/is-ms module + +const ms = require("ms"); + +const either = require("@validatem/either"); +const isString = require("@validatem/is-string"); +const required = require("@validatem/required"); + +const isPositiveInteger = require("./is-positive-integer"); + +module.exports = either([ + isPositiveInteger, + [ isString, (value) => ms(value), required, isPositiveInteger ] +]); diff --git a/src/validators/is-positive-integer.js b/src/validators/is-positive-integer.js new file mode 100644 index 0000000..c7dac37 --- /dev/null +++ b/src/validators/is-positive-integer.js @@ -0,0 +1,6 @@ +"use strict"; + +const isInteger = require("@validatem/is-integer"); +const isPositive = require("@validatem/is-positive"); + +module.exports = [ isInteger, isPositive ]; diff --git a/src/validators/is-task-object.js b/src/validators/is-task-object.js new file mode 100644 index 0000000..96bfc51 --- /dev/null +++ b/src/validators/is-task-object.js @@ -0,0 +1,30 @@ +"use strict"; + +const required = require("@validatem/required"); +const arrayOf = require("@validatem/array-of"); +const isString = require("@validatem/is-string"); +const isFunction = require("@validatem/is-function"); +const isArray = require("@validatem/is-array"); + +const isPositiveInteger = require("./is-positive-integer"); + +function makeRules(recurse) { + // We want to recurse exactly one level + let isTaskArray = (recurse === true) + ? arrayOf(makeRules(false)) + : isArray; + + return { + name: [ required, isString ], + version: [ required, isString ], + ttl: [ isPositiveInteger ], + parallelTasks: [ isPositiveInteger ], + taskInterval: [ isPositiveInteger ], + dependents: [ required, isTaskArray ], + dependencies: [ required, isTaskArray ], + tags: [ required, arrayOf(isString) ], + run: [ required, isFunction ] + }; +} + +module.exports = makeRules(true); diff --git a/src/validators/is-valid-configuration.js b/src/validators/is-valid-configuration.js new file mode 100644 index 0000000..49cb833 --- /dev/null +++ b/src/validators/is-valid-configuration.js @@ -0,0 +1,46 @@ +"use strict"; + +const required = require("@validatem/required"); +const defaultTo = require("@validatem/default-to"); +const either = require("@validatem/either"); +const anything = require("@validatem/anything"); +const anyProperty = require("@validatem/any-property"); +const arrayOf = require("@validatem/array-of"); +const isFunction = require("@validatem/is-function"); +const isString = require("@validatem/is-string"); +const isInteger = require("@validatem/is-integer"); +const isPositive = require("@validatem/is-positive"); +const isValue = require("@validatem/is-value"); +const isBoolean = require("@validatem/is-boolean"); + +const isMS = require("./is-ms"); + +module.exports = { + backend: [ required, isValue("postgresql") ], // No other backends exist yet + simulate: [ isBoolean, defaultTo(false) ], + taskInterval: [ isMS ], // global rate limit + database: anything, // FIXME: Validate various database options + seed: arrayOf({ + id: [ required, isString ], + tags: [ required, arrayOf(isString) ], + data: [ required, anything ] // FIXME: Validate object + }), + tags: anyProperty({ + key: [ required, isString ], + value: [ required, arrayOf(isString) ] + }), + tasks: anyProperty({ + key: [ required, isString ], + value: [ required, { + ttl: [ isMS ], + taskInterval: [ isMS ], + parallelTasks: [ defaultTo, either([ + [ isValue(Infinity) ], + [ isInteger, isPositive ] + ])], + version: [ defaultTo("0"), isString ], + dependsOn: [ defaultTo([]), arrayOf(isString) ], + run: [ required, isFunction ] + }] + }) +}; diff --git a/yarn.lock b/yarn.lock index bc2b7c0..1a0fe13 100644 --- a/yarn.lock +++ b/yarn.lock @@ -112,6 +112,18 @@ default-value "^1.0.0" error-chain "^0.1.0" +"@promistream/filter@^0.1.1": + version "0.1.1" + resolved "https://registry.yarnpkg.com/@promistream/filter/-/filter-0.1.1.tgz#126f2d581c801d0e8e30e1392700384f00da94be" + integrity sha512-rE4mTC8I7FiwH5HZTcqzfwHTRXAAShBuNoLMhhWcKPvqPIjAbEoqGg8klEs4fNDl8LXrd0Txm7PeBQ8cgZYwmg== + dependencies: + "@promistream/propagate-abort" "^0.1.3" + "@promistream/propagate-peek" "^0.1.0" + "@validatem/core" "^0.3.12" + "@validatem/is-function" "^0.1.0" + "@validatem/required" "^0.1.1" + bluebird "^3.7.2" + "@promistream/from-iterable@^0.1.0": version "0.1.0" resolved "https://registry.yarnpkg.com/@promistream/from-iterable/-/from-iterable-0.1.0.tgz#f0f28e5f53449f4dc0cb90fc3d6753b9181b8a8b" @@ -421,7 +433,7 @@ default-value "^1.0.0" flatten "^1.0.3" -"@validatem/is-array@^0.1.0": +"@validatem/is-array@^0.1.0", "@validatem/is-array@^0.1.1": version "0.1.1" resolved "https://registry.yarnpkg.com/@validatem/is-array/-/is-array-0.1.1.tgz#fbe15ca8c97c30b622a5bbeb536d341e99cfc2c5" integrity sha512-XD3C+Nqfpnbb4oO//Ufodzvui7SsCIW/stxZ39dP/fyRsBHrdERinkFATH5HepegtDlWMQswm5m1XFRbQiP2oQ== @@ -452,7 +464,15 @@ "@validatem/error" "^1.0.0" is-callable "^1.1.5" -"@validatem/is-number@^0.1.3": +"@validatem/is-integer@^0.1.0": + version "0.1.0" + resolved "https://registry.yarnpkg.com/@validatem/is-integer/-/is-integer-0.1.0.tgz#52c544063aaeabb630854e1822298f5c043196a0" + integrity sha512-sSp66uxfirIFMqro64DAdfM+UKo+IICmHdy/x3ZJXUM9F4byz/GyFmhR4wfcQswywwF1fqKw9458GE38fozjOQ== + dependencies: + "@validatem/error" "^1.0.0" + "@validatem/is-number" "^0.1.2" + +"@validatem/is-number@^0.1.2", "@validatem/is-number@^0.1.3": version "0.1.3" resolved "https://registry.yarnpkg.com/@validatem/is-number/-/is-number-0.1.3.tgz#0f8ce8c72970dbedbbd04d12942e5ab48a44cda6" integrity sha512-GjnbKYfYa0cTCJmsr5OUbylxTKHHZ6FDtJixWl+lEuXzeELDoYRp2UAjzfjTXJ9g2BumESqI/t0hap5rw5tEyQ== @@ -468,6 +488,15 @@ "@validatem/error" "^1.0.0" is-plain-obj "^2.1.0" +"@validatem/is-positive@^1.0.0": + version "1.0.0" + resolved "https://registry.yarnpkg.com/@validatem/is-positive/-/is-positive-1.0.0.tgz#8820e39dc34e94ffc216217830ad4d2a3b26d415" + integrity sha512-ei8YL+IxwdZd7QGaR9coAo/MJRJKMqN3RunoM6lLbtFGPJyMa6hOlcLWiZ9UmRI9qE96YUHr+AI80AJ91SeOYg== + dependencies: + "@validatem/error" "^1.0.0" + "@validatem/is-number" "^0.1.3" + is-negative-zero "^2.0.0" + "@validatem/is-string@^0.1.1": version "0.1.1" resolved "https://registry.yarnpkg.com/@validatem/is-string/-/is-string-0.1.1.tgz#0710d8cebedd4d6861b4a8c63d7803ed6d2f9d6c" @@ -484,6 +513,13 @@ "@validatem/error" "^1.0.0" is-string "^1.0.5" +"@validatem/is-value@^0.1.0": + version "0.1.0" + resolved "https://registry.yarnpkg.com/@validatem/is-value/-/is-value-0.1.0.tgz#b4c7481818a88d7c24400b9b587f83b388b37d56" + integrity sha512-FrwC6AP4W8dN6GCJEX02qNphoRUaN2JtdbIU2ztwPpLFUSgaJ+zvUrIPYDr8f+8YfrI/QIHm+uiY2TNEjQq/iQ== + dependencies: + "@validatem/error" "^1.0.0" + "@validatem/match-special@^0.1.0": version "0.1.0" resolved "https://registry.yarnpkg.com/@validatem/match-special/-/match-special-0.1.0.tgz#4e0c28f1aee5bf53c1ef30bbf8c755d4946ae0ff" @@ -1860,6 +1896,11 @@ is-glob@^4.0.0, is-glob@^4.0.1: dependencies: is-extglob "^2.1.1" +is-negative-zero@^2.0.0: + version "2.0.2" + resolved "https://registry.yarnpkg.com/is-negative-zero/-/is-negative-zero-2.0.2.tgz#7bf6f03a28003b8b3965de3ac26f664d765f3150" + integrity sha512-dqJvarLawXsFbNDeJW7zAz8ItJ9cd28YufuuFzh0G8pNHjJMnY08Dv7sYX2uF5UpQOwieAeOExEYAWWfu7ZZUA== + is-number-object@^1.0.4: version "1.0.6" resolved "https://registry.yarnpkg.com/is-number-object/-/is-number-object-1.0.6.tgz#6a7aaf838c7f0686a50b4553f7e54a96494e89f0"