Retry network errors on registry requests (#74507)
This commit is contained in:
parent
6d8d802b3f
commit
0f494f420f
|
@ -0,0 +1,108 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
import { fetchUrl } from './requests';
|
||||
import { RegistryError } from '../../../errors';
|
||||
jest.mock('node-fetch');
|
||||
|
||||
const { Response, FetchError } = jest.requireActual('node-fetch');
|
||||
// eslint-disable-next-line @typescript-eslint/no-var-requires
|
||||
const fetchMock = require('node-fetch') as jest.Mock;
|
||||
|
||||
jest.setTimeout(120 * 1000);
|
||||
describe('setupIngestManager', () => {
|
||||
beforeEach(async () => {});
|
||||
|
||||
afterEach(async () => {
|
||||
jest.clearAllMocks();
|
||||
});
|
||||
|
||||
describe('fetchUrl / getResponse errors', () => {
|
||||
it('regular Errors do not retry. Becomes RegistryError', async () => {
|
||||
fetchMock.mockImplementationOnce(() => {
|
||||
throw new Error('mocked');
|
||||
});
|
||||
const promise = fetchUrl('');
|
||||
await expect(promise).rejects.toThrow(RegistryError);
|
||||
expect(fetchMock).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('TypeErrors do not retry. Becomes RegistryError', async () => {
|
||||
fetchMock.mockImplementationOnce(() => {
|
||||
// @ts-expect-error
|
||||
null.f();
|
||||
});
|
||||
const promise = fetchUrl('');
|
||||
await expect(promise).rejects.toThrow(RegistryError);
|
||||
expect(fetchMock).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
describe('only system errors retry (like ECONNRESET)', () => {
|
||||
it('they eventually succeed', async () => {
|
||||
const successValue = JSON.stringify({ name: 'attempt 4 works', version: '1.2.3' });
|
||||
fetchMock
|
||||
.mockImplementationOnce(() => {
|
||||
throw new FetchError('message 1', 'system', { code: 'ESOMETHING' });
|
||||
})
|
||||
.mockImplementationOnce(() => {
|
||||
throw new FetchError('message 2', 'system', { code: 'ESOMETHING' });
|
||||
})
|
||||
.mockImplementationOnce(() => {
|
||||
throw new FetchError('message 3', 'system', { code: 'ESOMETHING' });
|
||||
})
|
||||
// this one succeeds
|
||||
.mockImplementationOnce(() => Promise.resolve(new Response(successValue)))
|
||||
.mockImplementationOnce(() => {
|
||||
throw new FetchError('message 5', 'system', { code: 'ESOMETHING' });
|
||||
})
|
||||
.mockImplementationOnce(() => {
|
||||
throw new FetchError('message 6', 'system', { code: 'ESOMETHING' });
|
||||
});
|
||||
|
||||
const promise = fetchUrl('');
|
||||
await expect(promise).resolves.toEqual(successValue);
|
||||
// doesn't retry after success
|
||||
expect(fetchMock).toHaveBeenCalledTimes(4);
|
||||
const actualResultsOrder = fetchMock.mock.results.map(({ type }: { type: string }) => type);
|
||||
expect(actualResultsOrder).toEqual(['throw', 'throw', 'throw', 'return']);
|
||||
});
|
||||
|
||||
it('or error after 1 failure & 5 retries with RegistryError', async () => {
|
||||
fetchMock
|
||||
.mockImplementationOnce(() => {
|
||||
throw new FetchError('message 1', 'system', { code: 'ESOMETHING' });
|
||||
})
|
||||
.mockImplementationOnce(() => {
|
||||
throw new FetchError('message 2', 'system', { code: 'ESOMETHING' });
|
||||
})
|
||||
.mockImplementationOnce(() => {
|
||||
throw new FetchError('message 3', 'system', { code: 'ESOMETHING' });
|
||||
})
|
||||
.mockImplementationOnce(() => {
|
||||
throw new FetchError('message 4', 'system', { code: 'ESOMETHING' });
|
||||
})
|
||||
.mockImplementationOnce(() => {
|
||||
throw new FetchError('message 5', 'system', { code: 'ESOMETHING' });
|
||||
})
|
||||
.mockImplementationOnce(() => {
|
||||
throw new FetchError('message 6', 'system', { code: 'ESOMETHING' });
|
||||
})
|
||||
.mockImplementationOnce(() => {
|
||||
throw new FetchError('message 7', 'system', { code: 'ESOMETHING' });
|
||||
})
|
||||
.mockImplementationOnce(() => {
|
||||
throw new FetchError('message 8', 'system', { code: 'ESOMETHING' });
|
||||
});
|
||||
|
||||
const promise = fetchUrl('');
|
||||
await expect(promise).rejects.toThrow(RegistryError);
|
||||
// doesn't retry after 1 failure & 5 failed retries
|
||||
expect(fetchMock).toHaveBeenCalledTimes(6);
|
||||
const actualResultsOrder = fetchMock.mock.results.map(({ type }: { type: string }) => type);
|
||||
expect(actualResultsOrder).toEqual(['throw', 'throw', 'throw', 'throw', 'throw', 'throw']);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
|
@ -4,20 +4,49 @@
|
|||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import fetch, { Response } from 'node-fetch';
|
||||
import fetch, { FetchError, Response } from 'node-fetch';
|
||||
import pRetry from 'p-retry';
|
||||
import { streamToString } from './streams';
|
||||
import { RegistryError } from '../../../errors';
|
||||
|
||||
type FailedAttemptErrors = pRetry.FailedAttemptError | FetchError | Error;
|
||||
|
||||
// not sure what to call this function, but we're not exporting it
|
||||
async function registryFetch(url: string) {
|
||||
const response = await fetch(url);
|
||||
|
||||
if (response.ok) {
|
||||
return response;
|
||||
} else {
|
||||
// 4xx & 5xx responses
|
||||
// exit without retry & throw RegistryError
|
||||
throw new pRetry.AbortError(
|
||||
new RegistryError(`Error connecting to package registry at ${url}: ${response.statusText}`)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
export async function getResponse(url: string): Promise<Response> {
|
||||
try {
|
||||
const response = await fetch(url);
|
||||
if (response.ok) {
|
||||
return response;
|
||||
} else {
|
||||
throw new RegistryError(
|
||||
`Error connecting to package registry at ${url}: ${response.statusText}`
|
||||
);
|
||||
}
|
||||
// we only want to retry certain failures like network issues
|
||||
// the rest should only try the one time then fail as they do now
|
||||
const response = await pRetry(() => registryFetch(url), {
|
||||
factor: 2,
|
||||
retries: 5,
|
||||
onFailedAttempt: (error) => {
|
||||
// we only want to retry certain types of errors, like `ECONNREFUSED` and other operational errors
|
||||
// and let the others through without retrying
|
||||
//
|
||||
// throwing in onFailedAttempt will abandon all retries & fail the request
|
||||
// we only want to retry system errors, so throw a RegistryError for everything else
|
||||
if (!isSystemError(error)) {
|
||||
throw new RegistryError(
|
||||
`Error connecting to package registry at ${url}: ${error.message}`
|
||||
);
|
||||
}
|
||||
},
|
||||
});
|
||||
return response;
|
||||
} catch (e) {
|
||||
throw new RegistryError(`Error connecting to package registry at ${url}: ${e.message}`);
|
||||
}
|
||||
|
@ -31,3 +60,14 @@ export async function getResponseStream(url: string): Promise<NodeJS.ReadableStr
|
|||
export async function fetchUrl(url: string): Promise<string> {
|
||||
return getResponseStream(url).then(streamToString);
|
||||
}
|
||||
|
||||
// node-fetch throws a FetchError for those types of errors and
|
||||
// "All errors originating from Node.js core are marked with error.type = 'system'"
|
||||
// https://github.com/node-fetch/node-fetch/blob/master/docs/ERROR-HANDLING.md#error-handling-with-node-fetch
|
||||
function isFetchError(error: FailedAttemptErrors): error is FetchError {
|
||||
return error instanceof FetchError || error.name === 'FetchError';
|
||||
}
|
||||
|
||||
function isSystemError(error: FailedAttemptErrors): boolean {
|
||||
return isFetchError(error) && error.type === 'system';
|
||||
}
|
||||
|
|
|
@ -11,7 +11,8 @@ export function bufferToStream(buffer: Buffer): PassThrough {
|
|||
return stream;
|
||||
}
|
||||
|
||||
export function streamToString(stream: NodeJS.ReadableStream): Promise<string> {
|
||||
export function streamToString(stream: NodeJS.ReadableStream | Buffer): Promise<string> {
|
||||
if (stream instanceof Buffer) return Promise.resolve(stream.toString());
|
||||
return new Promise((resolve, reject) => {
|
||||
const body: string[] = [];
|
||||
stream.on('data', (chunk: string) => body.push(chunk));
|
||||
|
|
Loading…
Reference in a new issue