You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
558 lines
18 KiB
JavaScript
558 lines
18 KiB
JavaScript
'use strict';
|
|
|
|
var Deque = require('double-ended-queue'),
|
|
HashMap = require('hashmap');
|
|
|
|
var ResourceRequest = require('./resource-request');
|
|
|
|
var inherits = require('util').inherits,
|
|
EventEmitter = require('events').EventEmitter,
|
|
debug = require('debug')('pool2');
|
|
|
|
var assert = require('assert');
|
|
|
|
var Backoff = require('simple-backoff').FibonacciBackoff;
|
|
|
|
function deprecate(old, current) {
|
|
if (process.env.NODE_ENV === 'testing') { return; }
|
|
console.log('Pool2: ' + old + ' is deprecated, please use ' + current);
|
|
}
|
|
|
|
/* Object tagging for debugging. I don't want to modify objects when not debugging, so I only tag them
|
|
* if debugging is enabled (setId); however, since I have to access the property of the object in the debug()
|
|
* calls, and this may cause accesses on undefined properties of an object, which may cause deoptimization
|
|
* of the object, I have to create a helper function to avoid this (getId)
|
|
*/
|
|
var SEQ = 0;
|
|
|
|
function getId(res) {
|
|
if (!debug.enabled) { return -1; }
|
|
return res.__pool2__id;
|
|
}
|
|
function setId(res) {
|
|
if (!debug.enabled) { return; }
|
|
if (res && typeof res === 'object') {
|
|
Object.defineProperty(res, '__pool2__id', {
|
|
configurable: false,
|
|
enumerable: false,
|
|
value: SEQ++
|
|
});
|
|
}
|
|
}
|
|
|
|
function validNum(opts, val, standard, allowZero, allowInfinity) { // jshint ignore: line
|
|
if (!opts || !opts.hasOwnProperty(val)) {
|
|
return standard;
|
|
}
|
|
if (allowInfinity && opts[val] === Infinity) {
|
|
return Infinity;
|
|
}
|
|
var num = parseInt(opts[val], 10);
|
|
if (isNaN(num) || num !== +opts[val] || !isFinite(num) || num < 0) {
|
|
throw new RangeError('Pool2: ' + val + ' must be a positive integer, ' + opts[val] + ' given.');
|
|
}
|
|
if (!allowZero && num === 0) {
|
|
throw new RangeError('Pool2: ' + val + ' cannot be 0.');
|
|
}
|
|
return num;
|
|
}
|
|
function HOP(a, b) { return a && hasOwnProperty.call(a, b); }
|
|
|
|
function Pool(opts) { // jshint maxcomplexity: 12, maxstatements: 45
|
|
EventEmitter.call(this);
|
|
|
|
opts = opts || { };
|
|
|
|
if (HOP(opts, 'release')) {
|
|
deprecate('opts.release', 'opts.dispose');
|
|
opts.dispose = opts.release;
|
|
}
|
|
|
|
if (HOP(opts, 'releaseTimeout')) {
|
|
deprecate('opts.releaseTimeout', 'opts.disposeTimeout');
|
|
opts.disposeTimeout = opts.releaseTimeout;
|
|
}
|
|
|
|
assert(HOP(opts, 'acquire'), 'new Pool(): opts.acquire is required');
|
|
assert(HOP(opts, 'dispose'), 'new Pool(): opts.dispose is required');
|
|
assert(typeof opts.acquire === 'function', 'new Pool(): opts.acquire must be a function');
|
|
assert(typeof opts.dispose === 'function', 'new Pool(): opts.dispose must be a function');
|
|
assert(!HOP(opts, 'destroy') || typeof opts.destroy === 'function', 'new Pool(): opts.destroy must be a function');
|
|
assert(!HOP(opts, 'ping') || typeof opts.ping === 'function', 'new Pool(): opts.ping must be a function');
|
|
|
|
this._acquire = opts.acquire;
|
|
this._dispose = opts.dispose;
|
|
this._destroy = opts.destroy || Pool.defaults.destroy;
|
|
this._ping = opts.ping || Pool.defaults.ping;
|
|
|
|
this.max = validNum(opts, 'max', Pool.defaults.max);
|
|
this.min = validNum(opts, 'min', Pool.defaults.min, true);
|
|
|
|
assert(this.max >= this.min, 'new Pool(): opts.min cannot be greater than opts.max');
|
|
|
|
this.maxRequests = validNum(opts, 'maxRequests', Pool.defaults.maxRequests, false, true);
|
|
this.acquireTimeout = validNum(opts, 'acquireTimeout', Pool.defaults.acquireTimeout, true);
|
|
this.disposeTimeout = validNum(opts, 'disposeTimeout', Pool.defaults.disposeTimeout, true);
|
|
this.requestTimeout = validNum(opts, 'requestTimeout', Pool.defaults.requestTimeout, false, true);
|
|
this.pingTimeout = validNum(opts, 'pingTimeout', Pool.defaults.pingTimeout);
|
|
this.idleTimeout = validNum(opts, 'idleTimeout', Pool.defaults.idleTimeout);
|
|
this.syncInterval = validNum(opts, 'syncInterval', Pool.defaults.syncInterval, true);
|
|
this.bailAfter = validNum(opts, 'bailAfter', Pool.defaults.bailAfter, true, true);
|
|
|
|
assert(this.syncInterval > 0 || !HOP(opts, 'idleTimeout'), 'new Pool(): Cannot specify opts.idleTimeout when opts.syncInterval is 0');
|
|
|
|
this.capabilities = Array.isArray(opts.capabilities) ? opts.capabilities.slice() : [ ];
|
|
|
|
if (this.syncInterval !== 0) {
|
|
this.syncTimer = setInterval(function () {
|
|
this._ensureMinimum();
|
|
this._reap();
|
|
this._maybeAllocateResource();
|
|
}.bind(this), this.syncInterval);
|
|
}
|
|
|
|
this.live = false;
|
|
this.ending = false;
|
|
this.destroyed = false;
|
|
|
|
this.acquiring = 0;
|
|
|
|
this.pool = new HashMap();
|
|
this.available = [ ];
|
|
this.requests = new Deque();
|
|
|
|
this.started = new Date();
|
|
this.backoff = new Backoff(opts.backoff);
|
|
|
|
if (debug.enabled) {
|
|
this._seq = 0;
|
|
}
|
|
|
|
setImmediate(this._ensureMinimum.bind(this));
|
|
}
|
|
inherits(Pool, EventEmitter);
|
|
|
|
Pool.defaults = {
|
|
destroy: function () { },
|
|
ping: function (res, cb) { setImmediate(cb); },
|
|
min: 0,
|
|
max: 10,
|
|
acquireTimeout: 30 * 1000,
|
|
disposeTimeout: 30 * 1000,
|
|
requestTimeout: Infinity,
|
|
pingTimeout: 10 * 1000,
|
|
idleTimeout: 60 * 1000,
|
|
syncInterval: 10 * 1000,
|
|
bailAfter: 0,
|
|
maxRequests: Infinity
|
|
};
|
|
|
|
// return stats on the pool
|
|
Pool.prototype.stats = function () {
|
|
var allocated = this.pool.count();
|
|
return {
|
|
min: this.min,
|
|
max: this.max,
|
|
allocated: allocated,
|
|
available: this.max - (allocated - this.available.length),
|
|
queued: this.requests.length,
|
|
maxRequests: this.maxRequests
|
|
};
|
|
};
|
|
|
|
// request a resource from the pool
|
|
Pool.prototype.acquire = function (cb) {
|
|
if (this.destroyed || this.ending) {
|
|
cb(new Error('Pool is ' + (this.ending ? 'ending' : 'destroyed')));
|
|
return;
|
|
}
|
|
|
|
if (this.requests.length >= this.maxRequests) {
|
|
cb(new Error('Pool is full'));
|
|
return;
|
|
}
|
|
|
|
var req = new ResourceRequest(this.requestTimeout, cb);
|
|
req.on('error', this.emit.bind(this, 'warn'));
|
|
|
|
this.requests.push(req);
|
|
this.emit('request', req);
|
|
|
|
setImmediate(this._maybeAllocateResource.bind(this));
|
|
|
|
return req;
|
|
};
|
|
|
|
// release the resource back into the pool
|
|
Pool.prototype.release = function (res) { // jshint maxstatements: 17
|
|
var err;
|
|
|
|
if (!this.pool.has(res)) {
|
|
err = new Error('Pool.release(): Resource not member of pool');
|
|
err.res = res;
|
|
this.emit('error', err);
|
|
return;
|
|
}
|
|
|
|
if (this.available.indexOf(res) > -1) {
|
|
err = new Error('Pool.release(): Resource already released (id=' + getId(res) + ')');
|
|
err.res = res;
|
|
this.emit('error', err);
|
|
return;
|
|
}
|
|
|
|
this.pool.set(res, new Date());
|
|
this.available.unshift(res);
|
|
|
|
if (this.requests.length === 0 && this.pool.count() === this.available.length) {
|
|
this.emit('drain');
|
|
}
|
|
|
|
this._maybeAllocateResource();
|
|
};
|
|
|
|
// destroy the resource -- should be called only on error conditions and the like
|
|
Pool.prototype.destroy = function (res) {
|
|
debug('Ungracefully destroying resource (id=%s)', getId(res));
|
|
// make sure resource is not in our available resources array
|
|
var idx = this.available.indexOf(res);
|
|
if (idx > -1) { this.available.splice(idx, 1); }
|
|
|
|
// remove from pool if present
|
|
if (this.pool.has(res)) {
|
|
this.pool.remove(res);
|
|
}
|
|
|
|
// destroy is fire-and-forget
|
|
try { this._destroy(res); }
|
|
catch (e) { this.emit('warn', e); }
|
|
|
|
this._ensureMinimum();
|
|
};
|
|
|
|
// attempt to tear down the resource nicely -- should be called when the resource is still valid
|
|
// (that is, the dispose callback is expected to behave correctly)
|
|
Pool.prototype.remove = function (res, cb) { // jshint maxcomplexity: 8, maxstatements: 20
|
|
// called sometimes internally for the timeout logic, but don't want to emit an error in those cases
|
|
var timer, skipError = false;
|
|
if (typeof cb === 'boolean') {
|
|
skipError = cb;
|
|
cb = null;
|
|
}
|
|
|
|
// ensure resource is not in our available resources array
|
|
var idx = this.available.indexOf(res);
|
|
if (idx > -1) { this.available.splice(idx, 1); }
|
|
|
|
if (this.pool.has(res)) {
|
|
this.pool.remove(res);
|
|
} else if (!skipError) {
|
|
// object isn't in our pool -- emit an error
|
|
this.emit('error', new Error('Pool.remove() called on non-member'));
|
|
}
|
|
|
|
// if we don't get a response from the dispose callback
|
|
// within the timeout period, attempt to destroy the resource
|
|
if (this.disposeTimeout !== 0) {
|
|
timer = setTimeout(this.destroy.bind(this, res), this.disposeTimeout);
|
|
}
|
|
|
|
try {
|
|
debug('Attempting to gracefully remove resource (id=%s)', getId(res));
|
|
this._dispose(res, function (e) {
|
|
clearTimeout(timer);
|
|
if (e) { this.emit('warn', e); }
|
|
else { this._ensureMinimum(); }
|
|
|
|
if (typeof cb === 'function') { cb(e); }
|
|
}.bind(this));
|
|
} catch (e) {
|
|
clearTimeout(timer);
|
|
this.emit('warn', e);
|
|
if (typeof cb === 'function') { cb(e); }
|
|
}
|
|
};
|
|
|
|
// attempt to gracefully close the pool
|
|
Pool.prototype.end = function (cb) {
|
|
cb = cb || function () { };
|
|
|
|
this.ending = true;
|
|
|
|
var closeResources = function () {
|
|
debug('Closing resources');
|
|
clearInterval(this.syncTimer);
|
|
|
|
var count = this.pool.count(),
|
|
errors = [ ];
|
|
|
|
if (count === 0) {
|
|
cb();
|
|
return;
|
|
}
|
|
|
|
this.pool.forEach(function (value, key) {
|
|
this.remove(key, function (err, res) {
|
|
if (err) { errors.push(err); }
|
|
|
|
count--;
|
|
if (count === 0) {
|
|
debug('Resources closed');
|
|
if (errors.length) { cb(errors); }
|
|
else { cb(); }
|
|
}
|
|
});
|
|
}.bind(this));
|
|
}.bind(this);
|
|
|
|
// begin now, or wait until there are no pending requests
|
|
if (this.available.length === this.pool.count() && this.requests.length === 0 && this.acquiring === 0) {
|
|
closeResources();
|
|
} else {
|
|
debug('Waiting for active requests to conclude before closing resources');
|
|
this.once('drain', closeResources);
|
|
}
|
|
};
|
|
|
|
// close idle resources
|
|
Pool.prototype._reap = function () {
|
|
var n = this.pool.count(),
|
|
i, c = 0, res, idleTimestamp,
|
|
idleThreshold = (new Date()) - this.idleTimeout;
|
|
|
|
debug('reap (cur=%d, av=%d)', n, this.available.length);
|
|
|
|
for (i = this.available.length; n > this.min && i >= 0; i--) {
|
|
res = this.available[i];
|
|
idleTimestamp = this.pool.get(res);
|
|
|
|
if (idleTimestamp < idleThreshold) {
|
|
n--; c++;
|
|
this.remove(res);
|
|
}
|
|
}
|
|
|
|
if (c) { debug('Shrinking pool: destroying %d idle connections', c); }
|
|
};
|
|
|
|
// attempt to acquire at least the minimum quantity of resources
|
|
Pool.prototype._ensureMinimum = function () {
|
|
if (this.ending || this.destroyed) { return; }
|
|
|
|
var n = this.min - (this.pool.count() + this.acquiring);
|
|
if (n <= 0) { return; }
|
|
|
|
debug('Attempting to acquire minimum resources (cur=%d, min=%d)', this.pool.count(), this.min);
|
|
while (n--) { this._allocateResource(); }
|
|
};
|
|
|
|
// allocate a resource to a waiting request, if possible
|
|
Pool.prototype._maybeAllocateResource = function () { // jshint maxstatements: 25, maxcomplexity: 8
|
|
// do nothing if there are no requests to serve
|
|
if (this.requests.length === 0) { return; }
|
|
|
|
// call callback if there is a request and a resource to give it
|
|
if (this.available.length) {
|
|
var res, req;
|
|
|
|
// if a request has been aborted for some reason, we don't want to attempt to fill it
|
|
// but since we're using a deque, we can't splice it out. instead, we flag the request
|
|
// as fulfilled and ignore it here if we come across it
|
|
do {
|
|
req = this.requests.shift();
|
|
} while (this.requests.length && req.fulfilled);
|
|
|
|
if (req.fulfilled) { return; }
|
|
|
|
// only check out a resource if we have a requset to fill!
|
|
res = this.available.shift();
|
|
|
|
debug('Reserving request for resource (id=%s, req=%s)', getId(res), req.id);
|
|
|
|
var aborted = false, abort, timer;
|
|
|
|
timer = setTimeout(function () {
|
|
debug('Ping timeout, removing resource (id=%s)', getId(res));
|
|
abort();
|
|
}, this.pingTimeout);
|
|
|
|
abort = function () {
|
|
debug('Releasing request to request list (req=%s)', req.id);
|
|
|
|
this.emit('requeue', req);
|
|
|
|
aborted = true;
|
|
clearTimeout(timer);
|
|
|
|
this.requests.unshift(req);
|
|
this.remove(res);
|
|
this._maybeAllocateResource();
|
|
}.bind(this);
|
|
|
|
try {
|
|
debug('Pinging resource (id=%s)', getId(res));
|
|
|
|
this._ping(res, function (err) {
|
|
if (aborted) {
|
|
debug('Ping succeeded after timeout, doing nothing (id=%s, req=%s)', getId(res), req.id);
|
|
return;
|
|
}
|
|
clearTimeout(timer);
|
|
|
|
if (err) {
|
|
debug('Ping errored, releasing resource (id=%s)', getId(res));
|
|
this.emit('warn', err);
|
|
abort();
|
|
return;
|
|
}
|
|
|
|
if (!req.fulfilled) {
|
|
debug('Allocating resource to request (id=%s, req=%s); waited %ds', getId(res), req.id, ((new Date()) - req.ts) / 1000);
|
|
req.resolve(res);
|
|
} else {
|
|
debug('Request became fulfilled while pinging resource; discarding (id=%s, req=%s)', getId(res), req.id);
|
|
// there's no request to serve, but we've still got a resource checked out -- release it
|
|
this.release(res);
|
|
}
|
|
}.bind(this));
|
|
} catch (err) {
|
|
debug('Synchronous throw attempting to ping resource (id=%s): %s', getId(res), err.message);
|
|
this.emit('error', err);
|
|
abort();
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
// allocate a new resource if there is a request but no resource to give it
|
|
// and there's room in the pool
|
|
var pending = this.requests.length,
|
|
toBeAvailable = this.available.length + this.acquiring,
|
|
toBeTotal = this.pool.count() + this.acquiring;
|
|
|
|
if (pending > toBeAvailable && toBeTotal < this.max) {
|
|
debug('Growing pool: no resource to serve request (p=%d, tba=%d, tbt=%d, max=%d)', pending, toBeAvailable, toBeTotal, this.max);
|
|
this._allocateResource();
|
|
} else {
|
|
debug('Not growing pool: pending=%d, to be available=%d', pending, toBeAvailable);
|
|
}
|
|
};
|
|
|
|
// create a new resource
|
|
Pool.prototype._allocateResource = function () {
|
|
if (this.destroyed) {
|
|
debug('Not allocating resource: destroyed');
|
|
return;
|
|
}
|
|
|
|
debug('Attempting to acquire resource (cur=%d, ac=%d)', this.pool.count(), this.acquiring);
|
|
|
|
// acquiring is asynchronous, don't over-allocate due to in-progress resource allocation
|
|
this.acquiring++;
|
|
|
|
var onError, timer;
|
|
|
|
onError = function (err) {
|
|
clearTimeout(timer);
|
|
|
|
debug('Couldn\'t allocate new resource: %s', err.message);
|
|
|
|
// throw an error if we haven't successfully allocated a resource within
|
|
// the alloted time
|
|
var now = new Date();
|
|
if (this.live === false && now - this.started >= this.bailAfter) {
|
|
debug('Destroying pool: unable to aquire a resource within %ds', this.bailAfter/1000);
|
|
this._destroyPool();
|
|
this.emit('error', err);
|
|
return;
|
|
}
|
|
|
|
// timed out allocations are dropped; this could leave us below the
|
|
// minimum threshold; try to bring us up to the minimum, but don't spam
|
|
setTimeout(this._ensureMinimum.bind(this), this.backoff.next());
|
|
}.bind(this);
|
|
|
|
if (this.acquireTimeout !== 0) {
|
|
timer = setTimeout(function () {
|
|
debug('Timed out acquiring resource');
|
|
timer = null;
|
|
this.acquiring--;
|
|
|
|
onError(new Error('Timed out acquiring resource'));
|
|
}.bind(this), this.acquireTimeout);
|
|
}
|
|
|
|
try {
|
|
this._acquire(function (err, res) { // jshint maxstatements: 25
|
|
if (!err && !res) {
|
|
onError(new Error('Acquire callback gave no error and no resource -- check your Pool instance\'s acquire function'));
|
|
return;
|
|
}
|
|
|
|
setId(res);
|
|
|
|
if (timer) {
|
|
clearTimeout(timer);
|
|
timer = null;
|
|
this.acquiring--;
|
|
} else if (!err) {
|
|
debug('Attempting to gracefully clean up late-arrived resource (id=%s)', getId(res));
|
|
this.remove(res, true);
|
|
return;
|
|
}
|
|
|
|
if (err) {
|
|
onError(err);
|
|
return;
|
|
}
|
|
|
|
this.live = true;
|
|
|
|
debug('Successfully allocated new resource (cur=%d, ac=%d, id=%s)', this.pool.count(), this.acquiring, getId(res));
|
|
|
|
this.pool.set(res, new Date());
|
|
this.available.unshift(res);
|
|
|
|
// normally 'drain' is emitted when the pending requests queue is empty; pending requests
|
|
// are the primary source of acquiring new resources. the pool minimum can cause resources
|
|
// to be acquired with no pending requests, however. if pool.end() is called while resources
|
|
// are being acquired to fill the minimum, the 'drain' event will never get triggered because
|
|
// there were no requests pending. in this case, we want to trigger the cleanup routine that
|
|
// normally binds to 'drain'
|
|
if (this.ending && this.requests.length === 0 && this.acquiring === 0) {
|
|
this.emit('drain');
|
|
return;
|
|
}
|
|
|
|
// we've successfully acquired a resource, and we only get
|
|
// here if something wants it, so... do that
|
|
this._maybeAllocateResource();
|
|
}.bind(this));
|
|
} catch (e) {
|
|
onError(e);
|
|
}
|
|
};
|
|
|
|
// destroy the pool itself
|
|
Pool.prototype._destroyPool = function () {
|
|
this.destroyed = true;
|
|
clearInterval(this.syncTimer);
|
|
this.pool.forEach(function (value, key) {
|
|
this.destroy(key);
|
|
}.bind(this));
|
|
this.pool.clear();
|
|
|
|
// requests is a deque, no forEach
|
|
var req;
|
|
while (( req = this.requests.shift() )) {
|
|
req.reject(new Error('Pool was destroyed'));
|
|
}
|
|
|
|
this.acquiring = 0;
|
|
this.available.length = 0;
|
|
};
|
|
|
|
Pool._validNum = validNum;
|
|
|
|
module.exports = Pool;
|