|
|
@ -12,7 +12,8 @@ const fromNodeStream = require("@promistream/from-node-stream");
|
|
|
|
|
|
|
|
|
|
|
|
const createNDJSONParseStream = require("./ndjson-parse-stream");
|
|
|
|
const createNDJSONParseStream = require("./ndjson-parse-stream");
|
|
|
|
|
|
|
|
|
|
|
|
module.exports = function createUpdateStream({ since, prefix } = {}) {
|
|
|
|
module.exports = function({ scrapingHost }) {
|
|
|
|
|
|
|
|
return function createUpdateStream({ since, prefix } = {}) {
|
|
|
|
let lastTimestamp = since ?? new Date(0);
|
|
|
|
let lastTimestamp = since ?? new Date(0);
|
|
|
|
|
|
|
|
|
|
|
|
return pipe([
|
|
|
|
return pipe([
|
|
|
@ -22,7 +23,7 @@ module.exports = function createUpdateStream({ since, prefix } = {}) {
|
|
|
|
// To ensure that we don't hammer the srap instance
|
|
|
|
// To ensure that we don't hammer the srap instance
|
|
|
|
return Promise.delay(5 * 1000);
|
|
|
|
return Promise.delay(5 * 1000);
|
|
|
|
}).then(() => {
|
|
|
|
}).then(() => {
|
|
|
|
return bhttp.get(`http://localhost:3000/updates?prefix=${encodeURIComponent(prefix)}&since=${Math.floor(lastTimestamp.getTime())}`, {
|
|
|
|
return bhttp.get(`http://${scrapingHost}/updates?prefix=${encodeURIComponent(prefix)}&since=${Math.floor(lastTimestamp.getTime())}`, {
|
|
|
|
stream: true
|
|
|
|
stream: true
|
|
|
|
});
|
|
|
|
});
|
|
|
|
}).then((response) => {
|
|
|
|
}).then((response) => {
|
|
|
@ -58,4 +59,5 @@ module.exports = function createUpdateStream({ since, prefix } = {}) {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
})
|
|
|
|
})
|
|
|
|
]);
|
|
|
|
]);
|
|
|
|
|
|
|
|
};
|
|
|
|
};
|
|
|
|
};
|
|
|
|