You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
cvm/lib/vm/kvm/qmp.js

95 lines
2.8 KiB
JavaScript

'use strict';
const net = require("net");
const createEventEmitter = require("create-event-emitter");
const jsonStream = require("JSONStream");
const createError = require("create-error");
const defaultValue = require("default-value");
const QMPError = createError("QMPError");
const CommandNotFoundError = createError(QMPError, "CommandNotFoundError");
module.exports = function connect(socketPath) {
return Promise.try(() => {
let socket = net.createConnection({path: socketPath});
let commandQueue = [];
let currentCommand;
function trySendCommand() {
/* This enforces single-concurrency by only executing a command when there's not already a command being processed. Every time a command is either queued or completed, this function is called again, so that eventually the command queue will drain, executing each command in order. */
if (currentCommand == null) {
currentCommand = commandQueue.shift();
socket.write(JSON.stringify(currentCommand.payload));
}
}
function commandResult(result) {
if (currentCommand != null) {
currentCommand.resolve(result.return);
} else {
// FIXME: Log a warning!
}
currentCommand = null;
trySendCommand();
}
function commandFailed(result) {
if (currentCommand != null) {
let err;
if (result.error.class === "CommandNotFound") {
err = new CommandNotFoundError(result.error.desc, result.error);
} else {
err = new QMPError(defaultValue(result.error.desc, "Unknown error occurred"), result.error);
}
currentCommand.reject(err);
} else {
// FIXME: Log a warning!
}
currentCommand = null;
trySendCommand();
}
let emitter = createEventEmitter({
execute: function executeCommand(command, args) {
return new Promise((resolve, reject) => {
/* We need to implement a defer here, because the QMP API doesn't tie responses to requests in any way. We can't really do this with .once event listeners either, because 1) that gets messy quickly and is hard to debug when it breaks, and 2) we need a command queue so that we are only ever executing a single command at a time. */
commandQueue.push({
resolve: resolve,
reject: reject,
payload: {
execute: command,
arguments: args
}
});
trySendCommand();
});
}
});
socket.pipe(jsonStream.parse()).on("data", (obj) => {
if (obj.event != null) {
emitter.emit(obj.event, obj);
} else if (obj.error != null) {
commandFailed(obj);
} else if (obj.return != null) {
commandResult(obj);
} else {
throw new Error("Encountered unexpected message type", obj);
}
});
return Promise.try(() => {
/* This initializes the QMP API. If it fails, our `connect` Promise will fail as well (as it should). */
emitter.execute("qmp_capabilities");
}).then((result) => {
return emitter;
});
});
}