"use strict" ;
const Promise = require ( "bluebird" ) ;
const fs = require ( "fs" ) . promises ;
const pipe = require ( "@promistream/pipe" ) ;
const simpleSource = require ( "@promistream/simple-source" ) ;
const sequentialize = require ( "@promistream/sequentialize" ) ;
const EndOfStream = require ( "@promistream/end-of-stream" ) ;
const { validateArguments } = require ( "@validatem/core" ) ;
const required = require ( "@validatem/required" ) ;
const isString = require ( "@validatem/is-string" ) ;
const isNumber = require ( "@validatem/is-number" ) ;
const isInteger = require ( "@validatem/is-integer" ) ;
const defaultTo = require ( "@validatem/default-to" ) ;
const either = require ( "@validatem/either" ) ;
const oneOf = require ( "@validatem/one-of" ) ;
// NOTE: Below read size is derived from the high water mark in core lib/internal/fs/streams.js
let defaultChunkSize = 64 * 1024 ;
function doRead ( handle , length ) {
// TODO: Can we safely switch to `allocUnsafe` here?
let buffer = Buffer . alloc ( length ) ;
return Promise . try ( ( ) => {
return handle . read ( buffer , 0 , length ) ;
} ) . then ( ( result ) => {
if ( result . bytesRead === 0 ) {
// Not documented; https://github.com/nodejs/node/issues/35363
return Promise . try ( ( ) => {
return handle . close ( ) ;
} ) . then ( ( ) => {
throw new EndOfStream ( ) ;
} ) ;
} else if ( result . bytesRead === length ) {
return buffer ;
} else if ( result . bytesRead < length ) {
// TODO: For possible future performance optimization, consider reusing the remaining Buffer allocation for a next read, if possible. Need more data on how often this case occurs first, though, to justify the added complexity.
return buffer . slice ( 0 , result . bytesRead ) ;
} else {
throw new Error ( ` Read more bytes ( ${ result . bytesRead } ) than the specified 'length' ( ${ length } ); this should never happen! ` ) ;
}
} ) ;
}
// FIXME: This should probably *only* allow reading mode flags
module . exports = function createReadFileStream ( _path , _options ) {
let [ path , options ] = validateArguments ( arguments , [
[ "path" , required , isString ] ,
[ "options" , defaultTo ( { } ) , {
chunkSize : [ isInteger , defaultTo ( defaultChunkSize ) ] ,
flag : [ defaultTo ( "r" ) , oneOf ( [ "a" , "ax" , "a+" , "ax+" , "as" , "as+" , "r" , "r+" , "rs+" , "w" , "wx" , "w+" , "wx+" ] ) ] ,
mode : [ either ( [ isString , isNumber ] ) ] // FIXME: Stricter validation
// TODO: start/end
} ]
] ) ;
let handlePromise = fs . open ( path , options . flag , options . mode ) ;
// Silence unhandled rejection warnings until later
handlePromise . catch ( ( ) => { } ) ;
// TODO: Metadata, including stream label and file size/type/path
return pipe ( [
simpleSource ( {
onRequest : ( ) => {
return Promise . try ( ( ) => {
// TODO: Can we optimize this by separately tracking when the open has completed, and replacing the Promise with the actual handle in that case?
return handlePromise ;
} ) . then ( ( handle ) => {
return doRead ( handle , options . chunkSize ) ;
} ) ;
} ,
onAbort : ( _reason ) => {
return Promise . try ( ( ) => {
return handlePromise ;
} ) . then ( ( handle ) => {
return handle . close ( ) ;
} ) ;
}
} ) ,
sequentialize ( )
] ) ;
} ;