[Ingest Manager] Set the default timeout to 5 minute with a 20 seconds margin and improve the rate limitter (#79307)

This commit is contained in:
Nicolas Chaulet 2020-10-02 14:25:41 -04:00 committed by GitHub
parent 9973667f4c
commit 54fa55d691
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 30 additions and 15 deletions

View file

@ -13,10 +13,12 @@ export const AGENT_TYPE_EPHEMERAL = 'EPHEMERAL';
export const AGENT_TYPE_TEMPORARY = 'TEMPORARY';
export const AGENT_POLLING_REQUEST_TIMEOUT_MS = 300000; // 5 minutes
export const AGENT_POLLING_REQUEST_TIMEOUT_MARGIN_MS = 20000; // 20s
export const AGENT_POLLING_THRESHOLD_MS = 30000;
export const AGENT_POLLING_INTERVAL = 1000;
export const AGENT_UPDATE_LAST_CHECKIN_INTERVAL_MS = 30000;
export const AGENT_UPDATE_ACTIONS_INTERVAL_MS = 5000;
export const AGENT_POLICY_ROLLOUT_RATE_LIMIT_INTERVAL_MS = 5000;
export const AGENT_POLICY_ROLLOUT_RATE_LIMIT_REQUEST_PER_INTERVAL = 25;
export const AGENT_POLICY_ROLLOUT_RATE_LIMIT_INTERVAL_MS = 1000;
export const AGENT_POLICY_ROLLOUT_RATE_LIMIT_REQUEST_PER_INTERVAL = 5;

View file

@ -8,6 +8,7 @@ export {
AGENT_TYPE_EPHEMERAL,
AGENT_TYPE_TEMPORARY,
AGENT_POLLING_THRESHOLD_MS,
AGENT_POLLING_REQUEST_TIMEOUT_MARGIN_MS,
AGENT_POLLING_INTERVAL,
AGENT_UPDATE_LAST_CHECKIN_INTERVAL_MS,
AGENT_POLICY_ROLLOUT_RATE_LIMIT_REQUEST_PER_INTERVAL,

View file

@ -15,9 +15,13 @@ describe('createRateLimiter', () => {
scheduler.run(({ expectObservable, cold }) => {
const source = cold('a-b-c-d-e-f|');
const rateLimiter = createRateLimiter(10, 1, 2, scheduler);
const intervalMs = 10;
const perInterval = 1;
const maxDelayMs = 50;
const rateLimiter = createRateLimiter(intervalMs, perInterval, maxDelayMs, scheduler);
const obs = source.pipe(rateLimiter());
const results = 'a 9ms b 9ms c 9ms d 9ms e 9ms (f|)';
// f should be dropped because of maxDelay
const results = 'a 9ms b 9ms c 9ms d 9ms (e|)';
expectObservable(obs).toBe(results);
});
});

View file

@ -54,6 +54,8 @@ export function createRateLimiter(
let countInCurrentInterval = 0;
function createRateLimitOperator<T>(): Rx.OperatorFunction<T, T> {
const maxIntervalEnd = scheduler.now() + maxDelay;
return Rx.pipe(
concatMap(function rateLimit(value: T) {
const now = scheduler.now();
@ -61,9 +63,9 @@ export function createRateLimiter(
countInCurrentInterval = 1;
intervalEnd = now + ratelimitIntervalMs;
return Rx.of(value);
} else if (intervalEnd >= now + maxDelay) {
// re-rate limit in the future to avoid to schedule too far in the future as some observer can unsubscribe
return Rx.of(value).pipe(delay(maxDelay, scheduler), createRateLimitOperator<T>());
} else if (intervalEnd >= maxIntervalEnd) {
// drop the value as it's never going to success as long polling timeout is going to happen before we can send the policy
return Rx.EMPTY;
} else {
if (++countInCurrentInterval > ratelimitRequestPerInterval) {
countInCurrentInterval = 1;

View file

@ -27,6 +27,7 @@ import * as APIKeysService from '../../api_keys';
import {
AGENT_SAVED_OBJECT_TYPE,
AGENT_UPDATE_ACTIONS_INTERVAL_MS,
AGENT_POLLING_REQUEST_TIMEOUT_MARGIN_MS,
AGENT_POLICY_ROLLOUT_RATE_LIMIT_INTERVAL_MS,
AGENT_POLICY_ROLLOUT_RATE_LIMIT_REQUEST_PER_INTERVAL,
} from '../../../constants';
@ -38,8 +39,6 @@ import {
import { appContextService } from '../../app_context';
import { toPromiseAbortable, AbortError, createRateLimiter } from './rxjs_utils';
const RATE_LIMIT_MAX_DELAY_MS = 5 * 60 * 1000; // 5 minutes
function getInternalUserSOClient() {
const fakeRequest = ({
headers: {},
@ -166,19 +165,29 @@ export async function createAgentActionFromPolicyAction(
return [newAgentAction];
}
function getPollingTimeoutMs() {
const pollingTimeoutMs = appContextService.getConfig()?.fleet.pollingRequestTimeout ?? 0;
// Set a timeout 20s before the real timeout to have a chance to respond an empty response before socket timeout
return Math.max(
pollingTimeoutMs - AGENT_POLLING_REQUEST_TIMEOUT_MARGIN_MS,
AGENT_POLLING_REQUEST_TIMEOUT_MARGIN_MS
);
}
export function agentCheckinStateNewActionsFactory() {
// Shared Observables
const agentPolicies$ = new Map<string, Observable<AgentPolicyAction>>();
const newActions$ = createNewActionsSharedObservable();
// Rx operators
const pollingTimeoutMs = appContextService.getConfig()?.fleet.pollingRequestTimeout ?? 0;
const pollingTimeoutMs = getPollingTimeoutMs();
const rateLimiterIntervalMs =
appContextService.getConfig()?.fleet.agentPolicyRolloutRateLimitIntervalMs ??
AGENT_POLICY_ROLLOUT_RATE_LIMIT_INTERVAL_MS;
const rateLimiterRequestPerInterval =
appContextService.getConfig()?.fleet.agentPolicyRolloutRateLimitRequestPerInterval ??
AGENT_POLICY_ROLLOUT_RATE_LIMIT_REQUEST_PER_INTERVAL;
const rateLimiterMaxDelay = Math.min(RATE_LIMIT_MAX_DELAY_MS, pollingTimeoutMs);
const rateLimiterMaxDelay = pollingTimeoutMs;
const rateLimiter = createRateLimiter(
rateLimiterIntervalMs,
@ -204,10 +213,7 @@ export function agentCheckinStateNewActionsFactory() {
}
const stream$ = agentPolicy$.pipe(
timeout(
// Set a timeout 3s before the real timeout to have a chance to respond an empty response before socket timeout
Math.max(pollingTimeoutMs - 3000, 3000)
),
timeout(pollingTimeoutMs),
filter(
(action) =>
agent.policy_id !== undefined &&