You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

172 lines
6.5 KiB
JavaScript

"use strict";
const Promise = require("bluebird");
const defaultValue = require("default-value");
const syncpipe = require("syncpipe");
const pg = require("pg");
const { validateOptions } = require("@validatem/core");
const isString = require("@validatem/is-string");
const isInteger = require("@validatem/is-integer");
const required = require("@validatem/required");
const requireEither = require("@validatem/require-either");
const astToQuery = require("../ast-to-query");
const optimizeAST = require("../ast/optimize");
const optimizers = require("../optimizers");
const typeOf = require("../type-of");
function numberPlaceholders(query) {
// FIXME: This is a hack. Need to find a better way to assemble queries that doesn't rely on fragile regex afterwards to produce correctly-numbered placeholders.
let i = 0;
return query.replace(/\?/g, () => {
i += 1;
return `$${i}`;
});
}
module.exports = function createClient(_options) {
let options = validateOptions(arguments, {
hostname: [ isString ],
port: [ isInteger ], // FIXME: isPortNumber
socket: [ isString ],
username: [ isString ],
password: [ isString ],
database: [ required, isString ]
}, requireEither([ "hostname", "socket" ]));
let pool = new pg.Pool({
// This is a `pg` weird-ism, it expects you to provide a socket path like it is a hostname
host: defaultValue(options.host, options.socket),
port: options.port,
user: options.username,
password: options.password,
database: options.database
});
function claimConnection() {
return Promise.try(() => {
return pool.connect();
}).disposer((connection) => {
connection.release();
});
}
function createClientInstance(options) {
return {
// FIXME: Rename to something that is clearly a verb, eg. `execute` or `do`
query: function (ast, parameters = {}) {
// FIXME/NOTE: `parameters` is an object, not an array! it fills in the placeholders in the built query
let pgClient = defaultValue(options.pgClient, pool);
// FIXME: Allow passing in a precomputed query
let queryObject = syncpipe(ast, [
(_) => optimizeAST(_, optimizers),
(_) => astToQuery(_.ast)
]);
// FIXME: Switch this over to proper typed Validatem validation
for (let key of queryObject.placeholders) {
if (!(key in parameters)) {
throw new Error(`Query requires a parameter '${key}', but it was not specified`);
}
}
let numberedQuery = numberPlaceholders(queryObject.query);
let processedParams = queryObject.params.map((param) => {
if (typeof param === "object" && typeOf(param) === "placeholder") {
return parameters[param.name];
} else {
return param;
}
});
console.log({queryObject, numberedQuery, processedParams});
// FIXME: Process placeholders!
return Promise.try(() => {
return pgClient.query(numberedQuery, processedParams);
}).then((result) => {
return result.rows;
});
},
tryQuery: function (query, parameters) {
// This automagically wraps a query in a conceptual transaction, so that if it fails, we can roll back to the point before the query. This is useful for eg. cases where you want to run some custom logic (that attempts a fallback query) on a UNIQUE violation *within a transaction*. Normally, in this situation the transaction would be 'locked' until a rollback is carried out, but if we don't have a savepoint right before the expected-to-fail query, we can't automatically roll back to the correct point.
// FIXME: Check whether an outer transaction is necessary for this to work correctly!
// TODO: Document this as "opportunistically try out a query, so that if it fails, it will be like the query never happened (but it will still throw the original error)". Make sure to explain why the regular query behaviour is different from this; automatically creating a savepoint for every single query could have undesirable performance implications.
return this.transaction((tx) => {
return tx.query(query, parameters);
});
},
transaction: function (callback) {
if (options.pgClient == null) {
// Not inside a transaction yet
return Promise.try(() => {
return claimConnection();
}).then((disposableClient) => {
return Promise.using(disposableClient, (pgClient) => {
let instance = createClientInstance({ pgClient: pgClient });
return Promise.try(() => {
return callback(instance);
}).then(() => {
return pgClient.query("COMMIT");
}).catch((error) => {
return Promise.try(() => {
return pgClient.query("ROLLBACK");
}).then(() => {
throw error;
});
});
});
});
} else {
// Inside a transaction already
let currentSavepointID = defaultValue(options.savepointID, 0);
let newSavepointID = currentSavepointID + 1;
let newSavepointName = `ZAP_${newSavepointID}`;
let instance = createClientInstance({
pgClient: options.pgClient,
savepointID: newSavepointID
});
return Promise.try(() => {
// FIXME: Can we parameterize this?
return options.pgClient.query(`SAVEPOINT ${newSavepointName}`);
}).then(() => {
// Nested to ensure that we only try to rollback if it's actually the *user query itself* that fails, not the SAVEPOINT command
return Promise.try(() => {
return callback(instance);
}).catch((error) => {
return Promise.try(() => {
// FIXME: Can we parameterize this?
return options.pgClient.query(`ROLLBACK TO SAVEPOINT ${newSavepointName}`);
}).then(() => {
throw error;
});
});
});
}
},
destroy: function () {
if (options.pgClient == null) {
return pool.end();
} else {
// FIXME: Add an explicit rollback function to the API, for cases where there's no thrown error but the transaction needs to be rolled back anyway due to some external condition? Or just expect the user to throw an error? Maybe a special error type that gets absorbed after applying the rollback? Maybe that should be named 'unsafe rollback' as it could wrongly represent a failure as a success from a Promises perspective?
throw new Error(`You can only destroy the ZapDB client outside of a transaction`);
}
}
};
}
return createClientInstance({});
};
// SELECT color, size, country_id, store_id, COUNT(*) AS total_sold, SUM(price) AS total_revenue FROM sales GROUP BY color, size, ROLLUP (country_id, store_id);
// SELECT * FROM sales GROUP BY color, size, ROLLUP (country_id, store_id);