You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
182 lines
6.7 KiB
CoffeeScript
182 lines
6.7 KiB
CoffeeScript
stream = require "stream"
|
|
Promise = require "bluebird"
|
|
streamLength = require "stream-length"
|
|
debug = require("debug")("combined-stream2")
|
|
|
|
# FIXME: .error handler on streams?
|
|
|
|
# Utility functions
|
|
ofTypes = (obj, types) ->
|
|
match = false
|
|
for type in types
|
|
match = match or obj instanceof type
|
|
return match
|
|
|
|
isStream = (obj) ->
|
|
return ofTypes obj, [stream.Readable, stream.Duplex, stream.Transform, stream.Stream]
|
|
|
|
makeStreams2 = (stream) ->
|
|
# Adapted from https://github.com/feross/multistream/blob/master/index.js
|
|
if not stream or typeof stream == "function" or stream instanceof Buffer or stream._readableState?
|
|
return stream
|
|
|
|
wrapper = new stream.Readable().wrap(stream)
|
|
|
|
if stream.destroy?
|
|
wrapper.destroy = stream.destroy.bind(stream)
|
|
|
|
return wrapper
|
|
|
|
# The actual stream class definition
|
|
class CombinedStream extends stream.Readable
|
|
constructor: ->
|
|
super
|
|
@_reading = false
|
|
@_sources = []
|
|
@_currentSource = null
|
|
@_sourceDataAvailable = false
|
|
@_wantData = false
|
|
|
|
append: (source, options = {}) ->
|
|
# Only readable binary data sources are allowed.
|
|
if not ofTypes source, [stream.Readable, stream.Duplex, stream.Transform, stream.Stream, Buffer, Function]
|
|
throw new Error "The provided source must be either a readable stream or a Buffer, or a callback providing either of those. If it is currently a string, you need to convert it to a Buffer yourself and ensure that the encoding is correct."
|
|
|
|
debug "appending source: %s", source.toString().replace(/\n/g, "\\n").replace(/\r/g, "\\r")
|
|
@_sources.push [makeStreams2(source), options]
|
|
|
|
getStreamLengths: ->
|
|
debug "getting stream lengths"
|
|
if @_reading
|
|
Promise.reject new Error("You can't obtain the stream lengths anymore once you've started reading!")
|
|
else
|
|
Promise.try =>
|
|
@_resolveAllSources()
|
|
.then (actualSources) =>
|
|
@_sources = actualSources
|
|
Promise.resolve actualSources
|
|
.map (source) ->
|
|
if source[1]?.knownLength? or source[1]?.contentLength?
|
|
Promise.resolve source[1]?.knownLength ? source[1]?.contentLength
|
|
else
|
|
streamLength source[0]
|
|
|
|
getCombinedStreamLength: (callback) ->
|
|
debug "getting combined stream length"
|
|
Promise.try =>
|
|
@getStreamLengths()
|
|
.reduce ((total, current) -> total + current), 0
|
|
.nodeify(callback)
|
|
|
|
_resolveAllSources: ->
|
|
debug "resolving all sources"
|
|
Promise.all (@_resolveSource(source) for source in @_sources)
|
|
|
|
_resolveSource: (source) ->
|
|
# If the 'source' is a function, then it's actually a callback that will *return* the source. We call the callback, and supply it with a `next` function that will post-process the source, and eventually trigger the actual read.
|
|
new Promise (resolve, reject) => # WARN?
|
|
if source[0] instanceof Function
|
|
debug "resolving %s", source[0].toString().replace(/\n/g, "\\n").replace(/\r/g, "\\r")
|
|
source[0] (realSource) =>
|
|
resolve [realSource, source[1]]
|
|
else
|
|
# It's a regular source, so we immediately continue.
|
|
debug "source %s is already resolved", source[0].toString().replace(/\n/g, "\\n").replace(/\r/g, "\\r")
|
|
resolve source
|
|
|
|
_initiateRead: ->
|
|
Promise.try =>
|
|
@_reading = true
|
|
@_resolveAllSources()
|
|
.then (actualSources) =>
|
|
@_sources = actualSources
|
|
Promise.resolve()
|
|
|
|
_read: (size) ->
|
|
Promise.try =>
|
|
if @_reading == false
|
|
@_initiateRead()
|
|
else
|
|
Promise.resolve()
|
|
.then =>
|
|
@_doRead size
|
|
|
|
_doRead: (size) ->
|
|
# FIXME: We should probably try to do something with `size` ourselves. Just passing it on for now, but it'd be nice to implement it properly in the future - this might help efficiency in some cases.
|
|
Promise.try =>
|
|
if @_currentSource == null
|
|
# We're not currently actively reading from any sources. Set a new source to be the current source.
|
|
@_nextSource size
|
|
else
|
|
# We haven't changed our source - immediately continue with the actual read.
|
|
Promise.resolve()
|
|
.then =>
|
|
@_doActualRead size
|
|
|
|
_nextSource: (readSize) ->
|
|
if @_sources.length == 0
|
|
# We've run out of sources - signal EOF and bail.
|
|
@push null
|
|
return
|
|
|
|
@_currentSource = @_sources.shift()[0]
|
|
@_currentIsStream = isStream @_currentSource
|
|
|
|
if @_currentIsStream
|
|
@_currentSource.once "end", =>
|
|
# We've depleted the stream (ie. we've read 'null') The current source should be set to `null`, so that on the next read a new source will be picked. We'll also immediately trigger the next read - the stream will be expecting to receive *some* kind of data before calling the next read itself.
|
|
@_currentSource = null
|
|
@_doRead readSize # FIXME: This should probably use the last-requested read size, not the one that was requested when *setting up* the `end` event.
|
|
|
|
@_currentSource.on "readable", =>
|
|
debug "received readable event, setting sourceDataAvailable to true"
|
|
@_sourceDataAvailable = true
|
|
|
|
if @_wantData
|
|
debug "wantData queued, reading"
|
|
@_doStreamRead()
|
|
|
|
Promise.resolve()
|
|
|
|
# We're wrapping the actual reading code in a separate function, so as to facilitate source-returning callbacks in the sources list.
|
|
_doActualRead: (size) =>
|
|
# FIXME: Apparently, it may be possible to push more than one chunk in a single _read call. The implementation specifics of this should probably be looked into - that could perhaps make our stream a bit more efficient. On the other hand, shouldn't we leave this for the Writable to decide?
|
|
new Promise (resolve, reject) =>
|
|
if @_currentIsStream
|
|
# This is a readable stream of some sort - we'll do a read, and pass on the result. We'll pass on the `size` parameter, but there's no guarantee that anything will actually be done with it.
|
|
if @_sourceDataAvailable
|
|
@_doStreamRead()
|
|
return resolve()
|
|
else
|
|
debug "want data, but no readable event fired yet, setting wantData to true"
|
|
@_wantData = true
|
|
return resolve() # We haven't actually read anything yet, but whatever.
|
|
else
|
|
# This is a Buffer - we'll push it as is, and immediately mark it as completed.
|
|
chunk = @_currentSource
|
|
|
|
# We need to unset it *before* pushing the chunk, because otherwise V8 will sometimes not give control back to this function, and a second read may occur before the source can be unset.
|
|
@_currentSource = null
|
|
|
|
if chunk != null # FIXME: ???
|
|
debug "pushing buffer %s", chunk.toString().replace(/\n/g, "\\n").replace(/\r/g, "\\r")
|
|
@push chunk
|
|
else
|
|
debug "WARN: current source was null, pushing empty buffer"
|
|
@push new Buffer("")
|
|
|
|
resolve()
|
|
|
|
_doStreamRead: =>
|
|
Promise.try =>
|
|
@_sourceDataAvailable = false
|
|
@_wantData = false
|
|
@push @_currentSource.read()
|
|
Promise.resolve()
|
|
|
|
# Public module API
|
|
module.exports =
|
|
create: (options) ->
|
|
# We implement the same API as the original `combined-stream`, for drop-in compatibility reasons.
|
|
return new CombinedStream(options)
|