From 6e7fb4a5155d2975abf005dd61746f1861d6c7cb Mon Sep 17 00:00:00 2001 From: Pete Hampton Date: Thu, 10 Dec 2020 16:13:05 +0000 Subject: [PATCH] [7.11][Telemetry] Diagnostic Alert Telemetry (#84422) * Port @tsg's work on task manager. Remove 2nd var to track telemetry opt in. Add ES client to start querying index. Use query to get docs from a dummy index. Change how index is queried. Get diagnostic alerts to send to staging cluster. Record last timestamp. PoC on telemetry opt in via 2 processes. Revert to original solution * Update on agreed method. Fixes race condition. * Expand wildcards. * stage. * Add rule.ruleset collection. * Update telemetry sender with correct query for loading diag alerts. * Add similar task tests to endpont artifact work. * Fix broken import statement. * Create sender mocks. * Update test to check for func call. * Update unused reference. * record last run. * Update index. * fix import * Fix test. * test fix. * Pass unit to time diff calc. * Tests should pass now hopefully. * Add additional process fields to allowlist. Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com> --- .../server/lib/telemetry/mocks.ts | 38 +++++ .../server/lib/telemetry/sender.ts | 79 +++++++++- .../server/lib/telemetry/task.test.ts | 149 ++++++++++++++++++ .../server/lib/telemetry/task.ts | 121 ++++++++++++++ .../security_solution/server/plugin.ts | 4 +- 5 files changed, 383 insertions(+), 8 deletions(-) create mode 100644 x-pack/plugins/security_solution/server/lib/telemetry/mocks.ts create mode 100644 x-pack/plugins/security_solution/server/lib/telemetry/task.test.ts create mode 100644 x-pack/plugins/security_solution/server/lib/telemetry/task.ts diff --git a/x-pack/plugins/security_solution/server/lib/telemetry/mocks.ts b/x-pack/plugins/security_solution/server/lib/telemetry/mocks.ts new file mode 100644 index 000000000000..b8908b1ad30c --- /dev/null +++ b/x-pack/plugins/security_solution/server/lib/telemetry/mocks.ts @@ -0,0 +1,38 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { TelemetryEventsSender } from './sender'; +import { TelemetryDiagTask } from './task'; + +/** + * Creates a mocked Telemetry Events Sender + */ +export const createMockTelemetryEventsSender = ( + enableTelemtry: boolean +): jest.Mocked => { + return ({ + setup: jest.fn(), + start: jest.fn(), + stop: jest.fn(), + fetchDiagnosticAlerts: jest.fn(), + queueTelemetryEvents: jest.fn(), + processEvents: jest.fn(), + isTelemetryOptedIn: jest.fn().mockReturnValue(enableTelemtry ?? jest.fn()), + sendIfDue: jest.fn(), + fetchClusterInfo: jest.fn(), + fetchTelemetryUrl: jest.fn(), + fetchLicenseInfo: jest.fn(), + copyLicenseFields: jest.fn(), + sendEvents: jest.fn(), + } as unknown) as jest.Mocked; +}; + +/** + * Creates a mocked Telemetry Diagnostic Task + */ +export class MockTelemetryDiagnosticTask extends TelemetryDiagTask { + public runTask = jest.fn(); +} diff --git a/x-pack/plugins/security_solution/server/lib/telemetry/sender.ts b/x-pack/plugins/security_solution/server/lib/telemetry/sender.ts index 88ce963757f6..65c69966f7ca 100644 --- a/x-pack/plugins/security_solution/server/lib/telemetry/sender.ts +++ b/x-pack/plugins/security_solution/server/lib/telemetry/sender.ts @@ -14,6 +14,11 @@ import { TelemetryPluginStart, TelemetryPluginSetup, } from '../../../../../../src/plugins/telemetry/server'; +import { + TaskManagerSetupContract, + TaskManagerStartContract, +} from '../../../../task_manager/server'; +import { TelemetryDiagTask } from './task'; export type SearchTypes = | string @@ -56,20 +61,34 @@ export class TelemetryEventsSender { private isSending = false; private queue: TelemetryEvent[] = []; private isOptedIn?: boolean = true; // Assume true until the first check + private diagTask?: TelemetryDiagTask; constructor(logger: Logger) { this.logger = logger.get('telemetry_events'); } - public setup(telemetrySetup?: TelemetryPluginSetup) { + public setup(telemetrySetup?: TelemetryPluginSetup, taskManager?: TaskManagerSetupContract) { this.telemetrySetup = telemetrySetup; + + if (taskManager) { + this.diagTask = new TelemetryDiagTask(this.logger, taskManager, this); + } } - public start(core?: CoreStart, telemetryStart?: TelemetryPluginStart) { + public start( + core?: CoreStart, + telemetryStart?: TelemetryPluginStart, + taskManager?: TaskManagerStartContract + ) { this.telemetryStart = telemetryStart; this.core = core; - this.logger.debug(`Starting task`); + if (taskManager && this.diagTask) { + this.logger.debug(`Starting diag task`); + this.diagTask.start(taskManager); + } + + this.logger.debug(`Starting local task`); setTimeout(() => { this.sendIfDue(); this.intervalId = setInterval(() => this.sendIfDue(), this.checkIntervalMs); @@ -82,6 +101,38 @@ export class TelemetryEventsSender { } } + public async fetchDiagnosticAlerts(executeFrom: string, executeTo: string) { + const query = { + expand_wildcards: 'open,hidden', + index: 'logs-endpoint.diagnostic.collection-*', + ignore_unavailable: true, + size: this.maxQueueSize, + body: { + query: { + range: { + 'event.ingested': { + gte: executeFrom, + lt: executeTo, + }, + }, + }, + sort: [ + { + 'event.ingested': { + order: 'desc', + }, + }, + ], + }, + }; + + if (!this.core) { + throw Error('could not fetch diagnostic alerts. core is not available'); + } + const callCluster = this.core.elasticsearch.legacy.client.callAsInternalUser; + return callCluster('search', query); + } + public queueTelemetryEvents(events: TelemetryEvent[]) { const qlength = this.queue.length; @@ -109,6 +160,11 @@ export class TelemetryEventsSender { }); } + public async isTelemetryOptedIn() { + this.isOptedIn = await this.telemetryStart?.getIsOptedIn(); + return this.isOptedIn === true; + } + private async sendIfDue() { if (this.isSending) { return; @@ -121,9 +177,7 @@ export class TelemetryEventsSender { try { this.isSending = true; - // Checking opt-in status is relatively expensive (calls a saved-object), so - // we only check it when we have things to send. - this.isOptedIn = await this.telemetryStart?.getIsOptedIn(); + this.isOptedIn = await this.isTelemetryOptedIn(); if (!this.isOptedIn) { this.logger.debug(`Telemetry is not opted-in.`); this.queue = []; @@ -245,9 +299,14 @@ const allowlistEventFields: AllowlistFields = { '@timestamp': true, agent: true, Endpoint: true, + Ransomware: true, + data_stream: true, ecs: true, elastic: true, event: true, + rule: { + ruleset: true, + }, file: { name: true, path: true, @@ -270,6 +329,8 @@ const allowlistEventFields: AllowlistFields = { executable: true, command_line: true, hash: true, + pid: true, + uptime: true, Ext: { code_signature: true, }, @@ -281,6 +342,12 @@ const allowlistEventFields: AllowlistFields = { Ext: { code_signature: true, }, + uptime: true, + pid: true, + ppid: true, + }, + token: { + integrity_level_name: true, }, }, }; diff --git a/x-pack/plugins/security_solution/server/lib/telemetry/task.test.ts b/x-pack/plugins/security_solution/server/lib/telemetry/task.test.ts new file mode 100644 index 000000000000..d5856cef8c02 --- /dev/null +++ b/x-pack/plugins/security_solution/server/lib/telemetry/task.test.ts @@ -0,0 +1,149 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +import moment from 'moment'; +import { loggingSystemMock } from 'src/core/server/mocks'; + +import { taskManagerMock } from '../../../../task_manager/server/mocks'; +import { TaskStatus } from '../../../../task_manager/server'; + +import { TelemetryDiagTask, TelemetryDiagTaskConstants } from './task'; +import { createMockTelemetryEventsSender, MockTelemetryDiagnosticTask } from './mocks'; + +describe('test', () => { + let logger: ReturnType; + + beforeEach(() => { + logger = loggingSystemMock.createLogger(); + }); + + describe('basic diagnostic alert telemetry sanity checks', () => { + test('task can register', () => { + const telemetryDiagTask = new TelemetryDiagTask( + logger, + taskManagerMock.createSetup(), + createMockTelemetryEventsSender(true) + ); + + expect(telemetryDiagTask).toBeInstanceOf(TelemetryDiagTask); + }); + }); + + test('diagnostic task should be registered', () => { + const mockTaskManager = taskManagerMock.createSetup(); + new TelemetryDiagTask(logger, mockTaskManager, createMockTelemetryEventsSender(true)); + + expect(mockTaskManager.registerTaskDefinitions).toHaveBeenCalled(); + }); + + test('task should be scheduled', async () => { + const mockTaskManagerSetup = taskManagerMock.createSetup(); + const telemetryDiagTask = new TelemetryDiagTask( + logger, + mockTaskManagerSetup, + createMockTelemetryEventsSender(true) + ); + + const mockTaskManagerStart = taskManagerMock.createStart(); + await telemetryDiagTask.start(mockTaskManagerStart); + expect(mockTaskManagerStart.ensureScheduled).toHaveBeenCalled(); + }); + + test('task should run', async () => { + const mockContext = createMockTelemetryEventsSender(true); + const mockTaskManager = taskManagerMock.createSetup(); + const telemetryDiagTask = new MockTelemetryDiagnosticTask(logger, mockTaskManager, mockContext); + + const mockTaskInstance = { + id: TelemetryDiagTaskConstants.TYPE, + runAt: new Date(), + attempts: 0, + ownerId: '', + status: TaskStatus.Running, + startedAt: new Date(), + scheduledAt: new Date(), + retryAt: new Date(), + params: {}, + state: {}, + taskType: TelemetryDiagTaskConstants.TYPE, + }; + const createTaskRunner = + mockTaskManager.registerTaskDefinitions.mock.calls[0][0][TelemetryDiagTaskConstants.TYPE] + .createTaskRunner; + const taskRunner = createTaskRunner({ taskInstance: mockTaskInstance }); + await taskRunner.run(); + expect(telemetryDiagTask.runTask).toHaveBeenCalled(); + }); + + test('task should not query elastic if telemetry is not opted in', async () => { + const mockSender = createMockTelemetryEventsSender(false); + const mockTaskManager = taskManagerMock.createSetup(); + new MockTelemetryDiagnosticTask(logger, mockTaskManager, mockSender); + + const mockTaskInstance = { + id: TelemetryDiagTaskConstants.TYPE, + runAt: new Date(), + attempts: 0, + ownerId: '', + status: TaskStatus.Running, + startedAt: new Date(), + scheduledAt: new Date(), + retryAt: new Date(), + params: {}, + state: {}, + taskType: TelemetryDiagTaskConstants.TYPE, + }; + const createTaskRunner = + mockTaskManager.registerTaskDefinitions.mock.calls[0][0][TelemetryDiagTaskConstants.TYPE] + .createTaskRunner; + const taskRunner = createTaskRunner({ taskInstance: mockTaskInstance }); + await taskRunner.run(); + expect(mockSender.fetchDiagnosticAlerts).not.toHaveBeenCalled(); + }); + + test('test -5 mins is returned when there is no previous task run', async () => { + const telemetryDiagTask = new TelemetryDiagTask( + logger, + taskManagerMock.createSetup(), + createMockTelemetryEventsSender(true) + ); + + const executeTo = moment().utc().toISOString(); + const executeFrom = undefined; + const newExecuteFrom = telemetryDiagTask.getLastExecutionTimestamp(executeTo, executeFrom); + + expect(newExecuteFrom).toEqual(moment(executeTo).subtract(5, 'minutes').toISOString()); + }); + + test('test -6 mins is returned when there was a previous task run', async () => { + const telemetryDiagTask = new TelemetryDiagTask( + logger, + taskManagerMock.createSetup(), + createMockTelemetryEventsSender(true) + ); + + const executeTo = moment().utc().toISOString(); + const executeFrom = moment(executeTo).subtract(6, 'minutes').toISOString(); + const newExecuteFrom = telemetryDiagTask.getLastExecutionTimestamp(executeTo, executeFrom); + + expect(newExecuteFrom).toEqual(executeFrom); + }); + + // it's possible if Kibana is down for a prolonged period the stored lastRun would have drifted + // if that is the case we will just roll it back to a 10 min search window + test('test 10 mins is returned when previous task run took longer than 10 minutes', async () => { + const telemetryDiagTask = new TelemetryDiagTask( + logger, + taskManagerMock.createSetup(), + createMockTelemetryEventsSender(true) + ); + + const executeTo = moment().utc().toISOString(); + const executeFrom = moment(executeTo).subtract(142, 'minutes').toISOString(); + const newExecuteFrom = telemetryDiagTask.getLastExecutionTimestamp(executeTo, executeFrom); + + expect(newExecuteFrom).toEqual(moment(executeTo).subtract(10, 'minutes').toISOString()); + }); +}); diff --git a/x-pack/plugins/security_solution/server/lib/telemetry/task.ts b/x-pack/plugins/security_solution/server/lib/telemetry/task.ts new file mode 100644 index 000000000000..28b8524f6451 --- /dev/null +++ b/x-pack/plugins/security_solution/server/lib/telemetry/task.ts @@ -0,0 +1,121 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +import moment from 'moment'; +import { Logger } from 'src/core/server'; +import { + ConcreteTaskInstance, + TaskManagerSetupContract, + TaskManagerStartContract, +} from '../../../../task_manager/server'; +import { TelemetryEventsSender, TelemetryEvent } from './sender'; + +export const TelemetryDiagTaskConstants = { + TIMEOUT: '1m', + TYPE: 'security:endpoint-diagnostics', + INTERVAL: '5m', + VERSION: '1.0.0', +}; + +export class TelemetryDiagTask { + private readonly logger: Logger; + private readonly sender: TelemetryEventsSender; + + constructor( + logger: Logger, + taskManager: TaskManagerSetupContract, + sender: TelemetryEventsSender + ) { + this.logger = logger; + this.sender = sender; + + taskManager.registerTaskDefinitions({ + [TelemetryDiagTaskConstants.TYPE]: { + title: 'Security Solution Telemetry Diagnostics task', + timeout: TelemetryDiagTaskConstants.TIMEOUT, + createTaskRunner: ({ taskInstance }: { taskInstance: ConcreteTaskInstance }) => { + return { + run: async () => { + const executeTo = moment().utc().toISOString(); + const executeFrom = this.getLastExecutionTimestamp( + executeTo, + taskInstance.state?.lastExecutionTimestamp + ); + await this.runTask(taskInstance.id, executeFrom, executeTo); + + return { + state: { + lastExecutionTimestamp: executeTo, + }, + }; + }, + cancel: async () => {}, + }; + }, + }, + }); + } + + public getLastExecutionTimestamp(executeTo: string, lastExecutionTimestamp?: string) { + if (lastExecutionTimestamp === undefined) { + this.logger.debug(`No last execution timestamp defined`); + return moment(executeTo).subtract(5, 'minutes').toISOString(); + } + + if (moment(executeTo).diff(lastExecutionTimestamp, 'minutes') >= 10) { + this.logger.debug(`last execution timestamp was greater than 10 minutes`); + return moment(executeTo).subtract(10, 'minutes').toISOString(); + } + + return lastExecutionTimestamp; + } + + public start = async (taskManager: TaskManagerStartContract) => { + try { + await taskManager.ensureScheduled({ + id: this.getTaskId(), + taskType: TelemetryDiagTaskConstants.TYPE, + scope: ['securitySolution'], + schedule: { + interval: TelemetryDiagTaskConstants.INTERVAL, + }, + state: {}, + params: { version: TelemetryDiagTaskConstants.VERSION }, + }); + } catch (e) { + this.logger.error(`Error scheduling task, received ${e.message}`); + } + }; + + private getTaskId = (): string => { + return `${TelemetryDiagTaskConstants.TYPE}:${TelemetryDiagTaskConstants.VERSION}`; + }; + + public runTask = async (taskId: string, searchFrom: string, searchTo: string) => { + this.logger.debug(`Running task ${taskId}`); + if (taskId !== this.getTaskId()) { + this.logger.debug(`Outdated task running: ${taskId}`); + return; + } + + const isOptedIn = await this.sender.isTelemetryOptedIn(); + if (!isOptedIn) { + this.logger.debug(`Telemetry is not opted-in.`); + return; + } + + const response = await this.sender.fetchDiagnosticAlerts(searchFrom, searchTo); + + const hits = response.hits?.hits || []; + if (!Array.isArray(hits) || !hits.length) { + this.logger.debug('no diagnostic alerts retrieved'); + return; + } + + const diagAlerts: TelemetryEvent[] = hits.map((h) => h._source); + this.logger.debug(`Received ${diagAlerts.length} diagnostic alerts`); + this.sender.queueTelemetryEvents(diagAlerts); + }; +} diff --git a/x-pack/plugins/security_solution/server/plugin.ts b/x-pack/plugins/security_solution/server/plugin.ts index b8676893d8ba..c4ee231ee1d6 100644 --- a/x-pack/plugins/security_solution/server/plugin.ts +++ b/x-pack/plugins/security_solution/server/plugin.ts @@ -316,7 +316,7 @@ export class Plugin implements IPlugin