mirror of https://github.com/jo/couchmagick.git
Rewrite using couch-daemon
parent
c6be8db904
commit
d30350fae3
@ -1,14 +0,0 @@
|
||||
{
|
||||
"curly": true,
|
||||
"eqeqeq": true,
|
||||
"immed": true,
|
||||
"latedef": true,
|
||||
"newcap": true,
|
||||
"noarg": true,
|
||||
"sub": true,
|
||||
"undef": true,
|
||||
"unused": true,
|
||||
"boss": true,
|
||||
"eqnull": true,
|
||||
"node": true
|
||||
}
|
@ -1,103 +1,25 @@
|
||||
#!/usr/bin/env node
|
||||
/* couchmagick
|
||||
* (c) 2013 Johannes J. Schmidt, null2 GmbH, Berlin
|
||||
/* couchmagick: Run ImageMagicks `convert` on CouchDB documents.
|
||||
*
|
||||
* (c) 2014 Johannes J. Schmidt, null2 GmbH, Berlin
|
||||
*/
|
||||
|
||||
var pkg = require('./package.json');
|
||||
var couchmagickstream = require('./lib/couchmagick-stream');
|
||||
|
||||
var url = require('url');
|
||||
var async = require('async');
|
||||
var nano = require('nano');
|
||||
var magick = require('couchmagick-listen');
|
||||
var daemon = require('couch-daemon');
|
||||
var _ = require('highland');
|
||||
|
||||
var couchmagick = daemon(process.stdin, process.stdout, function() {
|
||||
process.exit(0);
|
||||
});
|
||||
|
||||
couchmagick.get({
|
||||
// Connection
|
||||
address: 'httpd.bind_address',
|
||||
port: 'httpd.port',
|
||||
auth: {
|
||||
username: pkg.name + '.username',
|
||||
password: pkg.name + '.password'
|
||||
},
|
||||
|
||||
// Batching
|
||||
concurrency: pkg.name + '.concurrency',
|
||||
streams: pkg.name + '.streams',
|
||||
limit: pkg.name + '.limit',
|
||||
|
||||
changes_feed_timeout: pkg.name + '.changes_feed_timeout',
|
||||
convert_process_timeout: pkg.name + '.convert_process_timeout'
|
||||
}, function(err, config) {
|
||||
if (err) {
|
||||
return process.exit(0);
|
||||
}
|
||||
|
||||
// defaults
|
||||
config.concurrency = config.concurrency && parseInt(config.concurrency, 10) || 1;
|
||||
config.streams = config.streams && parseInt(config.streams, 10) || 1;
|
||||
config.limit = config.limit && parseInt(config.limit, 100) || 0;
|
||||
config.changes_feed_timeout = config.changes_feed_timeout && parseInt(config.changes_feed_timeout, 10) || 10000;
|
||||
config.convert_process_timeout = config.convert_process_timeout && parseInt(config.convert_process_timeout, 10) || 60000;
|
||||
|
||||
couchmagick.info('using config ' + JSON.stringify(config).replace(/"password":".*?"/, '"password":"***"'));
|
||||
daemon({
|
||||
name: pkg.name,
|
||||
version: pkg.version,
|
||||
include_docs: true
|
||||
}, function(url, options) {
|
||||
var magick = _(couchmagickstream(url, options));
|
||||
|
||||
|
||||
// TODO: validate config
|
||||
|
||||
|
||||
var couch = url.format({
|
||||
protocol: 'http',
|
||||
hostname: config.address,
|
||||
port: config.port,
|
||||
auth: config.auth && config.auth.username && config.auth.password ? [ config.auth.username, config.auth.password ].join(':') : null
|
||||
});
|
||||
|
||||
var options = {
|
||||
limit: config.limit,
|
||||
feed: 'continuous',
|
||||
changes_feed_timeout: config.changes_feed_timeout,
|
||||
convert_process_timeout: config.convert_process_timeout,
|
||||
concurrency: config.concurrency
|
||||
return function(source) {
|
||||
return source.pipe(magick);
|
||||
};
|
||||
|
||||
|
||||
function listen(db, next) {
|
||||
couchmagick.info('Listening on ' + db);
|
||||
|
||||
var stream = magick(url.resolve(couch, db), options);
|
||||
|
||||
stream.on('error', couchmagick.error);
|
||||
stream.on('data', function(data) {
|
||||
if (data.response) {
|
||||
couchmagick.info(data.response);
|
||||
}
|
||||
});
|
||||
stream.on('end', next);
|
||||
}
|
||||
|
||||
|
||||
// main loop ;)
|
||||
function run(err) {
|
||||
if (err) {
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
|
||||
// get list of all databases
|
||||
// TODO: listen to db changes
|
||||
nano(couch).db.list(function(err, dbs) {
|
||||
if (err) {
|
||||
couchmagick.error('Can not get _all_dbs: ' + err.description);
|
||||
|
||||
return process.exit(0);
|
||||
}
|
||||
|
||||
async.eachLimit(dbs, config.streams, listen, run);
|
||||
});
|
||||
}
|
||||
run();
|
||||
});
|
||||
|
||||
|
@ -0,0 +1,340 @@
|
||||
/* couchmagick: Run ImageMagicks `convert` on CouchDB documents.
|
||||
*
|
||||
* (c) 2014 Johannes J. Schmidt, null2 GmbH, Berlin
|
||||
*/
|
||||
|
||||
var path = require('path');
|
||||
var spawn = require('child_process').spawn;
|
||||
var _ = require('highland');
|
||||
var async = require('async');
|
||||
var nano = require('nano');
|
||||
var strformat = require('strformat');
|
||||
|
||||
|
||||
// TODO: emit all documents, also filtered one, to enable checkpointing
|
||||
module.exports = function couchmagick(url, options) {
|
||||
options = options || {};
|
||||
options.concurrency = options.concurrency || 1;
|
||||
options.timeout = options.timeout || 60 * 1000; // 1 minute
|
||||
|
||||
|
||||
var couch = nano(url);
|
||||
var configs = {};
|
||||
|
||||
|
||||
// convert attachment
|
||||
// TODO: process
|
||||
// * configs
|
||||
// * attachments
|
||||
// * versions
|
||||
// in one go. Don't do the through split pipeline stuff.
|
||||
function convertAttachment(data, callback) {
|
||||
var db = couch.use(data.db_name);
|
||||
|
||||
// get target doc
|
||||
db.get(data.target.id, function(err, doc) {
|
||||
data.target.doc = doc || { _id: data.target.id };
|
||||
data.target.doc.couchmagick = data.target.doc.couchmagick || {};
|
||||
data.target.doc.couchmagick[data.target.id] = data.target.doc.couchmagick[data.target.id] || {};
|
||||
|
||||
|
||||
// do not process attachments twice, respect revpos
|
||||
if (data.target.doc.couchmagick[data.target.id][data.target.name] && data.target.doc.couchmagick[data.target.id][data.target.name].revpos === data.source.revpos) {
|
||||
return callback(null, data);
|
||||
}
|
||||
|
||||
|
||||
// insert couchmagick stamp
|
||||
data.target.doc.couchmagick[data.target.id][data.target.name] = {
|
||||
id: data.source.id,
|
||||
name: data.source.name,
|
||||
revpos: data.source.revpos
|
||||
};
|
||||
|
||||
|
||||
// query params, doc_name is used by nano as id
|
||||
var params = {
|
||||
doc_name: data.target.id
|
||||
};
|
||||
if (data.target.doc._rev) {
|
||||
params.rev = data.target.doc._rev;
|
||||
}
|
||||
|
||||
// attachment multipart part
|
||||
var attachment = {
|
||||
name: data.target.name,
|
||||
content_type: data.target.content_type,
|
||||
data: []
|
||||
};
|
||||
|
||||
// convert process
|
||||
var c = spawn('convert', data.args);
|
||||
|
||||
// collect errors
|
||||
var cerror = [];
|
||||
c.stderr.on('data', function(err) {
|
||||
cerror.push(err);
|
||||
});
|
||||
|
||||
// convert timeout
|
||||
var kill = setTimeout(function() {
|
||||
cerror.push(new Buffer('timeout'));
|
||||
// send SIGTERM
|
||||
c.kill();
|
||||
}, options.timeout);
|
||||
|
||||
// collect output
|
||||
c.stdout.on('data', function(data) {
|
||||
attachment.data.push(data);
|
||||
});
|
||||
|
||||
// concat output
|
||||
c.stdout.on('end', function() {
|
||||
clearTimeout(kill);
|
||||
attachment.data = Buffer.concat(attachment.data);
|
||||
});
|
||||
|
||||
// convert finish
|
||||
c.on('close', function(code) {
|
||||
// store exit code
|
||||
data.code = code;
|
||||
data.target.doc.couchmagick[data.target.id][data.target.name].code = data.code;
|
||||
|
||||
if (code === 0) {
|
||||
// no error: make multipart request
|
||||
return db.multipart.insert(data.target.doc, [attachment], params, function(err, response) {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
data.response = response;
|
||||
if (response.rev) {
|
||||
data.target.rev = response.rev;
|
||||
}
|
||||
|
||||
callback(null, data);
|
||||
});
|
||||
}
|
||||
|
||||
// store error
|
||||
data.error = Buffer.concat(cerror).toString();
|
||||
data.target.doc.couchmagick[data.target.id][data.target.name].error = data.error;
|
||||
|
||||
// store document stup, discard attachment
|
||||
db.insert(data.target.doc, data.target.id, function(err, response) {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
data.response = response;
|
||||
if (response.rev) {
|
||||
data.target.rev = response.rev;
|
||||
}
|
||||
|
||||
callback(null, data);
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
// request attachment and pipe it into convert process
|
||||
db.attachment.get(data.source.id, data.source.name).pipe(c.stdin);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
// processing queue
|
||||
var convert = async.queue(convertAttachment, options.concurrency);
|
||||
|
||||
|
||||
return _.pipeline(
|
||||
// gather configs
|
||||
_.map(function(data) {
|
||||
if (data.stream === 'compile') {
|
||||
var cfg = {};
|
||||
cfg[data.id] = data.doc;
|
||||
|
||||
_.extend(cfg, configs);
|
||||
}
|
||||
return data;
|
||||
}),
|
||||
|
||||
// Decide whether a whole doc needs processing at all
|
||||
_.filter(function(data) {
|
||||
if (!data.doc) {
|
||||
return false;
|
||||
}
|
||||
if (!data.doc._attachments) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!Object.keys(data.doc._attachments).length) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}),
|
||||
|
||||
// split stream into each config
|
||||
// TODO: this prevents us from supporting multiple attachments per document
|
||||
// and therefore needs serialisation
|
||||
_.map(function(data) {
|
||||
return Object.keys(configs).map(function(config) {
|
||||
return {
|
||||
db_name: data.db_name,
|
||||
seq: data.seq,
|
||||
doc: data.doc,
|
||||
config: configs[config]
|
||||
};
|
||||
});
|
||||
}),
|
||||
_.flatten(),
|
||||
|
||||
// Decide if couchmagick should be run on a specific attachment
|
||||
_.filter(function(data) {
|
||||
if (typeof data.config.filter === 'function' && !data.config.filter(data.doc)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}),
|
||||
|
||||
// split stream into attachments
|
||||
// TODO: this prevents us from supporting multiple attachments per document
|
||||
// and therefore needs serialisation
|
||||
_.map(function(data) {
|
||||
return Object.keys(data.doc._attachments).map(function(name) {
|
||||
return {
|
||||
db_name: data.db_name,
|
||||
seq: data.seq,
|
||||
doc: data.doc,
|
||||
config: data.config,
|
||||
name: name
|
||||
};
|
||||
});
|
||||
}),
|
||||
_.flatten(),
|
||||
|
||||
// filter attachments with builtin
|
||||
_.filter(function(data) {
|
||||
if (!data.doc) {
|
||||
return false;
|
||||
}
|
||||
if (!data.name) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return data.doc._attachments[data.name].content_type.match(/^image\//);
|
||||
}),
|
||||
|
||||
// split stream into versions
|
||||
// TODO: this prevents us from supporting multiple attachments per document
|
||||
// and therefore needs serialisation
|
||||
_.map(function(data) {
|
||||
return Object.keys(data.config.versions).map(function(key) {
|
||||
var version = data.config.versions[key];
|
||||
|
||||
// version defaults
|
||||
version.id = version.id || '{id}/{version}';
|
||||
version.name = version.name || '{basename}-{version}{extname}';
|
||||
version.content_type = version.content_type || 'image/jpeg';
|
||||
version.args = version.args || [];
|
||||
|
||||
// first arg is input pipe
|
||||
if (!version.args[0] || version.args[0] !== '-') {
|
||||
version.args.unshift('-');
|
||||
}
|
||||
// last arg is output pipe
|
||||
if (version.args.length < 2 || !version.args[version.args.length - 1].match(/^[a-z]{0,3}:-$/)) {
|
||||
version.args.push('jpg:-');
|
||||
}
|
||||
|
||||
// run version filter
|
||||
if (typeof version.filter === 'function' && !version.filter(data.doc, data.name)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// construct target doc
|
||||
var id = strformat(version.id, {
|
||||
id: data.doc._id,
|
||||
parts: data.doc._id.split('/'),
|
||||
version: key
|
||||
});
|
||||
var name = strformat(version.name, {
|
||||
id: data.doc._id,
|
||||
parts: data.doc._id.split('/'),
|
||||
version: key,
|
||||
|
||||
name: data.name,
|
||||
extname: path.extname(data.name),
|
||||
basename: path.basename(data.name, path.extname(data.name)),
|
||||
dirname: path.dirname(data.name)
|
||||
});
|
||||
|
||||
|
||||
return {
|
||||
db_name: data.db_name,
|
||||
seq: data.seq,
|
||||
source: {
|
||||
id: data.doc._id,
|
||||
name: data.name,
|
||||
revpos: data.doc._attachments[data.name].revpos,
|
||||
couchmagick: data.doc.couchmagick
|
||||
},
|
||||
args: version.args,
|
||||
target: {
|
||||
id: id,
|
||||
name: name,
|
||||
content_type: version.content_type
|
||||
}
|
||||
};
|
||||
});
|
||||
}),
|
||||
_.flatten(),
|
||||
|
||||
|
||||
// filter derived versions to prevent cascades
|
||||
// eg:
|
||||
// single-attachment/thumbnail
|
||||
// single-attachment/thumbnail/thumbnail
|
||||
// single-attachment/thumbnail/thumbnail/thumbnail
|
||||
_.filter(function(data) {
|
||||
var derivative = data.source.couchmagick &&
|
||||
data.source.couchmagick[data.source.id] &&
|
||||
data.source.couchmagick[data.source.id][data.source.name] &&
|
||||
data.source.couchmagick[data.source.id][data.source.name].id;
|
||||
|
||||
return !derivative;
|
||||
}),
|
||||
|
||||
|
||||
// process attachments
|
||||
_.through(function(source) {
|
||||
return _(function(push, done) {
|
||||
source
|
||||
.on('data', function(data) {
|
||||
convert.push(data, function(err, res) {
|
||||
push(err, res);
|
||||
});
|
||||
})
|
||||
.on('error', push)
|
||||
.on('end', done);
|
||||
});
|
||||
}),
|
||||
|
||||
_.map(function(data) {
|
||||
if (!data.response) {
|
||||
return data;
|
||||
}
|
||||
|
||||
delete data.seq;
|
||||
|
||||
return {
|
||||
db_name: data.db_name,
|
||||
seq: data.seq,
|
||||
type: 'log',
|
||||
message: 'Complete: ' + JSON.stringify(data.response)
|
||||
};
|
||||
})
|
||||
);
|
||||
};
|
||||
|
Loading…
Reference in New Issue