WIP, metrics

backend-refactor
Sven Slootweg 2 years ago
parent 90ef557a30
commit 522192f025

@ -0,0 +1,3 @@
# srap
An unopinionated tag-based scraping server. Documentation coming soon™.

@ -8,6 +8,7 @@
const Promise = require("bluebird");
const yargs = require("yargs");
const path = require("path");
const express = require("express");
const createKernel = require("../src/kernel");
@ -21,4 +22,17 @@ return Promise.try(() => {
return createKernel(configuration);
}).then((kernel) => {
kernel.run();
let metricsApp = express();
metricsApp.get("/metrics", (req, res) => {
return Promise.try(() => {
return kernel.getMetrics();
}).then(({ contentType, metrics }) => {
res.set("Content-Type", contentType);
res.send(metrics);
});
});
metricsApp.listen(3131);
});

@ -0,0 +1,139 @@
#!/usr/bin/env node
"use strict";
// let realConsoleLog = console.log.bind(console);
// console.log = function(...args) {
// realConsoleLog(`[console.log called from ${(new Error).stack.split("\n")[2].replace(/^\s* at /, "")}]`);
// realConsoleLog(...args);
// };
// let realConsoleError = console.error.bind(console);
// console.error = function(...args) {
// realConsoleError(`[console.error called from ${(new Error).stack.split("\n")[2].replace(/^\s* at /, "")}]`);
// realConsoleError(...args);
// };
// FIXME: All of this is a work-in-progress, zero stability guarantees!
const Promise = require("bluebird");
const express = require("express");
const expressPromiseRouter = require("express-promise-router");
const assert = require("assert");
const path = require("path");
const pipe = require("@promistream/pipe");
const fromNodeStream = require("@promistream/from-node-stream");
const map = require("@promistream/map");
const { testValue } = require("@validatem/core");
const matchesFormat = require("@validatem/matches-format");
const isString = require("@validatem/is-string");
const initialize = require("../src/initialize");
const errors = require("../src/errors");
assert(process.argv.length >= 4);
let configurationPath = process.argv[2];
let listenHost = process.argv[3];
let absoluteConfigurationPath = path.join(process.cwd(), configurationPath);
let configuration = require(absoluteConfigurationPath);
return Promise.try(() => {
// FIXME: Deduplicate this with kernel! Also other common wiring across binaries...
return initialize({
knexfile: {
client: "pg",
connection: configuration.database,
pool: { min: 0, max: 32 },
migrations: { tableName: "srap_knex_migrations" }
}
});
}).then((state) => {
let { db, knex } = state;
const queries = require("../src/queries")(state);
let app = express();
let router = expressPromiseRouter();
// router.get("/items/:id", (req, res) => {
// return Promise.try(() => {
// return db.Item.query(knex).findById(req.params.id);
// }).then((item) => {
// if (item != null) {
// res.json(item);
// } else {
// throw new errors.NotFound(`No such item exists`);
// }
// });
// });
// router.delete("/items/:id", (req, res) => {
// return Promise.try(() => {
// return db.Item.query(knex)
// .findById(req.params.id)
// .delete();
// }).then((affectedRows) => {
// if (affectedRows > 0) {
// res.status(204).end();
// } else {
// throw new errors.NotFound(`No such item exists`);
// }
// });
// });
// // MARKER: Stub out routes, replace error implementation, add response generation for HTTP errors
// router.get("/items/:id/metadata", (req, res) => {
// });
// router.get("/items/:id/operations", (req, res) => {
// });
// router.get("/items/:id/operations/:operation/expire", (req, res) => {
// });
// router.post("/items/add", (req, res) => {
// });
// router.put("/items/add/:id", (req, res) => {
// });
// router.get("/items/:id", (req, res) => {
// });
router.get("/updates", (req, res) => {
// FIXME: Proper Express integration for Validatem
let isValid = testValue(req.query, {
since: matchesFormat(/^[0-9]+$/),
prefix: isString
});
if (isValid) {
let timestamp = (req.query.since != null)
? new Date(parseInt(req.query.since))
: undefined;
return pipe([
queries.getUpdates(knex, { prefix: req.query.prefix, timestamp: timestamp }),
map((item) => JSON.stringify(item) + "\n"),
fromNodeStream(res)
]).read();
} else {
res.status(422).send("Invalid request");
}
});
app.use(router);
app.listen({ host: listenHost, port: 3000 }, () => {
console.log("Server listening on port 3000");
});
});

@ -21,7 +21,7 @@
"@validatem/any-property": "^0.1.3",
"@validatem/anything": "^0.1.0",
"@validatem/array-of": "^0.1.2",
"@validatem/core": "^0.3.15",
"@validatem/core": "^0.3.16",
"@validatem/default-to": "^0.1.0",
"@validatem/error": "^1.1.0",
"@validatem/is-boolean": "^0.1.1",
@ -29,6 +29,7 @@
"@validatem/is-function": "^0.1.0",
"@validatem/is-number": "^0.1.3",
"@validatem/is-string": "^1.0.0",
"@validatem/matches-format": "^0.1.0",
"@validatem/require-either": "^0.1.0",
"@validatem/required": "^0.1.1",
"@validatem/wrap-value-as-option": "^0.1.0",
@ -47,6 +48,7 @@
"objection": "^2.2.14",
"pg": "^8.5.1",
"pg-query-stream": "^4.1.0",
"prom-client": "^14.0.1",
"syncpipe": "^1.0.0",
"yargs": "^16.2.0"
},

@ -1,109 +0,0 @@
"use strict";
// let realConsoleLog = console.log.bind(console);
// console.log = function(...args) {
// realConsoleLog(`[console.log called from ${(new Error).stack.split("\n")[2].replace(/^\s* at /, "")}]`);
// realConsoleLog(...args);
// };
// let realConsoleError = console.error.bind(console);
// console.error = function(...args) {
// realConsoleError(`[console.error called from ${(new Error).stack.split("\n")[2].replace(/^\s* at /, "")}]`);
// realConsoleError(...args);
// };
const Promise = require("bluebird");
const express = require("express");
const expressPromiseRouter = require("express-promise-router");
const pipe = require("@promistream/pipe");
const fromNodeStream = require("@promistream/from-node-stream");
const map = require("@promistream/map");
const initialize = require("./initialize");
const errors = require("./errors");
return Promise.try(() => {
return initialize({
knexfile: require("../knexfile")
});
}).then((state) => {
let { db, knex } = state;
const queries = require("./queries")(state);
let app = express();
let router = expressPromiseRouter();
router.get("/items/:id", (req, res) => {
return Promise.try(() => {
return db.Item.query(knex).findById(req.params.id);
}).then((item) => {
if (item != null) {
res.json(item);
} else {
throw new errors.NotFound(`No such item exists`);
}
});
});
router.delete("/items/:id", (req, res) => {
return Promise.try(() => {
return db.Item.query(knex)
.findById(req.params.id)
.delete();
}).then((affectedRows) => {
if (affectedRows > 0) {
res.status(204).end();
} else {
throw new errors.NotFound(`No such item exists`);
}
});
});
// MARKER: Stub out routes, replace error implementation, add response generation for HTTP errors
router.get("/items/:id/metadata", (req, res) => {
});
router.get("/items/:id/operations", (req, res) => {
});
router.get("/items/:id/operations/:operation/expire", (req, res) => {
});
router.post("/items/add", (req, res) => {
});
router.put("/items/add/:id", (req, res) => {
});
router.get("/items/:id", (req, res) => {
});
router.get("/updates", (req, res) => {
let timestamp = (req.query.since != null)
? new Date(parseInt(req.query.since))
: undefined;
console.log({ prefix: req.query.prefix, timestamp: timestamp });
// FIXME: Validation!
return pipe([
queries.getUpdates(knex, { prefix: req.query.prefix, timestamp: timestamp }),
map((item) => JSON.stringify(item) + "\n"),
fromNodeStream(res)
]).read();
});
app.use(router);
app.listen(3000, () => {
console.log("Server listening on port 3000");
});
});

@ -4,19 +4,56 @@ const Promise = require("bluebird");
const path = require("path");
const knex = require("knex");
const { knexSnakeCaseMappers } = require("objection");
const prometheusClient = require("prom-client");
const models = require("./models");
let migrationsFolder = path.join(__dirname, "../migrations");
module.exports = function initialize({ knexfile }) {
let prometheusRegistry = new prometheusClient.Registry();
prometheusClient.collectDefaultMetrics({ register: prometheusRegistry });
let knexInstance = knex({
... knexfile,
... knexSnakeCaseMappers()
});
let state = {
knex: knexInstance
knex: knexInstance,
prometheusRegistry: prometheusRegistry,
metrics: {
storedItems: new prometheusClient.Counter({
registers: [ prometheusRegistry ],
name: "srap_stored_items_total",
help: "Amount of items that have been created or updated",
labelNames: [ "tag" ]
}),
successfulItems: new prometheusClient.Counter({
registers: [ prometheusRegistry ],
name: "srap_successful_items_total",
help: "Amount of items that have been successfully processed",
labelNames: [ "task" ]
}),
failedItems: new prometheusClient.Counter({
registers: [ prometheusRegistry ],
name: "srap_failed_items_total",
help: "Amount of items that have failed during processing",
labelNames: [ "task" ]
}),
taskFetchTime: new prometheusClient.Gauge({
registers: [ prometheusRegistry ],
name: "srap_task_fetch_seconds",
help: "Time needed for the most recent attempt at fetching new scraping tasks",
labelNames: [ "task" ]
}),
taskFetchResults: new prometheusClient.Gauge({
registers: [ prometheusRegistry ],
name: "srap_task_fetch_results_count",
help: "Amount of new scraping tasks fetched during the most recent attempt",
labelNames: [ "task" ]
})
}
};
return Promise.try(() => {

@ -51,7 +51,7 @@ module.exports = function createKernel(configuration) {
const createTaskStream = require("./task-stream")(state);
const createDatabaseQueue = require("./queued-database-api")(state);
let { knex } = state;
let { knex, prometheusRegistry, metrics } = state;
let { dependencyMap, dependentMap } = createDependencyMap(configuration);
function insertSeeds() {
@ -108,6 +108,8 @@ module.exports = function createKernel(configuration) {
return pipe([
taskStream,
simpleSink((completedItem) => {
metrics.successfulItems.inc(1);
metrics.successfulItems.labels({ task: task }).inc(1);
logStatus(task, chalk.bold.green, "completed", completedItem.id);
})
]).read();
@ -209,6 +211,16 @@ module.exports = function createKernel(configuration) {
shutdown: function () {
// TODO: Properly lock all public methods after shutdown is called, and wait for any running tasks to have completed
knex.destroy();
},
getMetrics: function () {
return Promise.try(() => {
return prometheusRegistry.metrics();
}).then((metrics) => {
return {
contentType: prometheusRegistry.contentType,
metrics: metrics
};
});
}
};
});

@ -43,7 +43,7 @@ function taskResultsToObject(taskResults) {
]);
}
module.exports = function ({ db, knex }) {
module.exports = function ({ db, knex, metrics }) {
return {
// FIXME: Make object API instead
getItem: function (_tx, _id, _optional) {
@ -116,17 +116,30 @@ module.exports = function ({ db, knex }) {
updatedAt: new Date()
};
if (allowUpsert) {
// NOTE: We *always* do upserts here, even if the user specified `data` rather than `update`, because tags and aliases should always be added even if the item itself already exists. We trust the user not to accidentally reuse IDs between different kinds of objects (which would break in various other ways anyway).
return db.Item.query(tx).upsertGraph(newItem, {
insertMissing: true,
noDelete: true
});
} else {
return db.Item.query(tx).insertGraph(newItem, {
insertMissing: true
});
}
return Promise.try(() => {
if (allowUpsert) {
// NOTE: We *always* do upserts here, even if the user specified `data` rather than `update`, because tags and aliases should always be added even if the item itself already exists. We trust the user not to accidentally reuse IDs between different kinds of objects (which would break in various other ways anyway).
return db.Item.query(tx).upsertGraph(newItem, {
insertMissing: true,
noDelete: true
});
} else {
return db.Item.query(tx).insertGraph(newItem, {
insertMissing: true
});
}
}).tap(() => {
// FIXME: We should probably move the metrics stuff to the wrapper instead, so that it works for *any* backend
metrics.storedItems.inc(1);
// TODO: This currently produces somewhat misleading metrics; it only counts *explicitly specified* tags. That will *mostly* correlate to amount of genuinely-new items, but not perfectly. In the future, we should probably refactor the insertion code such that it is aware of the *current* tags of an item that it is about to merge into - but maybe that should be delayed until the zapdb migration.
if (newItem.tags != null) {
for (let tag of newItem.tags) {
metrics.storedItems.labels({ tag: tag.name }).inc(1);
}
}
});
}).catch({ name: "UniqueViolationError", table: "srap_items" }, (error) => {
if (failIfExists) {
throw error;

@ -89,7 +89,7 @@ module.exports = function (state) {
const queries = require("./queries")(state);
const createDatabaseQueue = require("./queued-database-api")(state);
let { knex, db } = state;
let { knex, db, metrics } = state;
// FIXME: Transaction support!
@ -120,6 +120,9 @@ module.exports = function (state) {
}).then((result) => {
let timeElapsed = Date.now() - startTime;
metrics.taskFetchTime.labels({ task: task }).set(timeElapsed / 1000);
metrics.taskFetchResults.labels({ task: task }).set(result.rowCount);
debug(`Task retrieval query for '${task}' took ${timeElapsed}ms and produced ${result.rowCount} results`);
if (result.rowCount > 0) {
@ -163,6 +166,8 @@ module.exports = function (state) {
: null
});
}).catch((error) => {
metrics.failedItems.inc(1);
metrics.failedItems.labels({ task: task }).inc(1);
logStatus(task, chalk.bold.red, "failed", `${item.id}: ${error.stack}`);
let commonUpdate = {

@ -343,6 +343,32 @@
supports-color "^7.1.0"
syncpipe "^1.0.0"
"@validatem/core@^0.3.16":
version "0.3.16"
resolved "https://registry.yarnpkg.com/@validatem/core/-/core-0.3.16.tgz#0852d49c0f45d8938bb530696f735b3809621d34"
integrity sha512-s5KqnQhQMg6QTa3X6ceVCr6stAsKN8GqdEgkiHpI0fJSO2JFxpwIi9BjeB++zbAQ4xd4SLwcUz+Ujg7xE0WHVA==
dependencies:
"@validatem/annotate-errors" "^0.1.2"
"@validatem/any-property" "^0.1.0"
"@validatem/error" "^1.0.0"
"@validatem/match-validation-error" "^0.1.0"
"@validatem/match-versioned-special" "^0.1.0"
"@validatem/match-virtual-property" "^0.1.0"
"@validatem/normalize-rules" "^0.1.0"
"@validatem/required" "^0.1.0"
"@validatem/validation-result" "^0.1.1"
"@validatem/virtual-property" "^0.1.0"
as-expression "^1.0.0"
assure-array "^1.0.0"
create-error "^0.3.1"
default-value "^1.0.0"
execall "^2.0.0"
flatten "^1.0.3"
indent-string "^4.0.0"
is-arguments "^1.0.4"
supports-color "^7.1.0"
syncpipe "^1.0.0"
"@validatem/default-to@^0.1.0":
version "0.1.0"
resolved "https://registry.yarnpkg.com/@validatem/default-to/-/default-to-0.1.0.tgz#62766a3ca24d2f61a96c713bcb629a5b3c6427c5"
@ -480,6 +506,14 @@
resolved "https://registry.yarnpkg.com/@validatem/match-virtual-property/-/match-virtual-property-0.1.0.tgz#4de2de1075987b5f3b356d3f2bcf6c0be5b5fb83"
integrity sha512-ssd3coFgwbLuqvZftLZTy3eHN0TFST8oTS2XTViQdXJPXVoJmwEKBpFhXgwnb5Ly1CE037R/KWpjhd1TP/56kQ==
"@validatem/matches-format@^0.1.0":
version "0.1.0"
resolved "https://registry.yarnpkg.com/@validatem/matches-format/-/matches-format-0.1.0.tgz#cb4ac6144c9769a6db3a0b36637b090b49f0142b"
integrity sha512-V3w6ajCNUx4qEsib5G+Bl1zGwXFm0COosg4dtz6lHr9m8mkP4CajzHZES6eSSojOlSrKvP/OAG3hzv77d1OTEQ==
dependencies:
"@validatem/error" "^1.0.0"
is-regex "^1.0.5"
"@validatem/normalize-rules@^0.1.0":
version "0.1.3"
resolved "https://registry.yarnpkg.com/@validatem/normalize-rules/-/normalize-rules-0.1.3.tgz#59fd6193b1091ff97b5c723b32c9bb1fe2a9dc9c"
@ -727,6 +761,11 @@ base@^0.11.1:
mixin-deep "^1.2.0"
pascalcase "^0.1.1"
bintrees@1.0.1:
version "1.0.1"
resolved "https://registry.yarnpkg.com/bintrees/-/bintrees-1.0.1.tgz#0e655c9b9c2435eaab68bf4027226d2b55a34524"
integrity sha1-DmVcm5wkNeqraL9AJyJtK1WjRSQ=
bluebird@^3.5.4, bluebird@^3.7.2:
version "3.7.2"
resolved "https://registry.yarnpkg.com/bluebird/-/bluebird-3.7.2.tgz#9f229c15be272454ffa973ace0dbee79a1b0c36f"
@ -1852,6 +1891,14 @@ is-promise@^4.0.0:
resolved "https://registry.yarnpkg.com/is-promise/-/is-promise-4.0.0.tgz#42ff9f84206c1991d26debf520dd5c01042dd2f3"
integrity sha512-hvpoI6korhJMnej285dSg6nu1+e6uxs7zG3BYAm5byqDsgJNWwxzM6z6iZiAgQR4TJ30JmBTOwqZUw3WlyH3AQ==
is-regex@^1.0.5:
version "1.1.4"
resolved "https://registry.yarnpkg.com/is-regex/-/is-regex-1.1.4.tgz#eef5663cd59fa4c0ae339505323df6854bb15958"
integrity sha512-kvRdxDsxZjhzUX07ZnLydzS1TU/TJlTUHHY4YLL87e37oUA49DfkLqgy+VjFocowy29cKvcSiu+kIv728jTTVg==
dependencies:
call-bind "^1.0.2"
has-tostringtag "^1.0.0"
is-regexp@^2.0.0:
version "2.1.0"
resolved "https://registry.yarnpkg.com/is-regexp/-/is-regexp-2.1.0.tgz#cd734a56864e23b956bf4e7c66c396a4c0b22c2d"
@ -2450,6 +2497,13 @@ progress@^2.0.0:
resolved "https://registry.yarnpkg.com/progress/-/progress-2.0.3.tgz#7e8cf8d8f5b8f239c1bc68beb4eb78567d572ef8"
integrity sha512-7PiHtLll5LdnKIMw100I+8xJXR5gW2QwWYkT6iJva0bXitZKa/XMrSbdmg3r2Xnaidz9Qumd0VPaMrZlF9V9sA==
prom-client@^14.0.1:
version "14.0.1"
resolved "https://registry.yarnpkg.com/prom-client/-/prom-client-14.0.1.tgz#bdd9583e02ec95429677c0e013712d42ef1f86a8"
integrity sha512-HxTArb6fkOntQHoRGvv4qd/BkorjliiuO2uSWC2KC17MUTKYttWdDoXX/vxOhQdkoECEM9BBH0pj2l8G8kev6w==
dependencies:
tdigest "^0.1.1"
proxy-addr@~2.0.5:
version "2.0.7"
resolved "https://registry.yarnpkg.com/proxy-addr/-/proxy-addr-2.0.7.tgz#f19fe69ceab311eeb94b42e70e8c2070f9ba1025"
@ -2839,6 +2893,13 @@ tarn@^3.0.1:
resolved "https://registry.yarnpkg.com/tarn/-/tarn-3.0.1.tgz#ebac2c6dbc6977d34d4526e0a7814200386a8aec"
integrity sha512-6usSlV9KyHsspvwu2duKH+FMUhqJnAh6J5J/4MITl8s94iSUQTLkJggdiewKv4RyARQccnigV48Z+khiuVZDJw==
tdigest@^0.1.1:
version "0.1.1"
resolved "https://registry.yarnpkg.com/tdigest/-/tdigest-0.1.1.tgz#2e3cb2c39ea449e55d1e6cd91117accca4588021"
integrity sha1-Ljyyw56kSeVdHmzZEReszKRYgCE=
dependencies:
bintrees "1.0.1"
text-table@^0.2.0:
version "0.2.0"
resolved "https://registry.yarnpkg.com/text-table/-/text-table-0.2.0.tgz#7f5ee823ae805207c00af2df4a84ec3fcfa570b4"

Loading…
Cancel
Save