You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

178 lines
5.0 KiB
JavaScript

'use strict';
var HashMap = require('hashmap'),
Pool = require('./pool');
var inherits = require('util').inherits,
EventEmitter = require('events').EventEmitter;
function Cluster(pools) {
EventEmitter.call(this);
if (!pools) { pools = [ ]; }
else if (!Array.isArray(pools)) { pools = [ pools ]; }
this.pools = [ ];
this.caps = { };
this.removeListeners = new HashMap();
this.sources = new HashMap();
this.ended = false;
pools.forEach(this.addPool, this);
}
inherits(Cluster, EventEmitter);
Cluster.prototype.addPool = function (pool) {
if (this.ended) {
throw new Error('Cluster.addPool(): Cluster is ended');
}
if (!(pool instanceof Pool)) {
throw new Error('Cluster.addPool(): Not a valid pool');
}
if (this.pools.indexOf(pool) > -1) {
throw new Error('Cluster.addPool(): Pool already in cluster');
}
this.pools.push(pool);
this._bindListeners(pool);
this._addCapabilities(pool);
};
Cluster.prototype.removePool = function (pool) {
if (!(pool instanceof Pool)) {
throw new Error('Cluster.removePool(): Not a valid pool');
}
var idx = this.pools.indexOf(pool);
if (idx === -1) {
throw new Error('Cluster.removePool(): Pool not in cluster');
}
this.pools.splice(idx, 1);
this._unbindListeners(pool);
this._removeCapabilities(pool);
};
Cluster.prototype.acquire = function (cap, cb) { // jshint maxstatements: 20, maxcomplexity: 8
if (typeof cap === 'function') {
cb = cap;
cap = void 0;
}
if (typeof cb !== 'function') {
this.emit('error', new Error('Cluster.acquire(): Callback is required'));
return;
}
if (this.ended) {
cb(new Error('Cluster.acquire(): Cluster is ended'));
return;
}
var sources = this.pools;
if (cap) {
if (!this.caps[cap] || !this.caps[cap].length) {
cb(new Error('Cluster.acquire(): No pools can fulfil capability: ' + cap));
return;
}
sources = this.caps[cap];
}
var pool = sources.filter(function (pool) {
var stats = pool.stats();
return stats.queued < stats.maxRequests;
}).sort(function (a, b) {
var statsA = a.stats(),
statsB = b.stats();
return (statsB.available - statsB.queued) - (statsA.available - statsA.queued);
})[0];
if (!pool) {
cb(new Error('Cluster.acquire(): No pools available'));
return;
}
pool.acquire(function (err, res) {
if (err) { cb(err); return; }
this.sources.set(res, pool);
process.nextTick(cb.bind(null, null, res));
}.bind(this));
};
Cluster.prototype.release = function (res) {
if (!this.sources.has(res)) {
var err = new Error('Cluster.release(): Unknown resource');
err.res = res;
this.emit('error', err);
return;
}
var pool = this.sources.get(res);
this.sources.remove(res);
pool.release(res);
};
Cluster.prototype.end = function (cb) {
if (this.ended) {
if (typeof cb === 'function') {
cb(new Error('Cluster.end(): Cluster is already ended'));
}
return;
}
this.ended = true;
var count = this.pools.length,
errs = [ ];
this.pools.forEach(function (pool) {
pool.end(function (err, res) {
this.removePool(pool);
if (err) { errs.concat(err); }
count--;
if (count === 0 && typeof cb === 'function') {
cb(errs.length ? errs : null);
}
}.bind(this));
}, this);
};
Cluster.prototype._addCapabilities = function (pool) {
if (!pool.capabilities || !Array.isArray(pool.capabilities)) { return; }
pool.capabilities.forEach(function (cap) {
if (typeof cap !== 'string') { return; }
this.caps[cap] = this.caps[cap] || [ ];
this.caps[cap].push(pool);
}, this);
};
Cluster.prototype._removeCapabilities = function (pool) {
if (!pool.capabilities || !Array.isArray(pool.capabilities)) { return; }
pool.capabilities.forEach(function (cap) {
if (typeof cap !== 'string' || !Array.isArray(this.caps[cap])) { return; }
var idx = this.caps[cap].indexOf(pool);
if (idx > -1) { this.caps[cap].splice(idx, 1); }
}, this);
};
Cluster.prototype._bindListeners = function (pool) {
var onError, onWarn;
onError = function (err) {
err.source = pool;
this.emit('error', err);
}.bind(this);
onWarn = function (err) {
err.source = pool;
this.emit('warn', err);
}.bind(this);
pool.on('error', onError);
pool.on('warn', onWarn);
this.removeListeners.set(pool, function () {
pool.removeListener('error', onError);
pool.removeListener('warn', onWarn);
});
};
Cluster.prototype._unbindListeners = function (pool) {
this.removeListeners.get(pool)();
this.removeListeners.remove(pool);
};
module.exports = Cluster;