Compare commits
19 Commits
backend-re
...
master
Author | SHA1 | Date |
---|---|---|
Sven Slootweg | 4163b7deb9 | 1 year ago |
Sven Slootweg | 00685bb7e9 | 1 year ago |
Sven Slootweg | 8997a762ed | 1 year ago |
Sven Slootweg | 96e368b432 | 1 year ago |
Sven Slootweg | 7c60c4fa6b | 1 year ago |
Sven Slootweg | 48e9f8372b | 1 year ago |
Sven Slootweg | e40113a701 | 1 year ago |
Sven Slootweg | 5b8e71c083 | 1 year ago |
Sven Slootweg | 59b89a3459 | 2 years ago |
Sven Slootweg | dbd15aa1d7 | 2 years ago |
Sven Slootweg | 6e172dd04d | 2 years ago |
Sven Slootweg | 919985bbd2 | 2 years ago |
Sven Slootweg | 31742f8638 | 2 years ago |
Sven Slootweg | f7cd69d7d0 | 2 years ago |
Sven Slootweg | e2f2fb6cb1 | 2 years ago |
Sven Slootweg | ea7cd67158 | 2 years ago |
Sven Slootweg | b9b0e63454 | 2 years ago |
Sven Slootweg | fb93e902a8 | 2 years ago |
Sven Slootweg | 1e1a367cb2 | 2 years ago |
@ -1,2 +1,3 @@
|
||||
node_modules
|
||||
junk
|
||||
.clinic
|
||||
|
@ -0,0 +1,21 @@
|
||||
{
|
||||
// Use IntelliSense to learn about possible attributes.
|
||||
// Hover to view descriptions of existing attributes.
|
||||
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
|
||||
"version": "0.2.0",
|
||||
"configurations": [
|
||||
{
|
||||
"type": "node",
|
||||
"request": "launch",
|
||||
"name": "Launch Program",
|
||||
"skipFiles": [
|
||||
"<node_internals>/**"
|
||||
],
|
||||
"program": "./bin/simulate",
|
||||
"args": ["../seekseek/scraper-config/", "lcsc:normalizeProduct", "lcsc:product:C494972"],
|
||||
"env": {
|
||||
"DEBUG": "srap:backend:postgresql:*"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
@ -0,0 +1,17 @@
|
||||
"use strict";
|
||||
|
||||
module.exports.up = function(knex, Promise) {
|
||||
return knex.schema
|
||||
.alterTable("srap_task_results", (table) => {
|
||||
table.jsonb("metadata").notNullable().default({}).alter();
|
||||
table.boolean("is_successful").nullable().alter();
|
||||
});
|
||||
};
|
||||
|
||||
module.exports.down = function(knex, Promise) {
|
||||
return knex.schema
|
||||
.alterTable("srap_task_results", (table) => {
|
||||
table.jsonb("metadata").notNullable().alter();
|
||||
table.boolean("is_successful").notNullable().alter();
|
||||
});
|
||||
};
|
@ -0,0 +1,19 @@
|
||||
"use strict";
|
||||
|
||||
module.exports.up = function(knex, Promise) {
|
||||
return knex.schema
|
||||
.alterTable("srap_tasks_in_progress", (table) => {
|
||||
table.timestamp("started_at").alter().nullable().defaultTo(null);
|
||||
table.boolean("started").notNullable().defaultTo(false);
|
||||
})
|
||||
.renameTable("srap_tasks_in_progress", "srap_queue");
|
||||
};
|
||||
|
||||
module.exports.down = function(knex, Promise) {
|
||||
return knex.schema
|
||||
.renameTable("srap_queue", "srap_tasks_in_progress")
|
||||
.alterTable("srap_tasks_in_progress", (table) => {
|
||||
table.timestamp("started_at").alter().notNullable().defaultTo(knex.fn.now());
|
||||
table.dropColumn("started");
|
||||
});
|
||||
};
|
@ -0,0 +1,39 @@
|
||||
"use strict";
|
||||
|
||||
module.exports.up = function(knex, Promise) {
|
||||
// Get rid of existing duplicate entries
|
||||
return knex.raw(`
|
||||
DELETE FROM srap_tags
|
||||
WHERE id IN (
|
||||
SELECT
|
||||
id
|
||||
FROM (
|
||||
SELECT
|
||||
id,
|
||||
row_number() OVER w as rnum
|
||||
FROM srap_tags
|
||||
WINDOW w AS (
|
||||
PARTITION BY name, item_id
|
||||
ORDER BY id
|
||||
)
|
||||
|
||||
) t
|
||||
WHERE t.rnum > 1);
|
||||
`).then(() => {
|
||||
return knex.schema
|
||||
.alterTable("srap_tags", (table) => {
|
||||
table.dropPrimary();
|
||||
table.dropIndex("name");
|
||||
table.dropColumn("id");
|
||||
table.primary([ "name", "item_id" ]);
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
module.exports.down = function(knex, Promise) {
|
||||
return knex.schema
|
||||
.alterTable("srap_tags", (table) => {
|
||||
table.dropPrimary();
|
||||
table.bigIncrements("id").primary();
|
||||
});
|
||||
};
|
@ -0,0 +1,15 @@
|
||||
"use strict";
|
||||
|
||||
module.exports.up = function(knex, Promise) {
|
||||
return knex.schema
|
||||
.alterTable("srap_tags", (table) => {
|
||||
table.index("item_id");
|
||||
});
|
||||
};
|
||||
|
||||
module.exports.down = function(knex, Promise) {
|
||||
return knex.schema
|
||||
.alterTable("srap_tags", (table) => {
|
||||
table.dropIndex("item_id");
|
||||
});
|
||||
};
|
@ -0,0 +1,15 @@
|
||||
"use strict";
|
||||
|
||||
module.exports.up = function(knex, Promise) {
|
||||
return knex.schema
|
||||
.alterTable("srap_aliases", (table) => {
|
||||
table.index("item_id");
|
||||
});
|
||||
};
|
||||
|
||||
module.exports.down = function(knex, Promise) {
|
||||
return knex.schema
|
||||
.alterTable("srap_aliases", (table) => {
|
||||
table.dropIndex("item_id");
|
||||
});
|
||||
};
|
@ -1,106 +1,182 @@
|
||||
"use strict";
|
||||
|
||||
const Promise = require("bluebird");
|
||||
const debug = require("debug")("srap:backend:postgresql:query:get-task-stream");
|
||||
const buffer = require("@promistream/buffer");
|
||||
const pipe = require("@promistream/pipe");
|
||||
const simpleSource = require("@promistream/simple-source");
|
||||
|
||||
const query = `
|
||||
WITH
|
||||
dependency_tasks AS (
|
||||
SELECT * FROM
|
||||
json_to_recordset(:dependencyTaskDefinitions) AS x(task text, task_version text)
|
||||
),
|
||||
matching_items AS (
|
||||
SELECT
|
||||
DISTINCT ON (srap_items.id)
|
||||
srap_items.*,
|
||||
results.updated_at AS result_date,
|
||||
results.task_version,
|
||||
(
|
||||
results.is_successful = TRUE
|
||||
AND (
|
||||
results.expires_at < NOW()
|
||||
OR results.is_invalidated = TRUE
|
||||
)
|
||||
) AS is_candidate
|
||||
FROM srap_items
|
||||
INNER JOIN srap_tags
|
||||
ON srap_tags.item_id = srap_items.id
|
||||
AND srap_tags.name = ANY(:tags)
|
||||
LEFT JOIN srap_task_results AS results
|
||||
ON results.item_id = srap_items.id
|
||||
AND results.task = :task
|
||||
WHERE
|
||||
NOT EXISTS (
|
||||
SELECT FROM srap_tasks_in_progress AS pr WHERE pr.item_id = srap_items.id
|
||||
)
|
||||
),
|
||||
candidates AS (
|
||||
SELECT * FROM matching_items
|
||||
WHERE result_date IS NULL
|
||||
UNION
|
||||
SELECT * FROM matching_items
|
||||
WHERE is_candidate = TRUE
|
||||
OR NOT (task_version = :taskVersion)
|
||||
)
|
||||
(
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
candidates
|
||||
WHERE
|
||||
NOT EXISTS (
|
||||
SELECT
|
||||
results.*
|
||||
FROM dependency_tasks
|
||||
const fetchQuery = `
|
||||
SELECT
|
||||
srap_items.*
|
||||
FROM
|
||||
srap_queue
|
||||
LEFT JOIN srap_items
|
||||
ON srap_items.id = srap_queue.item_id
|
||||
WHERE
|
||||
srap_queue.task = :task
|
||||
AND srap_queue.started = FALSE
|
||||
LIMIT :resultLimit
|
||||
`;
|
||||
|
||||
function makeFillQuery(withDependencies) {
|
||||
return `
|
||||
WITH
|
||||
${withDependencies ? `
|
||||
dependencies AS (
|
||||
SELECT * FROM json_to_recordset(:dependencyTaskDefinitions) AS x(task text, task_version text)
|
||||
),
|
||||
satisfied AS (
|
||||
SELECT results.* FROM dependencies
|
||||
LEFT JOIN srap_task_results AS results
|
||||
ON dependency_tasks.task = results.task
|
||||
AND dependency_tasks.task_version = results.task_version
|
||||
AND results.item_id = candidates.id
|
||||
WHERE
|
||||
results.is_successful IS NULL
|
||||
OR results.is_successful = FALSE
|
||||
OR (
|
||||
ON dependencies.task = results.task
|
||||
AND dependencies.task_version = results.task_version
|
||||
WHERE
|
||||
results.is_successful = TRUE
|
||||
AND results.is_invalidated = FALSE
|
||||
AND (
|
||||
results.expires_at < NOW()
|
||||
OR results.is_invalidated = TRUE
|
||||
results.expires_at > NOW()
|
||||
OR results.expires_at IS NULL
|
||||
)
|
||||
)
|
||||
),
|
||||
counts AS (
|
||||
SELECT item_id, COUNT(task) AS count FROM satisfied GROUP BY item_id
|
||||
),
|
||||
dependency_candidates AS (
|
||||
SELECT item_id FROM counts WHERE count = :dependencyCount
|
||||
),
|
||||
` : "" }
|
||||
tag_candidates AS (
|
||||
SELECT item_id FROM srap_tags WHERE name = ANY(:tags)
|
||||
),
|
||||
full_candidates AS MATERIALIZED (
|
||||
${withDependencies
|
||||
? `
|
||||
SELECT tag_candidates.item_id FROM dependency_candidates
|
||||
INNER JOIN tag_candidates
|
||||
ON dependency_candidates.item_id = tag_candidates.item_id
|
||||
`
|
||||
: `
|
||||
SELECT item_id FROM tag_candidates
|
||||
`
|
||||
}
|
||||
),
|
||||
tasks AS NOT MATERIALIZED (
|
||||
SELECT
|
||||
item_id,
|
||||
is_successful,
|
||||
(
|
||||
results.is_successful = FALSE
|
||||
OR (
|
||||
results.is_successful = TRUE
|
||||
AND results.is_invalidated = FALSE
|
||||
AND (
|
||||
results.expires_at > NOW()
|
||||
OR results.expires_at IS NULL
|
||||
)
|
||||
)
|
||||
) AS is_completed
|
||||
FROM srap_task_results AS results
|
||||
WHERE
|
||||
results.task = :task
|
||||
AND results.task_version = :taskVersion
|
||||
)
|
||||
) LIMIT :resultLimit;
|
||||
`;
|
||||
SELECT
|
||||
:task AS task,
|
||||
full_candidates.item_id
|
||||
FROM full_candidates
|
||||
LEFT JOIN tasks ON full_candidates.item_id = tasks.item_id
|
||||
WHERE tasks.is_completed IS NOT TRUE
|
||||
ORDER BY tasks.is_successful NULLS FIRST
|
||||
`;
|
||||
}
|
||||
|
||||
const fillQueryWithDependencies = makeFillQuery(true);
|
||||
const fillQueryWithoutDependencies = makeFillQuery(false);
|
||||
|
||||
module.exports = function ({ metrics, backendSettings }) {
|
||||
module.exports = function ({ metrics, backendSettings, knex }) {
|
||||
return function (tx, { task }) {
|
||||
return simpleSource(() => {
|
||||
let startTime = Date.now();
|
||||
|
||||
return Promise.try(() => {
|
||||
return tx.raw(query, {
|
||||
tags: task.tags,
|
||||
task: task.name,
|
||||
taskVersion: task.version,
|
||||
resultLimit: backendSettings.taskBatchSize,
|
||||
let hasDependencies = (task.dependencies.length > 0);
|
||||
|
||||
let refillParameters = {
|
||||
tags: task.tags,
|
||||
task: task.name,
|
||||
taskVersion: task.version,
|
||||
... hasDependencies
|
||||
? {
|
||||
dependencyCount: task.dependencies.length,
|
||||
dependencyTaskDefinitions: JSON.stringify(task.dependencies.map((dependency) => {
|
||||
// Case-mapping for SQL compatibility
|
||||
return { task_version: dependency.version, task: dependency.name };
|
||||
}))
|
||||
});
|
||||
}).then((result) => {
|
||||
let timeElapsed = Date.now() - startTime;
|
||||
|
||||
metrics.taskFetchTime.labels({ task: task.name }).set(timeElapsed / 1000);
|
||||
metrics.taskFetchResults.labels({ task: task.name }).set(result.rowCount);
|
||||
|
||||
debug(`Task retrieval query for '${task.name}' took ${timeElapsed}ms and produced ${result.rowCount} results`);
|
||||
|
||||
if (result.rowCount > 0) {
|
||||
return result.rows;
|
||||
} else {
|
||||
// TODO: Consider using LISTEN/NOTIFY instead?
|
||||
return Promise.resolve([]).delay(backendSettings.taskBatchDelay);
|
||||
}
|
||||
: {}
|
||||
};
|
||||
|
||||
let fetchParameters = {
|
||||
task: task.name,
|
||||
resultLimit: backendSettings.taskBatchSize
|
||||
// resultLimit: 1 // For tracking down race conditions
|
||||
};
|
||||
|
||||
function refillQueue() {
|
||||
let startTime = Date.now();
|
||||
|
||||
return Promise.try(() => {
|
||||
let fillQuery = (hasDependencies)
|
||||
? fillQueryWithDependencies
|
||||
: fillQueryWithoutDependencies;
|
||||
|
||||
// NOTE: We are deliberately bypassing the transaction here! Also deliberately not using VALUES, since we're inserting from the results of another query instead
|
||||
return knex.raw(`
|
||||
INSERT INTO srap_queue (task, item_id)
|
||||
(${fillQuery})
|
||||
ON CONFLICT (task, item_id) DO NOTHING;
|
||||
`, refillParameters);
|
||||
}).then((response) => {
|
||||
let timeElapsed = Date.now() - startTime;
|
||||
|
||||
metrics.taskRefillTime.labels({ task: task.name }).set(timeElapsed / 1000);
|
||||
metrics.taskRefillResults.labels({ task: task.name }).set(response.rowCount);
|
||||
|
||||
debug(`Queue for '${task.name}' was refilled with ${response.rowCount} items in ${timeElapsed}ms`);
|
||||
return response.rowCount;
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
return pipe([
|
||||
simpleSource(() => {
|
||||
let startTime = Date.now();
|
||||
|
||||
return Promise.try(() => {
|
||||
return tx.raw(fetchQuery, fetchParameters);
|
||||
}).then((result) => {
|
||||
let timeElapsed = Date.now() - startTime;
|
||||
|
||||
metrics.taskFetchTime.labels({ task: task.name }).set(timeElapsed / 1000);
|
||||
metrics.taskFetchResults.labels({ task: task.name }).set(result.rowCount);
|
||||
|
||||
debug(`Task retrieval query for '${task.name}' took ${timeElapsed}ms and produced ${result.rowCount} results`);
|
||||
|
||||
if (result.rowCount > 0) {
|
||||
return result.rows;
|
||||
} else {
|
||||
return Promise.try(() => {
|
||||
return refillQueue();
|
||||
}).then((newItems) => {
|
||||
if (newItems === 0) {
|
||||
// TODO: Consider using LISTEN/NOTIFY instead? Worth the added complexity?
|
||||
let randomization = Math.random() * backendSettings.delayRandomization * backendSettings.taskBatchDelay; // To prevent stampeding by low-throughput tasks
|
||||
return Promise.resolve([]).delay(backendSettings.taskBatchDelay + randomization);
|
||||
} else {
|
||||
// Have another go right away
|
||||
return [];
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}),
|
||||
buffer()
|
||||
]);
|
||||
};
|
||||
}
|
||||
};
|
||||
|
@ -1,179 +1,183 @@
|
||||
"use strict";
|
||||
|
||||
const Promise = require("bluebird");
|
||||
const chalk = require("chalk");
|
||||
|
||||
const mergeItems = require("../../semantics/merge-items");
|
||||
|
||||
function printTX(tx) {
|
||||
// TODO: Print entire chain?
|
||||
return `[tx ${tx.__txID}]`;
|
||||
return chalk.bold.yellow(`[tx ${tx.__txID ?? "?"}]`);
|
||||
}
|
||||
|
||||
function printItem(id, task) {
|
||||
if (task != null) {
|
||||
return `[${id}:${task.name}]`;
|
||||
return chalk.bold.white(`[${id}][${task.name}]`);
|
||||
} else {
|
||||
return `[${id}]`;
|
||||
return chalk.bold.white(`[${id}]`);
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME: Move logs to logging hook
|
||||
function logSimulated(... args) {
|
||||
console.log(... args);
|
||||
console.log(chalk.gray(args[0]), ... args.slice(1));
|
||||
}
|
||||
|
||||
module.exports = function (state) {
|
||||
|
||||
// NOTE: The simulated backend needs access to the 'real' backend; a task may eg. mutate an item based on its current data, and we'd need to read that from the real data source. The only constraint is that the simulated backend cannot *mutate* anything in the real backend, but reading is fine!
|
||||
return function attachSimulatedBackend({ backend }) {
|
||||
return {
|
||||
defaultSettings: {},
|
||||
create: function createSimulatedBackend(_options) {
|
||||
let txCounter = 0;
|
||||
let locks = new Map(); // Map<task, Set<id>>
|
||||
|
||||
return {
|
||||
shutdown: function () {
|
||||
return backend.shutdown();
|
||||
},
|
||||
|
||||
getDefaultTransaction: function () {
|
||||
return { __txID: null };
|
||||
},
|
||||
|
||||
isTransaction: function (value) {
|
||||
return ("__txID" in value);
|
||||
},
|
||||
|
||||
runInTransaction: function (tx, callback) {
|
||||
let newTransaction = { __txID: txCounter++, __parentTX: tx };
|
||||
|
||||
return callback(newTransaction);
|
||||
},
|
||||
|
||||
lock: function (tx, { id, task }) {
|
||||
if (!locks.has(task)) {
|
||||
locks.set(task, new Set());
|
||||
}
|
||||
return {
|
||||
defaultSettings: {},
|
||||
create: function createSimulatedBackend({ backend }) {
|
||||
let txCounter = 0;
|
||||
let locks = new Map(); // Map<task, Set<id>>
|
||||
|
||||
return {
|
||||
shutdown: function () {
|
||||
return backend.shutdown();
|
||||
},
|
||||
|
||||
getDefaultTransaction: function () {
|
||||
return { __txID: null };
|
||||
},
|
||||
|
||||
isTransaction: function (value) {
|
||||
return ("__txID" in value);
|
||||
},
|
||||
|
||||
runInTransaction: function (tx, callback) {
|
||||
let newTransaction = { __txID: txCounter++, __parentTX: tx };
|
||||
|
||||
return callback(newTransaction);
|
||||
},
|
||||
|
||||
lock: function (tx, { id, task }) {
|
||||
if (!locks.has(task)) {
|
||||
locks.set(task, new Set());
|
||||
}
|
||||
|
||||
let taskLocks = locks.get(task);
|
||||
let taskLocks = locks.get(task);
|
||||
|
||||
if (taskLocks.has(id)) {
|
||||
logSimulated(`${printTX(tx)} Already locked: ${printItem(id, task)}`);
|
||||
return false;
|
||||
} else {
|
||||
logSimulated(`${printTX(tx)} Locking ${printItem(id, task)}`);
|
||||
taskLocks.add(id);
|
||||
return true;
|
||||
}
|
||||
},
|
||||
|
||||
unlock: function (tx, { id, task }) {
|
||||
logSimulated(`${printTX(tx)} Unlocking ${printItem(id, task)}`);
|
||||
locks.get(task).delete(id);
|
||||
},
|
||||
|
||||
getItem: function (tx, options) {
|
||||
return backend.getItem(backend.getDefaultTransaction(), options);
|
||||
},
|
||||
|
||||
storeItem: function (tx, { id, parentID, update, tags, aliases, allowUpsert }) {
|
||||
return Promise.try(() => {
|
||||
return this.getItem(tx, { id: id, optional: true });
|
||||
}).then((currentItem) => {
|
||||
let actualID = currentItem.id ?? id;
|
||||
|
||||
let newItem = {
|
||||
id: actualID,
|
||||
data: (currentItem != null)
|
||||
? update(currentItem.data)
|
||||
: update({}),
|
||||
createdBy: parentID,
|
||||
tags: tags,
|
||||
aliases: aliases.concat([ actualID ]),
|
||||
updatedAt: new Date()
|
||||
if (taskLocks.has(id)) {
|
||||
logSimulated(`${printTX(tx)} Already locked: ${printItem(id, task)}`);
|
||||
return false;
|
||||
} else {
|
||||
logSimulated(`${printTX(tx)} Locking ${printItem(id, task)}`);
|
||||
taskLocks.add(id);
|
||||
return true;
|
||||
}
|
||||
},
|
||||
|
||||
unlock: function (tx, { id, task }) {
|
||||
logSimulated(`${printTX(tx)} Unlocking ${printItem(id, task)}`);
|
||||
locks.get(task).delete(id);
|
||||
},
|
||||
|
||||
getItem: function (tx, options) {
|
||||
return backend.getItem(backend.getDefaultTransaction(), options);
|
||||
},
|
||||
|
||||
storeItem: function (tx, { id, parentID, update, tags, aliases, allowUpsert }) {
|
||||
return Promise.try(() => {
|
||||
return this.getItem(tx, { id: id, optional: true });
|
||||
}).then((currentItem) => {
|
||||
let actualID = (currentItem != null)
|
||||
? currentItem.id
|
||||
: id;
|
||||
|
||||
let newItem = {
|
||||
id: actualID,
|
||||
data: (currentItem != null)
|
||||
? update(currentItem.data)
|
||||
: update({}),
|
||||
createdBy: parentID,
|
||||
tags: tags,
|
||||
aliases: aliases.concat([ actualID ]),
|
||||
updatedAt: new Date()
|
||||
};
|
||||
|
||||
logSimulated(`${printTX(tx)} ${printItem(id)} Storing item (upsert=${allowUpsert})`, newItem);
|
||||
});
|
||||
},
|
||||
|
||||
moveItem: function (tx, { from, into, merge, mergeMetadata, allowMerge }) {
|
||||
return Promise.all([
|
||||
this.getItem(tx, { id: from, optional: true }),
|
||||
this.getItem(tx, { id: into, optional: true }),
|
||||
]).then(([ fromObj, maybeIntoObj ]) => {
|
||||
if (fromObj != null) {
|
||||
let intoObj = maybeIntoObj ?? {
|
||||
id: into,
|
||||
data: {},
|
||||
taskResults: []
|
||||
};
|
||||
|
||||
logSimulated(`${printTX(tx)} ${printItem(id)} Storing item (upsert=${allowUpsert})`, newItem);
|
||||
});
|
||||
},
|
||||
|
||||
moveItem: function (tx, { from, into, merge, mergeMetadata, allowMerge }) {
|
||||
return Promise.all([
|
||||
this.getItem(tx, { id: from, optional: true }),
|
||||
this.getItem(tx, { id: into, optional: true }),
|
||||
]).then((fromObj, maybeIntoObj) => {
|
||||
if (fromObj != null) {
|
||||
let intoObj = maybeIntoObj ?? {
|
||||
id: into,
|
||||
data: {},
|
||||
taskResults: []
|
||||
};
|
||||
|
||||
if (allowMerge) {
|
||||
let newItem = mergeItems({ fromObj, intoObj, merge, mergeMetadata });
|
||||
logSimulated(`${printTX(tx)} ${printItem(from)} Moving item to ${printItem[into]} (merge=true)`, newItem);
|
||||
} else {
|
||||
logSimulated(`${printTX(tx)} ${printItem(from)} Moving item to ${printItem[into]} (merge=false)`);
|
||||
}
|
||||
|
||||
if (allowMerge) {
|
||||
let newItem = mergeItems({ fromObj, intoObj, merge, mergeMetadata });
|
||||
logSimulated(`${printTX(tx)} ${printItem(from)} Moving item to ${printItem[into]} (merge=true)`, newItem);
|
||||
} else {
|
||||
logSimulated(`${printTX(tx)} ${printItem(from)} Moving item to ${printItem[into]} (merge=false)`);
|
||||
}
|
||||
});
|
||||
},
|
||||
|
||||
deleteItem: function (tx, { id }) {
|
||||
logSimulated(`${printTX(tx)} ${printItem(id)} Deleting item`);
|
||||
},
|
||||
|
||||
createAlias: function (tx, { from, to }) {
|
||||
logSimulated(`${printTX(tx)} ${printItem(to)} Creating alias ${from}`);
|
||||
},
|
||||
|
||||
deleteAlias: function (tx, { from }) {
|
||||
logSimulated(`${printTX(tx)} ${printItem(from)} Deleting alias`);
|
||||
},
|
||||
|
||||
updateMetadata: function (tx, { id, task, update }) {
|
||||
return Promise.try(() => {
|
||||
return this.getItem(tx, { id, optional: true });
|
||||
}).then((item) => {
|
||||
let taskResult = (item != null)
|
||||
? item.taskResults.find((result) => result.task === task)
|
||||
: undefined;
|
||||
|
||||
let existingData = (taskResult != null)
|
||||
? taskResult.metadata
|
||||
: {};
|
||||
|
||||
let newData = update(existingData);
|
||||
|
||||
logSimulated(`${printTX(tx)} ${printItem(id, task)} Storing metadata`, newData);
|
||||
});
|
||||
},
|
||||
|
||||
expire: function (tx, { id, task }) {
|
||||
logSimulated(`${printTX(tx)} ${printItem(id, task)} Expiring task result`);
|
||||
},
|
||||
|
||||
markTaskStatus: function (tx, { id, task, isSuccessful }) {
|
||||
if (isSuccessful) {
|
||||
logSimulated(`${printTX(tx)} ${printItem(id, task)} Marking task as completed`);
|
||||
} else {
|
||||
logSimulated(`${printTX(tx)} ${printItem(id, task)} Marking task as failed`);
|
||||
}
|
||||
},
|
||||
|
||||
countLockedTasks: function (tx) {
|
||||
return Array.from(locks).reduce((total, [ task, taskLocks ]) => {
|
||||
return total + taskLocks.size;
|
||||
}, 0);
|
||||
},
|
||||
|
||||
getUpdateStream: function (... args) {
|
||||
return backend.getUpdateStream(... args);
|
||||
},
|
||||
|
||||
getTaskStream: function (... args) {
|
||||
return backend.getTaskStream(... args);
|
||||
});
|
||||
},
|
||||
|
||||
deleteItem: function (tx, { id }) {
|
||||
logSimulated(`${printTX(tx)} ${printItem(id)} Deleting item`);
|
||||
},
|
||||
|
||||
createAlias: function (tx, { from, to }) {
|
||||
logSimulated(`${printTX(tx)} ${printItem(to)} Creating alias ${from}`);
|
||||
},
|
||||
|
||||
deleteAlias: function (tx, { from }) {
|
||||
logSimulated(`${printTX(tx)} ${printItem(from)} Deleting alias`);
|
||||
},
|
||||
|
||||
updateMetadata: function (tx, { id, task, update }) {
|
||||
return Promise.try(() => {
|
||||
return this.getItem(tx, { id, optional: true });
|
||||
}).then((item) => {
|
||||
let taskResult = (item != null)
|
||||
? item.taskResults.find((result) => result.task === task)
|
||||
: undefined;
|
||||
|
||||
let existingData = (taskResult != null)
|
||||
? taskResult.metadata
|
||||
: {};
|
||||
|
||||
let newData = update(existingData);
|
||||
|
||||
logSimulated(`${printTX(tx)} ${printItem(id, task)} Storing metadata`, newData);
|
||||
});
|
||||
},
|
||||
|
||||
expire: function (tx, { id, task }) {
|
||||
logSimulated(`${printTX(tx)} ${printItem(id, task)} Expiring task result`);
|
||||
},
|
||||
|
||||
markTaskStatus: function (tx, { id, task, isSuccessful }) {
|
||||
if (isSuccessful) {
|
||||
logSimulated(`${printTX(tx)} ${printItem(id, task)} Marking task as completed`);
|
||||
} else {
|
||||
logSimulated(`${printTX(tx)} ${printItem(id, task)} Marking task as failed`);
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
},
|
||||
|
||||
countLockedTasks: function (tx) {
|
||||
return Array.from(locks).reduce((total, [ task, taskLocks ]) => {
|
||||
return total + taskLocks.size;
|
||||
}, 0);
|
||||
},
|
||||
|
||||
getUpdateStream: function (... args) {
|
||||
return backend.getUpdateStream(... args);
|
||||
},
|
||||
|
||||
getTaskStream: function (... args) {
|
||||
return backend.getTaskStream(... args);
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
};
|
||||
|
@ -1,36 +0,0 @@
|
||||
"use strict";
|
||||
|
||||
const Promise = require("bluebird");
|
||||
const consumable = require("@joepie91/consumable");
|
||||
const syncpipe = require("syncpipe");
|
||||
|
||||
const createMutationAPIWrapper = require("./mutation-api/wrapper");
|
||||
|
||||
module.exports = function (state) {
|
||||
const createDatabaseMutationAPI = require("./mutation-api/database")(state);
|
||||
|
||||
return function createDatabaseQueue(context) {
|
||||
let databaseMutationAPI = createDatabaseMutationAPI(context);
|
||||
let mutationAPI = createMutationAPIWrapper(context, databaseMutationAPI);
|
||||
|
||||
let queue = consumable([]);
|
||||
|
||||
return {
|
||||
api: syncpipe(Object.keys(mutationAPI), [
|
||||
(_) => _.map((method) => [ method, function() { queue.peek().push([ method, arguments ]); } ]),
|
||||
(_) => Object.fromEntries(_)
|
||||
]),
|
||||
execute: function () {
|
||||
if (!queue.peek().some((method) => method[0] === "updateMetadata")) {
|
||||
// Doing an updateMetadata call is necessary to mark a task 'completed', so we inject a dummy call that doesn't actually change the metadata itself
|
||||
// FIXME: Split apart 'markTaskCompleted' and 'updateMetadata' queries so that this hack is no longer necessary
|
||||
queue.peek().push([ "updateMetadata", [ (data) => data ]]);
|
||||
}
|
||||
|
||||
return Promise.each(queue.consume(), ([ method, args ]) => {
|
||||
return mutationAPI[method](... args);
|
||||
});
|
||||
}
|
||||
};
|
||||
};
|
||||
};
|
@ -0,0 +1,55 @@
|
||||
"use strict";
|
||||
|
||||
const Promise = require("bluebird");
|
||||
const chalk = require("chalk");
|
||||
|
||||
const logStatus = require("./util/log-status");
|
||||
|
||||
module.exports = function ({ backend }) {
|
||||
return function runTask(task, item) {
|
||||
let queue = [];
|
||||
let itemIsDeleted = false;
|
||||
|
||||
let api = backend.forItem({
|
||||
task: task,
|
||||
item: item,
|
||||
mutationQueue: queue,
|
||||
onDeleteSelf: () => { itemIsDeleted = true; }
|
||||
});
|
||||
|
||||
return Promise.try(() => {
|
||||
// TODO: Standardize logging control/levels interface, also for library use
|
||||
if (!process.env.SRAP_QUIET) {
|
||||
logStatus(task, chalk.bold.cyan, "started", item.id);
|
||||
}
|
||||
|
||||
// NOTE: We only pass in the item data itself, *not* any associated metadata like tags. If the scraping task wants access to that sort of information, it should do a `getItem` call from within its task logic where needed.
|
||||
// FIXME: Is that actually still true post-refactor?
|
||||
return task.run({
|
||||
data: item.data,
|
||||
... api.exposed
|
||||
});
|
||||
}).then(() => {
|
||||
// NOTE: We only apply changes at the very end (outside of simulation mode), so that when a task implementation contains multiple operations, each of those operation always 'sees' the state at the start of the task, not the state after the previous mutation. This makes the model as a whole easier to reason about. In simulation mode, all calls are immediate and the queue is empty - after all, no mutation can happen in that case anyway. This is also another reason to ensure that operations in live mode always see the starting state; that makes its behaviour consistent with simulation mode.
|
||||
return backend.topLevel.runInTransaction(null, (tx) => {
|
||||
return Promise.each(queue, (operation) => {
|
||||
return operation(tx);
|
||||
});
|
||||
});
|
||||
}).then(async () => {
|
||||
if (!itemIsDeleted) {
|
||||
await api.internal.markTaskCompleted();
|
||||
}
|
||||
|
||||
if (!process.env.SRAP_QUIET) {
|
||||
logStatus(task, chalk.bold.green, "completed", item.id);
|
||||
}
|
||||
|
||||
return { status: "completed", item: item };
|
||||
}).catch(async (error) => {
|
||||
await api.internal.markTaskFailed(null, { error });
|
||||
logStatus(task, chalk.bold.red, "failed", `${item.id}: ${error.stack}`);
|
||||
return { status: "failed", item: item, error: error };
|
||||
});
|
||||
};
|
||||
};
|
@ -1,42 +0,0 @@
|
||||
"use strict";
|
||||
|
||||
const Promise = require("bluebird");
|
||||
// const { UniqueViolationError } = require("objection");
|
||||
|
||||
const pipe = require("@promistream/pipe");
|
||||
const map = require("@promistream/map");
|
||||
const mapFilter = require("@promistream/map-filter");
|
||||
|
||||
module.exports = function ({ backend }) {
|
||||
return function processTaskSafely(task, processHandler) {
|
||||
let lockStream = mapFilter((item) => {
|
||||
return Promise.try(() => {
|
||||
return backend.lock(null, { id: item.id, task: task });
|
||||
}).then((success) => {
|
||||
if (success) {
|
||||
return item;
|
||||
} else {
|
||||
return mapFilter.NoValue;
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
let processUnlockStream = map((item) => {
|
||||
return Promise.try(() => {
|
||||
return backend.runInTransaction((tx) => {
|
||||
return processHandler(item, tx);
|
||||
});
|
||||
}).finally(() => {
|
||||
// NOTE: The unlock deliberately happens outside of a transaction, so that it can always succeed, even if a task and its associated database changes failed
|
||||
return backend.unlock(null, { id: item.id, task: task });
|
||||
}).then(() => {
|
||||
return item;
|
||||
});
|
||||
});
|
||||
|
||||
return pipe([
|
||||
lockStream,
|
||||
processUnlockStream
|
||||
]);
|
||||
};
|
||||
};
|
@ -0,0 +1,37 @@
|
||||
"use strict";
|
||||
|
||||
const Promise = require("bluebird");
|
||||
const pipe = require("@promistream/pipe");
|
||||
const filter = require("@promistream/filter");
|
||||
const map = require("@promistream/map");
|
||||
const rateLimit = require("@promistream/rate-limit");
|
||||
|
||||
const parallelize = require("@promistream/parallelize");
|
||||
|
||||
// FIXME: Move logs to logging hook
|
||||
|
||||
module.exports = function (state) {
|
||||
let { backend } = state;
|
||||
const runTask = require("../run-task")(state);
|
||||
|
||||
return function createTaskKernelStream(task, { globalRateLimiter }) {
|
||||
return pipe([
|
||||
backend.topLevel.getTaskStream(null, { task: task }),
|
||||
filter((item) => backend.forItem({ task: task, item: item }).internal.lock()),
|
||||
globalRateLimiter,
|
||||
(task.taskInterval != null)
|
||||
? rateLimit(task.taskInterval)
|
||||
: null,
|
||||
map((item) => {
|
||||
return Promise.try(() => {
|
||||
return runTask(task, item);
|
||||
}).tap(() => {
|
||||
return backend.forItem({ task: task, item: item }).internal.unlock();
|
||||
});
|
||||
}),
|
||||
(task.parallelTasks != null)
|
||||
? parallelize(task.parallelTasks)
|
||||
: null
|
||||
]);
|
||||
};
|
||||
};
|
@ -1,53 +0,0 @@
|
||||
"use strict";
|
||||
|
||||
const chalk = require("chalk");
|
||||
const pipe = require("@promistream/pipe");
|
||||
const filter = require("@promistream/filter");
|
||||
const map = require("@promistream/map");
|
||||
|
||||
const logStatus = require("./log-status");
|
||||
|
||||
module.exports = function ({ backend }) {
|
||||
function runTask(task, item) {
|
||||
let queue = [];
|
||||
let api = backend.forItem({ task: task, id: item.id, mutationQueue: queue });
|
||||
|
||||
return Promise.try(() => {
|
||||
logStatus(task, chalk.bold.cyan, "started", item.id);
|
||||
|
||||
// NOTE: We only pass in the item data itself, *not* any associated metadata like tags. If the scraping task wants access to that sort of information, it should do a `getItem` call from within its task logic where needed.
|
||||
// FIXME: Is that actually still true post-refactor?
|
||||
task.run({
|
||||
data: item.data,
|
||||
... api.exposed
|
||||
});
|
||||
}).then(() => {
|
||||
return backend.topLevel.runInTransaction((tx) => {
|
||||
|
||||
// FIXME: use queue
|
||||
});
|
||||
}).then(async () => {
|
||||
await api.internal.markTaskCompleted();
|
||||
logStatus(task, chalk.bold.green, "completed", item.id);
|
||||
return { status: "completed", item: item };
|
||||
}).catch(async (error) => {
|
||||
await api.internal.markTaskFailed(null, { error });
|
||||
logStatus(task, chalk.bold.red, "failed", `${item.id}: ${error.stack}`);
|
||||
return { status: "failed", item: item, error: error };
|
||||
});
|
||||
}
|
||||
|
||||
return function createTaskKernelStream(task) {
|
||||
return pipe([
|
||||
backend.topLevel.getTaskStream(null, { task: task }),
|
||||
filter((item) => backend.forItem({ task: task, id: item.id }).internal.lock()),
|
||||
map((item) => {
|
||||
return Promise.try(() => {
|
||||
return runTask(task, item);
|
||||
}).tap(() => {
|
||||
return backend.forItem({ task: task, id: item.id }).internal.unlock();
|
||||
});
|
||||
})
|
||||
]);
|
||||
};
|
||||
};
|
@ -0,0 +1,24 @@
|
||||
"use strict";
|
||||
|
||||
const Promise = require("bluebird");
|
||||
|
||||
module.exports = function asyncInterval(interval, callback) {
|
||||
function doCycle() {
|
||||
let startTime = Date.now();
|
||||
|
||||
return Promise.try(() => {
|
||||
return callback();
|
||||
}).then(() => {
|
||||
// let elapsed = Date.now() - startTime;
|
||||
let elapsed = 0; // HACK: Temporary way to force that the full interval is always waited *between* operations
|
||||
|
||||
if (elapsed > interval) {
|
||||
return doCycle();
|
||||
} else {
|
||||
return Promise.delay(interval - elapsed).then(doCycle);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return doCycle();
|
||||
};
|
@ -0,0 +1,3 @@
|
||||
- locks table: make locked status a field instead of based on existence
|
||||
- rename locks table to queue table
|
||||
- insert tasks into queue table whenever drained for a task
|
Loading…
Reference in New Issue