You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

84 lines
2.9 KiB
JavaScript

"use strict";
const Promise = require("bluebird");
const fs = require("fs").promises;
const pipe = require("@promistream/pipe");
const simpleSource = require("@promistream/simple-source");
const sequentialize = require("@promistream/sequentialize");
const EndOfStream = require("@promistream/end-of-stream");
const { validateArguments } = require("@validatem/core");
const required = require("@validatem/required");
const isString = require("@validatem/is-string");
const isNumber = require("@validatem/is-number");
const isInteger = require("@validatem/is-integer");
const defaultTo = require("@validatem/default-to");
const either = require("@validatem/either");
const oneOf = require("@validatem/one-of");
// NOTE: Below read size is derived from the high water mark in core lib/internal/fs/streams.js
let defaultChunkSize = 64 * 1024;
function doRead(handle, length) {
// TODO: Can we safely switch to `allocUnsafe` here?
let buffer = Buffer.alloc(length);
return Promise.try(() => {
return handle.read(buffer, 0, length);
}).then((result) => {
if (result.bytesRead === 0) {
// Not documented; https://github.com/nodejs/node/issues/35363
throw new EndOfStream();
} else if (result.bytesRead === length) {
return buffer;
} else if (result.bytesRead < length) {
// TODO: For possible future performance optimization, consider reusing the remaining Buffer allocation for a next read, if possible. Need more data on how often this case occurs first, though, to justify the added complexity.
return buffer.slice(0, result.bytesRead);
} else {
throw new Error(`Read more bytes (${result.bytesRead}) than the specified 'length' (${length}); this should never happen!`);
}
});
}
// FIXME: This should probably *only* allow reading mode flags
module.exports = function createReadFileStream(_path, _options) {
let [ path, options ] = validateArguments(arguments, [
[ "path", required, isString ],
[ "options", defaultTo({}), {
chunkSize: [ isInteger, defaultTo(defaultChunkSize) ],
flag: [ defaultTo("r"), oneOf([ "a", "ax", "a+", "ax+", "as", "as+", "r", "r+", "rs+", "w", "wx", "w+", "wx+" ]) ],
mode: [ either([ isString, isNumber ]) ] // FIXME: Stricter validation
// TODO: start/end
}]
]);
let handlePromise = fs.open(path, options.flag, options.mode);
// Silence unhandled rejection warnings until later
handlePromise.catch(() => {});
// TODO: Metadata, including stream label and file size/type/path
return pipe([
simpleSource({
onRequest: () => {
return Promise.try(() => {
// TODO: Can we optimize this by separately tracking when the open has completed, and replacing the Promise with the actual handle in that case?
return handlePromise;
}).then((handle) => {
return doRead(handle, options.chunkSize);
});
},
onAbort: (_reason) => {
return Promise.try(() => {
return handlePromise;
}).then((handle) => {
return handle.close();
});
}
}),
sequentialize()
]);
};