You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
84 lines
2.9 KiB
JavaScript
84 lines
2.9 KiB
JavaScript
"use strict";
|
|
|
|
const Promise = require("bluebird");
|
|
const fs = require("fs").promises;
|
|
|
|
const pipe = require("@promistream/pipe");
|
|
const simpleSource = require("@promistream/simple-source");
|
|
const sequentialize = require("@promistream/sequentialize");
|
|
const EndOfStream = require("@promistream/end-of-stream");
|
|
|
|
const { validateArguments } = require("@validatem/core");
|
|
const required = require("@validatem/required");
|
|
const isString = require("@validatem/is-string");
|
|
const isNumber = require("@validatem/is-number");
|
|
const isInteger = require("@validatem/is-integer");
|
|
const defaultTo = require("@validatem/default-to");
|
|
const either = require("@validatem/either");
|
|
const oneOf = require("@validatem/one-of");
|
|
|
|
// NOTE: Below read size is derived from the high water mark in core lib/internal/fs/streams.js
|
|
let defaultChunkSize = 64 * 1024;
|
|
|
|
function doRead(handle, length) {
|
|
// TODO: Can we safely switch to `allocUnsafe` here?
|
|
let buffer = Buffer.alloc(length);
|
|
|
|
return Promise.try(() => {
|
|
return handle.read(buffer, 0, length);
|
|
}).then((result) => {
|
|
if (result.bytesRead === 0) {
|
|
// Not documented; https://github.com/nodejs/node/issues/35363
|
|
throw new EndOfStream();
|
|
} else if (result.bytesRead === length) {
|
|
return buffer;
|
|
} else if (result.bytesRead < length) {
|
|
// TODO: For possible future performance optimization, consider reusing the remaining Buffer allocation for a next read, if possible. Need more data on how often this case occurs first, though, to justify the added complexity.
|
|
return buffer.slice(0, result.bytesRead);
|
|
} else {
|
|
throw new Error(`Read more bytes (${result.bytesRead}) than the specified 'length' (${length}); this should never happen!`);
|
|
}
|
|
});
|
|
}
|
|
|
|
// FIXME: This should probably *only* allow reading mode flags
|
|
|
|
module.exports = function createReadFileStream(_path, _options) {
|
|
let [ path, options ] = validateArguments(arguments, [
|
|
[ "path", required, isString ],
|
|
[ "options", defaultTo({}), {
|
|
chunkSize: [ isInteger, defaultTo(defaultChunkSize) ],
|
|
flag: [ defaultTo("r"), oneOf([ "a", "ax", "a+", "ax+", "as", "as+", "r", "r+", "rs+", "w", "wx", "w+", "wx+" ]) ],
|
|
mode: [ either([ isString, isNumber ]) ] // FIXME: Stricter validation
|
|
// TODO: start/end
|
|
}]
|
|
]);
|
|
|
|
let handlePromise = fs.open(path, options.flag, options.mode);
|
|
|
|
// Silence unhandled rejection warnings until later
|
|
handlePromise.catch(() => {});
|
|
|
|
// TODO: Metadata, including stream label and file size/type/path
|
|
return pipe([
|
|
simpleSource({
|
|
onRequest: () => {
|
|
return Promise.try(() => {
|
|
// TODO: Can we optimize this by separately tracking when the open has completed, and replacing the Promise with the actual handle in that case?
|
|
return handlePromise;
|
|
}).then((handle) => {
|
|
return doRead(handle, options.chunkSize);
|
|
});
|
|
},
|
|
onAbort: (_reason) => {
|
|
return Promise.try(() => {
|
|
return handlePromise;
|
|
}).then((handle) => {
|
|
return handle.close();
|
|
});
|
|
}
|
|
}),
|
|
sequentialize()
|
|
]);
|
|
};
|