"use strict" ;
const Promise = require ( "bluebird" ) ;
const bhttp = require ( "bhttp" ) ;
const pipe = require ( "@promistream/pipe" ) ;
const simpleSource = require ( "@promistream/simple-source" ) ;
const createCombineSequentialStream = require ( "@promistream/combine-sequential-streaming" ) ;
const createSpyStream = require ( "@promistream/spy" ) ;
const fromNodeStream = require ( "@promistream/from-node-stream" ) ;
const createNDJSONParseStream = require ( "./ndjson-parse-stream" ) ;
module . exports = function ( { scrapingHost } ) {
return function createUpdateStream ( { since , prefix } = { } ) {
let lastTimestamp = since ? ? new Date ( 0 ) ;
return pipe ( [
simpleSource ( ( ) => {
function attempt ( ) {
return Promise . try ( ( ) => {
// To ensure that we don't hammer the srap instance
return Promise . delay ( 5 * 1000 ) ;
} ) . then ( ( ) => {
return bhttp . get ( ` http:// ${ scrapingHost } /updates?prefix= ${ encodeURIComponent ( prefix ) } &since= ${ Math . floor ( lastTimestamp . getTime ( ) ) } ` , {
stream : true
} ) ;
} ) . then ( ( response ) => {
if ( response . statusCode === 200 ) {
return fromNodeStream . fromReadable ( response ) ;
} else {
throw new Error ( ` Got unexpected status code ${ response . statusCode } ` ) ;
}
} ) . catch ( { code : "ECONNREFUSED" } , ( _error ) => {
// Scraping server is down, try again in a minute or so
console . warn ( "WARNING: Scraping server is not reachable! Retrying in a minute..." ) ;
return Promise . try ( ( ) => {
return Promise . delay ( 60 * 1000 ) ;
} ) . then ( ( ) => {
return attempt ( ) ;
} ) ;
} ) ;
}
return attempt ( ) ;
} ) ,
createCombineSequentialStream ( ) ,
createNDJSONParseStream ( ) ,
createSpyStream ( ( item ) => {
if ( item . updatedAt != null ) {
// TODO: Can this be made significantly more performant by string-sorting the timestamps in ISO format directly, instead of going through a parsing cycle?
let itemDate = new Date ( item . updatedAt ) ;
if ( itemDate > lastTimestamp ) {
lastTimestamp = itemDate ;
}
}
} )
] ) ;
} ;
} ;