From 522192f025afe0cdf2f16cadf6f0c0932d60151d Mon Sep 17 00:00:00 2001 From: Sven Slootweg Date: Fri, 4 Mar 2022 01:47:44 +0100 Subject: [PATCH] WIP, metrics --- README.md | 3 + bin/run | 14 +++++ bin/server | 139 +++++++++++++++++++++++++++++++++++++++++++++ package.json | 4 +- src/index.js | 109 ----------------------------------- src/initialize.js | 39 ++++++++++++- src/kernel.js | 14 ++++- src/queries.js | 37 ++++++++---- src/task-stream.js | 7 ++- yarn.lock | 61 ++++++++++++++++++++ 10 files changed, 302 insertions(+), 125 deletions(-) create mode 100644 README.md create mode 100755 bin/server delete mode 100644 src/index.js diff --git a/README.md b/README.md new file mode 100644 index 0000000..87afad9 --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# srap + +An unopinionated tag-based scraping server. Documentation coming soon™. diff --git a/bin/run b/bin/run index 037b346..663fec8 100755 --- a/bin/run +++ b/bin/run @@ -8,6 +8,7 @@ const Promise = require("bluebird"); const yargs = require("yargs"); const path = require("path"); +const express = require("express"); const createKernel = require("../src/kernel"); @@ -21,4 +22,17 @@ return Promise.try(() => { return createKernel(configuration); }).then((kernel) => { kernel.run(); + + let metricsApp = express(); + + metricsApp.get("/metrics", (req, res) => { + return Promise.try(() => { + return kernel.getMetrics(); + }).then(({ contentType, metrics }) => { + res.set("Content-Type", contentType); + res.send(metrics); + }); + }); + + metricsApp.listen(3131); }); diff --git a/bin/server b/bin/server new file mode 100755 index 0000000..72688b7 --- /dev/null +++ b/bin/server @@ -0,0 +1,139 @@ +#!/usr/bin/env node + +"use strict"; + +// let realConsoleLog = console.log.bind(console); +// console.log = function(...args) { +// realConsoleLog(`[console.log called from ${(new Error).stack.split("\n")[2].replace(/^\s* at /, "")}]`); +// realConsoleLog(...args); +// }; + +// let realConsoleError = console.error.bind(console); +// console.error = function(...args) { +// realConsoleError(`[console.error called from ${(new Error).stack.split("\n")[2].replace(/^\s* at /, "")}]`); +// realConsoleError(...args); +// }; + +// FIXME: All of this is a work-in-progress, zero stability guarantees! + +const Promise = require("bluebird"); +const express = require("express"); +const expressPromiseRouter = require("express-promise-router"); +const assert = require("assert"); +const path = require("path"); + +const pipe = require("@promistream/pipe"); +const fromNodeStream = require("@promistream/from-node-stream"); +const map = require("@promistream/map"); + +const { testValue } = require("@validatem/core"); +const matchesFormat = require("@validatem/matches-format"); +const isString = require("@validatem/is-string"); + +const initialize = require("../src/initialize"); +const errors = require("../src/errors"); + +assert(process.argv.length >= 4); +let configurationPath = process.argv[2]; +let listenHost = process.argv[3]; + +let absoluteConfigurationPath = path.join(process.cwd(), configurationPath); +let configuration = require(absoluteConfigurationPath); + +return Promise.try(() => { + // FIXME: Deduplicate this with kernel! Also other common wiring across binaries... + return initialize({ + knexfile: { + client: "pg", + connection: configuration.database, + pool: { min: 0, max: 32 }, + migrations: { tableName: "srap_knex_migrations" } + } + }); +}).then((state) => { + let { db, knex } = state; + + const queries = require("../src/queries")(state); + + let app = express(); + let router = expressPromiseRouter(); + + // router.get("/items/:id", (req, res) => { + // return Promise.try(() => { + // return db.Item.query(knex).findById(req.params.id); + // }).then((item) => { + // if (item != null) { + // res.json(item); + // } else { + // throw new errors.NotFound(`No such item exists`); + // } + // }); + // }); + + // router.delete("/items/:id", (req, res) => { + // return Promise.try(() => { + // return db.Item.query(knex) + // .findById(req.params.id) + // .delete(); + // }).then((affectedRows) => { + // if (affectedRows > 0) { + // res.status(204).end(); + // } else { + // throw new errors.NotFound(`No such item exists`); + // } + // }); + // }); + + // // MARKER: Stub out routes, replace error implementation, add response generation for HTTP errors + + // router.get("/items/:id/metadata", (req, res) => { + + // }); + + // router.get("/items/:id/operations", (req, res) => { + + // }); + + // router.get("/items/:id/operations/:operation/expire", (req, res) => { + + // }); + + // router.post("/items/add", (req, res) => { + + // }); + + // router.put("/items/add/:id", (req, res) => { + + // }); + + // router.get("/items/:id", (req, res) => { + + // }); + + router.get("/updates", (req, res) => { + // FIXME: Proper Express integration for Validatem + let isValid = testValue(req.query, { + since: matchesFormat(/^[0-9]+$/), + prefix: isString + }); + + if (isValid) { + let timestamp = (req.query.since != null) + ? new Date(parseInt(req.query.since)) + : undefined; + + return pipe([ + queries.getUpdates(knex, { prefix: req.query.prefix, timestamp: timestamp }), + map((item) => JSON.stringify(item) + "\n"), + fromNodeStream(res) + ]).read(); + } else { + res.status(422).send("Invalid request"); + } + }); + + app.use(router); + app.listen({ host: listenHost, port: 3000 }, () => { + console.log("Server listening on port 3000"); + }); +}); diff --git a/package.json b/package.json index ca1e5f3..4af9cf2 100644 --- a/package.json +++ b/package.json @@ -21,7 +21,7 @@ "@validatem/any-property": "^0.1.3", "@validatem/anything": "^0.1.0", "@validatem/array-of": "^0.1.2", - "@validatem/core": "^0.3.15", + "@validatem/core": "^0.3.16", "@validatem/default-to": "^0.1.0", "@validatem/error": "^1.1.0", "@validatem/is-boolean": "^0.1.1", @@ -29,6 +29,7 @@ "@validatem/is-function": "^0.1.0", "@validatem/is-number": "^0.1.3", "@validatem/is-string": "^1.0.0", + "@validatem/matches-format": "^0.1.0", "@validatem/require-either": "^0.1.0", "@validatem/required": "^0.1.1", "@validatem/wrap-value-as-option": "^0.1.0", @@ -47,6 +48,7 @@ "objection": "^2.2.14", "pg": "^8.5.1", "pg-query-stream": "^4.1.0", + "prom-client": "^14.0.1", "syncpipe": "^1.0.0", "yargs": "^16.2.0" }, diff --git a/src/index.js b/src/index.js deleted file mode 100644 index cdc10a2..0000000 --- a/src/index.js +++ /dev/null @@ -1,109 +0,0 @@ -"use strict"; - -// let realConsoleLog = console.log.bind(console); -// console.log = function(...args) { -// realConsoleLog(`[console.log called from ${(new Error).stack.split("\n")[2].replace(/^\s* at /, "")}]`); -// realConsoleLog(...args); -// }; - -// let realConsoleError = console.error.bind(console); -// console.error = function(...args) { -// realConsoleError(`[console.error called from ${(new Error).stack.split("\n")[2].replace(/^\s* at /, "")}]`); -// realConsoleError(...args); -// }; - -const Promise = require("bluebird"); -const express = require("express"); -const expressPromiseRouter = require("express-promise-router"); - -const pipe = require("@promistream/pipe"); -const fromNodeStream = require("@promistream/from-node-stream"); -const map = require("@promistream/map"); - -const initialize = require("./initialize"); -const errors = require("./errors"); - -return Promise.try(() => { - return initialize({ - knexfile: require("../knexfile") - }); -}).then((state) => { - let { db, knex } = state; - - const queries = require("./queries")(state); - - let app = express(); - let router = expressPromiseRouter(); - - router.get("/items/:id", (req, res) => { - return Promise.try(() => { - return db.Item.query(knex).findById(req.params.id); - }).then((item) => { - if (item != null) { - res.json(item); - } else { - throw new errors.NotFound(`No such item exists`); - } - }); - }); - - router.delete("/items/:id", (req, res) => { - return Promise.try(() => { - return db.Item.query(knex) - .findById(req.params.id) - .delete(); - }).then((affectedRows) => { - if (affectedRows > 0) { - res.status(204).end(); - } else { - throw new errors.NotFound(`No such item exists`); - } - }); - }); - - // MARKER: Stub out routes, replace error implementation, add response generation for HTTP errors - - router.get("/items/:id/metadata", (req, res) => { - - }); - - router.get("/items/:id/operations", (req, res) => { - - }); - - router.get("/items/:id/operations/:operation/expire", (req, res) => { - - }); - - router.post("/items/add", (req, res) => { - - }); - - router.put("/items/add/:id", (req, res) => { - - }); - - router.get("/items/:id", (req, res) => { - - }); - - router.get("/updates", (req, res) => { - let timestamp = (req.query.since != null) - ? new Date(parseInt(req.query.since)) - : undefined; - - console.log({ prefix: req.query.prefix, timestamp: timestamp }); - - // FIXME: Validation! - return pipe([ - queries.getUpdates(knex, { prefix: req.query.prefix, timestamp: timestamp }), - map((item) => JSON.stringify(item) + "\n"), - fromNodeStream(res) - ]).read(); - }); - - app.use(router); - app.listen(3000, () => { - console.log("Server listening on port 3000"); - }); -}); diff --git a/src/initialize.js b/src/initialize.js index 686fce9..7b7c96b 100644 --- a/src/initialize.js +++ b/src/initialize.js @@ -4,19 +4,56 @@ const Promise = require("bluebird"); const path = require("path"); const knex = require("knex"); const { knexSnakeCaseMappers } = require("objection"); +const prometheusClient = require("prom-client"); const models = require("./models"); let migrationsFolder = path.join(__dirname, "../migrations"); module.exports = function initialize({ knexfile }) { + let prometheusRegistry = new prometheusClient.Registry(); + prometheusClient.collectDefaultMetrics({ register: prometheusRegistry }); + let knexInstance = knex({ ... knexfile, ... knexSnakeCaseMappers() }); let state = { - knex: knexInstance + knex: knexInstance, + prometheusRegistry: prometheusRegistry, + metrics: { + storedItems: new prometheusClient.Counter({ + registers: [ prometheusRegistry ], + name: "srap_stored_items_total", + help: "Amount of items that have been created or updated", + labelNames: [ "tag" ] + }), + successfulItems: new prometheusClient.Counter({ + registers: [ prometheusRegistry ], + name: "srap_successful_items_total", + help: "Amount of items that have been successfully processed", + labelNames: [ "task" ] + }), + failedItems: new prometheusClient.Counter({ + registers: [ prometheusRegistry ], + name: "srap_failed_items_total", + help: "Amount of items that have failed during processing", + labelNames: [ "task" ] + }), + taskFetchTime: new prometheusClient.Gauge({ + registers: [ prometheusRegistry ], + name: "srap_task_fetch_seconds", + help: "Time needed for the most recent attempt at fetching new scraping tasks", + labelNames: [ "task" ] + }), + taskFetchResults: new prometheusClient.Gauge({ + registers: [ prometheusRegistry ], + name: "srap_task_fetch_results_count", + help: "Amount of new scraping tasks fetched during the most recent attempt", + labelNames: [ "task" ] + }) + } }; return Promise.try(() => { diff --git a/src/kernel.js b/src/kernel.js index 39c2541..357a21c 100644 --- a/src/kernel.js +++ b/src/kernel.js @@ -51,7 +51,7 @@ module.exports = function createKernel(configuration) { const createTaskStream = require("./task-stream")(state); const createDatabaseQueue = require("./queued-database-api")(state); - let { knex } = state; + let { knex, prometheusRegistry, metrics } = state; let { dependencyMap, dependentMap } = createDependencyMap(configuration); function insertSeeds() { @@ -108,6 +108,8 @@ module.exports = function createKernel(configuration) { return pipe([ taskStream, simpleSink((completedItem) => { + metrics.successfulItems.inc(1); + metrics.successfulItems.labels({ task: task }).inc(1); logStatus(task, chalk.bold.green, "completed", completedItem.id); }) ]).read(); @@ -209,6 +211,16 @@ module.exports = function createKernel(configuration) { shutdown: function () { // TODO: Properly lock all public methods after shutdown is called, and wait for any running tasks to have completed knex.destroy(); + }, + getMetrics: function () { + return Promise.try(() => { + return prometheusRegistry.metrics(); + }).then((metrics) => { + return { + contentType: prometheusRegistry.contentType, + metrics: metrics + }; + }); } }; }); diff --git a/src/queries.js b/src/queries.js index 36230ad..d1bbc1e 100644 --- a/src/queries.js +++ b/src/queries.js @@ -43,7 +43,7 @@ function taskResultsToObject(taskResults) { ]); } -module.exports = function ({ db, knex }) { +module.exports = function ({ db, knex, metrics }) { return { // FIXME: Make object API instead getItem: function (_tx, _id, _optional) { @@ -116,17 +116,30 @@ module.exports = function ({ db, knex }) { updatedAt: new Date() }; - if (allowUpsert) { - // NOTE: We *always* do upserts here, even if the user specified `data` rather than `update`, because tags and aliases should always be added even if the item itself already exists. We trust the user not to accidentally reuse IDs between different kinds of objects (which would break in various other ways anyway). - return db.Item.query(tx).upsertGraph(newItem, { - insertMissing: true, - noDelete: true - }); - } else { - return db.Item.query(tx).insertGraph(newItem, { - insertMissing: true - }); - } + return Promise.try(() => { + if (allowUpsert) { + // NOTE: We *always* do upserts here, even if the user specified `data` rather than `update`, because tags and aliases should always be added even if the item itself already exists. We trust the user not to accidentally reuse IDs between different kinds of objects (which would break in various other ways anyway). + return db.Item.query(tx).upsertGraph(newItem, { + insertMissing: true, + noDelete: true + }); + } else { + return db.Item.query(tx).insertGraph(newItem, { + insertMissing: true + }); + } + }).tap(() => { + // FIXME: We should probably move the metrics stuff to the wrapper instead, so that it works for *any* backend + metrics.storedItems.inc(1); + + // TODO: This currently produces somewhat misleading metrics; it only counts *explicitly specified* tags. That will *mostly* correlate to amount of genuinely-new items, but not perfectly. In the future, we should probably refactor the insertion code such that it is aware of the *current* tags of an item that it is about to merge into - but maybe that should be delayed until the zapdb migration. + if (newItem.tags != null) { + for (let tag of newItem.tags) { + metrics.storedItems.labels({ tag: tag.name }).inc(1); + } + } + }); + }).catch({ name: "UniqueViolationError", table: "srap_items" }, (error) => { if (failIfExists) { throw error; diff --git a/src/task-stream.js b/src/task-stream.js index 0e4a48d..5a8e4b9 100644 --- a/src/task-stream.js +++ b/src/task-stream.js @@ -89,7 +89,7 @@ module.exports = function (state) { const queries = require("./queries")(state); const createDatabaseQueue = require("./queued-database-api")(state); - let { knex, db } = state; + let { knex, db, metrics } = state; // FIXME: Transaction support! @@ -120,6 +120,9 @@ module.exports = function (state) { }).then((result) => { let timeElapsed = Date.now() - startTime; + metrics.taskFetchTime.labels({ task: task }).set(timeElapsed / 1000); + metrics.taskFetchResults.labels({ task: task }).set(result.rowCount); + debug(`Task retrieval query for '${task}' took ${timeElapsed}ms and produced ${result.rowCount} results`); if (result.rowCount > 0) { @@ -163,6 +166,8 @@ module.exports = function (state) { : null }); }).catch((error) => { + metrics.failedItems.inc(1); + metrics.failedItems.labels({ task: task }).inc(1); logStatus(task, chalk.bold.red, "failed", `${item.id}: ${error.stack}`); let commonUpdate = { diff --git a/yarn.lock b/yarn.lock index 2cc38b5..48d7d12 100644 --- a/yarn.lock +++ b/yarn.lock @@ -343,6 +343,32 @@ supports-color "^7.1.0" syncpipe "^1.0.0" +"@validatem/core@^0.3.16": + version "0.3.16" + resolved "https://registry.yarnpkg.com/@validatem/core/-/core-0.3.16.tgz#0852d49c0f45d8938bb530696f735b3809621d34" + integrity sha512-s5KqnQhQMg6QTa3X6ceVCr6stAsKN8GqdEgkiHpI0fJSO2JFxpwIi9BjeB++zbAQ4xd4SLwcUz+Ujg7xE0WHVA== + dependencies: + "@validatem/annotate-errors" "^0.1.2" + "@validatem/any-property" "^0.1.0" + "@validatem/error" "^1.0.0" + "@validatem/match-validation-error" "^0.1.0" + "@validatem/match-versioned-special" "^0.1.0" + "@validatem/match-virtual-property" "^0.1.0" + "@validatem/normalize-rules" "^0.1.0" + "@validatem/required" "^0.1.0" + "@validatem/validation-result" "^0.1.1" + "@validatem/virtual-property" "^0.1.0" + as-expression "^1.0.0" + assure-array "^1.0.0" + create-error "^0.3.1" + default-value "^1.0.0" + execall "^2.0.0" + flatten "^1.0.3" + indent-string "^4.0.0" + is-arguments "^1.0.4" + supports-color "^7.1.0" + syncpipe "^1.0.0" + "@validatem/default-to@^0.1.0": version "0.1.0" resolved "https://registry.yarnpkg.com/@validatem/default-to/-/default-to-0.1.0.tgz#62766a3ca24d2f61a96c713bcb629a5b3c6427c5" @@ -480,6 +506,14 @@ resolved "https://registry.yarnpkg.com/@validatem/match-virtual-property/-/match-virtual-property-0.1.0.tgz#4de2de1075987b5f3b356d3f2bcf6c0be5b5fb83" integrity sha512-ssd3coFgwbLuqvZftLZTy3eHN0TFST8oTS2XTViQdXJPXVoJmwEKBpFhXgwnb5Ly1CE037R/KWpjhd1TP/56kQ== +"@validatem/matches-format@^0.1.0": + version "0.1.0" + resolved "https://registry.yarnpkg.com/@validatem/matches-format/-/matches-format-0.1.0.tgz#cb4ac6144c9769a6db3a0b36637b090b49f0142b" + integrity sha512-V3w6ajCNUx4qEsib5G+Bl1zGwXFm0COosg4dtz6lHr9m8mkP4CajzHZES6eSSojOlSrKvP/OAG3hzv77d1OTEQ== + dependencies: + "@validatem/error" "^1.0.0" + is-regex "^1.0.5" + "@validatem/normalize-rules@^0.1.0": version "0.1.3" resolved "https://registry.yarnpkg.com/@validatem/normalize-rules/-/normalize-rules-0.1.3.tgz#59fd6193b1091ff97b5c723b32c9bb1fe2a9dc9c" @@ -727,6 +761,11 @@ base@^0.11.1: mixin-deep "^1.2.0" pascalcase "^0.1.1" +bintrees@1.0.1: + version "1.0.1" + resolved "https://registry.yarnpkg.com/bintrees/-/bintrees-1.0.1.tgz#0e655c9b9c2435eaab68bf4027226d2b55a34524" + integrity sha1-DmVcm5wkNeqraL9AJyJtK1WjRSQ= + bluebird@^3.5.4, bluebird@^3.7.2: version "3.7.2" resolved "https://registry.yarnpkg.com/bluebird/-/bluebird-3.7.2.tgz#9f229c15be272454ffa973ace0dbee79a1b0c36f" @@ -1852,6 +1891,14 @@ is-promise@^4.0.0: resolved "https://registry.yarnpkg.com/is-promise/-/is-promise-4.0.0.tgz#42ff9f84206c1991d26debf520dd5c01042dd2f3" integrity sha512-hvpoI6korhJMnej285dSg6nu1+e6uxs7zG3BYAm5byqDsgJNWwxzM6z6iZiAgQR4TJ30JmBTOwqZUw3WlyH3AQ== +is-regex@^1.0.5: + version "1.1.4" + resolved "https://registry.yarnpkg.com/is-regex/-/is-regex-1.1.4.tgz#eef5663cd59fa4c0ae339505323df6854bb15958" + integrity sha512-kvRdxDsxZjhzUX07ZnLydzS1TU/TJlTUHHY4YLL87e37oUA49DfkLqgy+VjFocowy29cKvcSiu+kIv728jTTVg== + dependencies: + call-bind "^1.0.2" + has-tostringtag "^1.0.0" + is-regexp@^2.0.0: version "2.1.0" resolved "https://registry.yarnpkg.com/is-regexp/-/is-regexp-2.1.0.tgz#cd734a56864e23b956bf4e7c66c396a4c0b22c2d" @@ -2450,6 +2497,13 @@ progress@^2.0.0: resolved "https://registry.yarnpkg.com/progress/-/progress-2.0.3.tgz#7e8cf8d8f5b8f239c1bc68beb4eb78567d572ef8" integrity sha512-7PiHtLll5LdnKIMw100I+8xJXR5gW2QwWYkT6iJva0bXitZKa/XMrSbdmg3r2Xnaidz9Qumd0VPaMrZlF9V9sA== +prom-client@^14.0.1: + version "14.0.1" + resolved "https://registry.yarnpkg.com/prom-client/-/prom-client-14.0.1.tgz#bdd9583e02ec95429677c0e013712d42ef1f86a8" + integrity sha512-HxTArb6fkOntQHoRGvv4qd/BkorjliiuO2uSWC2KC17MUTKYttWdDoXX/vxOhQdkoECEM9BBH0pj2l8G8kev6w== + dependencies: + tdigest "^0.1.1" + proxy-addr@~2.0.5: version "2.0.7" resolved "https://registry.yarnpkg.com/proxy-addr/-/proxy-addr-2.0.7.tgz#f19fe69ceab311eeb94b42e70e8c2070f9ba1025" @@ -2839,6 +2893,13 @@ tarn@^3.0.1: resolved "https://registry.yarnpkg.com/tarn/-/tarn-3.0.1.tgz#ebac2c6dbc6977d34d4526e0a7814200386a8aec" integrity sha512-6usSlV9KyHsspvwu2duKH+FMUhqJnAh6J5J/4MITl8s94iSUQTLkJggdiewKv4RyARQccnigV48Z+khiuVZDJw== +tdigest@^0.1.1: + version "0.1.1" + resolved "https://registry.yarnpkg.com/tdigest/-/tdigest-0.1.1.tgz#2e3cb2c39ea449e55d1e6cd91117accca4588021" + integrity sha1-Ljyyw56kSeVdHmzZEReszKRYgCE= + dependencies: + bintrees "1.0.1" + text-table@^0.2.0: version "0.2.0" resolved "https://registry.yarnpkg.com/text-table/-/text-table-0.2.0.tgz#7f5ee823ae805207c00af2df4a84ec3fcfa570b4"