You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

381 lines
12 KiB
JavaScript

"use strict";
const Promise = require("bluebird");
const path = require("path");
const defaultValue = require("default-value");
const knexLibrary = require("knex");
const { knexSnakeCaseMappers } = require("objection");
const { addMilliseconds } = require("date-fns");
const ValidationError = require("@validatem/error");
const debug = require("debug")("srap:backend:postgresql:queries");
const models = require("./models");
const mergeItems = require("../../semantics/merge-items");
const syncpipe = require("syncpipe");
let migrationsFolder = path.join(__dirname, "../../../migrations");
function noop() {}
module.exports = function(state) {
let { metrics } = state;
return {
defaultSettings: {
taskBatchSize: 1000,
taskBatchDelay: 5 * 60 * 1000, // FIXME: Rename to reflect changed behaviour?
delayRandomization: 0.2
},
create: function createPostgreSQLBackend(options) {
let knex = knexLibrary({
client: "pg",
pool: { min: 0, max: 32, ... defaultValue(options.pool, {}) },
migrations: { tableName: "srap_knex_migrations" },
connection: options,
... knexSnakeCaseMappers()
});
return Promise.try(() => {
return knex.migrate.latest({
directory: migrationsFolder
});
}).then(() => {
// TODO: Does it really make sense to be merging in the backendSettings here? Shouldn't that happen automatically in some way for *every* backend, rather than just the PostgreSQL one specifically? As backend settings are a generic backend feature
state = { ... state, knex: knex, backendSettings: options};
let db = models(state);
state = { ... state, db: db };
// TODO: Should this be inlined instead?
function repointAliases (tx, { to, from }) {
return db.Alias.query(tx)
.patch({ itemId: to, updatedAt: new Date() })
.where({ itemId: from });
}
function getTaskResult(tx, task, id) {
return Promise.try(() => {
return db.Alias.query(tx).findById(id);
}).then((alias) => {
if (alias != null) {
return Promise.try(() => {
return db.TaskResult.query(tx).findById([ task.name, alias.itemId ]);
});
}
});
}
return {
shutdown: function () {
knex.destroy();
},
getDefaultTransaction: function () {
return knex;
},
isTransaction: function (value) {
if (value.where == null || value.raw == null) {
throw new ValidationError(`Must be a valid Knex or Knex transaction instance`);
}
},
runInTransaction: function (tx, callback) {
return tx.transaction((tx) => {
return callback(tx);
}, { doNotRejectOnRollback: false });
},
lock: function (tx, { id, task }) {
return this.runInTransaction(tx, (tx) => {
return Promise.try(() => {
return db.TaskInProgress.query(tx).forUpdate().findById([ task.name, id ]);
}).then((item) => {
if (item != null) {
debug(`Task '${task.name}:${id}' currently locked: ${item.started}`);
if (item.started === false) {
return Promise.try(() => {
return db.TaskInProgress.query(tx)
.findById([ task.name, id ])
.update({
started: true,
started_at: knex.fn.now()
});
}).then(() => {
debug(`Task '${task.name}:${id}' locked successfully`);
return true;
});
} else {
debug(`Task '${task.name}:${id}' failed lock because already locked on read`);
return false;
}
} else {
debug(`Task '${task.name}:${id}' failed lock because it no longer exists in queue`);
return false;
}
});
});
},
unlock: function (tx, { id, task }) {
return db.TaskInProgress.query(tx)
.delete()
.where({
task: task.name,
item_id: id
});
// TODO: return true/false depending on whether unlocking succeeded?
},
getItem: function (tx, { id, optional }) {
return Promise.try(() => {
return db.Alias.relatedQuery("item", tx)
.for(id)
.withGraphFetched("taskResults");
}).then((results) => {
if (optional === true || results.length > 0) {
return results[0];
} else {
throw new Error(`No item exists with ID '${id}'`);
}
});
},
storeItem: function (tx, { id, tags, aliases, update, failIfExists, allowUpsert, parentID }) {
// FIXME: Make failIfExists actually work, currently it does nothing as the UNIQUE constraint violation cannot occur for an upsert
// TODO: Ensure that we run the transaction in full isolation mode, and retry in case of a conflict
return Promise.try(() => {
// NOTE: We look up by alias, since this is an upsert - and so if the specified ID already exists as an alias, we should update the existing item instead of creating a new one with the specified (aliased) ID
return db.Alias
.relatedQuery("item", tx)
.for(id);
}).then((existingItems) => {
let existingItem = existingItems[0];
let actualID = (existingItem != null)
? existingItem.id
: id;
let existingData = (existingItem != null)
? existingItem.data
: {};
// Make sure to add a self:self alias
let allAliases = aliases.concat([ actualID ]);
let newItem = {
id: actualID,
data: update(existingData),
createdBy: parentID,
tags: tags.map((tag) => ({ name: tag, itemId: actualID })),
aliases: allAliases.map((alias) => ({ alias: alias })),
updatedAt: new Date()
};
return Promise.try(() => {
if (allowUpsert) {
// NOTE: We *always* do upserts here, even if the user specified `data` rather than `update`, because tags and aliases should always be added even if the item itself already exists. We trust the user not to accidentally reuse IDs between different kinds of objects (which would break in various other ways anyway).
return db.Item.query(tx).upsertGraph(newItem, {
insertMissing: true,
noDelete: true
});
} else {
return db.Item.query(tx).insertGraph(newItem, {
insertMissing: true
});
}
}).tap(() => {
// FIXME: We should probably move the metrics stuff to the wrapper instead, so that it works for *any* backend
metrics.storedItems.inc(1);
// TODO: This currently produces somewhat misleading metrics; it only counts *explicitly specified* tags. That will *mostly* correlate to amount of genuinely-new items, but not perfectly. In the future, we should probably refactor the insertion code such that it is aware of the *current* tags of an item that it is about to merge into - but maybe that should be delayed until the zapdb migration.
if (newItem.tags != null) {
for (let tag of newItem.tags) {
metrics.storedItems.labels({ tag: tag.name }).inc(1);
}
}
});
}).catch({ name: "UniqueViolationError", table: "srap_items" }, (error) => {
if (failIfExists) {
throw error;
} else {
// Do nothing, just ignore the failure
}
});
},
moveItem: function (tx, { from, into, merge, mergeMetadata }) {
let allowMerge = (merge != null);
// TODO: Make stuff like `this.getItem` roundtrip through the backend abstraction instead of directly calling internal database methods, eg. by providing the backend itself as an argument to the method
return Promise.all([
this.getItem(tx, { id: from, optional: true }),
this.getItem(tx, { id: into, optional: true }),
]).then(([ fromObj, maybeIntoObj ]) => {
// NOTE: If the source item does not exist, we silently ignore that. This is to ensure that opportunistic renaming of items (eg. after a naming convention change which only affects older items) in scraper configurations is possible.
if (fromObj != null) {
if (allowMerge) {
let intoObj = defaultValue(maybeIntoObj, {
id: into,
data: {},
taskResults: []
});
let newObject = mergeItems({ fromObj, intoObj, merge, mergeMetadata });
let upsertOptions = {
insertMissing: true,
noDelete: true
};
return Promise.all([
// NOTE: We don't use this.deleteItem, to sidestep any alias lookups
db.Item.query(tx).findById(from).delete(),
// NOTE: Cannot use into.$query here because that adds an implicit query builder operation, which upsertGraph does not allow
db.Item.query(tx).upsertGraph(newObject, upsertOptions),
]);
} else {
return Promise.all([
db.Item.query(tx).findById(from).patch({ id: into }),
this.createAlias(tx, { from: into, to: into })
]);
}
}
}).then(() => {
// NOTE: Repointing aliases has the side-effect of leaving a redirect from the source to the destination item, as each item has a self:self alias
return repointAliases(tx, { from: from, to: into });
});
},
deleteItem: function (tx, { id }) {
return db.Alias.relatedQuery("item", tx)
.for(id)
.delete();
},
createAlias: function (tx, { from, to, failIfExists }) {
// Isolate this operation into a savepoint so that it can fail without breaking the entire transaction
let promise = this.runInTransaction(tx, (tx) => {
return db.Alias.query(tx).insert({
alias: from,
itemId: to,
updatedAt: new Date()
});
});
if (failIfExists) {
return promise;
} else {
return Promise.resolve(promise)
.catch({ name: "UniqueViolationError" }, noop);
}
},
deleteAlias: function (tx, { from }) {
// TODO: This cannot yet be propagated to the update feed, because we don't keep a record of deletions
return db.Alias.query(tx).findById(from).delete();
},
updateMetadata: function (tx, { id, update, task }) {
// TODO: failIfExists
return Promise.try(() => {
return getTaskResult(tx, task, id);
}).then((taskResult) => {
if (taskResult != null) {
return taskResult.$query(tx).patch({
metadata: update(taskResult.metadata),
});
} else {
return db.TaskResult.query(tx).insert({
task: task.name,
itemId: id,
metadata: update({})
});
}
});
},
expire: function (tx, { id, task }) {
return Promise.try(() => {
return db.Alias.query(tx).findById(id);
}).then((alias) => {
return db.TaskResult.query(tx)
.where({ task: task.name, itemId: alias.itemId })
.patch({ isInvalidated: true });
});
},
setTTL: function (options) {
// options = ttl || { id, taskName, ttl }
// FIXME
},
allowFailure: function (allowed) {
},
log: function (category, message) {
},
markTaskStatus: function (tx, { id, task, isSuccessful }) {
return Promise.try(() => {
return getTaskResult(tx, task, id);
}).then((taskResult) => {
let sharedFields = {
isSuccessful: isSuccessful,
isInvalidated: false,
taskVersion: task.version,
updatedAt: new Date(),
expiresAt: (task.ttl != null)
? addMilliseconds(new Date(), task.ttl)
: undefined
};
if (taskResult != null) {
return taskResult.$query(tx).patch({
... sharedFields
});
} else {
return db.TaskResult.query(tx).insert({
... sharedFields,
task: task.name,
itemId: id
});
}
});
},
logTaskError: function (tx, { id, task, error }) {
throw new Error(`UNIMPLEMENTED`); // FIXME
},
countLockedTasks: function (tx) {
return Promise.try(() => {
return db.TaskInProgress.query(tx).count({ count: "*" }).where({ started: true });
}).then((result) => {
return result[0].count;
});
},
getQueueSize: function (tx) {
return Promise.try(() => {
return tx.raw(`SELECT task, COUNT(*) FROM srap_queue WHERE started = false GROUP BY task;`);
}).then((result) => {
return syncpipe(result.rows, [
_ => _.map(({ task, count }) => [ task, parseInt(count) ]),
_ => Object.fromEntries(_)
]);
});
},
getUpdateStream: require("./queries/get-update-stream")(state),
getTaskStream: require("./queries/get-task-stream")(state)
};
});
}
};
};