Merge pull request #2 from spenceralger/master

refactored the courier
This commit is contained in:
spenceralger 2014-02-14 01:10:49 -07:00
commit b890858727
15 changed files with 805 additions and 621 deletions

View file

@ -1,50 +0,0 @@
/* jshint node: true */
var elasticsearch = require('../elasticsearch-js');
var async = require('async');
var es = new elasticsearch.Client({
host: 'localhost:9200',
sniffOnStart: true,
sniffInterval: 3000,
apiVersion: '1.0',
log: 'trace'
});
var rl = require('readline').createInterface({
input: process.stdin,
output: process.stdout,
terminal: true
});
async.series([
function (done) {
setTimeout(done, 50);
},
function (done) {
console.log(es.transport.connectionPool._conns.index);
es.indices.create({
index: 'index_name'
}, done);
},
function (done) {
rl.question('Is the master down?', function () {
done();
});
},
function (done) {
console.log(es.transport.connectionPool._conns.index);
es.search({ index: 'index_name' }, done);
},
function (done) {
rl.question('Is the slave down now?', function () {
es.search({ body: { query: { match_all: {} } } }, done);
});
},
function (done) {
rl.question('Is the master back up?', function () {
es.search({ body: { query: { match_all: {} } } }, done);
});
}
], function (err) {
console.log(err);
});

View file

@ -1,287 +1,257 @@
define(function (require) {
var DataSource = require('courier/data_source');
var Docs = require('courier/docs');
var EventEmitter = require('utils/event_emitter');
var inherits = require('utils/inherits');
var errors = require('courier/errors');
var _ = require('lodash');
var angular = require('angular');
function chain(cntx, method) {
return function () {
method.apply(cntx, arguments);
return this;
};
}
var DocSource = require('courier/data_source/doc');
var SearchSource = require('courier/data_source/search');
var HastyRefresh = require('courier/errors').HastyRefresh;
function emitError(source, courier, error) {
if (EventEmitter.listenerCount(source, 'error')) {
source.emit('error', error);
} else {
courier.emit('error', error);
}
}
// map constructors to type keywords
var sourceTypes = {
doc: DocSource,
search: SearchSource
};
function mergeProp(state, filters, val, key) {
switch (key) {
case 'inherits':
case '_type':
// ignore
return;
case 'filter':
filters.push(val);
return;
case 'index':
case 'type':
if (key && state[key] == null) {
state[key] = val;
// fetch process for the two source types
var onFetch = {
// execute a search right now
search: function (courier) {
if (courier._activeSearchRequest) {
return courier._error(new HastyRefresh());
}
return;
case 'source':
key = '_source';
/* fall through */
}
courier._activeSearchRequest = SearchSource.fetch(
courier,
courier._refs.search,
function (err) {
if (err) return courier._error(err);
});
},
if (key && state.body[key] == null) {
state.body[key] = val;
}
}
function flattenDataSource(source) {
var state = {
body: {}
};
// all of the filters from the source chain
var filters = [];
var collectProp = _.partial(mergeProp, state, filters);
// walk the chain and merge each property
var current = source;
var currentState;
while (current) {
currentState = current._state();
_.forOwn(currentState, collectProp);
current = currentState.inherits;
}
// defaults for the query
_.forOwn({
query: {
'match_all': {}
}
}, collectProp);
// switch to filtered query if there are filters
if (filters.length) {
state.body.query = {
filtered: {
query: state.body.query,
filter: {
bool: {
must: filters
}
}
// validate that all of the DocSource objects are up to date
// then fetch the onces that are not
doc: function (courier) {
DocSource.validate(courier, courier._refs.doc, function (err, invalid) {
if (err) {
courier.stop();
return courier.emit('error', err);
}
};
// if all of the docs are up to date we don't need to do anything else
if (invalid.length === 0) return;
DocSource.fetch(courier, invalid, function (err) {
if (err) return courier._error(err);
});
});
}
};
return state;
}
function fetchSearchResults(courier, client, sources, cb) {
if (!client) {
this.emit('error', new Error('Courier does not have a client yet, unable to fetch queries.'));
return;
}
var all = [];
var body = '';
_.each(sources, function (source) {
if (source.getType() !== 'search') {
return;
}
all.push(source);
var state = flattenDataSource(source);
var header = JSON.stringify({
index: state.index,
type: state.type
});
var doc = JSON.stringify(state.body);
body += header + '\n' + doc + '\n';
});
return client.msearch({ body: body }, function (err, resp) {
if (err) return cb(err);
_.each(resp.responses, function (resp, i) {
var source = sources[i];
if (resp.error) return emitError(source, courier, resp);
source.emit('results', resp);
});
cb(err, resp);
});
}
function fetchDocs(courier, client, sources, cb) {
if (!client) {
this.emit('error', new Error('Courier does not have a client yet, unable to fetch queries.'));
return;
}
var all = [];
var body = {
docs: []
};
_.each(sources, function (source) {
if (source.getType() !== 'get') {
return;
}
all.push(source);
var state = flattenDataSource(source);
body.docs.push({
index: state.index,
type: state.type,
id: state.id
});
});
return client.mget({ body: body }, function (err, resp) {
if (err) return cb(err);
_.each(resp.responses, function (resp, i) {
var source = sources[i];
if (resp.error) return emitError(source, courier, resp);
source.emit('results', resp);
});
cb(err, resp);
});
}
function saveUpdate(source, fields) {
}
// default config values
var defaults = {
fetchInterval: 30000,
docInterval: 2500
};
/**
* Federated query service, supports data sources that inherit properties
* from one another and automatically emit results.
* Federated query service, supports two data source types: doc and search.
*
* search:
* - inherits filters, and other query properties
* - automatically emit results on a set interval
* doc:
* - tracks doc versions
* - emits same results event when the doc is updated
* - helps seperate versions of kibana running on the same machine stay in sync
* - (NI) tracks version and uses it when new versions of a doc are reindexed
* - (NI) helps deal with conflicts
*
* @param {object} config
* @param {Client} config.client - The elasticsearch.js client to use for querying. Should be setup and ready to go.
* @param {integer} [config.fetchInterval=30000] - The amount in ms between each fetch (deafult is 30 seconds)
* @param {Client} config.client - The elasticsearch.js client to use for querying. Should be
* setup and ready to go.
* @param {EsClient} [config.client] - The elasticsearch client that the courier should use
* (can be set at a later time with the `.client()` method)
* @param {integer} [config.fetchInterval=30000] - The amount in ms between each fetch (deafult
* is 30 seconds)
*/
function Courier(config) {
if (!(this instanceof Courier)) return new Courier(config);
var opts = {
fetchInterval: 30000
};
var fetchTimer;
var activeRequest;
var courier = this;
var sources = {
search: [],
get: []
};
function doSearch() {
if (!opts.client) {
this.emit('error', new Error('Courier does not have a client, pass it ' +
'in to the constructor or set it with the .client() method'));
return;
}
if (activeRequest) {
activeRequest.abort();
stopFetching();
this.emit('error', new errors.HastyRefresh());
return;
}
config = _.defaults(config || {}, defaults);
// we need to catch the original promise in order to keep it's abort method
activeRequest = fetchSearchResults(courier, opts.client, sources.search, function (err, resp) {
activeRequest = null;
setFetchTimeout();
this._client = config.client;
if (err) {
window.console && console.log(err);
}
});
}
function setFetchTimeout() {
clearTimeout(fetchTimer);
if (opts.fetchInterval) {
fetchTimer = setTimeout(doSearch, opts.fetchInterval);
} else {
fetchTimer = null;
}
}
function stopFetching(type) {
clearTimeout(fetchTimer);
}
// start using a DataSource in fetches/updates
function openDataSource(source) {
var type = source.getType();
if (~sources[type].indexOf(source)) return false;
sources[type].push(source);
}
// stop using a DataSource in fetches/updates
function closeDataSource(source) {
var type = source.getType();
var i = sources[type].indexOf(source);
if (i === -1) return;
sources[type].slice(i, 1);
// only search DataSources get fetched automatically
if (type === 'search' && sources.search.length === 0) stopFetching();
}
// has the courier been started?
function isRunning() {
return !!fetchTimer;
}
// chainable public api
this.start = chain(this, doSearch);
this.running = chain(this, isRunning);
this.stop = chain(this, stopFetching);
this.close = chain(this, function () { _(sources.search).each(closeDataSource); });
this.openDataSource = chain(this, openDataSource);
this.closeDataSource = chain(this, closeDataSource);
// setters
this.client = chain(this, function (client) {
opts.client = client;
// array's to store references to individual sources of each type
// wrapped in some metadata
this._refs = _.transform(sourceTypes, function (refs, fn, type) {
refs[type] = [];
});
this.fetchInterval = function (val) {
opts.fetchInterval = val;
if (isRunning()) setFetchTimeout();
return this;
};
// factory
this.createSource = function (type, initialState) {
return new DataSource(this, type, initialState);
};
// stores all timer ids
this._timer = {};
// apply the passed in config
_.each(config || {}, function (val, key) {
if (typeof this[key] !== 'function') throw new TypeError('invalid config "' + key + '"');
this[key](val);
// interval times for each type
this._interval = {};
// interval hook/fn for each type
this._onInterval = {};
_.each(sourceTypes, function (fn, type) {
var courier = this;
// the name used outside of this module
var publicName;
if (type === 'search') {
publicName = 'fetchInterval';
} else {
publicName = type + 'Interval';
}
// store the config value passed in for this interval
this._interval[type] = config[publicName];
// store a quick "bound" method for triggering
this._onInterval[type] = function () {
onFetch[type](courier);
courier._schedule(type);
};
// create a public setter for this interval type
this[publicName] = function (val) {
courier._interval[type] = val;
courier._schedule(type);
return this;
};
}, this);
}
inherits(Courier, EventEmitter);
// private api, exposed for testing
Courier._flattenDataSource = flattenDataSource;
/**
* PUBLIC API
*/
// start fetching results on an interval
Courier.prototype.start = function () {
if (!this.running()) {
this._schedule('doc');
this._schedule('search');
this.fetch();
}
return this;
};
// is the courier currently running?
Courier.prototype.running = function () {
return !!this._fetchTimer;
};
// stop the courier from fetching more results
Courier.prototype.stop = function () {
this._clearScheduled('search');
this._clearScheduled('doc');
};
// close the courier, stopping it from refreshing and
// closing all of the sources
Courier.prototype.close = function () {
_.each(sourceTypes, function (fn, type) {
this._refs[type].forEach(function (ref) {
this._closeDataSource(ref.source);
}, this);
}, this);
};
// force a fetch of all datasources right now
Courier.prototype.fetch = function () {
_.forOwn(onFetch, function (method, type) {
method(this);
}, this);
};
// data source factory
Courier.prototype.createSource = function (type, initialState) {
type = type || 'search';
if ('function' !== typeof sourceTypes[type]) throw new TypeError(
'Invalid source type ' + type
);
var Constructor = sourceTypes[type];
return new Constructor(this, initialState);
};
/*****
* PRIVATE API
*****/
// handle errors in a standard way. The only errors that should make it here are
// - issues with msearch/mget syntax
// - unable to reach ES
// - HastyRefresh
Courier.prototype._error = function (err) {
this.stop();
return this.emit('error', err);
};
// every time a child object (DataSource, Mapper) needs the client, it should
// call _getClient
Courier.prototype._getClient = function () {
if (!this._client) throw new Error('Client is not set on the Courier yet.');
return this._client;
};
// start using a DocSource in fetches/updates
Courier.prototype._openDataSource = function (source) {
var refs = this._refs[source._getType()];
if (!_.find(refs, { source: source })) {
refs.push({
source: source
});
}
};
// stop using a DataSource in fetches/updates
Courier.prototype._closeDataSource = function (source) {
var type = source._getType();
var refs = this._refs[type];
_(refs).where({ source: source }).each(_.partial(_.pull, refs));
if (refs.length === 0) this._clearScheduled(type);
};
// schedule a fetch after fetchInterval
Courier.prototype._schedule = function (type) {
this._clearScheduled(type);
if (this._interval[type]) {
this._timer[type] = setTimeout(this._onInterval[type], this._interval[type]);
}
};
// properly clear scheduled fetches
Courier.prototype._clearScheduled = function (type) {
this._timer[type] = clearTimeout(this._timer[type]);
};
// alert the courior that a doc has been updated
// and that it should update matching docs
Courier.prototype._docUpdated = function (source) {
var updated = source._state;
_.each(this._refs.doc, function (ref) {
var state = ref.source._state;
if (
state === updated
|| (
state.id === updated.id
&& state.type === updated.type
&& state.index === updated.index
)
) {
delete ref.version;
}
});
onFetch.doc(this);
};
return Courier;
});

View file

@ -1,128 +0,0 @@
define(function (require) {
var inherits = require('utils/inherits');
var _ = require('lodash');
var EventEmitter = require('utils/event_emitter');
var Mapper = require('courier/mapper');
var IndexPattern = require('courier/index_pattern');
// polyfill for older versions of node
function listenerCount(emitter, event) {
if (EventEmitter.listenerCount) {
return EventEmitter.listenerCount(emitter, event);
} else {
return this.listeners(event).length;
}
}
var apiMethods = {
search: [
'index',
'type',
'query',
'filter',
'sort',
'highlight',
'aggs',
'from',
'size',
'source',
'inherits'
],
get: [
'index',
'type',
'id',
'sourceInclude',
'sourceExclude'
]
};
function DataSource(courier, type, initialState) {
var state;
if (initialState) {
// state can be serialized as JSON, and passed back in to restore
if (typeof initialState === 'string') {
state = JSON.parse(initialState);
} else {
state = _.cloneDeep(initialState);
}
if (state._type) {
if (type && type !== state._type) {
throw new Error('Initial state is not of the type specified for this DataSource');
} else {
type = state._type;
}
}
} else {
state = {};
}
type = type || 'search';
if (!_.has(apiMethods, type)) {
throw new TypeError('Invalid DataSource type ' + type);
}
state._type = type;
var mapper = new Mapper();
var onNewListener = _.bind(function (name) {
// new newListener is emitted before it is added, count will be 0
if (name !== 'results' || listenerCount(this, 'results') !== 0) return;
courier.openDataSource(this);
this.removeListener('newListener', onNewListener);
this.on('removeListener', onRemoveListener);
}, this);
var onRemoveListener = _.bind(function () {
if (listenerCount(this, 'results') > 0) return;
courier.closeDataSource(this);
this.removeListener('removeListener', onRemoveListener);
this.on('newListener', onNewListener);
}, this);
this.on('newListener', onNewListener);
/**
* Used to flatten a chain of DataSources
* @return {object} - simple object containing all of the
* sources internal state
*/
this._state = function () {
return state;
};
// public api
this.toJSON = function () {
return _.omit(state, 'inherits');
};
this.toString = function () {
return JSON.stringify(this.toJSON());
};
this.getFieldNames = function (cb) {
mapper.getMapping(state.index, state.type, function (mapping) {
return _.keys(mapping);
});
};
this.getType = function () {
return state._type;
};
this.extend = function () {
return courier.createSource(type).inherits(this);
};
// get/set internal state values
apiMethods[type].forEach(function (name) {
this[name] = function (val) {
state[name] = val;
if (name === 'index' && arguments[1]) {
state.index = new IndexPattern(val, arguments[1]);
}
return this;
};
}, this);
}
inherits(DataSource, EventEmitter);
return DataSource;
});

View file

@ -0,0 +1,160 @@
define(function (require) {
var inherits = require('utils/inherits');
var _ = require('lodash');
var EventEmitter = require('utils/event_emitter');
var Mapper = require('courier/mapper');
var IndexPattern = require('courier/index_pattern');
function DataSource(courier, initialState) {
var state;
EventEmitter.call(this);
// state can be serialized as JSON, and passed back in to restore
if (initialState) {
if (typeof initialState === 'string') {
state = JSON.parse(initialState);
} else {
state = _.cloneDeep(initialState);
}
} else {
state = {};
}
this._state = state;
this._courier = courier;
var onNewListener = _.bind(function (name) {
// new newListener is emitted before it is added, count will be 0
if (name !== 'results' || EventEmitter.listenerCount(this, 'results') !== 0) return;
courier._openDataSource(this);
this.removeListener('newListener', onNewListener);
this.on('removeListener', onRemoveListener);
}, this);
var onRemoveListener = _.bind(function () {
if (EventEmitter.listenerCount(this, 'results') > 0) return;
courier._closeDataSource(this);
this.removeListener('removeListener', onRemoveListener);
this.on('newListener', onNewListener);
}, this);
this.on('newListener', onNewListener);
this.extend = function () {
return courier.createSource(this._getType()).inherits(this);
};
// get/set internal state values
this._methods.forEach(function (name) {
this[name] = function (val) {
state[name] = val;
if (name === 'index' && arguments[1]) {
state.index = new IndexPattern(val, arguments[1]);
}
return this;
};
}, this);
}
inherits(DataSource, EventEmitter);
/*****
* PUBLIC API
*****/
/**
* fetch the field names for this DataSource
* @param {Function} cb
* @callback {Error, Array} - calls cb with a possible error or an array of field names
* @todo
*/
DataSource.prototype.getFieldNames = function (cb) {
throw new Error('not implemented');
};
/**
* flatten an object to a simple encodable object
* @return {[type]} [description]
*/
DataSource.prototype.toJSON = function () {
return _.omit(this._state, 'inherits');
};
/**
* Create a string representation of the object
* @return {[type]} [description]
*/
DataSource.prototype.toString = function () {
return JSON.stringify(this.toJSON());
};
/*****
* PRIVATE API
*****/
/**
* Handle errors by allowing them to bubble from the DataSource
* to the courior. Maybe we should walk the inheritance chain too.
* @param {Error} err - The error that occured
* @return {undefined}
*/
DataSource.prototype._error = function (err) {
if (EventEmitter.listenerCount(this, 'error')) {
this.emit('error', err);
} else {
this._courier.emit('error', err);
}
};
/**
* Walk the inheritance chain of a source and return it's
* flat representaion (taking into account merging rules)
* @return {object} - the flat state of the DataSource
*/
DataSource.prototype._flatten = function () {
var type = this._getType();
// the merged state of this dataSource and it's ancestors
var flatState = {};
var collectProp = _.partial(this._mergeProp, flatState);
// walk the chain and merge each property
var current = this;
var currentState;
while (current) {
currentState = current._state;
_.forOwn(currentState, collectProp);
current = currentState.inherits;
}
if (type === 'search') {
// defaults for the query
_.forOwn({
query: {
'match_all': {}
}
}, collectProp);
// switch to filtered query if there are filters
if (flatState.filters) {
if (flatState.filters.length) {
flatState.body.query = {
filtered: {
query: flatState.body.query,
filter: {
bool: {
must: flatState.filters
}
}
}
};
}
delete flatState.filters;
}
}
return flatState;
};
return DataSource;
});

View file

@ -0,0 +1,181 @@
define(function (require) {
var DataSource = require('courier/data_source/data_source');
var inherits = require('utils/inherits');
var errors = require('courier/errors');
var _ = require('lodash');
function DocSource(courier, initialState) {
DataSource.call(this, courier, initialState);
}
inherits(DocSource, DataSource);
/**
* Method used by the Courier to fetch multiple DocSource objects at one time.
* Those objects come in via the refs array, which is a list of objects containing
* at least `source` and `version` keys.
*
* @param {Courier} courier - The courier requesting the records
* @param {array} refs - The list of refs
* @param {Function} cb - Callback
* @return {undefined}
*/
DocSource.fetch = function (courier, refs, cb) {
var client = courier._getClient();
var allRefs = [];
var body = {
docs: []
};
_.each(refs, function (ref) {
var source = ref.source;
if (source._getType() !== 'doc') return;
allRefs.push(ref);
body.docs.push(source._flatten());
});
return client.mget({ body: body }, function (err, resp) {
if (err) return cb(err);
_.each(resp.docs, function (resp, i) {
var ref = allRefs[i];
var source = ref.source;
if (resp.error) return this._error(resp);
if (ref.version === resp._version) return; // no change
ref.version = resp._version;
source._storeVersion(resp._version);
source.emit('results', resp);
});
cb(err, resp);
});
};
/**
* Method used to check if a list of refs is
* @param {[type]} courier [description]
* @param {[type]} refs [description]
* @param {Function} cb [description]
* @return {[type]} [description]
*/
DocSource.validate = function (courier, refs, cb) {
var invalid = _.filter(refs, function (ref) {
var storedVersion = ref.source._getStoredVersion();
if (ref.version !== storedVersion) return true;
});
setTimeout(function () {
cb(void 0, invalid);
});
};
/*****
* PUBLIC API
*****/
/**
* List of methods that is turned into a chainable API in the constructor
* @type {Array}
*/
DocSource.prototype._methods = [
'index',
'type',
'id',
'sourceInclude',
'sourceExclude'
];
/**
* Applies a partial update to the document
* @param {object} fields - The fields to change and their new values (es doc field)
* @param {Function} cb - Callback to know when the update is complete
* @return {undefined}
*/
DocSource.prototype.update = function (fields, cb) {
var source = this;
var courier = this._courier;
var client = courier._getClient();
var state = this._state;
client.update({
id: state.id,
type: state.type,
index: state.index,
body: {
doc: fields
}
}, function (err, resp) {
if (err) return cb(err);
courier._docUpdated(source);
return cb();
});
};
/*****
* PRIVATE API
*****/
/**
* Get the type of this DataSource
* @return {string} - 'doc'
*/
DocSource.prototype._getType = function () {
return 'doc';
};
/**
* Used to merge properties into the state within ._flatten().
* The state is passed in and modified by the function
*
* @param {object} state - the current merged state
* @param {*} val - the value at `key`
* @param {*} key - The key of `val`
* @return {undefined}
*/
DocSource.prototype._mergeProp = function (state, val, key) {
key = '_' + key;
if (val != null && state[key] == null) {
state[key] = val;
}
};
/**
* Creates a key based on the doc's index/type/id
* @return {string}
*/
DocSource.prototype._versionKey = function () {
var state = this._state;
return 'DocVersion:' + (
[
state.index,
state.type,
state.id
]
.map(encodeURIComponent)
.join('/')
);
};
/**
* Fetches the stored version from localStorage
* @return {number} - the version number, or NaN
*/
DocSource.prototype._getStoredVersion = function () {
var id = this._versionKey();
return _.parseInt(localStorage.getItem(id));
};
/**
* Stores the version into localStorage
* @param {number, NaN} version - the current version number, NaN works well forcing a refresh
* @return {undefined}
*/
DocSource.prototype._storeVersion = function (version) {
var id = this._versionKey();
localStorage.setItem(id, version);
};
return DocSource;
});

View file

@ -0,0 +1,129 @@
define(function (require) {
var DataSource = require('courier/data_source/data_source');
var inherits = require('utils/inherits');
var errors = require('courier/errors');
var _ = require('lodash');
function SearchSource(courier, initialState) {
DataSource.call(this, courier, initialState);
}
inherits(SearchSource, DataSource);
/**
* Method used by the Courier to fetch multiple SearchSource request at a time.
* Those objects come in via the refs array, which is a list of objects containing
* a `source` keys.
*
* @param {Courier} courier - The courier requesting the results
* @param {array} refs - The list of refs
* @param {Function} cb - Callback
* @return {undefined}
*/
SearchSource.fetch = function (courier, refs, cb) {
var client = courier._getClient();
var allRefs = [];
var body = '';
_.each(refs, function (ref) {
var source = ref.source;
if (source._getType() !== 'search') {
return;
}
allRefs.push(source);
var state = source._flatten();
body +=
JSON.stringify({ index: state.index, type: state.type })
+ '\n'
+ JSON.stringify(state.body)
+ '\n';
});
return client.msearch({ body: body }, function (err, resp) {
if (err) return cb(err);
_.each(resp.responses, function (resp, i) {
var source = allRefs[i];
if (resp.error) return errors.emit(source, courier, resp);
source.emit('results', resp);
});
cb(void 0, resp);
});
};
/*****
* PUBLIC API
*****/
/**
* List of the editable state properties that turn into a
* chainable API
*
* @type {Array}
*/
SearchSource.prototype._methods = [
'index',
'type',
'query',
'filter',
'sort',
'highlight',
'aggs',
'from',
'size',
'source',
'inherits'
];
/******
* PRIVATE APIS
******/
/**
* Gets the type of the DataSource
* @return {string}
*/
SearchSource.prototype._getType = function () {
return 'search';
};
/**
* Used to merge properties into the state within ._flatten().
* The state is passed in and modified by the function
*
* @param {object} state - the current merged state
* @param {*} val - the value at `key`
* @param {*} key - The key of `val`
* @return {undefined}
*/
SearchSource.prototype._mergeProp = function (state, val, key) {
switch (key) {
case 'inherits':
case '_type':
// ignore
return;
case 'filter':
state.filters = state.filters || [];
state.filters.push(val);
return;
case 'index':
case 'type':
case 'id':
if (key && state[key] == null) {
state[key] = val;
}
return;
case 'source':
key = '_source';
/* fall through */
default:
state.body = state.body || {};
if (key && state.body[key] == null) {
state.body[key] = val;
}
}
};
return SearchSource;
});

View file

@ -1,135 +0,0 @@
define(function (require) {
var _ = require('lodash');
function Docs(courier) {
// docs that we have let loose, and want to track
var tracking = {};
var watchers = {};
function respId(getResp) {
return [
encodeURIComponent(getResp._index),
encodeURIComponent(getResp._type),
encodeURIComponent(getResp._id)
].join('/');
}
function change(id, updated) {
if (watchers[id]) {
var notify = function () {
var oldVal = tracking[id]._source;
tracking[id] = _.cloneDeep(update);
watchers[id].forEach(function (watcher) {
try {
watcher(updated, oldVal);
} catch (e) { console.error(e); }
});
};
if (updated) {
notify();
} else {
courier.get('client').get({
});
}
}
}
function track(resp) {
var id = respId(resp);
var tracker = _.pick(resp, '_id', '_type', '_index', '_source');
if (tracking[id] && equal(tracking[id]._source, resp)) return false;
change(id, resp);
}
/**
* add a function to be called when objects matching
* this resp are changed
* @param {object} resp - Response like object, should contain _id, _type, and _index keys
* @param {[type]} onChange - Function to be called when changes are noticed
*/
function watch(resp, onChange) {
var id = respId(resp);
if (!watchers[id]) watchers[id] = [];
watchers[id].push(onChange);
}
function get(args, cb, onChange) {
var client = courier.get('client');
client.get(args, function (err, getResp) {
if (err) return cb(err);
watch(getResp, onChange);
return cb(void 0, getResp);
});
}
function index(args, cb) {
var client = courier.get('client');
client.index(args, function (err, indexResp) {
if (err) return cb(err);
delete indexResp.created;
indexResp._source = args.body;
track(indexResp);
return cb(void 0, indexResp);
});
}
function update(args, cb) {
var client = courier.get('client');
client.update(args, function (err, updateResp) {
if (err) return cb(err);
return cb(void 0, updateResp);
});
}
this.watch = watch;
this.get = get;
this.index = index;
this.set = index;
this.update = update;
}
function equal(o1, o2) {
/* jshint eqeqeq:false, forin:false */
if (o1 === o2) return true;
if (o1 === null || o2 === null) return false;
if (o1 !== o1 && o2 !== o2) return true; // NaN === NaN
var t1 = typeof o1, t2 = typeof o2, length, key, keySet;
if (t1 == t2) {
if (t1 == 'object') {
if (_.isArray(o1)) {
if (!_.isArray(o2)) return false;
if ((length = o1.length) == o2.length) {
for (key = 0; key < length; key++) {
if (!equal(o1[key], o2[key])) return false;
}
return true;
}
} else if (_.isDate(o1)) {
return _.isDate(o2) && o1.getTime() == o2.getTime();
} else if (_.isRegExp(o1) && _.isRegExp(o2)) {
return o1.toString() == o2.toString();
} else {
if (_.isArray(o2)) return false;
keySet = {};
for (key in o1) {
if (_.isFunction(o1[key])) continue;
if (!equal(o1[key], o2[key])) return false;
keySet[key] = true;
}
for (key in o2) {
if (!keySet.hasOwnProperty(key) &&
o2[key] !== undefined &&
!_.isFunction(o2[key])) return false;
}
return true;
}
}
}
return false;
}
return Docs;
});

View file

@ -1,12 +1,12 @@
define(function (require) {
define(function (require, module, exports) {
var listenerCount = require('utils/event_emitter').listenerCount;
// caused by a refresh attempting to start before the prevous is done
function HastyRefresh() {
this.name = 'HastyRefresh';
this.message = 'Courier attempted to start a query before the previous had finished.';
}
HastyRefresh.prototype = new Error();
HastyRefresh.prototype.constructor = HastyRefresh;
return {
HastyRefresh: HastyRefresh
};
exports.HastyRefresh = HastyRefresh;
});

View file

@ -1,16 +0,0 @@
var elasticsearch = require('elasticsearch');
var es = elasticsearch.Client();
es.msearch({
body: [
{
index: 'logstash-2014.02.1111'
},
{
query: { 'match_all': {} }
}
]
}, function (err, resp) {
console.log(resp);
es.close();
});

View file

@ -1,3 +1,3 @@
<courier-test type="apache" fields="extension,response,request"></courier-test>
<courier-test type="nginx" fields=""></courier-test>
<courier-doc-test index="" ></courier-doc-test>
<courier-doc-test index="logstash-2014.02.11" type="apache" id="6434"></courier-doc-test>
<courier-doc-test index="logstash-2014.02.11" type="apache" id="6434"></courier-doc-test>
<courier-test type="apache" fields="extension,response,request"></courier-test>

View file

@ -1,5 +1,6 @@
define(function (require) {
var angular = require('angular');
var _ = require('lodash');
angular.module('kibana/controllers')
.controller('Kibana', function (courier, $scope, $rootScope) {
@ -8,7 +9,7 @@ define(function (require) {
.size(5);
// this should be triggered from within the controlling application
setTimeout(courier.start, 15);
setTimeout(_.bindKey(courier, 'start'), 15);
});
angular.module('kibana/directives')
@ -18,19 +19,53 @@ define(function (require) {
scope: {
type: '@'
},
template: '<strong style="float:left">{{count}} :&nbsp;</strong><pre>{{json}}</pre>',
controller: function ($rootScope, $scope) {
$scope.count = 0;
var source = $rootScope.dataSource.extend()
.type($scope.type)
.source({
include: 'country'
})
.on('results', function (resp) {
$scope.count ++;
$scope.json = JSON.stringify(resp.hits, null, ' ');
});
$scope.$watch('type', source.type);
}
};
})
.directive('courierDocTest', function () {
return {
restrict: 'E',
scope: {
id: '@',
type: '@',
index: '@'
},
template: '<pre>{{json}}</pre>'
template: '<strong style="float:left">{{count}} : <button ng-click="click()">reindex</button> :&nbsp;</strong><pre>{{json}}</pre>',
controller: function (courier, $scope) {
$scope.count = 0;
var currentSource;
$scope.click = function () {
if (currentSource) {
source.update(currentSource);
}
};
var source = courier.createSource('doc')
.id($scope.id)
.type($scope.type)
.index($scope.index)
.on('results', function (doc) {
currentSource = doc._source;
$scope.count ++;
$scope.json = JSON.stringify(doc, null, ' ');
});
}
};
});
});

View file

@ -1,16 +1,22 @@
define(function (require) {
var angular = require('angular');
var Courier = require('courier/courier');
var DocSource = require('courier/data_source/doc');
require('services/promises');
angular.module('kibana/services')
.service('courier', function (es) {
.service('courier', function (es, promises) {
promises.playNice(DocSource.prototype, [
'update',
'index'
]);
var courier = new Courier({
fetchInterval: 15000,
client: es
});
courier.on('error', function (err) {
console.error(err);
client: es,
promises: promises
});
return courier;

View file

@ -0,0 +1,39 @@
define(function (require, module, exports) {
var _ = require('lodash');
var angular = require('angular');
angular.module('kibana/services')
.service('promises', function ($q) {
function playNice(fn, fns) {
if (fns && _.isArray(fns) && _.isObject(fn)) {
fns.forEach(function (method) {
fn[method] = playNice(fn[method]);
});
return fn;
}
return function playNiceWrapper() {
// if the last arg is a callback then don't do anything
if (typeof arguments[arguments.length - 1] === 'function') {
return fn.apply(this, arguments);
}
// otherwise create a callback and pass it in
var args = Array.prototype.slice.call(arguments);
var defer = $q.defer();
args.push(function (err, result) {
if (err) return defer.reject(err);
defer.resolve(result);
});
fn.apply(this, args);
return defer.promise;
};
}
return {
playNice: playNice
};
});
});

View file

@ -2,6 +2,9 @@ define(function (require) {
var Courier = require('courier/courier');
var _ = require('lodash');
var sinon = require('sinon/sinon');
var DataSource = require('courier/data_source/data_source');
var DocSource = require('courier/data_source/doc');
var SearchSource = require('courier/data_source/search');
describe('Courier Module', function () {
@ -39,13 +42,13 @@ define(function (require) {
it('creates an empty search DataSource object', function () {
courier = new Courier();
var source = courier.createSource();
expect(source._state()).to.eql({ _type: 'search' });
expect(source._state).to.eql({});
});
it('optionally accepts a type for the DataSource', function () {
var courier = new Courier();
expect(courier.createSource()._state()._type).to.eql('search');
expect(courier.createSource('search')._state()._type).to.eql('search');
expect(courier.createSource('get')._state()._type).to.eql('get');
expect(courier.createSource()).to.be.a(SearchSource);
expect(courier.createSource('search')).to.be.a(SearchSource);
expect(courier.createSource('doc')).to.be.a(DocSource);
expect(function () {
courier.createSource('invalid type');
}).to.throwError(TypeError);
@ -53,12 +56,12 @@ define(function (require) {
it('optionally accepts a json object/string that will populate the DataSource object with settings', function () {
courier = new Courier();
var savedState = JSON.stringify({
_type: 'get',
_type: 'doc',
index: 'logstash-[YYYY-MM-DD]',
type: 'nginx',
id: '1'
});
var source = courier.createSource('get', savedState);
var source = courier.createSource('doc', savedState);
expect(source + '').to.eql(savedState);
});
});
@ -83,7 +86,7 @@ define(function (require) {
source.on('results', _.noop);
source.index('the index name');
expect(Courier._flattenDataSource(source).index).to.eql('the index name');
expect(source._flatten().index).to.eql('the index name');
});
});
@ -111,7 +114,7 @@ define(function (require) {
})
.on('results', _.noop);
var query = Courier._flattenDataSource(math);
var query = math._flatten();
expect(query.index).to.eql('people');
expect(query.type).to.eql('students');
expect(query.body).to.eql({

View file

@ -1,30 +1,20 @@
define(function (require) {
var Courier = require('courier/courier');
var DataSource = require('courier/data_source');
var DataSource = require('courier/data_source/data_source');
var DocSource = require('courier/data_source/doc');
var SearchSource = require('courier/data_source/search');
describe('DataSource class', function () {
var courier = new Courier();
describe('::new', function () {
it('accepts and validates a type', function () {
var source = new DataSource(courier, 'get');
expect(source._state()._type).to.eql('get');
source = new DataSource(courier, 'search');
expect(source._state()._type).to.eql('search');
expect(function () {
source = new DataSource(courier, 'invalid Type');
}).to.throwError(TypeError);
});
it('optionally accepts a json object/string that will populate the DataSource object with settings', function () {
var savedState = JSON.stringify({
_type: 'get',
_type: 'doc',
index: 'logstash-[YYYY-MM-DD]',
type: 'nginx',
id: '1'
});
var source = new DataSource(courier, 'get', savedState);
var source = new DocSource(courier, savedState);
expect(source + '').to.eql(savedState);
});
});