WIP, sync
parent
3f67beaaa6
commit
8537fb91e0
@ -0,0 +1,22 @@
|
||||
"use strict";
|
||||
|
||||
const Promise = require("bluebird");
|
||||
|
||||
module.exports.up = function(knex) {
|
||||
return Promise.try(() => {
|
||||
return knex.schema.createTable("datasheets_products", (table) => {
|
||||
table.text("id").primary();
|
||||
table.text("manufacturer");
|
||||
table.text("name").notNull();
|
||||
table.text("description");
|
||||
table.text("source").notNull();
|
||||
table.text("url").notNull();
|
||||
});
|
||||
}).then(() => {
|
||||
return knex.raw("CREATE INDEX search_index ON datasheets_products ((lower(name)) text_pattern_ops);");
|
||||
});
|
||||
};
|
||||
|
||||
module.exports.down = function(knex) {
|
||||
return knex.schema.dropTable("datasheets_products");
|
||||
};
|
@ -0,0 +1,41 @@
|
||||
"use strict";
|
||||
|
||||
const Promise = require("bluebird");
|
||||
const matchValue = require("match-value");
|
||||
const pipe = require("@promistream/pipe");
|
||||
const simpleSink = require("@promistream/simple-sink");
|
||||
const updateStream = require("./update-stream");
|
||||
|
||||
module.exports = function ({ knex }) {
|
||||
return function createSynchronizer(tableName, prefix, mapper) {
|
||||
return pipe([
|
||||
updateStream({ prefix }),
|
||||
simpleSink((item) => {
|
||||
return Promise.try(() => {
|
||||
console.log("[sync] processing item", item);
|
||||
return matchValue(item.type, {
|
||||
item: () => {
|
||||
let result = mapper(item);
|
||||
|
||||
if (result != null) {
|
||||
return knex(tableName)
|
||||
.insert(result)
|
||||
.onConflict("id").merge();
|
||||
}
|
||||
},
|
||||
alias: () => {
|
||||
return knex(tableName)
|
||||
.delete()
|
||||
.where({ id: item.alias });
|
||||
},
|
||||
taskResult: () => {
|
||||
// Ignore these for now
|
||||
}
|
||||
});
|
||||
}).then(() => {
|
||||
// FIXME: This placeholder `.then` is necessary to make this work *at all*. Investigate why this isn't working otherwise, and whether that's a bug in simple-sink
|
||||
});
|
||||
})
|
||||
]).read();
|
||||
};
|
||||
};
|
@ -0,0 +1,14 @@
|
||||
"use strict";
|
||||
|
||||
const pipe = require("@promistream/pipe");
|
||||
const splitLines = require("@promistream/split-lines");
|
||||
const map = require("@promistream/map");
|
||||
const decodeString = require("@promistream/decode-string");
|
||||
|
||||
module.exports = function createNDJSONParseStream() {
|
||||
return pipe([
|
||||
decodeString("utf8"),
|
||||
splitLines(),
|
||||
map((line) => JSON.parse(line))
|
||||
]);
|
||||
};
|
@ -0,0 +1,18 @@
|
||||
"use strict";
|
||||
|
||||
const Promise = require("bluebird");
|
||||
|
||||
const pipe = require("@promistream/pipe");
|
||||
const simpleSink = require("@promistream/simple-sink");
|
||||
const updateStream = require("./update-stream");
|
||||
|
||||
return Promise.try(() => {
|
||||
return pipe([
|
||||
updateStream({ prefix: "datasheet:" }),
|
||||
simpleSink((item) => {
|
||||
console.log(item);
|
||||
})
|
||||
]).read();
|
||||
}).then(() => {
|
||||
console.log("Done!");
|
||||
});
|
@ -0,0 +1,63 @@
|
||||
"use strict";
|
||||
|
||||
const Promise = require("bluebird");
|
||||
const bhttp = require("bhttp");
|
||||
|
||||
const pipe = require("@promistream/pipe");
|
||||
const simpleSource = require("@promistream/simple-source");
|
||||
const createCombineSequentialStream = require("@promistream/combine-sequential-streaming");
|
||||
const createSpyStream = require("@promistream/spy");
|
||||
const fromNodeStream = require("@promistream/from-node-stream");
|
||||
|
||||
const createNDJSONParseStream = require("./ndjson-parse-stream");
|
||||
|
||||
module.exports = function createUpdateStream({ prefix } = {}) {
|
||||
let lastTimestamp = new Date(0);
|
||||
// let lastTimestamp = new Date();
|
||||
|
||||
return pipe([
|
||||
simpleSource(() => {
|
||||
function attempt() {
|
||||
return Promise.try(() => {
|
||||
// To ensure that we don't hammer the srap instance
|
||||
return Promise.delay(5 * 1000);
|
||||
}).then(() => {
|
||||
// console.log({ lastTimestamp });
|
||||
// console.log(`http://localhost:3000/updates?prefix=${encodeURIComponent(prefix)}&since=${Math.floor(lastTimestamp.getTime())}`);
|
||||
return bhttp.get(`http://localhost:3000/updates?prefix=${encodeURIComponent(prefix)}&since=${Math.floor(lastTimestamp.getTime())}`, {
|
||||
stream: true
|
||||
});
|
||||
}).then((response) => {
|
||||
if (response.statusCode === 200) {
|
||||
return fromNodeStream.fromReadable(response);
|
||||
} else {
|
||||
throw new Error(`Got unexpected status code ${response.statusCode}`);
|
||||
}
|
||||
}).catch({ code: "ECONNREFUSED" }, (_error) => {
|
||||
// Scraping server is down, try again in a minute or so
|
||||
console.warn("WARNING: Scraping server is not reachable! Retrying in a minute...");
|
||||
|
||||
return Promise.try(() => {
|
||||
return Promise.delay(60 * 1000);
|
||||
}).then(() => {
|
||||
return attempt();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
return attempt();
|
||||
}),
|
||||
createCombineSequentialStream(),
|
||||
createNDJSONParseStream(),
|
||||
createSpyStream((item) => {
|
||||
if (item.updatedAt != null) {
|
||||
// TODO: Can this be made significantly more performant by string-sorting the timestamps in ISO format directly, instead of going through a parsing cycle?
|
||||
let itemDate = new Date(item.updatedAt);
|
||||
|
||||
if (itemDate > lastTimestamp) {
|
||||
lastTimestamp = itemDate;
|
||||
}
|
||||
}
|
||||
})
|
||||
]);
|
||||
};
|
Loading…
Reference in New Issue