v1.1.0: Fixing stuff for `request`, take two.

master
Sven Slootweg 10 years ago
parent 40f8e6f6d4
commit b96ee82d9a

@ -36,6 +36,16 @@ Note that there are a few important differences between `combined-stream` and `c
Most usecases will not be affected by these differences, but your mileage may vary. Most usecases will not be affected by these differences, but your mileage may vary.
## Using `combined-stream2` with `request`
__There's an important caveat when trying to append a `request` stream to a `combined-stream2`.__
Because `request` does a bunch of strange non-standard hackery with streams, it doesn't play nicely with `combined-stream2` (and many other writable/transform streams). For convenience, `combined-stream2` contains a built-in workaround using a `PassThrough` stream specifically for dealing with `request` streams, but __this workaround will likely result in the entire response being buffered into memory.__
Passing in response objects (that is, the object provided in the `response` event handler for a `request` call) is *completely unsupported* - trying to do so will likely break `combined-stream2`. You *must* pass in the `request` object, rather than the `response` object.
I seriously suggest you consider using [`bhttp`](https://www.npmjs.com/package/bhttp) instead - it has much more predictable behaviour.
## Usage ## Usage
```javascript ```javascript

@ -15,16 +15,25 @@ ofTypes = (obj, types) ->
isStream = (obj) -> isStream = (obj) ->
return ofTypes obj, [stream.Readable, stream.Duplex, stream.Transform, stream.Stream] return ofTypes obj, [stream.Readable, stream.Duplex, stream.Transform, stream.Stream]
makeStreams2 = (stream) -> makeStreams2 = (sourceStream) ->
# Adapted from https://github.com/feross/multistream/blob/master/index.js # Adapted from https://github.com/feross/multistream/blob/master/index.js
if not stream or typeof stream == "function" or stream instanceof Buffer or stream._readableState? if not sourceStream or typeof sourceStream == "function" or sourceStream instanceof Buffer or sourceStream._readableState?
return stream debug "already streams2 or otherwise compatible"
return sourceStream
if sourceStream.httpModule?
# This is a special case for `request`, because it does weird stream hackery.
# NOTE: The caveat is that this will buffer up in memory.
debug "found `request` stream, using PassThrough stream..."
return sourceStream.pipe(new stream.PassThrough())
debug "wrapping stream..."
wrapper = new stream.Readable().wrap(stream) wrapper = new stream.Readable().wrap(stream)
if stream.destroy? if sourceStream.destroy?
wrapper.destroy = stream.destroy.bind(stream) wrapper.destroy = sourceStream.destroy.bind(sourceStream)
debug "returning streams2-wrapped stream"
return wrapper return wrapper
# The actual stream class definition # The actual stream class definition
@ -116,6 +125,7 @@ class CombinedStream extends stream.Readable
_nextSource: (readSize) -> _nextSource: (readSize) ->
if @_sources.length == 0 if @_sources.length == 0
# We've run out of sources - signal EOF and bail. # We've run out of sources - signal EOF and bail.
debug "ran out of streams; pushing EOF"
@push null @push null
return return

@ -25,15 +25,22 @@ isStream = function(obj) {
return ofTypes(obj, [stream.Readable, stream.Duplex, stream.Transform, stream.Stream]); return ofTypes(obj, [stream.Readable, stream.Duplex, stream.Transform, stream.Stream]);
}; };
makeStreams2 = function(stream) { makeStreams2 = function(sourceStream) {
var wrapper; var wrapper;
if (!stream || typeof stream === "function" || stream instanceof Buffer || (stream._readableState != null)) { if (!sourceStream || typeof sourceStream === "function" || sourceStream instanceof Buffer || (sourceStream._readableState != null)) {
return stream; debug("already streams2 or otherwise compatible");
return sourceStream;
} }
if (sourceStream.httpModule != null) {
debug("found `request` stream, using PassThrough stream...");
return sourceStream.pipe(new stream.PassThrough());
}
debug("wrapping stream...");
wrapper = new stream.Readable().wrap(stream); wrapper = new stream.Readable().wrap(stream);
if (stream.destroy != null) { if (sourceStream.destroy != null) {
wrapper.destroy = stream.destroy.bind(stream); wrapper.destroy = sourceStream.destroy.bind(sourceStream);
} }
debug("returning streams2-wrapped stream");
return wrapper; return wrapper;
}; };
@ -177,6 +184,7 @@ CombinedStream = (function(_super) {
CombinedStream.prototype._nextSource = function(readSize) { CombinedStream.prototype._nextSource = function(readSize) {
if (this._sources.length === 0) { if (this._sources.length === 0) {
debug("ran out of streams; pushing EOF");
this.push(null); this.push(null);
return; return;
} }

@ -1,6 +1,6 @@
{ {
"name": "combined-stream2", "name": "combined-stream2",
"version": "1.0.4", "version": "1.1.0",
"description": "A drop-in Streams2-compatible replacement for combined-stream.", "description": "A drop-in Streams2-compatible replacement for combined-stream.",
"main": "index.js", "main": "index.js",
"scripts": { "scripts": {
@ -28,7 +28,8 @@
"gulp-plumber": "~0.6.3", "gulp-plumber": "~0.6.3",
"gulp-remember": "~0.2.0", "gulp-remember": "~0.2.0",
"gulp-rename": "~1.2.0", "gulp-rename": "~1.2.0",
"gulp-util": "~2.2.17" "gulp-util": "~2.2.17",
"request": "^2.53.0"
}, },
"dependencies": { "dependencies": {
"debug": "^2.1.1", "debug": "^2.1.1",

@ -0,0 +1,8 @@
request = require "request"
CombinedStream = require "./lib/combined-stream2.coffee"
str = require "stream"
stream = CombinedStream.create()
stream.append request("http://google.com/")
stream.append new Buffer("\nDONE!\n")
stream.pipe(new str.PassThrough).resume()
Loading…
Cancel
Save