You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

181 lines
6.6 KiB
CoffeeScript

stream = require "stream"
Promise = require "bluebird"
streamLength = require "stream-length"
debug = require("debug")("combined-stream2")
# FIXME: .error handler on streams?
# Utility functions
ofTypes = (obj, types) ->
match = false
for type in types
match = match or obj instanceof type
return match
isStream = (obj) ->
return ofTypes obj, [stream.Readable, stream.Duplex, stream.Transform]
makeStreams2 = (stream) ->
# Adapted from https://github.com/feross/multistream/blob/master/index.js
if not stream or typeof stream == "function" or stream instanceof Buffer or stream._readableState?
return stream
wrapper = new stream.Readable().wrap(stream)
if stream.destroy?
wrapper.destroy = stream.destroy.bind(stream)
return wrapper
# The actual stream class definition
class CombinedStream extends stream.Readable
constructor: ->
super
@_reading = false
@_sources = []
@_currentSource = null
@_sourceDataAvailable = false
@_wantData = false
append: (source, options = {}) ->
# Only readable binary data sources are allowed.
if not ofTypes source, [stream.Readable, stream.Duplex, stream.Transform, Buffer, Function]
throw new Error "The provided source must be either a readable stream or a Buffer, or a callback providing either of those. If it is currently a string, you need to convert it to a Buffer yourself and ensure that the encoding is correct."
debug "appending source: %s", source.toString().replace(/\n/g, "\\n").replace(/\r/g, "\\r")
@_sources.push [makeStreams2(source), options]
getStreamLengths: ->
debug "getting stream lengths"
if @_reading
Promise.reject new Error("You can't obtain the stream lengths anymore once you've started reading!")
else
Promise.try =>
@_resolveAllSources()
.then (actualSources) =>
@_sources = actualSources
Promise.resolve actualSources
.map (source) ->
if source[1]?.knownLength? or source[1]?.contentLength?
Promise.resolve source[1]?.knownLength ? source[1]?.contentLength
else
streamLength source[0]
getCombinedStreamLength: ->
debug "getting combined stream length"
Promise.try =>
@getStreamLengths()
.reduce ((total, current) -> total + current), 0
_resolveAllSources: ->
debug "resolving all sources"
Promise.all (@_resolveSource(source) for source in @_sources)
_resolveSource: (source) ->
# If the 'source' is a function, then it's actually a callback that will *return* the source. We call the callback, and supply it with a `next` function that will post-process the source, and eventually trigger the actual read.
new Promise (resolve, reject) => # WARN?
if source[0] instanceof Function
debug "resolving %s", source[0].toString().replace(/\n/g, "\\n").replace(/\r/g, "\\r")
source[0] (realSource) =>
resolve [realSource, source[1]]
else
# It's a regular source, so we immediately continue.
debug "source %s is already resolved", source[0].toString().replace(/\n/g, "\\n").replace(/\r/g, "\\r")
resolve source
_initiateRead: ->
Promise.try =>
@_reading = true
@_resolveAllSources()
.then (actualSources) =>
@_sources = actualSources
Promise.resolve()
_read: (size) ->
Promise.try =>
if @_reading == false
@_initiateRead()
else
Promise.resolve()
.then =>
@_doRead size
_doRead: (size) ->
# FIXME: We should probably try to do something with `size` ourselves. Just passing it on for now, but it'd be nice to implement it properly in the future - this might help efficiency in some cases.
Promise.try =>
if @_currentSource == null
# We're not currently actively reading from any sources. Set a new source to be the current source.
@_nextSource size
else
# We haven't changed our source - immediately continue with the actual read.
Promise.resolve()
.then =>
@_doActualRead size
_nextSource: (readSize) ->
if @_sources.length == 0
# We've run out of sources - signal EOF and bail.
@push null
return
@_currentSource = @_sources.shift()[0]
@_currentIsStream = isStream @_currentSource
if @_currentIsStream
@_currentSource.once "end", =>
# We've depleted the stream (ie. we've read 'null') The current source should be set to `null`, so that on the next read a new source will be picked. We'll also immediately trigger the next read - the stream will be expecting to receive *some* kind of data before calling the next read itself.
@_currentSource = null
@_doRead readSize # FIXME: This should probably use the last-requested read size, not the one that was requested when *setting up* the `end` event.
@_currentSource.on "readable", =>
debug "received readable event, setting sourceDataAvailable to true"
@_sourceDataAvailable = true
if @_wantData
debug "wantData queued, reading"
@_doStreamRead()
Promise.resolve()
# We're wrapping the actual reading code in a separate function, so as to facilitate source-returning callbacks in the sources list.
_doActualRead: (size) =>
# FIXME: Apparently, it may be possible to push more than one chunk in a single _read call. The implementation specifics of this should probably be looked into - that could perhaps make our stream a bit more efficient. On the other hand, shouldn't we leave this for the Writable to decide?
new Promise (resolve, reject) =>
if @_currentIsStream
# This is a readable stream of some sort - we'll do a read, and pass on the result. We'll pass on the `size` parameter, but there's no guarantee that anything will actually be done with it.
if @_sourceDataAvailable
@_doStreamRead()
return resolve()
else
debug "want data, but no readable event fired yet, setting wantData to true"
@_wantData = true
return resolve() # We haven't actually read anything yet, but whatever.
else
# This is a Buffer - we'll push it as is, and immediately mark it as completed.
chunk = @_currentSource
# We need to unset it *before* pushing the chunk, because otherwise V8 will sometimes not give control back to this function, and a second read may occur before the source can be unset.
@_currentSource = null
if chunk != null # FIXME: ???
debug "pushing buffer %s", chunk.toString().replace(/\n/g, "\\n").replace(/\r/g, "\\r")
@push chunk
else
debug "WARN: current source was null, pushing empty buffer"
@push new Buffer("")
resolve()
_doStreamRead: =>
Promise.try =>
@_sourceDataAvailable = false
@_wantData = false
@push @_currentSource.read()
Promise.resolve()
# Public module API
module.exports =
create: (options) ->
# We implement the same API as the original `combined-stream`, for drop-in compatibility reasons.
return new CombinedStream(options)