You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
66 lines
2.2 KiB
JavaScript
66 lines
2.2 KiB
JavaScript
"use strict";
|
|
|
|
const dateFns = require("date-fns");
|
|
|
|
const pipe = require("@promistream/pipe");
|
|
const combineSequentialStreaming = require("@promistream/combine-sequential-streaming");
|
|
const fromIterable = require("@promistream/from-iterable");
|
|
const fromNodeStream = require("@promistream/from-node-stream");
|
|
|
|
const createTypeTaggingStream = require("../../../streams/tag-type");
|
|
|
|
module.exports = function ({ db, knex }) {
|
|
return function (tx, { timestamp, prefix }) {
|
|
// NOTE: This returns snake_cased keys! As we're bypassing the Objection internals, no casemapping occurs.
|
|
// FIXME/REFACTOR: That needs to be changed in the refactor, for consistency across database backends
|
|
|
|
// NOTE: This is a hacky workaround - if we don't do this, then for some reason also entries *at* the exact timestamp are included, which is not what we want.
|
|
// FIXME: Verify that this doesn't break anything, eg. when an entry is created inbetween the original timestamp and +1ms.
|
|
let actualTimestamp = (timestamp != null)
|
|
? dateFns.addMilliseconds(timestamp, 1)
|
|
: undefined;
|
|
|
|
function applyWhereClauses(query, idField) {
|
|
if (timestamp != null) {
|
|
// FIXME: An error in the query here throws an error, resulting in an abort handling bug in a promistream
|
|
query = query.whereRaw(`updated_at > ?`, [ actualTimestamp ]);
|
|
}
|
|
|
|
if (prefix != null) {
|
|
query = query.whereRaw(`${idField} LIKE ?`, [ `${prefix.replace(/%/g, "\\%")}%` ]);
|
|
}
|
|
|
|
return query;
|
|
}
|
|
|
|
function* streamGenerator() {
|
|
yield pipe([
|
|
fromNodeStream.fromReadable(
|
|
applyWhereClauses(db.Item.query(tx), "id").toKnexQuery().stream()
|
|
),
|
|
createTypeTaggingStream("item")
|
|
]);
|
|
|
|
yield pipe([
|
|
fromNodeStream.fromReadable(
|
|
// NOTE: We are only interested in aliases which don't point at themselves
|
|
applyWhereClauses(db.Alias.query(tx).where("alias", "!=", knex.ref("item_id")), "alias").toKnexQuery().stream()
|
|
),
|
|
createTypeTaggingStream("alias")
|
|
]);
|
|
|
|
yield pipe([
|
|
fromNodeStream.fromReadable(
|
|
applyWhereClauses(db.TaskResult.query(tx), "item_id").toKnexQuery().stream()
|
|
),
|
|
createTypeTaggingStream("taskResult")
|
|
]);
|
|
}
|
|
|
|
return pipe([
|
|
fromIterable(streamGenerator()),
|
|
combineSequentialStreaming()
|
|
]);
|
|
}
|
|
};
|