[7.11][Telemetry] Diagnostic Alert Telemetry (#84422)

* Port @tsg's work on task manager.

Remove 2nd var to track telemetry opt in.

Add ES client to start querying index.

Use query to get docs from a dummy index.

Change how index is queried.

Get diagnostic alerts to send to staging cluster.

Record last timestamp.

PoC on telemetry opt in via 2 processes.

Revert to original solution

* Update on agreed method. Fixes race condition.

* Expand wildcards.

* stage.

* Add rule.ruleset collection.

* Update telemetry sender with correct query for loading diag alerts.

* Add similar task tests to endpont artifact work.

* Fix broken import statement.

* Create sender mocks.

* Update test to check for func call.

* Update unused reference.

* record last run.

* Update index.

* fix import

* Fix test.

* test fix.

* Pass unit to time diff calc.

* Tests should pass now hopefully.

* Add additional process fields to allowlist.

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Pete Hampton 2020-12-10 16:13:05 +00:00 committed by GitHub
parent 692247fc12
commit 6e7fb4a515
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 383 additions and 8 deletions

View file

@ -0,0 +1,38 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { TelemetryEventsSender } from './sender';
import { TelemetryDiagTask } from './task';
/**
* Creates a mocked Telemetry Events Sender
*/
export const createMockTelemetryEventsSender = (
enableTelemtry: boolean
): jest.Mocked<TelemetryEventsSender> => {
return ({
setup: jest.fn(),
start: jest.fn(),
stop: jest.fn(),
fetchDiagnosticAlerts: jest.fn(),
queueTelemetryEvents: jest.fn(),
processEvents: jest.fn(),
isTelemetryOptedIn: jest.fn().mockReturnValue(enableTelemtry ?? jest.fn()),
sendIfDue: jest.fn(),
fetchClusterInfo: jest.fn(),
fetchTelemetryUrl: jest.fn(),
fetchLicenseInfo: jest.fn(),
copyLicenseFields: jest.fn(),
sendEvents: jest.fn(),
} as unknown) as jest.Mocked<TelemetryEventsSender>;
};
/**
* Creates a mocked Telemetry Diagnostic Task
*/
export class MockTelemetryDiagnosticTask extends TelemetryDiagTask {
public runTask = jest.fn();
}

View file

@ -14,6 +14,11 @@ import {
TelemetryPluginStart,
TelemetryPluginSetup,
} from '../../../../../../src/plugins/telemetry/server';
import {
TaskManagerSetupContract,
TaskManagerStartContract,
} from '../../../../task_manager/server';
import { TelemetryDiagTask } from './task';
export type SearchTypes =
| string
@ -56,20 +61,34 @@ export class TelemetryEventsSender {
private isSending = false;
private queue: TelemetryEvent[] = [];
private isOptedIn?: boolean = true; // Assume true until the first check
private diagTask?: TelemetryDiagTask;
constructor(logger: Logger) {
this.logger = logger.get('telemetry_events');
}
public setup(telemetrySetup?: TelemetryPluginSetup) {
public setup(telemetrySetup?: TelemetryPluginSetup, taskManager?: TaskManagerSetupContract) {
this.telemetrySetup = telemetrySetup;
if (taskManager) {
this.diagTask = new TelemetryDiagTask(this.logger, taskManager, this);
}
}
public start(core?: CoreStart, telemetryStart?: TelemetryPluginStart) {
public start(
core?: CoreStart,
telemetryStart?: TelemetryPluginStart,
taskManager?: TaskManagerStartContract
) {
this.telemetryStart = telemetryStart;
this.core = core;
this.logger.debug(`Starting task`);
if (taskManager && this.diagTask) {
this.logger.debug(`Starting diag task`);
this.diagTask.start(taskManager);
}
this.logger.debug(`Starting local task`);
setTimeout(() => {
this.sendIfDue();
this.intervalId = setInterval(() => this.sendIfDue(), this.checkIntervalMs);
@ -82,6 +101,38 @@ export class TelemetryEventsSender {
}
}
public async fetchDiagnosticAlerts(executeFrom: string, executeTo: string) {
const query = {
expand_wildcards: 'open,hidden',
index: 'logs-endpoint.diagnostic.collection-*',
ignore_unavailable: true,
size: this.maxQueueSize,
body: {
query: {
range: {
'event.ingested': {
gte: executeFrom,
lt: executeTo,
},
},
},
sort: [
{
'event.ingested': {
order: 'desc',
},
},
],
},
};
if (!this.core) {
throw Error('could not fetch diagnostic alerts. core is not available');
}
const callCluster = this.core.elasticsearch.legacy.client.callAsInternalUser;
return callCluster('search', query);
}
public queueTelemetryEvents(events: TelemetryEvent[]) {
const qlength = this.queue.length;
@ -109,6 +160,11 @@ export class TelemetryEventsSender {
});
}
public async isTelemetryOptedIn() {
this.isOptedIn = await this.telemetryStart?.getIsOptedIn();
return this.isOptedIn === true;
}
private async sendIfDue() {
if (this.isSending) {
return;
@ -121,9 +177,7 @@ export class TelemetryEventsSender {
try {
this.isSending = true;
// Checking opt-in status is relatively expensive (calls a saved-object), so
// we only check it when we have things to send.
this.isOptedIn = await this.telemetryStart?.getIsOptedIn();
this.isOptedIn = await this.isTelemetryOptedIn();
if (!this.isOptedIn) {
this.logger.debug(`Telemetry is not opted-in.`);
this.queue = [];
@ -245,9 +299,14 @@ const allowlistEventFields: AllowlistFields = {
'@timestamp': true,
agent: true,
Endpoint: true,
Ransomware: true,
data_stream: true,
ecs: true,
elastic: true,
event: true,
rule: {
ruleset: true,
},
file: {
name: true,
path: true,
@ -270,6 +329,8 @@ const allowlistEventFields: AllowlistFields = {
executable: true,
command_line: true,
hash: true,
pid: true,
uptime: true,
Ext: {
code_signature: true,
},
@ -281,6 +342,12 @@ const allowlistEventFields: AllowlistFields = {
Ext: {
code_signature: true,
},
uptime: true,
pid: true,
ppid: true,
},
token: {
integrity_level_name: true,
},
},
};

View file

@ -0,0 +1,149 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import moment from 'moment';
import { loggingSystemMock } from 'src/core/server/mocks';
import { taskManagerMock } from '../../../../task_manager/server/mocks';
import { TaskStatus } from '../../../../task_manager/server';
import { TelemetryDiagTask, TelemetryDiagTaskConstants } from './task';
import { createMockTelemetryEventsSender, MockTelemetryDiagnosticTask } from './mocks';
describe('test', () => {
let logger: ReturnType<typeof loggingSystemMock.createLogger>;
beforeEach(() => {
logger = loggingSystemMock.createLogger();
});
describe('basic diagnostic alert telemetry sanity checks', () => {
test('task can register', () => {
const telemetryDiagTask = new TelemetryDiagTask(
logger,
taskManagerMock.createSetup(),
createMockTelemetryEventsSender(true)
);
expect(telemetryDiagTask).toBeInstanceOf(TelemetryDiagTask);
});
});
test('diagnostic task should be registered', () => {
const mockTaskManager = taskManagerMock.createSetup();
new TelemetryDiagTask(logger, mockTaskManager, createMockTelemetryEventsSender(true));
expect(mockTaskManager.registerTaskDefinitions).toHaveBeenCalled();
});
test('task should be scheduled', async () => {
const mockTaskManagerSetup = taskManagerMock.createSetup();
const telemetryDiagTask = new TelemetryDiagTask(
logger,
mockTaskManagerSetup,
createMockTelemetryEventsSender(true)
);
const mockTaskManagerStart = taskManagerMock.createStart();
await telemetryDiagTask.start(mockTaskManagerStart);
expect(mockTaskManagerStart.ensureScheduled).toHaveBeenCalled();
});
test('task should run', async () => {
const mockContext = createMockTelemetryEventsSender(true);
const mockTaskManager = taskManagerMock.createSetup();
const telemetryDiagTask = new MockTelemetryDiagnosticTask(logger, mockTaskManager, mockContext);
const mockTaskInstance = {
id: TelemetryDiagTaskConstants.TYPE,
runAt: new Date(),
attempts: 0,
ownerId: '',
status: TaskStatus.Running,
startedAt: new Date(),
scheduledAt: new Date(),
retryAt: new Date(),
params: {},
state: {},
taskType: TelemetryDiagTaskConstants.TYPE,
};
const createTaskRunner =
mockTaskManager.registerTaskDefinitions.mock.calls[0][0][TelemetryDiagTaskConstants.TYPE]
.createTaskRunner;
const taskRunner = createTaskRunner({ taskInstance: mockTaskInstance });
await taskRunner.run();
expect(telemetryDiagTask.runTask).toHaveBeenCalled();
});
test('task should not query elastic if telemetry is not opted in', async () => {
const mockSender = createMockTelemetryEventsSender(false);
const mockTaskManager = taskManagerMock.createSetup();
new MockTelemetryDiagnosticTask(logger, mockTaskManager, mockSender);
const mockTaskInstance = {
id: TelemetryDiagTaskConstants.TYPE,
runAt: new Date(),
attempts: 0,
ownerId: '',
status: TaskStatus.Running,
startedAt: new Date(),
scheduledAt: new Date(),
retryAt: new Date(),
params: {},
state: {},
taskType: TelemetryDiagTaskConstants.TYPE,
};
const createTaskRunner =
mockTaskManager.registerTaskDefinitions.mock.calls[0][0][TelemetryDiagTaskConstants.TYPE]
.createTaskRunner;
const taskRunner = createTaskRunner({ taskInstance: mockTaskInstance });
await taskRunner.run();
expect(mockSender.fetchDiagnosticAlerts).not.toHaveBeenCalled();
});
test('test -5 mins is returned when there is no previous task run', async () => {
const telemetryDiagTask = new TelemetryDiagTask(
logger,
taskManagerMock.createSetup(),
createMockTelemetryEventsSender(true)
);
const executeTo = moment().utc().toISOString();
const executeFrom = undefined;
const newExecuteFrom = telemetryDiagTask.getLastExecutionTimestamp(executeTo, executeFrom);
expect(newExecuteFrom).toEqual(moment(executeTo).subtract(5, 'minutes').toISOString());
});
test('test -6 mins is returned when there was a previous task run', async () => {
const telemetryDiagTask = new TelemetryDiagTask(
logger,
taskManagerMock.createSetup(),
createMockTelemetryEventsSender(true)
);
const executeTo = moment().utc().toISOString();
const executeFrom = moment(executeTo).subtract(6, 'minutes').toISOString();
const newExecuteFrom = telemetryDiagTask.getLastExecutionTimestamp(executeTo, executeFrom);
expect(newExecuteFrom).toEqual(executeFrom);
});
// it's possible if Kibana is down for a prolonged period the stored lastRun would have drifted
// if that is the case we will just roll it back to a 10 min search window
test('test 10 mins is returned when previous task run took longer than 10 minutes', async () => {
const telemetryDiagTask = new TelemetryDiagTask(
logger,
taskManagerMock.createSetup(),
createMockTelemetryEventsSender(true)
);
const executeTo = moment().utc().toISOString();
const executeFrom = moment(executeTo).subtract(142, 'minutes').toISOString();
const newExecuteFrom = telemetryDiagTask.getLastExecutionTimestamp(executeTo, executeFrom);
expect(newExecuteFrom).toEqual(moment(executeTo).subtract(10, 'minutes').toISOString());
});
});

View file

@ -0,0 +1,121 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import moment from 'moment';
import { Logger } from 'src/core/server';
import {
ConcreteTaskInstance,
TaskManagerSetupContract,
TaskManagerStartContract,
} from '../../../../task_manager/server';
import { TelemetryEventsSender, TelemetryEvent } from './sender';
export const TelemetryDiagTaskConstants = {
TIMEOUT: '1m',
TYPE: 'security:endpoint-diagnostics',
INTERVAL: '5m',
VERSION: '1.0.0',
};
export class TelemetryDiagTask {
private readonly logger: Logger;
private readonly sender: TelemetryEventsSender;
constructor(
logger: Logger,
taskManager: TaskManagerSetupContract,
sender: TelemetryEventsSender
) {
this.logger = logger;
this.sender = sender;
taskManager.registerTaskDefinitions({
[TelemetryDiagTaskConstants.TYPE]: {
title: 'Security Solution Telemetry Diagnostics task',
timeout: TelemetryDiagTaskConstants.TIMEOUT,
createTaskRunner: ({ taskInstance }: { taskInstance: ConcreteTaskInstance }) => {
return {
run: async () => {
const executeTo = moment().utc().toISOString();
const executeFrom = this.getLastExecutionTimestamp(
executeTo,
taskInstance.state?.lastExecutionTimestamp
);
await this.runTask(taskInstance.id, executeFrom, executeTo);
return {
state: {
lastExecutionTimestamp: executeTo,
},
};
},
cancel: async () => {},
};
},
},
});
}
public getLastExecutionTimestamp(executeTo: string, lastExecutionTimestamp?: string) {
if (lastExecutionTimestamp === undefined) {
this.logger.debug(`No last execution timestamp defined`);
return moment(executeTo).subtract(5, 'minutes').toISOString();
}
if (moment(executeTo).diff(lastExecutionTimestamp, 'minutes') >= 10) {
this.logger.debug(`last execution timestamp was greater than 10 minutes`);
return moment(executeTo).subtract(10, 'minutes').toISOString();
}
return lastExecutionTimestamp;
}
public start = async (taskManager: TaskManagerStartContract) => {
try {
await taskManager.ensureScheduled({
id: this.getTaskId(),
taskType: TelemetryDiagTaskConstants.TYPE,
scope: ['securitySolution'],
schedule: {
interval: TelemetryDiagTaskConstants.INTERVAL,
},
state: {},
params: { version: TelemetryDiagTaskConstants.VERSION },
});
} catch (e) {
this.logger.error(`Error scheduling task, received ${e.message}`);
}
};
private getTaskId = (): string => {
return `${TelemetryDiagTaskConstants.TYPE}:${TelemetryDiagTaskConstants.VERSION}`;
};
public runTask = async (taskId: string, searchFrom: string, searchTo: string) => {
this.logger.debug(`Running task ${taskId}`);
if (taskId !== this.getTaskId()) {
this.logger.debug(`Outdated task running: ${taskId}`);
return;
}
const isOptedIn = await this.sender.isTelemetryOptedIn();
if (!isOptedIn) {
this.logger.debug(`Telemetry is not opted-in.`);
return;
}
const response = await this.sender.fetchDiagnosticAlerts(searchFrom, searchTo);
const hits = response.hits?.hits || [];
if (!Array.isArray(hits) || !hits.length) {
this.logger.debug('no diagnostic alerts retrieved');
return;
}
const diagAlerts: TelemetryEvent[] = hits.map((h) => h._source);
this.logger.debug(`Received ${diagAlerts.length} diagnostic alerts`);
this.sender.queueTelemetryEvents(diagAlerts);
};
}

View file

@ -316,7 +316,7 @@ export class Plugin implements IPlugin<PluginSetup, PluginStart, SetupPlugins, S
);
});
this.telemetryEventsSender.setup(plugins.telemetry);
this.telemetryEventsSender.setup(plugins.telemetry, plugins.taskManager);
return {};
}
@ -369,7 +369,7 @@ export class Plugin implements IPlugin<PluginSetup, PluginStart, SetupPlugins, S
this.logger.debug('User artifacts task not available.');
}
this.telemetryEventsSender.start(core, plugins.telemetry);
this.telemetryEventsSender.start(core, plugins.telemetry, plugins.taskManager);
this.licensing$ = plugins.licensing.license$;
licenseService.start(this.licensing$);
this.policyWatcher = new PolicyWatcher(