migrate version check to new client (#71215)

This commit is contained in:
Pierre Gayvallet 2020-07-15 19:14:33 +02:00 committed by GitHub
parent 7f31e3ece4
commit 7a325c9a32
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 77 additions and 45 deletions

View file

@ -17,11 +17,9 @@
* under the License.
*/
import { first } from 'rxjs/operators';
import { MockLegacyClusterClient, MockClusterClient } from './elasticsearch_service.test.mocks';
import { BehaviorSubject } from 'rxjs';
import { first } from 'rxjs/operators';
import { Env } from '../config';
import { getEnvOptions } from '../config/__mocks__/env';
import { CoreContext } from '../core_context';
@ -227,28 +225,34 @@ describe('#setup', () => {
});
it('esNodeVersionCompatibility$ only starts polling when subscribed to', async (done) => {
mockLegacyClusterClientInstance.callAsInternalUser.mockRejectedValue(new Error());
const mockedClient = mockClusterClientInstance.asInternalUser;
mockedClient.nodes.info.mockImplementation(() =>
elasticsearchClientMock.createClientError(new Error())
);
const setupContract = await elasticsearchService.setup(setupDeps);
await delay(10);
expect(mockLegacyClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(0);
expect(mockedClient.nodes.info).toHaveBeenCalledTimes(0);
setupContract.esNodesCompatibility$.subscribe(() => {
expect(mockLegacyClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1);
expect(mockedClient.nodes.info).toHaveBeenCalledTimes(1);
done();
});
});
it('esNodeVersionCompatibility$ stops polling when unsubscribed from', async (done) => {
mockLegacyClusterClientInstance.callAsInternalUser.mockRejectedValue(new Error());
const mockedClient = mockClusterClientInstance.asInternalUser;
mockedClient.nodes.info.mockImplementation(() =>
elasticsearchClientMock.createClientError(new Error())
);
const setupContract = await elasticsearchService.setup(setupDeps);
expect(mockLegacyClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(0);
expect(mockedClient.nodes.info).toHaveBeenCalledTimes(0);
const sub = setupContract.esNodesCompatibility$.subscribe(async () => {
sub.unsubscribe();
await delay(100);
expect(mockLegacyClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1);
expect(mockedClient.nodes.info).toHaveBeenCalledTimes(1);
done();
});
});
@ -353,16 +357,19 @@ describe('#stop', () => {
it('stops pollEsNodeVersions even if there are active subscriptions', async (done) => {
expect.assertions(2);
mockLegacyClusterClientInstance.callAsInternalUser.mockRejectedValue(new Error());
const mockedClient = mockClusterClientInstance.asInternalUser;
mockedClient.nodes.info.mockImplementation(() =>
elasticsearchClientMock.createClientError(new Error())
);
const setupContract = await elasticsearchService.setup(setupDeps);
setupContract.esNodesCompatibility$.subscribe(async () => {
expect(mockLegacyClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1);
expect(mockedClient.nodes.info).toHaveBeenCalledTimes(1);
await elasticsearchService.stop();
await delay(100);
expect(mockLegacyClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1);
expect(mockedClient.nodes.info).toHaveBeenCalledTimes(1);
done();
});
});

View file

@ -78,9 +78,10 @@ export class ElasticsearchService
this.getAuthHeaders = deps.http.getAuthHeaders;
this.legacyClient = this.createLegacyClusterClient('data', config);
this.client = this.createClusterClient('data', config);
const esNodesCompatibility$ = pollEsNodesVersion({
callWithInternalUser: this.legacyClient.callAsInternalUser,
internalClient: this.client.asInternalUser,
log: this.log,
ignoreVersionMismatch: config.ignoreVersionMismatch,
esVersionCheckInterval: config.healthCheckDelay.asMilliseconds(),
@ -109,7 +110,6 @@ export class ElasticsearchService
}
const config = await this.config$.pipe(first()).toPromise();
this.client = this.createClusterClient('data', config);
const createClient = (
type: string,
@ -120,7 +120,7 @@ export class ElasticsearchService
};
return {
client: this.client,
client: this.client!,
createClient,
legacy: {
client: this.legacyClient,
@ -133,7 +133,7 @@ export class ElasticsearchService
this.log.debug('Stopping elasticsearch service');
this.stop$.next();
if (this.client) {
this.client.close();
await this.client.close();
}
if (this.legacyClient) {
this.legacyClient.close();

View file

@ -18,6 +18,7 @@
*/
import { mapNodesVersionCompatibility, pollEsNodesVersion, NodesInfo } from './ensure_es_version';
import { loggingSystemMock } from '../../logging/logging_system.mock';
import { elasticsearchClientMock } from '../client/mocks';
import { take, delay } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { of } from 'rxjs';
@ -27,6 +28,9 @@ const mockLogger = mockLoggerFactory.get('mock logger');
const KIBANA_VERSION = '5.1.0';
const createEsSuccess = elasticsearchClientMock.createClientResponse;
const createEsError = elasticsearchClientMock.createClientError;
function createNodes(...versions: string[]): NodesInfo {
const nodes = {} as any;
versions
@ -111,25 +115,34 @@ describe('mapNodesVersionCompatibility', () => {
});
describe('pollEsNodesVersion', () => {
const callWithInternalUser = jest.fn();
let internalClient: ReturnType<typeof elasticsearchClientMock.createInternalClient>;
const getTestScheduler = () =>
new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
beforeEach(() => {
callWithInternalUser.mockReset();
internalClient = elasticsearchClientMock.createInternalClient();
});
const nodeInfosSuccessOnce = (infos: NodesInfo) => {
internalClient.nodes.info.mockImplementationOnce(() => createEsSuccess(infos));
};
const nodeInfosErrorOnce = (error: any) => {
internalClient.nodes.info.mockImplementationOnce(() => createEsError(error));
};
it('returns iscCompatible=false and keeps polling when a poll request throws', (done) => {
expect.assertions(3);
const expectedCompatibilityResults = [false, false, true];
jest.clearAllMocks();
callWithInternalUser.mockResolvedValueOnce(createNodes('5.1.0', '5.2.0', '5.0.0'));
callWithInternalUser.mockRejectedValueOnce(new Error('mock request error'));
callWithInternalUser.mockResolvedValueOnce(createNodes('5.1.0', '5.2.0', '5.1.1-Beta1'));
nodeInfosSuccessOnce(createNodes('5.1.0', '5.2.0', '5.0.0'));
nodeInfosErrorOnce('mock request error');
nodeInfosSuccessOnce(createNodes('5.1.0', '5.2.0', '5.1.1-Beta1'));
pollEsNodesVersion({
callWithInternalUser,
internalClient,
esVersionCheckInterval: 1,
ignoreVersionMismatch: false,
kibanaVersion: KIBANA_VERSION,
@ -148,9 +161,11 @@ describe('pollEsNodesVersion', () => {
it('returns compatibility results', (done) => {
expect.assertions(1);
const nodes = createNodes('5.1.0', '5.2.0', '5.0.0');
callWithInternalUser.mockResolvedValueOnce(nodes);
nodeInfosSuccessOnce(nodes);
pollEsNodesVersion({
callWithInternalUser,
internalClient,
esVersionCheckInterval: 1,
ignoreVersionMismatch: false,
kibanaVersion: KIBANA_VERSION,
@ -168,15 +183,15 @@ describe('pollEsNodesVersion', () => {
it('only emits if the node versions changed since the previous poll', (done) => {
expect.assertions(4);
callWithInternalUser.mockResolvedValueOnce(createNodes('5.1.0', '5.2.0', '5.0.0')); // emit
callWithInternalUser.mockResolvedValueOnce(createNodes('5.0.0', '5.1.0', '5.2.0')); // ignore, same versions, different ordering
callWithInternalUser.mockResolvedValueOnce(createNodes('5.1.1', '5.2.0', '5.0.0')); // emit
callWithInternalUser.mockResolvedValueOnce(createNodes('5.1.1', '5.1.2', '5.1.3')); // emit
callWithInternalUser.mockResolvedValueOnce(createNodes('5.1.1', '5.1.2', '5.1.3')); // ignore
callWithInternalUser.mockResolvedValueOnce(createNodes('5.0.0', '5.1.0', '5.2.0')); // emit, different from previous version
nodeInfosSuccessOnce(createNodes('5.1.0', '5.2.0', '5.0.0')); // emit
nodeInfosSuccessOnce(createNodes('5.0.0', '5.1.0', '5.2.0')); // ignore, same versions, different ordering
nodeInfosSuccessOnce(createNodes('5.1.1', '5.2.0', '5.0.0')); // emit
nodeInfosSuccessOnce(createNodes('5.1.1', '5.1.2', '5.1.3')); // emit
nodeInfosSuccessOnce(createNodes('5.1.1', '5.1.2', '5.1.3')); // ignore
nodeInfosSuccessOnce(createNodes('5.0.0', '5.1.0', '5.2.0')); // emit, different from previous version
pollEsNodesVersion({
callWithInternalUser,
internalClient,
esVersionCheckInterval: 1,
ignoreVersionMismatch: false,
kibanaVersion: KIBANA_VERSION,
@ -192,14 +207,21 @@ describe('pollEsNodesVersion', () => {
it('starts polling immediately and then every esVersionCheckInterval', () => {
expect.assertions(1);
callWithInternalUser.mockReturnValueOnce([createNodes('5.1.0', '5.2.0', '5.0.0')]);
callWithInternalUser.mockReturnValueOnce([createNodes('5.1.1', '5.2.0', '5.0.0')]);
// @ts-expect-error we need to return an incompatible type to use the testScheduler here
internalClient.nodes.info.mockReturnValueOnce([
{ body: createNodes('5.1.0', '5.2.0', '5.0.0') },
]);
// @ts-expect-error we need to return an incompatible type to use the testScheduler here
internalClient.nodes.info.mockReturnValueOnce([
{ body: createNodes('5.1.1', '5.2.0', '5.0.0') },
]);
getTestScheduler().run(({ expectObservable }) => {
const expected = 'a 99ms (b|)';
const esNodesCompatibility$ = pollEsNodesVersion({
callWithInternalUser,
internalClient,
esVersionCheckInterval: 100,
ignoreVersionMismatch: false,
kibanaVersion: KIBANA_VERSION,
@ -227,15 +249,17 @@ describe('pollEsNodesVersion', () => {
getTestScheduler().run(({ expectObservable }) => {
const expected = '100ms a 99ms (b|)';
callWithInternalUser.mockReturnValueOnce(
of(createNodes('5.1.0', '5.2.0', '5.0.0')).pipe(delay(100))
internalClient.nodes.info.mockReturnValueOnce(
// @ts-expect-error we need to return an incompatible type to use the testScheduler here
of({ body: createNodes('5.1.0', '5.2.0', '5.0.0') }).pipe(delay(100))
);
callWithInternalUser.mockReturnValueOnce(
of(createNodes('5.1.1', '5.2.0', '5.0.0')).pipe(delay(100))
internalClient.nodes.info.mockReturnValueOnce(
// @ts-expect-error we need to return an incompatible type to use the testScheduler here
of({ body: createNodes('5.1.1', '5.2.0', '5.0.0') }).pipe(delay(100))
);
const esNodesCompatibility$ = pollEsNodesVersion({
callWithInternalUser,
internalClient,
esVersionCheckInterval: 10,
ignoreVersionMismatch: false,
kibanaVersion: KIBANA_VERSION,
@ -256,6 +280,6 @@ describe('pollEsNodesVersion', () => {
});
});
expect(callWithInternalUser).toHaveBeenCalledTimes(2);
expect(internalClient.nodes.info).toHaveBeenCalledTimes(2);
});
});

View file

@ -29,10 +29,10 @@ import {
esVersionEqualsKibana,
} from './es_kibana_version_compatability';
import { Logger } from '../../logging';
import { LegacyAPICaller } from '../legacy';
import type { ElasticsearchClient } from '../client';
export interface PollEsNodesVersionOptions {
callWithInternalUser: LegacyAPICaller;
internalClient: ElasticsearchClient;
log: Logger;
kibanaVersion: string;
ignoreVersionMismatch: boolean;
@ -137,7 +137,7 @@ function compareNodes(prev: NodesVersionCompatibility, curr: NodesVersionCompati
}
export const pollEsNodesVersion = ({
callWithInternalUser,
internalClient,
log,
kibanaVersion,
ignoreVersionMismatch,
@ -147,10 +147,11 @@ export const pollEsNodesVersion = ({
return timer(0, healthCheckInterval).pipe(
exhaustMap(() => {
return from(
callWithInternalUser('nodes.info', {
filterPath: ['nodes.*.version', 'nodes.*.http.publish_address', 'nodes.*.ip'],
internalClient.nodes.info<NodesInfo>({
filter_path: ['nodes.*.version', 'nodes.*.http.publish_address', 'nodes.*.ip'],
})
).pipe(
map(({ body }) => body),
catchError((_err) => {
return of({ nodes: {} });
})