You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
"use strict" ;
const Promise = require ( "bluebird" ) ;
const matchValue = require ( "match-value" ) ;
const pipe = require ( "@promistream/pipe" ) ;
const simpleSink = require ( "@promistream/simple-sink" ) ;
module . exports = function ( state ) {
const updateStream = require ( "./update-stream" ) ( state ) ;
let { knex } = state ;
return function createSynchronizer ( tableName , prefix , mapper , { getLastTimestamp } = { } ) {
return Promise . try ( ( ) => {
if ( getLastTimestamp != null ) {
// NOTE: Must be a Date object! FIXME: Change this?
return getLastTimestamp ( ) ;
} else {
return null ;
}
} ) . then ( ( lastTimestamp ) => {
return pipe ( [
updateStream ( { prefix , since : lastTimestamp } ) ,
simpleSink ( ( item ) => {
return Promise . try ( ( ) => {
// TODO: debug-log item processing?
// console.log("[sync] processing item", item);
return matchValue ( item . type , {
item : ( ) => {
let result = mapper ( item ) ;
if ( result != null ) {
return knex ( tableName )
. insert ( result )
. onConflict ( "id" ) . merge ( ) ;
}
} ,
alias : ( ) => {
return knex ( tableName )
. delete ( )
. where ( { id : item . alias } ) ;
} ,
taskResult : ( ) => {
// Ignore these for now
}
} ) ;
} ) . then ( ( ) => {
// FIXME: This placeholder `.then` is necessary to make this work *at all*. Investigate why this isn't working otherwise, and whether that's a bug in simple-sink
} ) ;
} )
] ) . read ( ) ;
} ) ;
} ;
} ;