diff --git a/index.js b/index.js index 57a453d..dd93adb 100644 --- a/index.js +++ b/index.js @@ -2,25 +2,46 @@ const Promise = require("bluebird"); const ms = require("ms"); -const promiseDelaySince = require("@joepie91/promise-delay-every"); -const propagateAbort = require("@ppstreams/propagate-abort"); +const promiseDelayEvery = require("@joepie91/promise-delay-every"); +const propagateAbort = require("@promistream/propagate-abort"); +const propagatePeek = require("@promistream/propagate-peek"); +const debug = require("debug")("promistream:rate-limit"); -module.exports = function rateLimitedStream(interval) { +function makeClonableInstance(interval) { let intervalInMilliseconds = (typeof interval === "number") ? interval : ms(interval); - let delayer = promiseDelaySince(intervalInMilliseconds); - - return { - description: `rate-limited stream (1 per ${interval})`, - read: (source) => { - return Promise.try(() => { - return delayer(); - }).then(() => { - return source.read(); - }); - }, - abort: propagateAbort - } -}; \ No newline at end of file + debug(`Creating clonable with interval of ${intervalInMilliseconds}ms`); + + let delayer = promiseDelayEvery(intervalInMilliseconds); + + return function createClone () { + debug("Creating clone"); + + return { + _promistreamVersion: 0, + description: `rate-limited stream (1 per ${interval})`, + read: (source) => { + return Promise.try(() => { + debug("Attempted read"); + return delayer(); + }).then(() => { + debug("Delay passed, completing read"); + return source.read(); + }); + }, + peek: propagatePeek, + abort: propagateAbort + }; + }; +} + +module.exports = function rateLimitedStream(... args) { + // Internally, we treat a non-clonable stream as a clonable stream that will only ever have one clone - this makes implementation much simpler + let createClone = makeClonableInstance(... args); + + return createClone(); +}; + +module.exports.clonable = makeClonableInstance; diff --git a/package.json b/package.json index 7b38273..ad6c4c4 100644 --- a/package.json +++ b/package.json @@ -7,8 +7,10 @@ "license": "WTFPL OR CC0-1.0", "dependencies": { "@joepie91/promise-delay-every": "^1.0.0", - "@ppstreams/propagate-abort": "^0.1.2", + "@promistream/propagate-abort": "^0.1.6", + "@promistream/propagate-peek": "^0.1.1", "bluebird": "^3.5.4", + "debug": "^4.3.1", "ms": "^2.1.1" } }