You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
52 lines
1.4 KiB
JavaScript
52 lines
1.4 KiB
JavaScript
"use strict";
|
|
|
|
const Promise = require("bluebird");
|
|
const matchValue = require("match-value");
|
|
const pipe = require("@promistream/pipe");
|
|
const simpleSink = require("@promistream/simple-sink");
|
|
const updateStream = require("./update-stream");
|
|
|
|
module.exports = function ({ knex }) {
|
|
return function createSynchronizer(tableName, prefix, mapper, { getLastTimestamp } = {}) {
|
|
return Promise.try(() => {
|
|
if (getLastTimestamp != null) {
|
|
// NOTE: Must be a Date object! FIXME: Change this?
|
|
return getLastTimestamp();
|
|
} else {
|
|
return null;
|
|
}
|
|
}).then((lastTimestamp) => {
|
|
return pipe([
|
|
updateStream({ prefix, since: lastTimestamp }),
|
|
simpleSink((item) => {
|
|
return Promise.try(() => {
|
|
// TODO: debug-log item processing?
|
|
// console.log("[sync] processing item", item);
|
|
return matchValue(item.type, {
|
|
item: () => {
|
|
let result = mapper(item);
|
|
|
|
if (result != null) {
|
|
return knex(tableName)
|
|
.insert(result)
|
|
.onConflict("id").merge();
|
|
}
|
|
},
|
|
alias: () => {
|
|
return knex(tableName)
|
|
.delete()
|
|
.where({ id: item.alias });
|
|
},
|
|
taskResult: () => {
|
|
// Ignore these for now
|
|
}
|
|
});
|
|
}).then(() => {
|
|
// FIXME: This placeholder `.then` is necessary to make this work *at all*. Investigate why this isn't working otherwise, and whether that's a bug in simple-sink
|
|
});
|
|
})
|
|
]).read();
|
|
});
|
|
};
|
|
};
|