[Ingest Manager] Internal action for policy reassign (#78493)

This commit is contained in:
Nicolas Chaulet 2020-10-06 09:21:01 -04:00 committed by GitHub
parent dcb646debc
commit 71fef96c38
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 80 additions and 19 deletions

View file

@ -22,7 +22,13 @@ export type AgentStatus =
| 'updating'
| 'degraded';
export type AgentActionType = 'POLICY_CHANGE' | 'UNENROLL' | 'UPGRADE';
export type AgentActionType =
| 'POLICY_CHANGE'
| 'UNENROLL'
| 'UPGRADE'
// INTERNAL* actions are mean to interupt long polling calls these actions will not be distributed to the agent
| 'INTERNAL_POLICY_REASSIGN';
export interface NewAgentAction {
type: AgentActionType;
data?: any;

View file

@ -132,6 +132,14 @@ export async function getAgentActionsForCheckin(
nodeTypes.literal.buildNode(false),
])
),
nodeTypes.function.buildNode(
'not',
nodeTypes.function.buildNodeWithArgumentNodes('is', [
nodeTypes.literal.buildNode(`${AGENT_ACTION_SAVED_OBJECT_TYPE}.attributes.type`),
nodeTypes.literal.buildNode('INTERNAL_POLICY_REASSIGN'),
nodeTypes.literal.buildNode(false),
])
),
nodeTypes.function.buildNodeWithArgumentNodes('is', [
nodeTypes.literal.buildNode(`${AGENT_ACTION_SAVED_OBJECT_TYPE}.attributes.agent_id`),
nodeTypes.literal.buildNode(agentId),

View file

@ -3,13 +3,16 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import semver from 'semver';
import { timer, from, Observable, TimeoutError } from 'rxjs';
import { timer, from, Observable, TimeoutError, of, EMPTY } from 'rxjs';
import { omit } from 'lodash';
import {
shareReplay,
share,
distinctUntilKeyChanged,
switchMap,
concatMap,
merge,
filter,
timeout,
@ -38,6 +41,7 @@ import {
} from '../actions';
import { appContextService } from '../../app_context';
import { toPromiseAbortable, AbortError, createRateLimiter } from './rxjs_utils';
import { getAgent } from '../crud';
function getInternalUserSOClient() {
const fakeRequest = ({
@ -69,7 +73,8 @@ function createNewActionsSharedObservable(): Observable<AgentAction[]> {
lastTimestamp = new Date().toISOString();
return from(getNewActionsSince(internalSOClient, timestamp));
}),
shareReplay({ refCount: true, bufferSize: 1 })
filter((data) => data.length > 0),
share()
);
}
@ -201,6 +206,18 @@ export function agentCheckinStateNewActionsFactory() {
rateLimiterMaxDelay
);
function getOrCreateAgentPolicyObservable(agentPolicyId: string) {
if (!agentPolicies$.has(agentPolicyId)) {
agentPolicies$.set(agentPolicyId, createAgentPolicyActionSharedObservable(agentPolicyId));
}
const agentPolicy$ = agentPolicies$.get(agentPolicyId);
if (!agentPolicy$) {
throw new Error(`Invalid state, no observable for policy ${agentPolicyId}`);
}
return agentPolicy$;
}
async function subscribeToNewActions(
soClient: SavedObjectsClientContract,
agent: Agent,
@ -209,14 +226,7 @@ export function agentCheckinStateNewActionsFactory() {
if (!agent.policy_id) {
throw new Error('Agent does not have a policy');
}
const agentPolicyId = agent.policy_id;
if (!agentPolicies$.has(agentPolicyId)) {
agentPolicies$.set(agentPolicyId, createAgentPolicyActionSharedObservable(agentPolicyId));
}
const agentPolicy$ = agentPolicies$.get(agentPolicyId);
if (!agentPolicy$) {
throw new Error(`Invalid state, no observable for policy ${agentPolicyId}`);
}
const agentPolicy$ = getOrCreateAgentPolicyObservable(agent.policy_id);
const stream$ = agentPolicy$.pipe(
timeout(pollingTimeoutMs),
@ -229,25 +239,43 @@ export function agentCheckinStateNewActionsFactory() {
(!agent.policy_revision || action.policy_revision > agent.policy_revision)
),
rateLimiter(),
switchMap((policyAction) => createAgentActionFromPolicyAction(soClient, agent, policyAction)),
concatMap((policyAction) => createAgentActionFromPolicyAction(soClient, agent, policyAction)),
merge(newActions$),
switchMap(async (data) => {
if (!data) {
return;
concatMap((data: AgentAction[] | undefined) => {
if (data === undefined) {
return EMPTY;
}
const newActions = data.filter((action) => action.agent_id === agent.id);
if (newActions.length === 0) {
return;
return EMPTY;
}
return newActions;
const hasConfigReassign = newActions.some(
(action) => action.type === 'INTERNAL_POLICY_REASSIGN'
);
if (hasConfigReassign) {
return from(getAgent(soClient, agent.id)).pipe(
concatMap((refreshedAgent) => {
if (!refreshedAgent.policy_id) {
throw new Error('Agent does not have a policy assigned');
}
const newAgentPolicy$ = getOrCreateAgentPolicyObservable(refreshedAgent.policy_id);
return newAgentPolicy$;
}),
rateLimiter(),
concatMap((policyAction) =>
createAgentActionFromPolicyAction(soClient, agent, policyAction)
)
);
}
return of(newActions);
}),
filter((data) => data !== undefined),
take(1)
);
try {
const data = await toPromiseAbortable(stream$, options?.signal);
return data || [];
} catch (err) {
if (err instanceof TimeoutError || err instanceof AbortError) {

View file

@ -10,6 +10,7 @@ import { AGENT_SAVED_OBJECT_TYPE } from '../../constants';
import { AgentSOAttributes } from '../../types';
import { agentPolicyService } from '../agent_policy';
import { getAgents, listAllAgents } from './crud';
import { createAgentAction, bulkCreateAgentActions } from './actions';
export async function reassignAgent(
soClient: SavedObjectsClientContract,
@ -25,6 +26,12 @@ export async function reassignAgent(
policy_id: newAgentPolicyId,
policy_revision: null,
});
await createAgentAction(soClient, {
agent_id: agentId,
created_at: new Date().toISOString(),
type: 'INTERNAL_POLICY_REASSIGN',
});
}
export async function reassignAgents(
@ -56,7 +63,7 @@ export async function reassignAgents(
const agentsToUpdate = agents.filter((agent) => agent.policy_id !== newAgentPolicyId);
// Update the necessary agents
return await soClient.bulkUpdate<AgentSOAttributes>(
const res = await soClient.bulkUpdate<AgentSOAttributes>(
agentsToUpdate.map((agent) => ({
type: AGENT_SAVED_OBJECT_TYPE,
id: agent.id,
@ -66,4 +73,15 @@ export async function reassignAgents(
},
}))
);
const now = new Date().toISOString();
await bulkCreateAgentActions(
soClient,
agentsToUpdate.map((agent) => ({
agent_id: agent.id,
created_at: now,
type: 'INTERNAL_POLICY_REASSIGN',
}))
);
return res;
}

View file

@ -67,6 +67,7 @@ export const NewAgentActionSchema = schema.object({
schema.literal('POLICY_CHANGE'),
schema.literal('UNENROLL'),
schema.literal('UPGRADE'),
schema.literal('INTERNAL_POLICY_REASSIGN'),
]),
data: schema.maybe(schema.any()),
ack_data: schema.maybe(schema.any()),