[6.x] [@kbn/es] fix promise and CLI error handling (#17596) (#17598)

* [kbn-es/cluster] fix promise handling to properly route success/failure

* [kbn-es/cli] catch errors that bubble to the CLI and log with a bit of style

* [kbn-es] fix promise handling when building from source

* [kbn-es] check for inverse of .stop() condition

* [kbn-es/cluster] resolve promise is cluster stops cleanly

* [kbn-es/cluster/start] reject if ES exits before starting
This commit is contained in:
Spencer 2018-04-06 13:20:32 -07:00 committed by GitHub
parent e98173cf2c
commit b9350316dd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 170 additions and 102 deletions

View file

@ -2,6 +2,8 @@ const chalk = require('chalk');
const getopts = require('getopts');
const dedent = require('dedent');
const commands = require('./cli_commands');
const { isCliError } = require('./errors');
const { log } = require('./utils');
function help() {
const availableCommands = Object.keys(commands).map(
@ -24,41 +26,55 @@ function help() {
}
exports.run = async (defaults = {}) => {
const argv = process.argv.slice(2);
const options = getopts(argv, {
alias: {
h: 'help',
},
try {
const argv = process.argv.slice(2);
const options = getopts(argv, {
alias: {
h: 'help',
},
default: defaults,
});
const args = options._;
const commandName = args[0];
default: defaults,
});
const args = options._;
const commandName = args[0];
if (args.length === 0 || (!commandName && options.help)) {
help();
return;
if (args.length === 0 || (!commandName && options.help)) {
help();
return;
}
const command = commands[commandName];
if (command === undefined) {
log.error(
chalk.red(`[${commandName}] is not a valid command, see 'es --help'`)
);
process.exitCode = 1;
return;
}
if (commandName && options.help) {
log.write(dedent`
usage: ${command.usage || `es ${commandName} [<args>]`}
${command.description}
${command.help(defaults).replace(/\n/g, '\n ')}
`);
return;
}
await command.run(defaults);
} catch (error) {
if (isCliError(error)) {
// only log the message, the CLI explicitly threw this message
// and it doesn't need a stack trace
log.error(error.message);
} else {
log.error('Unhandled error');
log.error(error);
}
process.exitCode = 1;
}
const command = commands[commandName];
if (command === undefined) {
console.log(
chalk.red(`[${commandName}] is not a valid command, see 'es --help'`)
);
process.exit(1);
}
if (commandName && options.help) {
console.log(dedent`
usage: ${command.usage || `es ${commandName} [<args>]`}
${command.description}
${command.help(defaults).replace(/\n/g, '\n ')}
`);
return;
}
await command.run(defaults);
};

View file

@ -1,6 +1,7 @@
const dedent = require('dedent');
const getopts = require('getopts');
const { Cluster } = require('../cluster');
const { createCliError } = require('../errors');
exports.description = 'Install and run from an Elasticsearch tar';
@ -38,8 +39,7 @@ exports.run = async (defaults = {}) => {
const [, path] = options._;
if (!path || !path.endsWith('tar.gz')) {
console.warn('you must provide a path to an ES tar file');
return;
throw createCliError('you must provide a path to an ES tar file');
}
const { installPath } = await cluster.installArchive(path, options);

View file

@ -3,6 +3,7 @@ const chalk = require('chalk');
const { installSnapshot, installSource, installArchive } = require('./install');
const { ES_BIN } = require('./paths');
const { log: defaultLog, parseEsLog, extractConfigFiles } = require('./utils');
const { createCliError } = require('./errors');
exports.Cluster = class Cluster {
constructor(log = defaultLog) {
@ -15,17 +16,17 @@ exports.Cluster = class Cluster {
* @param {Object} options
* @property {Array} options.installPath
* @property {Array} options.sourcePath
* @returns {Promise}
* @returns {Promise<{installPath}>}
*/
async installSource(options = {}) {
this._log.info(chalk.bold('Installing from source'));
this._log.indent(4);
const install = await installSource({ log: this._log, ...options });
const { installPath } = await installSource({ log: this._log, ...options });
this._log.indent(-4);
return install;
return { installPath };
}
/**
@ -34,17 +35,20 @@ exports.Cluster = class Cluster {
* @param {Object} options
* @property {Array} options.installPath
* @property {Array} options.sourcePath
* @returns {Promise}
* @returns {Promise<{installPath}>}
*/
async installSnapshot(options = {}) {
this._log.info(chalk.bold('Installing from snapshot'));
this._log.indent(4);
const install = await installSnapshot({ log: this._log, ...options });
const { installPath } = await installSnapshot({
log: this._log,
...options,
});
this._log.indent(-4);
return install;
return { installPath };
}
/**
@ -53,17 +57,20 @@ exports.Cluster = class Cluster {
* @param {String} path
* @param {Object} options
* @property {Array} options.installPath
* @returns {Promise}
* @returns {Promise<{installPath}>}
*/
async installArchive(path, options = {}) {
this._log.info(chalk.bold('Installing from an archive'));
this._log.indent(4);
const install = await installArchive(path, { log: this._log, ...options });
const { installPath } = await installArchive(path, {
log: this._log,
...options,
});
this._log.indent(-4);
return install;
return { installPath };
}
/**
@ -75,26 +82,72 @@ exports.Cluster = class Cluster {
* @returns {Promise}
*/
async start(installPath, options = {}) {
await this.run(installPath, options);
this._exec(installPath, options);
return new Promise(resolve => {
this._process.stdout.on('data', data => {
if (/started/.test(data)) {
return resolve(process);
}
});
});
await Promise.race([
// await the "started" log message
new Promise(resolve => {
this._process.stdout.on('data', data => {
if (/started/.test(data)) {
resolve();
}
});
}),
// await the outcome of the process in case it exits before starting
this._outcome.then(() => {
throw createCliError('ES exited without starting');
}),
]);
}
/**
* Starts Elasticsearch and immediately returns with process
* Starts Elasticsearch and waits for Elasticsearch to exit
*
* @param {String} installPath
* @param {Object} options
* @property {Array} options.esArgs
* @returns {Process}
* @returns {Promise<undefined>}
*/
run(installPath, { esArgs = [] }) {
async run(installPath, options = {}) {
this._exec(installPath, options);
// await the final outcome of the process
await this._outcome;
}
/**
* Stops ES process, if it's running
*
* @returns {Promise}
*/
async stop() {
if (!this._process || !this._outcome) {
throw new Error('ES has not been started');
}
this._process.kill();
await this._outcome;
}
/**
* Common logic from this.start() and this.run()
*
* Start the elasticsearch process (stored at `this._process`)
* and "pipe" its stdio to `this._log`. Also create `this._outcome`
* which will be resolved/rejected when the process exits.
*
* @private
* @param {String} installPath
* @param {Object} options
* @property {Array} options.esArgs
* @return {undefined}
*/
_exec(installPath, { esArgs = [] }) {
if (this._process || this._outcome) {
throw new Error('ES has already been started');
}
this._log.info(chalk.bold('Starting'));
this._log.indent(4);
@ -120,30 +173,14 @@ exports.Cluster = class Cluster {
);
this._outcome = new Promise((resolve, reject) => {
this._process.on('exit', code => {
// JVM exits with 143 on SIGTERM and 130 on SIGINT, dont' treat then as errors
this._process.once('exit', code => {
// JVM exits with 143 on SIGTERM and 130 on SIGINT, dont' treat them as errors
if (code > 0 && !(code === 143 || code === 130)) {
return reject(`ES exitted with code ${code}`);
reject(createCliError(`ES exitted with code ${code}`));
} else {
resolve();
}
resolve();
});
});
return process;
}
/**
* Stops ES process, if it's running
*
* @returns {Promise}
*/
stop() {
if (!this._process || !this._outcome) {
return Promise.reject('ES has not been started');
}
this._process.kill();
return this._outcome;
}
};

View file

@ -0,0 +1,9 @@
exports.createCliError = function(message) {
const error = new Error(message);
error.isCliError = true;
return error;
};
exports.isCliError = function(error) {
return error && error.isCliError;
};

View file

@ -6,9 +6,13 @@ const chalk = require('chalk');
const crypto = require('crypto');
const simpleGit = require('simple-git/promise');
const { installArchive } = require('./archive');
const { createCliError } = require('../errors');
const { findMostRecentlyChanged, log: defaultLog, cache } = require('../utils');
const { GRADLE_BIN, ES_ARCHIVE_PATTERN, BASE_PATH } = require('../paths');
const onceEvent = (emitter, event) =>
new Promise(resolve => emitter.once(event, resolve));
/**
* Installs ES from source
*
@ -96,33 +100,35 @@ async function sourceInfo(cwd, log = defaultLog) {
* @property {ToolingLog} options.log
* @returns {Object} containing archive and optional plugins
*/
function createSnapshot({ sourcePath, log = defaultLog }) {
async function createSnapshot({ sourcePath, log = defaultLog }) {
const buildArgs = [':distribution:archives:tar:assemble'];
return new Promise((resolve, reject) => {
log.info('%s %s', GRADLE_BIN, buildArgs.join(' '));
log.info('%s %s', GRADLE_BIN, buildArgs.join(' '));
const build = execa(GRADLE_BIN, buildArgs, {
cwd: sourcePath,
stdio: ['ignore', 'pipe', 'pipe'],
});
const stdout = readline.createInterface({ input: build.stdout });
const stderr = readline.createInterface({ input: build.stderr });
stdout.on('line', line => log.debug(line));
stderr.on('line', line => log.error(line));
build.stdout.on('end', () => {
if (build.exitCode > 0) {
reject(new Error('unable to build ES'));
} else {
const esTarballPath = findMostRecentlyChanged(
path.resolve(sourcePath, ES_ARCHIVE_PATTERN)
);
resolve(esTarballPath);
}
});
const build = execa(GRADLE_BIN, buildArgs, {
cwd: sourcePath,
stdio: ['ignore', 'pipe', 'pipe'],
});
const stdout = readline.createInterface({ input: build.stdout });
const stderr = readline.createInterface({ input: build.stderr });
stdout.on('line', line => log.debug(line));
stderr.on('line', line => log.error(line));
const [exitCode] = await Promise.all([
onceEvent(build, 'exit'),
onceEvent(stdout, 'close'),
onceEvent(stderr, 'close'),
]);
if (exitCode > 0) {
throw createCliError('unable to build ES');
}
const esTarballPath = findMostRecentlyChanged(
path.resolve(sourcePath, ES_ARCHIVE_PATTERN)
);
return esTarballPath;
}