[data.search.session] Server telemetry on search sessions (#91256)

* [data.search.session] Server telemetry on search sessions

* Update telemetry mappings

* Added tests and logger

Co-authored-by: Liza K <liza.katz@elastic.co>
This commit is contained in:
Lukas Olson 2021-02-15 07:48:16 -07:00 committed by GitHub
parent 2db193b4ed
commit 42e11e6763
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 217 additions and 0 deletions

View file

@ -0,0 +1,94 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import {
SharedGlobalConfig,
ElasticsearchClient,
SavedObjectsErrorHelpers,
Logger,
} from '../../../../../src/core/server';
import { BehaviorSubject } from 'rxjs';
import { fetchProvider } from './fetch';
import { elasticsearchServiceMock } from '../../../../../src/core/server/mocks';
describe('fetchProvider', () => {
let fetchFn: any;
let esClient: jest.Mocked<ElasticsearchClient>;
let mockLogger: Logger;
beforeEach(async () => {
const config$ = new BehaviorSubject<SharedGlobalConfig>({
kibana: {
index: '123',
},
} as any);
mockLogger = {
warn: jest.fn(),
debug: jest.fn(),
} as any;
esClient = elasticsearchServiceMock.createElasticsearchClient();
fetchFn = fetchProvider(config$, mockLogger);
});
test('returns when ES returns no results', async () => {
esClient.search.mockResolvedValue({
statusCode: 200,
body: {
aggregations: {
persisted: {
buckets: [],
},
},
},
} as any);
const collRes = await fetchFn({ esClient });
expect(collRes.transientCount).toBe(0);
expect(collRes.persistedCount).toBe(0);
expect(collRes.totalCount).toBe(0);
expect(mockLogger.warn).not.toBeCalled();
});
test('returns when ES throws an error', async () => {
esClient.search.mockRejectedValue(
SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')
);
const collRes = await fetchFn({ esClient });
expect(collRes.transientCount).toBe(0);
expect(collRes.persistedCount).toBe(0);
expect(collRes.totalCount).toBe(0);
expect(mockLogger.warn).toBeCalledTimes(1);
});
test('returns when ES returns full buckets', async () => {
esClient.search.mockResolvedValue({
statusCode: 200,
body: {
aggregations: {
persisted: {
buckets: [
{
key_as_string: 'true',
doc_count: 10,
},
{
key_as_string: 'false',
doc_count: 7,
},
],
},
},
},
} as any);
const collRes = await fetchFn({ esClient });
expect(collRes.transientCount).toBe(7);
expect(collRes.persistedCount).toBe(10);
expect(collRes.totalCount).toBe(17);
});
});

View file

@ -0,0 +1,59 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Observable } from 'rxjs';
import { first } from 'rxjs/operators';
import { SearchResponse } from 'elasticsearch';
import { SharedGlobalConfig, Logger } from 'kibana/server';
import { CollectorFetchContext } from '../../../../../src/plugins/usage_collection/server';
import { SEARCH_SESSION_TYPE } from '../../common';
import { ReportedUsage } from './register';
interface SessionPersistedTermsBucket {
key_as_string: 'false' | 'true';
doc_count: number;
}
export function fetchProvider(config$: Observable<SharedGlobalConfig>, logger: Logger) {
return async ({ esClient }: CollectorFetchContext): Promise<ReportedUsage> => {
try {
const config = await config$.pipe(first()).toPromise();
const { body: esResponse } = await esClient.search<SearchResponse<unknown>>({
index: config.kibana.index,
body: {
size: 0,
aggs: {
persisted: {
terms: {
field: `${SEARCH_SESSION_TYPE}.persisted`,
},
},
},
},
});
const { buckets } = esResponse.aggregations.persisted;
if (!buckets.length) {
return { transientCount: 0, persistedCount: 0, totalCount: 0 };
}
const { transientCount = 0, persistedCount = 0 } = buckets.reduce(
(usage: Partial<ReportedUsage>, bucket: SessionPersistedTermsBucket) => {
const key = bucket.key_as_string === 'false' ? 'transientCount' : 'persistedCount';
return { ...usage, [key]: bucket.doc_count };
},
{}
);
const totalCount = transientCount + persistedCount;
logger.debug(`fetchProvider | ${persistedCount} persisted | ${transientCount} transient`);
return { transientCount, persistedCount, totalCount };
} catch (e) {
logger.warn(`fetchProvider | error | ${e.message}`);
return { transientCount: 0, persistedCount: 0, totalCount: 0 };
}
};
}

View file

@ -0,0 +1,8 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export { registerUsageCollector } from './register';

View file

@ -0,0 +1,38 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { PluginInitializerContext, Logger } from 'kibana/server';
import { UsageCollectionSetup } from 'src/plugins/usage_collection/server';
import { fetchProvider } from './fetch';
export interface ReportedUsage {
transientCount: number;
persistedCount: number;
totalCount: number;
}
export async function registerUsageCollector(
usageCollection: UsageCollectionSetup,
context: PluginInitializerContext,
logger: Logger
) {
try {
const collector = usageCollection.makeUsageCollector<ReportedUsage>({
type: 'search-session',
isReady: () => true,
fetch: fetchProvider(context.config.legacy.globalConfig$, logger),
schema: {
transientCount: { type: 'long' },
persistedCount: { type: 'long' },
totalCount: { type: 'long' },
},
});
usageCollection.registerCollector(collector);
} catch (err) {
return; // kibana plugin is not enabled (test environment)
}
}

View file

@ -24,6 +24,7 @@ import {
import { getUiSettings } from './ui_settings';
import type { DataEnhancedRequestHandlerContext } from './type';
import { ConfigSchema } from '../config';
import { registerUsageCollector } from './collectors';
import { SecurityPluginSetup } from '../../security/server';
interface SetupDependencies {
@ -85,6 +86,10 @@ export class EnhancedDataServerPlugin
this.sessionService.setup(core, {
taskManager: deps.taskManager,
});
if (deps.usageCollection) {
registerUsageCollector(deps.usageCollection, this.initializerContext, this.logger);
}
}
public start(core: CoreStart, { taskManager }: StartDependencies) {

View file

@ -3183,6 +3183,19 @@
}
}
},
"search-session": {
"properties": {
"transientCount": {
"type": "long"
},
"persistedCount": {
"type": "long"
},
"totalCount": {
"type": "long"
}
}
},
"security_solution": {
"properties": {
"detections": {