diff --git a/src/app.js b/src/app.js index 7aae7f8..ac928c6 100644 --- a/src/app.js +++ b/src/app.js @@ -6,9 +6,15 @@ const path = require("path"); const defaultValue = require("default-value"); const moize = require("moize"); +const config = require("../config.json"); const knex = require("knex")(require("../knexfile")); -const createSynchronizer = require("./sync")({ knex: knex }); +let state = { + knex: knex, + scrapingHost: config.scrapingHost +}; + +const createSynchronizer = require("./sync")(state); createSynchronizer("datasheets_products", "datasheet:", (item) => { if (item.data.url != null) { diff --git a/src/sync/index.js b/src/sync/index.js index 494e140..4bb8bc1 100644 --- a/src/sync/index.js +++ b/src/sync/index.js @@ -4,9 +4,12 @@ const Promise = require("bluebird"); const matchValue = require("match-value"); const pipe = require("@promistream/pipe"); const simpleSink = require("@promistream/simple-sink"); -const updateStream = require("./update-stream"); -module.exports = function ({ knex }) { +module.exports = function (state) { + const updateStream = require("./update-stream")(state); + + let { knex } = state; + return function createSynchronizer(tableName, prefix, mapper, { getLastTimestamp } = {}) { return Promise.try(() => { if (getLastTimestamp != null) { diff --git a/src/sync/update-stream.js b/src/sync/update-stream.js index 2a52d15..ca4dc52 100644 --- a/src/sync/update-stream.js +++ b/src/sync/update-stream.js @@ -12,50 +12,52 @@ const fromNodeStream = require("@promistream/from-node-stream"); const createNDJSONParseStream = require("./ndjson-parse-stream"); -module.exports = function createUpdateStream({ since, prefix } = {}) { - let lastTimestamp = since ?? new Date(0); - - return pipe([ - simpleSource(() => { - function attempt() { - return Promise.try(() => { - // To ensure that we don't hammer the srap instance - return Promise.delay(5 * 1000); - }).then(() => { - return bhttp.get(`http://localhost:3000/updates?prefix=${encodeURIComponent(prefix)}&since=${Math.floor(lastTimestamp.getTime())}`, { - stream: true - }); - }).then((response) => { - if (response.statusCode === 200) { - return fromNodeStream.fromReadable(response); - } else { - throw new Error(`Got unexpected status code ${response.statusCode}`); - } - }).catch({ code: "ECONNREFUSED" }, (_error) => { - // Scraping server is down, try again in a minute or so - console.warn("WARNING: Scraping server is not reachable! Retrying in a minute..."); - +module.exports = function({ scrapingHost }) { + return function createUpdateStream({ since, prefix } = {}) { + let lastTimestamp = since ?? new Date(0); + + return pipe([ + simpleSource(() => { + function attempt() { return Promise.try(() => { - return Promise.delay(60 * 1000); + // To ensure that we don't hammer the srap instance + return Promise.delay(5 * 1000); }).then(() => { - return attempt(); + return bhttp.get(`http://${scrapingHost}/updates?prefix=${encodeURIComponent(prefix)}&since=${Math.floor(lastTimestamp.getTime())}`, { + stream: true + }); + }).then((response) => { + if (response.statusCode === 200) { + return fromNodeStream.fromReadable(response); + } else { + throw new Error(`Got unexpected status code ${response.statusCode}`); + } + }).catch({ code: "ECONNREFUSED" }, (_error) => { + // Scraping server is down, try again in a minute or so + console.warn("WARNING: Scraping server is not reachable! Retrying in a minute..."); + + return Promise.try(() => { + return Promise.delay(60 * 1000); + }).then(() => { + return attempt(); + }); }); - }); - } + } - return attempt(); - }), - createCombineSequentialStream(), - createNDJSONParseStream(), - createSpyStream((item) => { - if (item.updatedAt != null) { - // TODO: Can this be made significantly more performant by string-sorting the timestamps in ISO format directly, instead of going through a parsing cycle? - let itemDate = new Date(item.updatedAt); + return attempt(); + }), + createCombineSequentialStream(), + createNDJSONParseStream(), + createSpyStream((item) => { + if (item.updatedAt != null) { + // TODO: Can this be made significantly more performant by string-sorting the timestamps in ISO format directly, instead of going through a parsing cycle? + let itemDate = new Date(item.updatedAt); - if (itemDate > lastTimestamp) { - lastTimestamp = itemDate; + if (itemDate > lastTimestamp) { + lastTimestamp = itemDate; + } } - } - }) - ]); + }) + ]); + }; };