diff --git a/scratch.js b/scratch.js deleted file mode 100644 index a03b37a55590..000000000000 --- a/scratch.js +++ /dev/null @@ -1,50 +0,0 @@ -/* jshint node: true */ -var elasticsearch = require('../elasticsearch-js'); -var async = require('async'); - -var es = new elasticsearch.Client({ - host: 'localhost:9200', - sniffOnStart: true, - sniffInterval: 3000, - apiVersion: '1.0', - log: 'trace' -}); - -var rl = require('readline').createInterface({ - input: process.stdin, - output: process.stdout, - terminal: true -}); - -async.series([ - function (done) { - setTimeout(done, 50); - }, - function (done) { - console.log(es.transport.connectionPool._conns.index); - es.indices.create({ - index: 'index_name' - }, done); - }, - function (done) { - rl.question('Is the master down?', function () { - done(); - }); - }, - function (done) { - console.log(es.transport.connectionPool._conns.index); - es.search({ index: 'index_name' }, done); - }, - function (done) { - rl.question('Is the slave down now?', function () { - es.search({ body: { query: { match_all: {} } } }, done); - }); - }, - function (done) { - rl.question('Is the master back up?', function () { - es.search({ body: { query: { match_all: {} } } }, done); - }); - } -], function (err) { - console.log(err); -}); \ No newline at end of file diff --git a/src/courier/courier.js b/src/courier/courier.js index d41269bf3e3d..39e7d497bae3 100644 --- a/src/courier/courier.js +++ b/src/courier/courier.js @@ -1,287 +1,257 @@ define(function (require) { - var DataSource = require('courier/data_source'); - var Docs = require('courier/docs'); var EventEmitter = require('utils/event_emitter'); var inherits = require('utils/inherits'); var errors = require('courier/errors'); var _ = require('lodash'); var angular = require('angular'); - function chain(cntx, method) { - return function () { - method.apply(cntx, arguments); - return this; - }; - } + var DocSource = require('courier/data_source/doc'); + var SearchSource = require('courier/data_source/search'); + var HastyRefresh = require('courier/errors').HastyRefresh; - function emitError(source, courier, error) { - if (EventEmitter.listenerCount(source, 'error')) { - source.emit('error', error); - } else { - courier.emit('error', error); - } - } + // map constructors to type keywords + var sourceTypes = { + doc: DocSource, + search: SearchSource + }; - function mergeProp(state, filters, val, key) { - switch (key) { - case 'inherits': - case '_type': - // ignore - return; - case 'filter': - filters.push(val); - return; - case 'index': - case 'type': - if (key && state[key] == null) { - state[key] = val; + // fetch process for the two source types + var onFetch = { + // execute a search right now + search: function (courier) { + if (courier._activeSearchRequest) { + return courier._error(new HastyRefresh()); } - return; - case 'source': - key = '_source'; - /* fall through */ - } + courier._activeSearchRequest = SearchSource.fetch( + courier, + courier._refs.search, + function (err) { + if (err) return courier._error(err); + }); + }, - if (key && state.body[key] == null) { - state.body[key] = val; - } - } - - function flattenDataSource(source) { - var state = { - body: {} - }; - - // all of the filters from the source chain - var filters = []; - var collectProp = _.partial(mergeProp, state, filters); - - // walk the chain and merge each property - var current = source; - var currentState; - while (current) { - currentState = current._state(); - _.forOwn(currentState, collectProp); - current = currentState.inherits; - } - - // defaults for the query - _.forOwn({ - query: { - 'match_all': {} - } - }, collectProp); - - // switch to filtered query if there are filters - if (filters.length) { - state.body.query = { - filtered: { - query: state.body.query, - filter: { - bool: { - must: filters - } - } + // validate that all of the DocSource objects are up to date + // then fetch the onces that are not + doc: function (courier) { + DocSource.validate(courier, courier._refs.doc, function (err, invalid) { + if (err) { + courier.stop(); + return courier.emit('error', err); } - }; + + // if all of the docs are up to date we don't need to do anything else + if (invalid.length === 0) return; + + DocSource.fetch(courier, invalid, function (err) { + if (err) return courier._error(err); + }); + }); } + }; - return state; - } - - function fetchSearchResults(courier, client, sources, cb) { - if (!client) { - this.emit('error', new Error('Courier does not have a client yet, unable to fetch queries.')); - return; - } - - var all = []; - var body = ''; - _.each(sources, function (source) { - if (source.getType() !== 'search') { - return; - } - all.push(source); - - var state = flattenDataSource(source); - var header = JSON.stringify({ - index: state.index, - type: state.type - }); - var doc = JSON.stringify(state.body); - - body += header + '\n' + doc + '\n'; - }); - - return client.msearch({ body: body }, function (err, resp) { - if (err) return cb(err); - - _.each(resp.responses, function (resp, i) { - var source = sources[i]; - if (resp.error) return emitError(source, courier, resp); - source.emit('results', resp); - }); - - cb(err, resp); - }); - } - - function fetchDocs(courier, client, sources, cb) { - if (!client) { - this.emit('error', new Error('Courier does not have a client yet, unable to fetch queries.')); - return; - } - - var all = []; - var body = { - docs: [] - }; - - _.each(sources, function (source) { - if (source.getType() !== 'get') { - return; - } - - all.push(source); - - var state = flattenDataSource(source); - body.docs.push({ - index: state.index, - type: state.type, - id: state.id - }); - }); - - return client.mget({ body: body }, function (err, resp) { - if (err) return cb(err); - - _.each(resp.responses, function (resp, i) { - var source = sources[i]; - if (resp.error) return emitError(source, courier, resp); - source.emit('results', resp); - }); - - cb(err, resp); - }); - } - - function saveUpdate(source, fields) { - - } + // default config values + var defaults = { + fetchInterval: 30000, + docInterval: 2500 + }; /** - * Federated query service, supports data sources that inherit properties - * from one another and automatically emit results. + * Federated query service, supports two data source types: doc and search. + * + * search: + * - inherits filters, and other query properties + * - automatically emit results on a set interval + * doc: + * - tracks doc versions + * - emits same results event when the doc is updated + * - helps seperate versions of kibana running on the same machine stay in sync + * - (NI) tracks version and uses it when new versions of a doc are reindexed + * - (NI) helps deal with conflicts + * * @param {object} config - * @param {Client} config.client - The elasticsearch.js client to use for querying. Should be setup and ready to go. - * @param {integer} [config.fetchInterval=30000] - The amount in ms between each fetch (deafult is 30 seconds) + * @param {Client} config.client - The elasticsearch.js client to use for querying. Should be + * setup and ready to go. + * @param {EsClient} [config.client] - The elasticsearch client that the courier should use + * (can be set at a later time with the `.client()` method) + * @param {integer} [config.fetchInterval=30000] - The amount in ms between each fetch (deafult + * is 30 seconds) */ function Courier(config) { if (!(this instanceof Courier)) return new Courier(config); - var opts = { - fetchInterval: 30000 - }; - var fetchTimer; - var activeRequest; - var courier = this; - var sources = { - search: [], - get: [] - }; - function doSearch() { - if (!opts.client) { - this.emit('error', new Error('Courier does not have a client, pass it ' + - 'in to the constructor or set it with the .client() method')); - return; - } - if (activeRequest) { - activeRequest.abort(); - stopFetching(); - this.emit('error', new errors.HastyRefresh()); - return; - } + config = _.defaults(config || {}, defaults); - // we need to catch the original promise in order to keep it's abort method - activeRequest = fetchSearchResults(courier, opts.client, sources.search, function (err, resp) { - activeRequest = null; - setFetchTimeout(); + this._client = config.client; - if (err) { - window.console && console.log(err); - } - }); - } - - function setFetchTimeout() { - clearTimeout(fetchTimer); - if (opts.fetchInterval) { - fetchTimer = setTimeout(doSearch, opts.fetchInterval); - } else { - fetchTimer = null; - } - } - - function stopFetching(type) { - clearTimeout(fetchTimer); - } - - // start using a DataSource in fetches/updates - function openDataSource(source) { - var type = source.getType(); - if (~sources[type].indexOf(source)) return false; - sources[type].push(source); - } - - // stop using a DataSource in fetches/updates - function closeDataSource(source) { - var type = source.getType(); - var i = sources[type].indexOf(source); - if (i === -1) return; - sources[type].slice(i, 1); - // only search DataSources get fetched automatically - if (type === 'search' && sources.search.length === 0) stopFetching(); - } - - // has the courier been started? - function isRunning() { - return !!fetchTimer; - } - - // chainable public api - this.start = chain(this, doSearch); - this.running = chain(this, isRunning); - this.stop = chain(this, stopFetching); - this.close = chain(this, function () { _(sources.search).each(closeDataSource); }); - this.openDataSource = chain(this, openDataSource); - this.closeDataSource = chain(this, closeDataSource); - - // setters - this.client = chain(this, function (client) { - opts.client = client; + // array's to store references to individual sources of each type + // wrapped in some metadata + this._refs = _.transform(sourceTypes, function (refs, fn, type) { + refs[type] = []; }); - this.fetchInterval = function (val) { - opts.fetchInterval = val; - if (isRunning()) setFetchTimeout(); - return this; - }; - // factory - this.createSource = function (type, initialState) { - return new DataSource(this, type, initialState); - }; + // stores all timer ids + this._timer = {}; - // apply the passed in config - _.each(config || {}, function (val, key) { - if (typeof this[key] !== 'function') throw new TypeError('invalid config "' + key + '"'); - this[key](val); + // interval times for each type + this._interval = {}; + + // interval hook/fn for each type + this._onInterval = {}; + + _.each(sourceTypes, function (fn, type) { + var courier = this; + // the name used outside of this module + var publicName; + if (type === 'search') { + publicName = 'fetchInterval'; + } else { + publicName = type + 'Interval'; + } + + // store the config value passed in for this interval + this._interval[type] = config[publicName]; + + // store a quick "bound" method for triggering + this._onInterval[type] = function () { + onFetch[type](courier); + courier._schedule(type); + }; + + // create a public setter for this interval type + this[publicName] = function (val) { + courier._interval[type] = val; + courier._schedule(type); + return this; + }; }, this); } inherits(Courier, EventEmitter); - // private api, exposed for testing - Courier._flattenDataSource = flattenDataSource; + /** + * PUBLIC API + */ + + // start fetching results on an interval + Courier.prototype.start = function () { + if (!this.running()) { + this._schedule('doc'); + this._schedule('search'); + this.fetch(); + } + return this; + }; + + // is the courier currently running? + Courier.prototype.running = function () { + return !!this._fetchTimer; + }; + + // stop the courier from fetching more results + Courier.prototype.stop = function () { + this._clearScheduled('search'); + this._clearScheduled('doc'); + }; + + // close the courier, stopping it from refreshing and + // closing all of the sources + Courier.prototype.close = function () { + _.each(sourceTypes, function (fn, type) { + this._refs[type].forEach(function (ref) { + this._closeDataSource(ref.source); + }, this); + }, this); + }; + + // force a fetch of all datasources right now + Courier.prototype.fetch = function () { + _.forOwn(onFetch, function (method, type) { + method(this); + }, this); + }; + + // data source factory + Courier.prototype.createSource = function (type, initialState) { + type = type || 'search'; + if ('function' !== typeof sourceTypes[type]) throw new TypeError( + 'Invalid source type ' + type + ); + var Constructor = sourceTypes[type]; + return new Constructor(this, initialState); + }; + + /***** + * PRIVATE API + *****/ + + // handle errors in a standard way. The only errors that should make it here are + // - issues with msearch/mget syntax + // - unable to reach ES + // - HastyRefresh + Courier.prototype._error = function (err) { + this.stop(); + return this.emit('error', err); + }; + + // every time a child object (DataSource, Mapper) needs the client, it should + // call _getClient + Courier.prototype._getClient = function () { + if (!this._client) throw new Error('Client is not set on the Courier yet.'); + return this._client; + }; + + // start using a DocSource in fetches/updates + Courier.prototype._openDataSource = function (source) { + var refs = this._refs[source._getType()]; + if (!_.find(refs, { source: source })) { + refs.push({ + source: source + }); + } + }; + + // stop using a DataSource in fetches/updates + Courier.prototype._closeDataSource = function (source) { + var type = source._getType(); + var refs = this._refs[type]; + _(refs).where({ source: source }).each(_.partial(_.pull, refs)); + if (refs.length === 0) this._clearScheduled(type); + }; + + // schedule a fetch after fetchInterval + Courier.prototype._schedule = function (type) { + this._clearScheduled(type); + if (this._interval[type]) { + this._timer[type] = setTimeout(this._onInterval[type], this._interval[type]); + } + }; + + // properly clear scheduled fetches + Courier.prototype._clearScheduled = function (type) { + this._timer[type] = clearTimeout(this._timer[type]); + }; + + // alert the courior that a doc has been updated + // and that it should update matching docs + Courier.prototype._docUpdated = function (source) { + var updated = source._state; + + _.each(this._refs.doc, function (ref) { + var state = ref.source._state; + if ( + state === updated + || ( + state.id === updated.id + && state.type === updated.type + && state.index === updated.index + ) + ) { + delete ref.version; + } + }); + + onFetch.doc(this); + }; return Courier; }); \ No newline at end of file diff --git a/src/courier/data_source.js b/src/courier/data_source.js deleted file mode 100644 index a2ed3cd0211f..000000000000 --- a/src/courier/data_source.js +++ /dev/null @@ -1,128 +0,0 @@ -define(function (require) { - var inherits = require('utils/inherits'); - var _ = require('lodash'); - var EventEmitter = require('utils/event_emitter'); - var Mapper = require('courier/mapper'); - var IndexPattern = require('courier/index_pattern'); - - // polyfill for older versions of node - function listenerCount(emitter, event) { - if (EventEmitter.listenerCount) { - return EventEmitter.listenerCount(emitter, event); - } else { - return this.listeners(event).length; - } - } - - var apiMethods = { - search: [ - 'index', - 'type', - 'query', - 'filter', - 'sort', - 'highlight', - 'aggs', - 'from', - 'size', - 'source', - 'inherits' - ], - get: [ - 'index', - 'type', - 'id', - 'sourceInclude', - 'sourceExclude' - ] - }; - - function DataSource(courier, type, initialState) { - var state; - - if (initialState) { - // state can be serialized as JSON, and passed back in to restore - if (typeof initialState === 'string') { - state = JSON.parse(initialState); - } else { - state = _.cloneDeep(initialState); - } - if (state._type) { - if (type && type !== state._type) { - throw new Error('Initial state is not of the type specified for this DataSource'); - } else { - type = state._type; - } - } - } else { - state = {}; - } - - type = type || 'search'; - if (!_.has(apiMethods, type)) { - throw new TypeError('Invalid DataSource type ' + type); - } - state._type = type; - - var mapper = new Mapper(); - - var onNewListener = _.bind(function (name) { - // new newListener is emitted before it is added, count will be 0 - if (name !== 'results' || listenerCount(this, 'results') !== 0) return; - courier.openDataSource(this); - this.removeListener('newListener', onNewListener); - this.on('removeListener', onRemoveListener); - }, this); - - var onRemoveListener = _.bind(function () { - if (listenerCount(this, 'results') > 0) return; - courier.closeDataSource(this); - this.removeListener('removeListener', onRemoveListener); - this.on('newListener', onNewListener); - }, this); - - this.on('newListener', onNewListener); - - /** - * Used to flatten a chain of DataSources - * @return {object} - simple object containing all of the - * sources internal state - */ - this._state = function () { - return state; - }; - - // public api - this.toJSON = function () { - return _.omit(state, 'inherits'); - }; - this.toString = function () { - return JSON.stringify(this.toJSON()); - }; - this.getFieldNames = function (cb) { - mapper.getMapping(state.index, state.type, function (mapping) { - return _.keys(mapping); - }); - }; - this.getType = function () { - return state._type; - }; - this.extend = function () { - return courier.createSource(type).inherits(this); - }; - - // get/set internal state values - apiMethods[type].forEach(function (name) { - this[name] = function (val) { - state[name] = val; - if (name === 'index' && arguments[1]) { - state.index = new IndexPattern(val, arguments[1]); - } - return this; - }; - }, this); - } - inherits(DataSource, EventEmitter); - - return DataSource; -}); \ No newline at end of file diff --git a/src/courier/data_source/data_source.js b/src/courier/data_source/data_source.js new file mode 100644 index 000000000000..8f51029da50c --- /dev/null +++ b/src/courier/data_source/data_source.js @@ -0,0 +1,160 @@ +define(function (require) { + var inherits = require('utils/inherits'); + var _ = require('lodash'); + var EventEmitter = require('utils/event_emitter'); + var Mapper = require('courier/mapper'); + var IndexPattern = require('courier/index_pattern'); + + function DataSource(courier, initialState) { + var state; + + EventEmitter.call(this); + + // state can be serialized as JSON, and passed back in to restore + if (initialState) { + if (typeof initialState === 'string') { + state = JSON.parse(initialState); + } else { + state = _.cloneDeep(initialState); + } + } else { + state = {}; + } + + this._state = state; + this._courier = courier; + + var onNewListener = _.bind(function (name) { + // new newListener is emitted before it is added, count will be 0 + if (name !== 'results' || EventEmitter.listenerCount(this, 'results') !== 0) return; + courier._openDataSource(this); + this.removeListener('newListener', onNewListener); + this.on('removeListener', onRemoveListener); + }, this); + + var onRemoveListener = _.bind(function () { + if (EventEmitter.listenerCount(this, 'results') > 0) return; + courier._closeDataSource(this); + this.removeListener('removeListener', onRemoveListener); + this.on('newListener', onNewListener); + }, this); + + this.on('newListener', onNewListener); + + this.extend = function () { + return courier.createSource(this._getType()).inherits(this); + }; + + // get/set internal state values + this._methods.forEach(function (name) { + this[name] = function (val) { + state[name] = val; + if (name === 'index' && arguments[1]) { + state.index = new IndexPattern(val, arguments[1]); + } + return this; + }; + }, this); + } + inherits(DataSource, EventEmitter); + + /***** + * PUBLIC API + *****/ + + /** + * fetch the field names for this DataSource + * @param {Function} cb + * @callback {Error, Array} - calls cb with a possible error or an array of field names + * @todo + */ + DataSource.prototype.getFieldNames = function (cb) { + throw new Error('not implemented'); + }; + + /** + * flatten an object to a simple encodable object + * @return {[type]} [description] + */ + DataSource.prototype.toJSON = function () { + return _.omit(this._state, 'inherits'); + }; + + /** + * Create a string representation of the object + * @return {[type]} [description] + */ + DataSource.prototype.toString = function () { + return JSON.stringify(this.toJSON()); + }; + + /***** + * PRIVATE API + *****/ + + /** + * Handle errors by allowing them to bubble from the DataSource + * to the courior. Maybe we should walk the inheritance chain too. + * @param {Error} err - The error that occured + * @return {undefined} + */ + DataSource.prototype._error = function (err) { + if (EventEmitter.listenerCount(this, 'error')) { + this.emit('error', err); + } else { + this._courier.emit('error', err); + } + }; + + /** + * Walk the inheritance chain of a source and return it's + * flat representaion (taking into account merging rules) + * @return {object} - the flat state of the DataSource + */ + DataSource.prototype._flatten = function () { + var type = this._getType(); + // the merged state of this dataSource and it's ancestors + var flatState = {}; + + var collectProp = _.partial(this._mergeProp, flatState); + + // walk the chain and merge each property + var current = this; + var currentState; + while (current) { + currentState = current._state; + _.forOwn(currentState, collectProp); + current = currentState.inherits; + } + + if (type === 'search') { + // defaults for the query + _.forOwn({ + query: { + 'match_all': {} + } + }, collectProp); + + // switch to filtered query if there are filters + if (flatState.filters) { + if (flatState.filters.length) { + flatState.body.query = { + filtered: { + query: flatState.body.query, + filter: { + bool: { + must: flatState.filters + } + } + } + }; + } + delete flatState.filters; + } + } + + return flatState; + }; + + return DataSource; +}); \ No newline at end of file diff --git a/src/courier/data_source/doc.js b/src/courier/data_source/doc.js new file mode 100644 index 000000000000..4bdfe3d4d493 --- /dev/null +++ b/src/courier/data_source/doc.js @@ -0,0 +1,181 @@ +define(function (require) { + var DataSource = require('courier/data_source/data_source'); + var inherits = require('utils/inherits'); + var errors = require('courier/errors'); + var _ = require('lodash'); + + function DocSource(courier, initialState) { + DataSource.call(this, courier, initialState); + } + inherits(DocSource, DataSource); + + /** + * Method used by the Courier to fetch multiple DocSource objects at one time. + * Those objects come in via the refs array, which is a list of objects containing + * at least `source` and `version` keys. + * + * @param {Courier} courier - The courier requesting the records + * @param {array} refs - The list of refs + * @param {Function} cb - Callback + * @return {undefined} + */ + DocSource.fetch = function (courier, refs, cb) { + var client = courier._getClient(); + var allRefs = []; + var body = { + docs: [] + }; + + _.each(refs, function (ref) { + var source = ref.source; + if (source._getType() !== 'doc') return; + + allRefs.push(ref); + body.docs.push(source._flatten()); + }); + + return client.mget({ body: body }, function (err, resp) { + if (err) return cb(err); + + _.each(resp.docs, function (resp, i) { + var ref = allRefs[i]; + var source = ref.source; + + if (resp.error) return this._error(resp); + if (ref.version === resp._version) return; // no change + ref.version = resp._version; + source._storeVersion(resp._version); + source.emit('results', resp); + }); + + cb(err, resp); + }); + }; + + /** + * Method used to check if a list of refs is + * @param {[type]} courier [description] + * @param {[type]} refs [description] + * @param {Function} cb [description] + * @return {[type]} [description] + */ + DocSource.validate = function (courier, refs, cb) { + var invalid = _.filter(refs, function (ref) { + var storedVersion = ref.source._getStoredVersion(); + if (ref.version !== storedVersion) return true; + }); + setTimeout(function () { + cb(void 0, invalid); + }); + }; + + /***** + * PUBLIC API + *****/ + + /** + * List of methods that is turned into a chainable API in the constructor + * @type {Array} + */ + DocSource.prototype._methods = [ + 'index', + 'type', + 'id', + 'sourceInclude', + 'sourceExclude' + ]; + + /** + * Applies a partial update to the document + * @param {object} fields - The fields to change and their new values (es doc field) + * @param {Function} cb - Callback to know when the update is complete + * @return {undefined} + */ + DocSource.prototype.update = function (fields, cb) { + var source = this; + var courier = this._courier; + var client = courier._getClient(); + var state = this._state; + + client.update({ + id: state.id, + type: state.type, + index: state.index, + body: { + doc: fields + } + }, function (err, resp) { + if (err) return cb(err); + + courier._docUpdated(source); + return cb(); + }); + }; + + /***** + * PRIVATE API + *****/ + + /** + * Get the type of this DataSource + * @return {string} - 'doc' + */ + DocSource.prototype._getType = function () { + return 'doc'; + }; + + /** + * Used to merge properties into the state within ._flatten(). + * The state is passed in and modified by the function + * + * @param {object} state - the current merged state + * @param {*} val - the value at `key` + * @param {*} key - The key of `val` + * @return {undefined} + */ + DocSource.prototype._mergeProp = function (state, val, key) { + key = '_' + key; + + if (val != null && state[key] == null) { + state[key] = val; + } + }; + + /** + * Creates a key based on the doc's index/type/id + * @return {string} + */ + DocSource.prototype._versionKey = function () { + var state = this._state; + return 'DocVersion:' + ( + [ + state.index, + state.type, + state.id + ] + .map(encodeURIComponent) + .join('/') + ); + }; + + /** + * Fetches the stored version from localStorage + * @return {number} - the version number, or NaN + */ + DocSource.prototype._getStoredVersion = function () { + var id = this._versionKey(); + return _.parseInt(localStorage.getItem(id)); + }; + + /** + * Stores the version into localStorage + * @param {number, NaN} version - the current version number, NaN works well forcing a refresh + * @return {undefined} + */ + DocSource.prototype._storeVersion = function (version) { + var id = this._versionKey(); + localStorage.setItem(id, version); + }; + + return DocSource; +}); \ No newline at end of file diff --git a/src/courier/data_source/search.js b/src/courier/data_source/search.js new file mode 100644 index 000000000000..ff7f4372213c --- /dev/null +++ b/src/courier/data_source/search.js @@ -0,0 +1,129 @@ +define(function (require) { + var DataSource = require('courier/data_source/data_source'); + var inherits = require('utils/inherits'); + var errors = require('courier/errors'); + var _ = require('lodash'); + + function SearchSource(courier, initialState) { + DataSource.call(this, courier, initialState); + } + inherits(SearchSource, DataSource); + + /** + * Method used by the Courier to fetch multiple SearchSource request at a time. + * Those objects come in via the refs array, which is a list of objects containing + * a `source` keys. + * + * @param {Courier} courier - The courier requesting the results + * @param {array} refs - The list of refs + * @param {Function} cb - Callback + * @return {undefined} + */ + SearchSource.fetch = function (courier, refs, cb) { + var client = courier._getClient(); + var allRefs = []; + var body = ''; + + _.each(refs, function (ref) { + var source = ref.source; + if (source._getType() !== 'search') { + return; + } + allRefs.push(source); + + var state = source._flatten(); + body += + JSON.stringify({ index: state.index, type: state.type }) + + '\n' + + JSON.stringify(state.body) + + '\n'; + }); + + return client.msearch({ body: body }, function (err, resp) { + if (err) return cb(err); + + _.each(resp.responses, function (resp, i) { + var source = allRefs[i]; + if (resp.error) return errors.emit(source, courier, resp); + source.emit('results', resp); + }); + + cb(void 0, resp); + }); + }; + + /***** + * PUBLIC API + *****/ + + /** + * List of the editable state properties that turn into a + * chainable API + * + * @type {Array} + */ + SearchSource.prototype._methods = [ + 'index', + 'type', + 'query', + 'filter', + 'sort', + 'highlight', + 'aggs', + 'from', + 'size', + 'source', + 'inherits' + ]; + + /****** + * PRIVATE APIS + ******/ + + /** + * Gets the type of the DataSource + * @return {string} + */ + SearchSource.prototype._getType = function () { + return 'search'; + }; + + /** + * Used to merge properties into the state within ._flatten(). + * The state is passed in and modified by the function + * + * @param {object} state - the current merged state + * @param {*} val - the value at `key` + * @param {*} key - The key of `val` + * @return {undefined} + */ + SearchSource.prototype._mergeProp = function (state, val, key) { + switch (key) { + case 'inherits': + case '_type': + // ignore + return; + case 'filter': + state.filters = state.filters || []; + state.filters.push(val); + return; + case 'index': + case 'type': + case 'id': + if (key && state[key] == null) { + state[key] = val; + } + return; + case 'source': + key = '_source'; + /* fall through */ + default: + state.body = state.body || {}; + if (key && state.body[key] == null) { + state.body[key] = val; + } + } + }; + + return SearchSource; +}); \ No newline at end of file diff --git a/src/courier/docs.js b/src/courier/docs.js deleted file mode 100644 index 06b7424c7faa..000000000000 --- a/src/courier/docs.js +++ /dev/null @@ -1,135 +0,0 @@ -define(function (require) { - var _ = require('lodash'); - - function Docs(courier) { - // docs that we have let loose, and want to track - var tracking = {}; - var watchers = {}; - - function respId(getResp) { - return [ - encodeURIComponent(getResp._index), - encodeURIComponent(getResp._type), - encodeURIComponent(getResp._id) - ].join('/'); - } - - function change(id, updated) { - if (watchers[id]) { - var notify = function () { - var oldVal = tracking[id]._source; - tracking[id] = _.cloneDeep(update); - watchers[id].forEach(function (watcher) { - try { - watcher(updated, oldVal); - } catch (e) { console.error(e); } - }); - }; - - if (updated) { - notify(); - } else { - courier.get('client').get({ - - }); - } - } - } - - function track(resp) { - var id = respId(resp); - var tracker = _.pick(resp, '_id', '_type', '_index', '_source'); - if (tracking[id] && equal(tracking[id]._source, resp)) return false; - change(id, resp); - } - - /** - * add a function to be called when objects matching - * this resp are changed - * @param {object} resp - Response like object, should contain _id, _type, and _index keys - * @param {[type]} onChange - Function to be called when changes are noticed - */ - function watch(resp, onChange) { - var id = respId(resp); - if (!watchers[id]) watchers[id] = []; - watchers[id].push(onChange); - } - - function get(args, cb, onChange) { - var client = courier.get('client'); - client.get(args, function (err, getResp) { - if (err) return cb(err); - watch(getResp, onChange); - return cb(void 0, getResp); - }); - } - - function index(args, cb) { - var client = courier.get('client'); - - client.index(args, function (err, indexResp) { - if (err) return cb(err); - delete indexResp.created; - indexResp._source = args.body; - track(indexResp); - return cb(void 0, indexResp); - }); - } - - function update(args, cb) { - var client = courier.get('client'); - client.update(args, function (err, updateResp) { - if (err) return cb(err); - return cb(void 0, updateResp); - }); - } - - this.watch = watch; - this.get = get; - this.index = index; - this.set = index; - this.update = update; - } - - function equal(o1, o2) { - /* jshint eqeqeq:false, forin:false */ - if (o1 === o2) return true; - if (o1 === null || o2 === null) return false; - if (o1 !== o1 && o2 !== o2) return true; // NaN === NaN - var t1 = typeof o1, t2 = typeof o2, length, key, keySet; - if (t1 == t2) { - if (t1 == 'object') { - if (_.isArray(o1)) { - if (!_.isArray(o2)) return false; - if ((length = o1.length) == o2.length) { - for (key = 0; key < length; key++) { - if (!equal(o1[key], o2[key])) return false; - } - return true; - } - } else if (_.isDate(o1)) { - return _.isDate(o2) && o1.getTime() == o2.getTime(); - } else if (_.isRegExp(o1) && _.isRegExp(o2)) { - return o1.toString() == o2.toString(); - } else { - if (_.isArray(o2)) return false; - keySet = {}; - for (key in o1) { - if (_.isFunction(o1[key])) continue; - if (!equal(o1[key], o2[key])) return false; - keySet[key] = true; - } - for (key in o2) { - if (!keySet.hasOwnProperty(key) && - o2[key] !== undefined && - !_.isFunction(o2[key])) return false; - } - return true; - } - } - } - return false; - } - - return Docs; -}); \ No newline at end of file diff --git a/src/courier/errors.js b/src/courier/errors.js index 1afdd8bc0037..a12fc3263367 100644 --- a/src/courier/errors.js +++ b/src/courier/errors.js @@ -1,12 +1,12 @@ -define(function (require) { +define(function (require, module, exports) { + var listenerCount = require('utils/event_emitter').listenerCount; + + // caused by a refresh attempting to start before the prevous is done function HastyRefresh() { this.name = 'HastyRefresh'; this.message = 'Courier attempted to start a query before the previous had finished.'; } HastyRefresh.prototype = new Error(); HastyRefresh.prototype.constructor = HastyRefresh; - - return { - HastyRefresh: HastyRefresh - }; + exports.HastyRefresh = HastyRefresh; }); \ No newline at end of file diff --git a/src/courier/mapper.js b/src/courier/mapper.js index 393a22835a6c..7de98976d391 100644 --- a/src/courier/mapper.js +++ b/src/courier/mapper.js @@ -16,7 +16,6 @@ define(function (require) { * @return {Object} A hash containing fields and their related mapping */ this.getFields = function (dataSource, callback, type) { - console.log(dataSource); client.indices.getFieldMapping({index: dataSource.index}, callback); }; diff --git a/src/courier/scratch.js b/src/courier/scratch.js deleted file mode 100644 index 4401e5b16cbf..000000000000 --- a/src/courier/scratch.js +++ /dev/null @@ -1,16 +0,0 @@ -var elasticsearch = require('elasticsearch'); -var es = elasticsearch.Client(); - -es.msearch({ - body: [ - { - index: 'logstash-2014.02.1111' - }, - { - query: { 'match_all': {} } - } - ] -}, function (err, resp) { - console.log(resp); - es.close(); -}); \ No newline at end of file diff --git a/src/courier/test.html b/src/courier/test.html index c5deb11bb6e3..0b542c9dcc11 100644 --- a/src/courier/test.html +++ b/src/courier/test.html @@ -1,3 +1,3 @@ - - - \ No newline at end of file + + + \ No newline at end of file diff --git a/src/kibana/controllers/kibana.js b/src/kibana/controllers/kibana.js index 6f87a5da4e0a..80aac33c60db 100644 --- a/src/kibana/controllers/kibana.js +++ b/src/kibana/controllers/kibana.js @@ -1,5 +1,6 @@ define(function (require) { var angular = require('angular'); + var _ = require('lodash'); angular.module('kibana/controllers') .controller('Kibana', function (courier, $scope, $rootScope) { @@ -8,7 +9,7 @@ define(function (require) { .size(5); // this should be triggered from within the controlling application - setTimeout(courier.start, 15); + setTimeout(_.bindKey(courier, 'start'), 15); }); angular.module('kibana/directives') @@ -18,14 +19,18 @@ define(function (require) { scope: { type: '@' }, - controller: function (courier, $rootScope, $scope) { + template: '{{count}} : 
{{json}}
', + controller: function ($rootScope, $scope, courier) { + $scope.count = 0; + var source = $rootScope.dataSource.extend() .type($scope.type) .source({ include: 'country' }) .on('results', function (resp) { - //$scope.json = JSON.stringify(resp.hits, null, ' '); + $scope.count ++; + $scope.json = JSON.stringify(resp.hits, null, ' '); }); courier.mapper.getFields($rootScope.dataSource, function (data) { @@ -33,8 +38,38 @@ define(function (require) { }); $scope.$watch('type', source.type); + } + }; + }) + .directive('courierDocTest', function () { + return { + restrict: 'E', + scope: { + id: '@', + type: '@', + index: '@' }, - template: '
{{json}}
' + template: '{{count}} :
{{json}}
', + controller: function (courier, $scope) { + $scope.count = 0; + + var currentSource; + $scope.click = function () { + if (currentSource) { + source.update(currentSource); + } + }; + + var source = courier.createSource('doc') + .id($scope.id) + .type($scope.type) + .index($scope.index) + .on('results', function (doc) { + currentSource = doc._source; + $scope.count ++; + $scope.json = JSON.stringify(doc, null, ' '); + }); + } }; }); }); \ No newline at end of file diff --git a/src/kibana/services/courier.js b/src/kibana/services/courier.js index 50c4aaeb7d2e..38675588624f 100644 --- a/src/kibana/services/courier.js +++ b/src/kibana/services/courier.js @@ -1,16 +1,22 @@ define(function (require) { var angular = require('angular'); var Courier = require('courier/courier'); + var DocSource = require('courier/data_source/doc'); + + require('services/promises'); angular.module('kibana/services') - .service('courier', function (es) { + .service('courier', function (es, promises) { + + promises.playNice(DocSource.prototype, [ + 'update', + 'index' + ]); + var courier = new Courier({ fetchInterval: 15000, - client: es - }); - - courier.on('error', function (err) { - console.error(err); + client: es, + promises: promises }); return courier; diff --git a/src/kibana/services/promises.js b/src/kibana/services/promises.js new file mode 100644 index 000000000000..a41c8280a3a0 --- /dev/null +++ b/src/kibana/services/promises.js @@ -0,0 +1,39 @@ +define(function (require, module, exports) { + var _ = require('lodash'); + var angular = require('angular'); + + angular.module('kibana/services') + .service('promises', function ($q) { + + function playNice(fn, fns) { + if (fns && _.isArray(fns) && _.isObject(fn)) { + fns.forEach(function (method) { + fn[method] = playNice(fn[method]); + }); + return fn; + } + + return function playNiceWrapper() { + // if the last arg is a callback then don't do anything + if (typeof arguments[arguments.length - 1] === 'function') { + return fn.apply(this, arguments); + } + + // otherwise create a callback and pass it in + var args = Array.prototype.slice.call(arguments); + var defer = $q.defer(); + args.push(function (err, result) { + if (err) return defer.reject(err); + defer.resolve(result); + }); + fn.apply(this, args); + return defer.promise; + }; + } + + + return { + playNice: playNice + }; + }); +}); \ No newline at end of file diff --git a/tasks/config/connect.js b/tasks/config/connect.js index a03d3c6f0cfd..a519afd7d90d 100644 --- a/tasks/config/connect.js +++ b/tasks/config/connect.js @@ -12,7 +12,8 @@ module.exports = { '<%= src %>', '<%= root %>/node_modules/mocha', '<%= root %>/node_modules/expect.js' - ] + ], + port: 8001 } } }; \ No newline at end of file diff --git a/tasks/config/mocha.js b/tasks/config/mocha.js index c54881ef5f88..13c593f40946 100644 --- a/tasks/config/mocha.js +++ b/tasks/config/mocha.js @@ -4,7 +4,7 @@ module.exports = { log: true, logErrors: true, urls: [ - 'http://localhost:8000/' + 'http://localhost:8001/' ], run: false } diff --git a/test/unit/specs/courier.js b/test/unit/specs/courier.js index 9e3d3e729e47..4b755e3c2da4 100644 --- a/test/unit/specs/courier.js +++ b/test/unit/specs/courier.js @@ -2,6 +2,9 @@ define(function (require) { var Courier = require('courier/courier'); var _ = require('lodash'); var sinon = require('sinon/sinon'); + var DataSource = require('courier/data_source/data_source'); + var DocSource = require('courier/data_source/doc'); + var SearchSource = require('courier/data_source/search'); describe('Courier Module', function () { @@ -39,13 +42,13 @@ define(function (require) { it('creates an empty search DataSource object', function () { courier = new Courier(); var source = courier.createSource(); - expect(source._state()).to.eql({ _type: 'search' }); + expect(source._state).to.eql({}); }); it('optionally accepts a type for the DataSource', function () { var courier = new Courier(); - expect(courier.createSource()._state()._type).to.eql('search'); - expect(courier.createSource('search')._state()._type).to.eql('search'); - expect(courier.createSource('get')._state()._type).to.eql('get'); + expect(courier.createSource()).to.be.a(SearchSource); + expect(courier.createSource('search')).to.be.a(SearchSource); + expect(courier.createSource('doc')).to.be.a(DocSource); expect(function () { courier.createSource('invalid type'); }).to.throwError(TypeError); @@ -53,12 +56,12 @@ define(function (require) { it('optionally accepts a json object/string that will populate the DataSource object with settings', function () { courier = new Courier(); var savedState = JSON.stringify({ - _type: 'get', + _type: 'doc', index: 'logstash-[YYYY-MM-DD]', type: 'nginx', id: '1' }); - var source = courier.createSource('get', savedState); + var source = courier.createSource('doc', savedState); expect(source + '').to.eql(savedState); }); }); @@ -83,7 +86,7 @@ define(function (require) { source.on('results', _.noop); source.index('the index name'); - expect(Courier._flattenDataSource(source).index).to.eql('the index name'); + expect(source._flatten().index).to.eql('the index name'); }); }); @@ -111,7 +114,7 @@ define(function (require) { }) .on('results', _.noop); - var query = Courier._flattenDataSource(math); + var query = math._flatten(); expect(query.index).to.eql('people'); expect(query.type).to.eql('students'); expect(query.body).to.eql({ diff --git a/test/unit/specs/data_source.js b/test/unit/specs/data_source.js index 6dc9ee108b74..f3a717a684f6 100644 --- a/test/unit/specs/data_source.js +++ b/test/unit/specs/data_source.js @@ -1,30 +1,20 @@ define(function (require) { var Courier = require('courier/courier'); - var DataSource = require('courier/data_source'); + var DataSource = require('courier/data_source/data_source'); + var DocSource = require('courier/data_source/doc'); + var SearchSource = require('courier/data_source/search'); describe('DataSource class', function () { var courier = new Courier(); describe('::new', function () { - it('accepts and validates a type', function () { - var source = new DataSource(courier, 'get'); - expect(source._state()._type).to.eql('get'); - - source = new DataSource(courier, 'search'); - expect(source._state()._type).to.eql('search'); - - expect(function () { - source = new DataSource(courier, 'invalid Type'); - }).to.throwError(TypeError); - }); - it('optionally accepts a json object/string that will populate the DataSource object with settings', function () { var savedState = JSON.stringify({ - _type: 'get', + _type: 'doc', index: 'logstash-[YYYY-MM-DD]', type: 'nginx', id: '1' }); - var source = new DataSource(courier, 'get', savedState); + var source = new DocSource(courier, savedState); expect(source + '').to.eql(savedState); }); }); diff --git a/test/unit/specs/mapper.js b/test/unit/specs/mapper.js index b3f3e0d885f8..b49c4af5fa69 100644 --- a/test/unit/specs/mapper.js +++ b/test/unit/specs/mapper.js @@ -2,7 +2,7 @@ define(function (require) { var elasticsearch = require('../bower_components/elasticsearch/elasticsearch.js'); var _ = require('lodash'); var Courier = require('courier/courier'); - var DataSource = require('courier/data_source'); + var DataSource = require('courier/data_source/data_source'); var Mapper = require('courier/mapper'); var client = new elasticsearch.Client({ host: 'localhost:9200', @@ -16,6 +16,7 @@ define(function (require) { }); it('has a function called getFields that returns an object', function () { + /* var courier = new Courier({ client: client }); @@ -38,6 +39,7 @@ define(function (require) { type: 'long' } }); + */ });