[Reporting] Remove legacy elasticsearch client usage from the reporting plugin (#97184)

This commit is contained in:
Michael Dokolin 2021-04-16 10:09:34 +02:00 committed by GitHub
parent 9987e3d73b
commit 15e8ca1161
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 634 additions and 641 deletions

View file

@ -40,8 +40,8 @@ export interface LayoutParams {
export interface ReportDocumentHead {
_id: string;
_index: string;
_seq_no: unknown;
_primary_term: unknown;
_seq_no: number;
_primary_term: number;
}
export interface TaskRunResult {

View file

@ -10,7 +10,6 @@ import * as Rx from 'rxjs';
import { first, map, take } from 'rxjs/operators';
import {
BasePath,
ElasticsearchServiceSetup,
IClusterClient,
KibanaRequest,
PluginInitializerContext,
@ -38,7 +37,6 @@ export interface ReportingInternalSetup {
basePath: Pick<BasePath, 'set'>;
router: ReportingPluginRouter;
features: FeaturesPluginSetup;
elasticsearch: ElasticsearchServiceSetup;
licensing: LicensingPluginSetup;
security?: SecurityPluginSetup;
spaces?: SpacesPluginSetup;
@ -212,11 +210,6 @@ export class ReportingCore {
return this.pluginSetupDeps;
}
// NOTE: Uses the Legacy API
public getElasticsearchService() {
return this.getPluginSetupDeps().elasticsearch;
}
private async getSavedObjectsClient(request: KibanaRequest) {
const { savedObjects } = await this.getPluginStartDeps();
return savedObjects.getScopedClient(request) as SavedObjectsClientContract;

View file

@ -5,8 +5,9 @@
* 2.0.
*/
import type { DeeplyMockedKeys } from '@kbn/utility-types/jest';
import nodeCrypto from '@elastic/node-crypto';
import { ElasticsearchServiceSetup, IUiSettingsClient } from 'kibana/server';
import { ElasticsearchClient, IUiSettingsClient } from 'kibana/server';
import moment from 'moment';
// @ts-ignore
import Puid from 'puid';
@ -50,20 +51,12 @@ describe('CSV Execute Job', function () {
let defaultElasticsearchResponse: any;
let encryptedHeaders: any;
let clusterStub: any;
let configGetStub: any;
let mockEsClient: DeeplyMockedKeys<ElasticsearchClient>;
let mockReportingConfig: ReportingConfig;
let mockReportingCore: ReportingCore;
let callAsCurrentUserStub: any;
let cancellationToken: any;
const mockElasticsearch = {
legacy: {
client: {
asScoped: () => clusterStub,
},
},
};
const mockUiSettingsClient = {
get: sinon.stub(),
};
@ -85,10 +78,10 @@ describe('CSV Execute Job', function () {
mockReportingCore = await createMockReportingCore(mockReportingConfig);
mockReportingCore.getUiSettingsServiceFactory = () =>
Promise.resolve((mockUiSettingsClient as unknown) as IUiSettingsClient);
mockReportingCore.getElasticsearchService = () =>
mockElasticsearch as ElasticsearchServiceSetup;
mockReportingCore.setConfig(mockReportingConfig);
mockEsClient = (await mockReportingCore.getEsClient()).asScoped({} as any)
.asCurrentUser as typeof mockEsClient;
cancellationToken = new CancellationToken();
defaultElasticsearchResponse = {
@ -97,14 +90,9 @@ describe('CSV Execute Job', function () {
},
_scroll_id: 'defaultScrollId',
};
clusterStub = {
callAsCurrentUser() {},
};
callAsCurrentUserStub = sinon
.stub(clusterStub, 'callAsCurrentUser')
.resolves(defaultElasticsearchResponse);
mockEsClient.search.mockResolvedValue({ body: defaultElasticsearchResponse } as any);
mockEsClient.scroll.mockResolvedValue({ body: defaultElasticsearchResponse } as any);
mockUiSettingsClient.get.withArgs(CSV_SEPARATOR_SETTING).returns(',');
mockUiSettingsClient.get.withArgs(CSV_QUOTE_VALUES_SETTING).returns(true);
@ -127,7 +115,7 @@ describe('CSV Execute Job', function () {
});
describe('basic Elasticsearch call behavior', function () {
it('should decrypt encrypted headers and pass to callAsCurrentUser', async function () {
it('should decrypt encrypted headers and pass to the elasticsearch client', async function () {
const runTask = runTaskFnFactory(mockReportingCore, mockLogger);
await runTask(
'job456',
@ -138,8 +126,7 @@ describe('CSV Execute Job', function () {
}),
cancellationToken
);
expect(callAsCurrentUserStub.called).toBe(true);
expect(callAsCurrentUserStub.firstCall.args[0]).toEqual('search');
expect(mockEsClient.search).toHaveBeenCalled();
});
it('should pass the index and body to execute the initial search', async function () {
@ -160,21 +147,22 @@ describe('CSV Execute Job', function () {
await runTask('job777', job, cancellationToken);
const searchCall = callAsCurrentUserStub.firstCall;
expect(searchCall.args[0]).toBe('search');
expect(searchCall.args[1].index).toBe(index);
expect(searchCall.args[1].body).toBe(body);
expect(mockEsClient.search).toHaveBeenCalledWith(expect.objectContaining({ body, index }));
});
it('should pass the scrollId from the initial search to the subsequent scroll', async function () {
const scrollId = getRandomScrollId();
callAsCurrentUserStub.onFirstCall().resolves({
hits: {
hits: [{}],
mockEsClient.search.mockResolvedValueOnce({
body: {
hits: {
hits: [{}],
},
_scroll_id: scrollId,
},
_scroll_id: scrollId,
});
callAsCurrentUserStub.onSecondCall().resolves(defaultElasticsearchResponse);
} as any);
mockEsClient.scroll.mockResolvedValue({ body: defaultElasticsearchResponse } as any);
const runTask = runTaskFnFactory(mockReportingCore, mockLogger);
await runTask(
'job456',
@ -186,10 +174,9 @@ describe('CSV Execute Job', function () {
cancellationToken
);
const scrollCall = callAsCurrentUserStub.secondCall;
expect(scrollCall.args[0]).toBe('scroll');
expect(scrollCall.args[1].scrollId).toBe(scrollId);
expect(mockEsClient.scroll).toHaveBeenCalledWith(
expect.objectContaining({ scroll_id: scrollId })
);
});
it('should not execute scroll if there are no hits from the search', async function () {
@ -204,28 +191,27 @@ describe('CSV Execute Job', function () {
cancellationToken
);
expect(callAsCurrentUserStub.callCount).toBe(2);
const searchCall = callAsCurrentUserStub.firstCall;
expect(searchCall.args[0]).toBe('search');
const clearScrollCall = callAsCurrentUserStub.secondCall;
expect(clearScrollCall.args[0]).toBe('clearScroll');
expect(mockEsClient.search).toHaveBeenCalled();
expect(mockEsClient.clearScroll).toHaveBeenCalled();
});
it('should stop executing scroll if there are no hits', async function () {
callAsCurrentUserStub.onFirstCall().resolves({
hits: {
hits: [{}],
mockEsClient.search.mockResolvedValueOnce({
body: {
hits: {
hits: [{}],
},
_scroll_id: 'scrollId',
},
_scroll_id: 'scrollId',
});
callAsCurrentUserStub.onSecondCall().resolves({
hits: {
hits: [],
} as any);
mockEsClient.scroll.mockResolvedValueOnce({
body: {
hits: {
hits: [],
},
_scroll_id: 'scrollId',
},
_scroll_id: 'scrollId',
});
} as any);
const runTask = runTaskFnFactory(mockReportingCore, mockLogger);
await runTask(
@ -238,33 +224,30 @@ describe('CSV Execute Job', function () {
cancellationToken
);
expect(callAsCurrentUserStub.callCount).toBe(3);
const searchCall = callAsCurrentUserStub.firstCall;
expect(searchCall.args[0]).toBe('search');
const scrollCall = callAsCurrentUserStub.secondCall;
expect(scrollCall.args[0]).toBe('scroll');
const clearScroll = callAsCurrentUserStub.thirdCall;
expect(clearScroll.args[0]).toBe('clearScroll');
expect(mockEsClient.search).toHaveBeenCalled();
expect(mockEsClient.scroll).toHaveBeenCalled();
expect(mockEsClient.clearScroll).toHaveBeenCalled();
});
it('should call clearScroll with scrollId when there are no more hits', async function () {
const lastScrollId = getRandomScrollId();
callAsCurrentUserStub.onFirstCall().resolves({
hits: {
hits: [{}],
mockEsClient.search.mockResolvedValueOnce({
body: {
hits: {
hits: [{}],
},
_scroll_id: 'scrollId',
},
_scroll_id: 'scrollId',
});
} as any);
callAsCurrentUserStub.onSecondCall().resolves({
hits: {
hits: [],
mockEsClient.scroll.mockResolvedValueOnce({
body: {
hits: {
hits: [],
},
_scroll_id: lastScrollId,
},
_scroll_id: lastScrollId,
});
} as any);
const runTask = runTaskFnFactory(mockReportingCore, mockLogger);
await runTask(
@ -277,26 +260,28 @@ describe('CSV Execute Job', function () {
cancellationToken
);
const lastCall = callAsCurrentUserStub.getCall(callAsCurrentUserStub.callCount - 1);
expect(lastCall.args[0]).toBe('clearScroll');
expect(lastCall.args[1].scrollId).toEqual([lastScrollId]);
expect(mockEsClient.clearScroll).toHaveBeenCalledWith(
expect.objectContaining({ scroll_id: lastScrollId })
);
});
it('calls clearScroll when there is an error iterating the hits', async function () {
const lastScrollId = getRandomScrollId();
callAsCurrentUserStub.onFirstCall().resolves({
hits: {
hits: [
{
_source: {
one: 'foo',
two: 'bar',
mockEsClient.search.mockResolvedValueOnce({
body: {
hits: {
hits: [
{
_source: {
one: 'foo',
two: 'bar',
},
},
},
],
],
},
_scroll_id: lastScrollId,
},
_scroll_id: lastScrollId,
});
} as any);
const runTask = runTaskFnFactory(mockReportingCore, mockLogger);
const jobParams = getBasePayload({
@ -309,21 +294,23 @@ describe('CSV Execute Job', function () {
`[TypeError: Cannot read property 'indexOf' of undefined]`
);
const lastCall = callAsCurrentUserStub.getCall(callAsCurrentUserStub.callCount - 1);
expect(lastCall.args[0]).toBe('clearScroll');
expect(lastCall.args[1].scrollId).toEqual([lastScrollId]);
expect(mockEsClient.clearScroll).toHaveBeenCalledWith(
expect.objectContaining({ scroll_id: lastScrollId })
);
});
});
describe('Warning when cells have formulas', () => {
it('returns `csv_contains_formulas` when cells contain formulas', async function () {
configGetStub.withArgs('csv', 'checkForFormulas').returns(true);
callAsCurrentUserStub.onFirstCall().returns({
hits: {
hits: [{ _source: { one: '=SUM(A1:A2)', two: 'bar' } }],
mockEsClient.search.mockResolvedValueOnce({
body: {
hits: {
hits: [{ _source: { one: '=SUM(A1:A2)', two: 'bar' } }],
},
_scroll_id: 'scrollId',
},
_scroll_id: 'scrollId',
});
} as any);
const runTask = runTaskFnFactory(mockReportingCore, mockLogger);
const jobParams = getBasePayload({
@ -343,12 +330,14 @@ describe('CSV Execute Job', function () {
it('returns warnings when headings contain formulas', async function () {
configGetStub.withArgs('csv', 'checkForFormulas').returns(true);
callAsCurrentUserStub.onFirstCall().returns({
hits: {
hits: [{ _source: { '=SUM(A1:A2)': 'foo', two: 'bar' } }],
mockEsClient.search.mockResolvedValueOnce({
body: {
hits: {
hits: [{ _source: { '=SUM(A1:A2)': 'foo', two: 'bar' } }],
},
_scroll_id: 'scrollId',
},
_scroll_id: 'scrollId',
});
} as any);
const runTask = runTaskFnFactory(mockReportingCore, mockLogger);
const jobParams = getBasePayload({
@ -369,12 +358,14 @@ describe('CSV Execute Job', function () {
it('returns no warnings when cells have no formulas', async function () {
configGetStub.withArgs('csv', 'checkForFormulas').returns(true);
configGetStub.withArgs('csv', 'escapeFormulaValues').returns(false);
callAsCurrentUserStub.onFirstCall().returns({
hits: {
hits: [{ _source: { one: 'foo', two: 'bar' } }],
mockEsClient.search.mockResolvedValueOnce({
body: {
hits: {
hits: [{ _source: { one: 'foo', two: 'bar' } }],
},
_scroll_id: 'scrollId',
},
_scroll_id: 'scrollId',
});
} as any);
const runTask = runTaskFnFactory(mockReportingCore, mockLogger);
const jobParams = getBasePayload({
@ -395,12 +386,14 @@ describe('CSV Execute Job', function () {
it('returns no warnings when cells have formulas but are escaped', async function () {
configGetStub.withArgs('csv', 'checkForFormulas').returns(true);
configGetStub.withArgs('csv', 'escapeFormulaValues').returns(true);
callAsCurrentUserStub.onFirstCall().returns({
hits: {
hits: [{ _source: { '=SUM(A1:A2)': 'foo', two: 'bar' } }],
mockEsClient.search.mockResolvedValueOnce({
body: {
hits: {
hits: [{ _source: { '=SUM(A1:A2)': 'foo', two: 'bar' } }],
},
_scroll_id: 'scrollId',
},
_scroll_id: 'scrollId',
});
} as any);
const runTask = runTaskFnFactory(mockReportingCore, mockLogger);
const jobParams = getBasePayload({
@ -421,12 +414,14 @@ describe('CSV Execute Job', function () {
it('returns no warnings when configured not to', async () => {
configGetStub.withArgs('csv', 'checkForFormulas').returns(false);
callAsCurrentUserStub.onFirstCall().returns({
hits: {
hits: [{ _source: { one: '=SUM(A1:A2)', two: 'bar' } }],
mockEsClient.search.mockResolvedValueOnce({
body: {
hits: {
hits: [{ _source: { one: '=SUM(A1:A2)', two: 'bar' } }],
},
_scroll_id: 'scrollId',
},
_scroll_id: 'scrollId',
});
} as any);
const runTask = runTaskFnFactory(mockReportingCore, mockLogger);
const jobParams = getBasePayload({
@ -448,12 +443,14 @@ describe('CSV Execute Job', function () {
describe('Byte order mark encoding', () => {
it('encodes CSVs with BOM', async () => {
configGetStub.withArgs('csv', 'useByteOrderMarkEncoding').returns(true);
callAsCurrentUserStub.onFirstCall().returns({
hits: {
hits: [{ _source: { one: 'one', two: 'bar' } }],
mockEsClient.search.mockResolvedValueOnce({
body: {
hits: {
hits: [{ _source: { one: 'one', two: 'bar' } }],
},
_scroll_id: 'scrollId',
},
_scroll_id: 'scrollId',
});
} as any);
const runTask = runTaskFnFactory(mockReportingCore, mockLogger);
const jobParams = getBasePayload({
@ -469,12 +466,14 @@ describe('CSV Execute Job', function () {
it('encodes CSVs without BOM', async () => {
configGetStub.withArgs('csv', 'useByteOrderMarkEncoding').returns(false);
callAsCurrentUserStub.onFirstCall().returns({
hits: {
hits: [{ _source: { one: 'one', two: 'bar' } }],
mockEsClient.search.mockResolvedValueOnce({
body: {
hits: {
hits: [{ _source: { one: 'one', two: 'bar' } }],
},
_scroll_id: 'scrollId',
},
_scroll_id: 'scrollId',
});
} as any);
const runTask = runTaskFnFactory(mockReportingCore, mockLogger);
const jobParams = getBasePayload({
@ -492,12 +491,14 @@ describe('CSV Execute Job', function () {
describe('Escaping cells with formulas', () => {
it('escapes values with formulas', async () => {
configGetStub.withArgs('csv', 'escapeFormulaValues').returns(true);
callAsCurrentUserStub.onFirstCall().returns({
hits: {
hits: [{ _source: { one: `=cmd|' /C calc'!A0`, two: 'bar' } }],
mockEsClient.search.mockResolvedValueOnce({
body: {
hits: {
hits: [{ _source: { one: `=cmd|' /C calc'!A0`, two: 'bar' } }],
},
_scroll_id: 'scrollId',
},
_scroll_id: 'scrollId',
});
} as any);
const runTask = runTaskFnFactory(mockReportingCore, mockLogger);
const jobParams = getBasePayload({
@ -513,12 +514,14 @@ describe('CSV Execute Job', function () {
it('does not escapes values with formulas', async () => {
configGetStub.withArgs('csv', 'escapeFormulaValues').returns(false);
callAsCurrentUserStub.onFirstCall().returns({
hits: {
hits: [{ _source: { one: `=cmd|' /C calc'!A0`, two: 'bar' } }],
mockEsClient.search.mockResolvedValueOnce({
body: {
hits: {
hits: [{ _source: { one: `=cmd|' /C calc'!A0`, two: 'bar' } }],
},
_scroll_id: 'scrollId',
},
_scroll_id: 'scrollId',
});
} as any);
const runTask = runTaskFnFactory(mockReportingCore, mockLogger);
const jobParams = getBasePayload({
@ -535,7 +538,7 @@ describe('CSV Execute Job', function () {
describe('Elasticsearch call errors', function () {
it('should reject Promise if search call errors out', async function () {
callAsCurrentUserStub.rejects(new Error());
mockEsClient.search.mockRejectedValueOnce(new Error());
const runTask = runTaskFnFactory(mockReportingCore, mockLogger);
const jobParams = getBasePayload({
headers: encryptedHeaders,
@ -548,13 +551,15 @@ describe('CSV Execute Job', function () {
});
it('should reject Promise if scroll call errors out', async function () {
callAsCurrentUserStub.onFirstCall().resolves({
hits: {
hits: [{}],
mockEsClient.search.mockResolvedValueOnce({
body: {
hits: {
hits: [{}],
},
_scroll_id: 'scrollId',
},
_scroll_id: 'scrollId',
});
callAsCurrentUserStub.onSecondCall().rejects(new Error());
} as any);
mockEsClient.scroll.mockRejectedValueOnce(new Error());
const runTask = runTaskFnFactory(mockReportingCore, mockLogger);
const jobParams = getBasePayload({
headers: encryptedHeaders,
@ -569,12 +574,14 @@ describe('CSV Execute Job', function () {
describe('invalid responses', function () {
it('should reject Promise if search returns hits but no _scroll_id', async function () {
callAsCurrentUserStub.resolves({
hits: {
hits: [{}],
mockEsClient.search.mockResolvedValueOnce({
body: {
hits: {
hits: [{}],
},
_scroll_id: undefined,
},
_scroll_id: undefined,
});
} as any);
const runTask = runTaskFnFactory(mockReportingCore, mockLogger);
const jobParams = getBasePayload({
@ -588,12 +595,14 @@ describe('CSV Execute Job', function () {
});
it('should reject Promise if search returns no hits and no _scroll_id', async function () {
callAsCurrentUserStub.resolves({
hits: {
hits: [],
mockEsClient.search.mockResolvedValueOnce({
body: {
hits: {
hits: [],
},
_scroll_id: undefined,
},
_scroll_id: undefined,
});
} as any);
const runTask = runTaskFnFactory(mockReportingCore, mockLogger);
const jobParams = getBasePayload({
@ -607,19 +616,23 @@ describe('CSV Execute Job', function () {
});
it('should reject Promise if scroll returns hits but no _scroll_id', async function () {
callAsCurrentUserStub.onFirstCall().resolves({
hits: {
hits: [{}],
mockEsClient.search.mockResolvedValueOnce({
body: {
hits: {
hits: [{}],
},
_scroll_id: 'scrollId',
},
_scroll_id: 'scrollId',
});
} as any);
callAsCurrentUserStub.onSecondCall().resolves({
hits: {
hits: [{}],
mockEsClient.scroll.mockResolvedValueOnce({
body: {
hits: {
hits: [{}],
},
_scroll_id: undefined,
},
_scroll_id: undefined,
});
} as any);
const runTask = runTaskFnFactory(mockReportingCore, mockLogger);
const jobParams = getBasePayload({
@ -633,19 +646,23 @@ describe('CSV Execute Job', function () {
});
it('should reject Promise if scroll returns no hits and no _scroll_id', async function () {
callAsCurrentUserStub.onFirstCall().resolves({
hits: {
hits: [{}],
mockEsClient.search.mockResolvedValueOnce({
body: {
hits: {
hits: [{}],
},
_scroll_id: 'scrollId',
},
_scroll_id: 'scrollId',
});
} as any);
callAsCurrentUserStub.onSecondCall().resolves({
hits: {
hits: [],
mockEsClient.scroll.mockResolvedValueOnce({
body: {
hits: {
hits: [],
},
_scroll_id: undefined,
},
_scroll_id: undefined,
});
} as any);
const runTask = runTaskFnFactory(mockReportingCore, mockLogger);
const jobParams = getBasePayload({
@ -663,21 +680,20 @@ describe('CSV Execute Job', function () {
const scrollId = getRandomScrollId();
beforeEach(function () {
// We have to "re-stub" the callAsCurrentUser stub here so that we can use the fakeFunction
// that delays the Promise resolution so we have a chance to call cancellationToken.cancel().
// Otherwise, we get into an endless loop, and don't have a chance to call cancel
callAsCurrentUserStub.restore();
callAsCurrentUserStub = sinon
.stub(clusterStub, 'callAsCurrentUser')
.callsFake(async function () {
await delay(1);
return {
const searchStub = async () => {
await delay(1);
return {
body: {
hits: {
hits: [{}],
},
_scroll_id: scrollId,
};
});
},
};
};
mockEsClient.search.mockImplementation(searchStub as typeof mockEsClient.search);
mockEsClient.scroll.mockImplementation(searchStub as typeof mockEsClient.scroll);
});
it('should stop calling Elasticsearch when cancellationToken.cancel is called', async function () {
@ -693,10 +709,15 @@ describe('CSV Execute Job', function () {
);
await delay(250);
const callCount = callAsCurrentUserStub.callCount;
expect(mockEsClient.search).toHaveBeenCalled();
expect(mockEsClient.scroll).toHaveBeenCalled();
expect(mockEsClient.clearScroll).not.toHaveBeenCalled();
cancellationToken.cancel();
await delay(250);
expect(callAsCurrentUserStub.callCount).toBe(callCount + 1); // last call is to clear the scroll
expect(mockEsClient.clearScroll).toHaveBeenCalled();
});
it(`shouldn't call clearScroll if it never got a scrollId`, async function () {
@ -712,9 +733,7 @@ describe('CSV Execute Job', function () {
);
cancellationToken.cancel();
for (let i = 0; i < callAsCurrentUserStub.callCount; ++i) {
expect(callAsCurrentUserStub.getCall(i).args[1]).not.toBe('clearScroll'); // dead code?
}
expect(mockEsClient.clearScroll).not.toHaveBeenCalled();
});
it('should call clearScroll if it got a scrollId', async function () {
@ -732,9 +751,11 @@ describe('CSV Execute Job', function () {
cancellationToken.cancel();
await delay(100);
const lastCall = callAsCurrentUserStub.getCall(callAsCurrentUserStub.callCount - 1);
expect(lastCall.args[0]).toBe('clearScroll');
expect(lastCall.args[1].scrollId).toEqual([scrollId]);
expect(mockEsClient.clearScroll).toHaveBeenCalledWith(
expect.objectContaining({
scroll_id: scrollId,
})
);
});
});
@ -788,12 +809,14 @@ describe('CSV Execute Job', function () {
it('should write column headers to output, when there are results', async function () {
const runTask = runTaskFnFactory(mockReportingCore, mockLogger);
callAsCurrentUserStub.onFirstCall().resolves({
hits: {
hits: [{ one: '1', two: '2' }],
mockEsClient.search.mockResolvedValueOnce({
body: {
hits: {
hits: [{ one: '1', two: '2' }],
},
_scroll_id: 'scrollId',
},
_scroll_id: 'scrollId',
});
} as any);
const jobParams = getBasePayload({
headers: encryptedHeaders,
@ -809,12 +832,14 @@ describe('CSV Execute Job', function () {
it('should use comma separated values of non-nested fields from _source', async function () {
const runTask = runTaskFnFactory(mockReportingCore, mockLogger);
callAsCurrentUserStub.onFirstCall().resolves({
hits: {
hits: [{ _source: { one: 'foo', two: 'bar' } }],
mockEsClient.search.mockResolvedValueOnce({
body: {
hits: {
hits: [{ _source: { one: 'foo', two: 'bar' } }],
},
_scroll_id: 'scrollId',
},
_scroll_id: 'scrollId',
});
} as any);
const jobParams = getBasePayload({
headers: encryptedHeaders,
@ -831,18 +856,22 @@ describe('CSV Execute Job', function () {
it('should concatenate the hits from multiple responses', async function () {
const runTask = runTaskFnFactory(mockReportingCore, mockLogger);
callAsCurrentUserStub.onFirstCall().resolves({
hits: {
hits: [{ _source: { one: 'foo', two: 'bar' } }],
mockEsClient.search.mockResolvedValueOnce({
body: {
hits: {
hits: [{ _source: { one: 'foo', two: 'bar' } }],
},
_scroll_id: 'scrollId',
},
_scroll_id: 'scrollId',
});
callAsCurrentUserStub.onSecondCall().resolves({
hits: {
hits: [{ _source: { one: 'baz', two: 'qux' } }],
} as any);
mockEsClient.scroll.mockResolvedValueOnce({
body: {
hits: {
hits: [{ _source: { one: 'baz', two: 'qux' } }],
},
_scroll_id: 'scrollId',
},
_scroll_id: 'scrollId',
});
} as any);
const jobParams = getBasePayload({
headers: encryptedHeaders,
@ -860,12 +889,14 @@ describe('CSV Execute Job', function () {
it('should use field formatters to format fields', async function () {
const runTask = runTaskFnFactory(mockReportingCore, mockLogger);
callAsCurrentUserStub.onFirstCall().resolves({
hits: {
hits: [{ _source: { one: 'foo', two: 'bar' } }],
mockEsClient.search.mockResolvedValueOnce({
body: {
hits: {
hits: [{ _source: { one: 'foo', two: 'bar' } }],
},
_scroll_id: 'scrollId',
},
_scroll_id: 'scrollId',
});
} as any);
const jobParams = getBasePayload({
headers: encryptedHeaders,
@ -962,12 +993,14 @@ describe('CSV Execute Job', function () {
beforeEach(async function () {
configGetStub.withArgs('csv', 'maxSizeBytes').returns(9);
callAsCurrentUserStub.onFirstCall().returns({
hits: {
hits: [{ _source: { one: 'foo', two: 'bar' } }],
mockEsClient.search.mockResolvedValueOnce({
body: {
hits: {
hits: [{ _source: { one: 'foo', two: 'bar' } }],
},
_scroll_id: 'scrollId',
},
_scroll_id: 'scrollId',
});
} as any);
const runTask = runTaskFnFactory(mockReportingCore, mockLogger);
const jobParams = getBasePayload({
@ -1002,12 +1035,14 @@ describe('CSV Execute Job', function () {
Promise.resolve((mockUiSettingsClient as unknown) as IUiSettingsClient);
configGetStub.withArgs('csv', 'maxSizeBytes').returns(18);
callAsCurrentUserStub.onFirstCall().returns({
hits: {
hits: [{ _source: { one: 'foo', two: 'bar' } }],
mockEsClient.search.mockResolvedValueOnce({
body: {
hits: {
hits: [{ _source: { one: 'foo', two: 'bar' } }],
},
_scroll_id: 'scrollId',
},
_scroll_id: 'scrollId',
});
} as any);
const runTask = runTaskFnFactory(mockReportingCore, mockLogger);
const jobParams = getBasePayload({
@ -1039,12 +1074,14 @@ describe('CSV Execute Job', function () {
const scrollDuration = 'test';
configGetStub.withArgs('csv', 'scroll').returns({ duration: scrollDuration });
callAsCurrentUserStub.onFirstCall().returns({
hits: {
hits: [{}],
mockEsClient.search.mockResolvedValueOnce({
body: {
hits: {
hits: [{}],
},
_scroll_id: 'scrollId',
},
_scroll_id: 'scrollId',
});
} as any);
const runTask = runTaskFnFactory(mockReportingCore, mockLogger);
const jobParams = getBasePayload({
@ -1056,21 +1093,23 @@ describe('CSV Execute Job', function () {
await runTask('job123', jobParams, cancellationToken);
const searchCall = callAsCurrentUserStub.firstCall;
expect(searchCall.args[0]).toBe('search');
expect(searchCall.args[1].scroll).toBe(scrollDuration);
expect(mockEsClient.search).toHaveBeenCalledWith(
expect.objectContaining({ scroll: scrollDuration })
);
});
it('passes scroll size to initial search call', async function () {
const scrollSize = 100;
configGetStub.withArgs('csv', 'scroll').returns({ size: scrollSize });
callAsCurrentUserStub.onFirstCall().resolves({
hits: {
hits: [{}],
mockEsClient.search.mockResolvedValueOnce({
body: {
hits: {
hits: [{}],
},
_scroll_id: 'scrollId',
},
_scroll_id: 'scrollId',
});
} as any);
const runTask = runTaskFnFactory(mockReportingCore, mockLogger);
const jobParams = getBasePayload({
@ -1082,21 +1121,23 @@ describe('CSV Execute Job', function () {
await runTask('job123', jobParams, cancellationToken);
const searchCall = callAsCurrentUserStub.firstCall;
expect(searchCall.args[0]).toBe('search');
expect(searchCall.args[1].size).toBe(scrollSize);
expect(mockEsClient.search).toHaveBeenCalledWith(
expect.objectContaining({ size: scrollSize })
);
});
it('passes scroll duration to subsequent scroll call', async function () {
const scrollDuration = 'test';
configGetStub.withArgs('csv', 'scroll').returns({ duration: scrollDuration });
callAsCurrentUserStub.onFirstCall().resolves({
hits: {
hits: [{}],
mockEsClient.search.mockResolvedValueOnce({
body: {
hits: {
hits: [{}],
},
_scroll_id: 'scrollId',
},
_scroll_id: 'scrollId',
});
} as any);
const runTask = runTaskFnFactory(mockReportingCore, mockLogger);
const jobParams = getBasePayload({
@ -1108,9 +1149,9 @@ describe('CSV Execute Job', function () {
await runTask('job123', jobParams, cancellationToken);
const scrollCall = callAsCurrentUserStub.secondCall;
expect(scrollCall.args[0]).toBe('scroll');
expect(scrollCall.args[1].scroll).toBe(scrollDuration);
expect(mockEsClient.scroll).toHaveBeenCalledWith(
expect.objectContaining({ scroll: scrollDuration })
);
});
});
});

View file

@ -17,7 +17,7 @@ export const runTaskFnFactory: RunTaskFnFactory<
const config = reporting.getConfig();
return async function runTask(jobId, job, cancellationToken) {
const elasticsearch = reporting.getElasticsearchService();
const elasticsearch = await reporting.getEsClient();
const logger = parentLogger.clone([jobId]);
const generateCsv = createGenerateCsv(logger);
@ -25,16 +25,13 @@ export const runTaskFnFactory: RunTaskFnFactory<
const headers = await decryptJobHeaders(encryptionKey, job.headers, logger);
const fakeRequest = reporting.getFakeRequest({ headers }, job.spaceId, logger);
const uiSettingsClient = await reporting.getUiSettingsClient(fakeRequest, logger);
const { callAsCurrentUser } = elasticsearch.legacy.client.asScoped(fakeRequest);
const callEndpoint = (endpoint: string, clientParams = {}, options = {}) =>
callAsCurrentUser(endpoint, clientParams, options);
const { asCurrentUser: elasticsearchClient } = elasticsearch.asScoped(fakeRequest);
const { content, maxSizeReached, size, csvContainsFormulas, warnings } = await generateCsv(
job,
config,
uiSettingsClient,
callEndpoint,
elasticsearchClient,
cancellationToken
);

View file

@ -7,16 +7,18 @@
import expect from '@kbn/expect';
import sinon from 'sinon';
import { elasticsearchServiceMock } from 'src/core/server/mocks';
import { CancellationToken } from '../../../../common';
import { createMockLevelLogger } from '../../../test_helpers/create_mock_levellogger';
import { ScrollConfig } from '../../../types';
import { createHitIterator } from './hit_iterator';
const { asInternalUser: mockEsClient } = elasticsearchServiceMock.createClusterClient();
const mockLogger = createMockLevelLogger();
const debugLogStub = sinon.stub(mockLogger, 'debug');
const warnLogStub = sinon.stub(mockLogger, 'warn');
const errorLogStub = sinon.stub(mockLogger, 'error');
const mockCallEndpoint = sinon.stub();
const mockSearchRequest = {};
const mockConfig: ScrollConfig = { duration: '2s', size: 123 };
let realCancellationToken = new CancellationToken();
@ -27,10 +29,30 @@ describe('hitIterator', function () {
debugLogStub.resetHistory();
warnLogStub.resetHistory();
errorLogStub.resetHistory();
mockCallEndpoint.resetHistory();
mockCallEndpoint.resetBehavior();
mockCallEndpoint.resolves({ _scroll_id: '123blah', hits: { hits: ['you found me'] } });
mockCallEndpoint.onCall(11).resolves({ _scroll_id: '123blah', hits: {} });
mockEsClient.search.mockClear();
mockEsClient.search.mockResolvedValue({
body: {
_scroll_id: '123blah',
hits: { hits: ['you found me'] },
},
} as any);
mockEsClient.scroll.mockClear();
for (let i = 0; i < 10; i++) {
mockEsClient.scroll.mockResolvedValueOnce({
body: {
_scroll_id: '123blah',
hits: { hits: ['you found me'] },
},
} as any);
}
mockEsClient.scroll.mockResolvedValueOnce({
body: {
_scroll_id: '123blah',
hits: {},
},
} as any);
isCancelledStub = sinon.stub(realCancellationToken, 'isCancelled');
isCancelledStub.returns(false);
@ -45,7 +67,7 @@ describe('hitIterator', function () {
const hitIterator = createHitIterator(mockLogger);
const iterator = hitIterator(
mockConfig,
mockCallEndpoint,
mockEsClient,
mockSearchRequest,
realCancellationToken
);
@ -58,7 +80,7 @@ describe('hitIterator', function () {
expect(hit).to.be('you found me');
}
expect(mockCallEndpoint.callCount).to.be(13);
expect(mockEsClient.scroll.mock.calls.length).to.be(11);
expect(debugLogStub.callCount).to.be(13);
expect(warnLogStub.callCount).to.be(0);
expect(errorLogStub.callCount).to.be(0);
@ -73,7 +95,7 @@ describe('hitIterator', function () {
const hitIterator = createHitIterator(mockLogger);
const iterator = hitIterator(
mockConfig,
mockCallEndpoint,
mockEsClient,
mockSearchRequest,
realCancellationToken
);
@ -86,7 +108,7 @@ describe('hitIterator', function () {
expect(hit).to.be('you found me');
}
expect(mockCallEndpoint.callCount).to.be(3);
expect(mockEsClient.scroll.mock.calls.length).to.be(1);
expect(debugLogStub.callCount).to.be(3);
expect(warnLogStub.callCount).to.be(1);
expect(errorLogStub.callCount).to.be(0);
@ -98,13 +120,20 @@ describe('hitIterator', function () {
it('handles time out', async () => {
// Setup
mockCallEndpoint.onCall(2).resolves({ status: 404 });
mockEsClient.scroll.mockReset();
mockEsClient.scroll.mockResolvedValueOnce({
body: {
_scroll_id: '123blah',
hits: { hits: ['you found me'] },
},
} as any);
mockEsClient.scroll.mockResolvedValueOnce({ body: { status: 404 } } as any);
// Begin
const hitIterator = createHitIterator(mockLogger);
const iterator = hitIterator(
mockConfig,
mockCallEndpoint,
mockEsClient,
mockSearchRequest,
realCancellationToken
);
@ -125,7 +154,7 @@ describe('hitIterator', function () {
errorThrown = true;
}
expect(mockCallEndpoint.callCount).to.be(4);
expect(mockEsClient.scroll.mock.calls.length).to.be(2);
expect(debugLogStub.callCount).to.be(4);
expect(warnLogStub.callCount).to.be(0);
expect(errorLogStub.callCount).to.be(1);
@ -134,13 +163,13 @@ describe('hitIterator', function () {
it('handles scroll id could not be cleared', async () => {
// Setup
mockCallEndpoint.withArgs('clearScroll').rejects({ status: 404 });
mockEsClient.clearScroll.mockRejectedValueOnce({ status: 404 });
// Begin
const hitIterator = createHitIterator(mockLogger);
const iterator = hitIterator(
mockConfig,
mockCallEndpoint,
mockEsClient,
mockSearchRequest,
realCancellationToken
);
@ -153,7 +182,7 @@ describe('hitIterator', function () {
expect(hit).to.be('you found me');
}
expect(mockCallEndpoint.callCount).to.be(13);
expect(mockEsClient.scroll.mock.calls.length).to.be(11);
expect(warnLogStub.callCount).to.be(1);
expect(errorLogStub.callCount).to.be(1);
});

View file

@ -5,54 +5,55 @@
* 2.0.
*/
import { UnwrapPromise } from '@kbn/utility-types';
import { i18n } from '@kbn/i18n';
import { SearchParams, SearchResponse } from 'elasticsearch';
import { ElasticsearchClient } from 'src/core/server';
import { CancellationToken } from '../../../../common';
import { LevelLogger } from '../../../lib';
import { ScrollConfig } from '../../../types';
export type EndpointCaller = (method: string, params: object) => Promise<SearchResponse<any>>;
type SearchResponse = UnwrapPromise<ReturnType<ElasticsearchClient['search']>>;
type SearchRequest = Required<Parameters<ElasticsearchClient['search']>>[0];
function parseResponse(request: SearchResponse<any>) {
const response = request;
if (!response || !response._scroll_id) {
function parseResponse(response: SearchResponse) {
if (!response?.body._scroll_id) {
throw new Error(
i18n.translate('xpack.reporting.exportTypes.csv.hitIterator.expectedScrollIdErrorMessage', {
defaultMessage: 'Expected {scrollId} in the following Elasticsearch response: {response}',
values: { response: JSON.stringify(response), scrollId: '_scroll_id' },
values: { response: JSON.stringify(response?.body), scrollId: '_scroll_id' },
})
);
}
if (!response.hits) {
if (!response?.body.hits) {
throw new Error(
i18n.translate('xpack.reporting.exportTypes.csv.hitIterator.expectedHitsErrorMessage', {
defaultMessage: 'Expected {hits} in the following Elasticsearch response: {response}',
values: { response: JSON.stringify(response), hits: 'hits' },
values: { response: JSON.stringify(response?.body), hits: 'hits' },
})
);
}
return {
scrollId: response._scroll_id,
hits: response.hits.hits,
scrollId: response.body._scroll_id,
hits: response.body.hits.hits,
};
}
export function createHitIterator(logger: LevelLogger) {
return async function* hitIterator(
scrollSettings: ScrollConfig,
callEndpoint: EndpointCaller,
searchRequest: SearchParams,
elasticsearchClient: ElasticsearchClient,
searchRequest: SearchRequest,
cancellationToken: CancellationToken
) {
logger.debug('executing search request');
async function search(index: string | boolean | string[] | undefined, body: object) {
async function search(index: SearchRequest['index'], body: SearchRequest['body']) {
return parseResponse(
await callEndpoint('search', {
ignore_unavailable: true, // ignores if the index pattern contains any aliases that point to closed indices
await elasticsearchClient.search({
index,
body,
ignore_unavailable: true, // ignores if the index pattern contains any aliases that point to closed indices
scroll: scrollSettings.duration,
size: scrollSettings.size,
})
@ -62,8 +63,8 @@ export function createHitIterator(logger: LevelLogger) {
async function scroll(scrollId: string | undefined) {
logger.debug('executing scroll request');
return parseResponse(
await callEndpoint('scroll', {
scrollId,
await elasticsearchClient.scroll({
scroll_id: scrollId,
scroll: scrollSettings.duration,
})
);
@ -72,8 +73,8 @@ export function createHitIterator(logger: LevelLogger) {
async function clearScroll(scrollId: string | undefined) {
logger.debug('executing clearScroll request');
try {
await callEndpoint('clearScroll', {
scrollId: [scrollId],
await elasticsearchClient.clearScroll({
scroll_id: scrollId,
});
} catch (err) {
// Do not throw the error, as the job can still be completed successfully

View file

@ -6,7 +6,7 @@
*/
import { i18n } from '@kbn/i18n';
import { IUiSettingsClient } from 'src/core/server';
import { ElasticsearchClient, IUiSettingsClient } from 'src/core/server';
import { ReportingConfig } from '../../../';
import { CancellationToken } from '../../../../../../plugins/reporting/common';
import { CSV_BOM_CHARS } from '../../../../common/constants';
@ -24,7 +24,7 @@ import { fieldFormatMapFactory } from './field_format_map';
import { createFlattenHit } from './flatten_hit';
import { createFormatCsvValues } from './format_csv_values';
import { getUiSettings } from './get_ui_settings';
import { createHitIterator, EndpointCaller } from './hit_iterator';
import { createHitIterator } from './hit_iterator';
interface SearchRequest {
index: string;
@ -56,7 +56,7 @@ export function createGenerateCsv(logger: LevelLogger) {
job: GenerateCsvParams,
config: ReportingConfig,
uiSettingsClient: IUiSettingsClient,
callEndpoint: EndpointCaller,
elasticsearchClient: ElasticsearchClient,
cancellationToken: CancellationToken
): Promise<SavedSearchGeneratorResultDeprecatedCSV> {
const settings = await getUiSettings(job.browserTimezone, uiSettingsClient, config, logger);
@ -79,7 +79,7 @@ export function createGenerateCsv(logger: LevelLogger) {
const iterator = hitIterator(
settings.scroll,
callEndpoint,
elasticsearchClient,
job.searchRequest,
cancellationToken
);

View file

@ -59,16 +59,6 @@ beforeEach(async () => {
mockReporting = await createMockReportingCore(mockReportingConfig);
const mockElasticsearch = {
legacy: {
client: {
asScoped: () => ({ callAsCurrentUser: jest.fn() }),
},
},
};
const mockGetElasticsearch = jest.fn();
mockGetElasticsearch.mockImplementation(() => Promise.resolve(mockElasticsearch));
mockReporting.getElasticsearchService = mockGetElasticsearch;
// @ts-ignore over-riding config method
mockReporting.config = mockReportingConfig;

View file

@ -57,16 +57,6 @@ beforeEach(async () => {
mockReporting = await createMockReportingCore(mockReportingConfig);
const mockElasticsearch = {
legacy: {
client: {
asScoped: () => ({ callAsCurrentUser: jest.fn() }),
},
},
};
const mockGetElasticsearch = jest.fn();
mockGetElasticsearch.mockImplementation(() => Promise.resolve(mockElasticsearch));
mockReporting.getElasticsearchService = mockGetElasticsearch;
// @ts-ignore over-riding config
mockReporting.config = mockReportingConfig;

View file

@ -25,8 +25,8 @@ const puid = new Puid();
export class Report implements Partial<ReportSource> {
public _index?: string;
public _id: string;
public _primary_term?: unknown; // set by ES
public _seq_no: unknown; // set by ES
public _primary_term?: number; // set by ES
public _seq_no?: number; // set by ES
public readonly kibana_name: ReportSource['kibana_name'];
public readonly kibana_id: ReportSource['kibana_id'];

View file

@ -5,8 +5,8 @@
* 2.0.
*/
import sinon from 'sinon';
import { ElasticsearchServiceSetup } from 'src/core/server';
import type { DeeplyMockedKeys } from '@kbn/utility-types/jest';
import { ElasticsearchClient } from 'src/core/server';
import { ReportingConfig, ReportingCore } from '../../';
import {
createMockConfig,
@ -21,9 +21,7 @@ describe('ReportingStore', () => {
const mockLogger = createMockLevelLogger();
let mockConfig: ReportingConfig;
let mockCore: ReportingCore;
const callClusterStub = sinon.stub();
const mockElasticsearch = { legacy: { client: { callAsInternalUser: callClusterStub } } };
let mockEsClient: DeeplyMockedKeys<ElasticsearchClient>;
beforeEach(async () => {
const reportingConfig = {
@ -33,17 +31,14 @@ describe('ReportingStore', () => {
const mockSchema = createMockConfigSchema(reportingConfig);
mockConfig = createMockConfig(mockSchema);
mockCore = await createMockReportingCore(mockConfig);
mockEsClient = (await mockCore.getEsClient()).asInternalUser as typeof mockEsClient;
callClusterStub.reset();
callClusterStub.withArgs('indices.exists').resolves({});
callClusterStub.withArgs('indices.create').resolves({});
callClusterStub.withArgs('index').resolves({ _id: 'stub-id', _index: 'stub-index' });
callClusterStub.withArgs('indices.refresh').resolves({});
callClusterStub.withArgs('update').resolves({});
callClusterStub.withArgs('get').resolves({});
mockCore.getElasticsearchService = () =>
(mockElasticsearch as unknown) as ElasticsearchServiceSetup;
mockEsClient.indices.create.mockResolvedValue({} as any);
mockEsClient.indices.exists.mockResolvedValue({} as any);
mockEsClient.indices.refresh.mockResolvedValue({} as any);
mockEsClient.get.mockResolvedValue({} as any);
mockEsClient.index.mockResolvedValue({ body: { _id: 'stub-id', _index: 'stub-index' } } as any);
mockEsClient.update.mockResolvedValue({} as any);
});
describe('addReport', () => {
@ -88,14 +83,14 @@ describe('ReportingStore', () => {
meta: {},
} as any);
expect(store.addReport(mockReport)).rejects.toMatchInlineSnapshot(
`[TypeError: this.client.callAsInternalUser is not a function]`
`[Error: Report object from ES has missing fields!]`
);
});
it('handles error creating the index', async () => {
// setup
callClusterStub.withArgs('indices.exists').resolves(false);
callClusterStub.withArgs('indices.create').rejects(new Error('horrible error'));
mockEsClient.indices.exists.mockResolvedValue({ body: false } as any);
mockEsClient.indices.create.mockRejectedValue(new Error('horrible error'));
const store = new ReportingStore(mockCore, mockLogger);
const mockReport = new Report({
@ -117,8 +112,8 @@ describe('ReportingStore', () => {
*/
it('ignores index creation error if the index already exists and continues adding the report', async () => {
// setup
callClusterStub.withArgs('indices.exists').resolves(false);
callClusterStub.withArgs('indices.create').rejects(new Error('devastating error'));
mockEsClient.indices.exists.mockResolvedValue({ body: false } as any);
mockEsClient.indices.create.mockRejectedValue(new Error('devastating error'));
const store = new ReportingStore(mockCore, mockLogger);
const mockReport = new Report({
@ -134,10 +129,9 @@ describe('ReportingStore', () => {
it('skips creating the index if already exists', async () => {
// setup
callClusterStub.withArgs('indices.exists').resolves(false);
callClusterStub
.withArgs('indices.create')
.rejects(new Error('resource_already_exists_exception')); // will be triggered but ignored
mockEsClient.indices.exists.mockResolvedValue({ body: false } as any);
// will be triggered but ignored
mockEsClient.indices.create.mockRejectedValue(new Error('resource_already_exists_exception'));
const store = new ReportingStore(mockCore, mockLogger);
const mockReport = new Report({
@ -159,10 +153,9 @@ describe('ReportingStore', () => {
it('allows username string to be `false`', async () => {
// setup
callClusterStub.withArgs('indices.exists').resolves(false);
callClusterStub
.withArgs('indices.create')
.rejects(new Error('resource_already_exists_exception')); // will be triggered but ignored
mockEsClient.indices.exists.mockResolvedValue({ body: false } as any);
// will be triggered but ignored
mockEsClient.indices.create.mockRejectedValue(new Error('resource_already_exists_exception'));
const store = new ReportingStore(mockCore, mockLogger);
const mockReport = new Report({
@ -192,8 +185,8 @@ describe('ReportingStore', () => {
const mockReport: ReportDocument = {
_id: '1234-foo-78',
_index: '.reporting-test-17409',
_primary_term: 'primary_term string',
_seq_no: 'seq_no string',
_primary_term: 1234,
_seq_no: 5678,
_source: {
kibana_name: 'test',
kibana_id: 'test123',
@ -210,7 +203,7 @@ describe('ReportingStore', () => {
output: null,
},
};
callClusterStub.withArgs('get').resolves(mockReport);
mockEsClient.get.mockResolvedValue({ body: mockReport } as any);
const store = new ReportingStore(mockCore, mockLogger);
const report = new Report({
...mockReport,
@ -221,8 +214,8 @@ describe('ReportingStore', () => {
Report {
"_id": "1234-foo-78",
"_index": ".reporting-test-17409",
"_primary_term": "primary_term string",
"_seq_no": "seq_no string",
"_primary_term": 1234,
"_seq_no": 5678,
"attempts": 0,
"browser_type": "browser type string",
"completed_at": undefined,
@ -267,10 +260,9 @@ describe('ReportingStore', () => {
await store.setReportClaimed(report, { testDoc: 'test' } as any);
const updateCall = callClusterStub.getCalls().find((call) => call.args[0] === 'update');
expect(updateCall && updateCall.args).toMatchInlineSnapshot(`
const [updateCall] = mockEsClient.update.mock.calls;
expect(updateCall).toMatchInlineSnapshot(`
Array [
"update",
Object {
"body": Object {
"doc": Object {
@ -308,10 +300,9 @@ describe('ReportingStore', () => {
await store.setReportFailed(report, { errors: 'yes' } as any);
const updateCall = callClusterStub.getCalls().find((call) => call.args[0] === 'update');
expect(updateCall && updateCall.args).toMatchInlineSnapshot(`
const [updateCall] = mockEsClient.update.mock.calls;
expect(updateCall).toMatchInlineSnapshot(`
Array [
"update",
Object {
"body": Object {
"doc": Object {
@ -349,10 +340,9 @@ describe('ReportingStore', () => {
await store.setReportCompleted(report, { certainly_completed: 'yes' } as any);
const updateCall = callClusterStub.getCalls().find((call) => call.args[0] === 'update');
expect(updateCall && updateCall.args).toMatchInlineSnapshot(`
const [updateCall] = mockEsClient.update.mock.calls;
expect(updateCall).toMatchInlineSnapshot(`
Array [
"update",
Object {
"body": Object {
"doc": Object {
@ -395,10 +385,9 @@ describe('ReportingStore', () => {
},
} as any);
const updateCall = callClusterStub.getCalls().find((call) => call.args[0] === 'update');
expect(updateCall && updateCall.args).toMatchInlineSnapshot(`
const [updateCall] = mockEsClient.update.mock.calls;
expect(updateCall).toMatchInlineSnapshot(`
Array [
"update",
Object {
"body": Object {
"doc": Object {

View file

@ -5,8 +5,7 @@
* 2.0.
*/
import { SearchParams } from 'elasticsearch';
import { ElasticsearchServiceSetup } from 'src/core/server';
import { ElasticsearchClient } from 'src/core/server';
import { LevelLogger, statuses } from '../';
import { ReportingCore } from '../../';
import { numberToDuration } from '../../../common/schema_utils';
@ -14,7 +13,7 @@ import { JobStatus } from '../../../common/types';
import { ReportTaskParams } from '../tasks';
import { indexTimestamp } from './index_timestamp';
import { mapping } from './mapping';
import { Report, ReportDocument } from './report';
import { Report, ReportDocument, ReportSource } from './report';
/*
* When searching for long-pending reports, we get a subset of fields
@ -45,59 +44,60 @@ export class ReportingStore {
private readonly indexPrefix: string; // config setting of index prefix in system index name
private readonly indexInterval: string; // config setting of index prefix: how often to poll for pending work
private readonly queueTimeoutMins: number; // config setting of queue timeout, rounded up to nearest minute
private client: ElasticsearchServiceSetup['legacy']['client'];
private logger: LevelLogger;
private client?: ElasticsearchClient;
constructor(reporting: ReportingCore, logger: LevelLogger) {
const config = reporting.getConfig();
const elasticsearch = reporting.getElasticsearchService();
constructor(private reportingCore: ReportingCore, private logger: LevelLogger) {
const config = reportingCore.getConfig();
this.client = elasticsearch.legacy.client;
this.indexPrefix = config.get('index');
this.indexInterval = config.get('queue', 'indexInterval');
this.logger = logger.clone(['store']);
this.queueTimeoutMins = Math.ceil(numberToDuration(config.get('queue', 'timeout')).asMinutes());
}
private async getClient() {
if (!this.client) {
({ asInternalUser: this.client } = await this.reportingCore.getEsClient());
}
return this.client;
}
private async createIndex(indexName: string) {
return await this.client
.callAsInternalUser('indices.exists', {
index: indexName,
})
.then((exists) => {
if (exists) {
return exists;
}
const client = await this.getClient();
const { body: exists } = await client.indices.exists({ index: indexName });
const indexSettings = {
number_of_shards: 1,
auto_expand_replicas: '0-1',
};
const body = {
settings: indexSettings,
mappings: {
properties: mapping,
},
};
if (exists) {
return exists;
}
return this.client
.callAsInternalUser('indices.create', {
index: indexName,
body,
})
.then(() => true)
.catch((err: Error) => {
const isIndexExistsError = err.message.match(/resource_already_exists_exception/);
if (isIndexExistsError) {
// Do not fail a job if the job runner hits the race condition.
this.logger.warn(`Automatic index creation failed: index already exists: ${err}`);
return;
}
const indexSettings = {
number_of_shards: 1,
auto_expand_replicas: '0-1',
};
const body = {
settings: indexSettings,
mappings: {
properties: mapping,
},
};
this.logger.error(err);
throw err;
});
});
try {
await client.indices.create({ index: indexName, body });
return true;
} catch (error) {
const isIndexExistsError = error.message.match(/resource_already_exists_exception/);
if (isIndexExistsError) {
// Do not fail a job if the job runner hits the race condition.
this.logger.warn(`Automatic index creation failed: index already exists: ${error}`);
return;
}
this.logger.error(error);
throw error;
}
}
/*
@ -105,7 +105,7 @@ export class ReportingStore {
*/
private async indexReport(report: Report) {
const doc = {
index: report._index,
index: report._index!,
id: report._id,
body: {
...report.toEsDocsJSON()._source,
@ -114,14 +114,20 @@ export class ReportingStore {
status: statuses.JOB_STATUS_PENDING,
},
};
return await this.client.callAsInternalUser('index', doc);
const client = await this.getClient();
const { body } = await client.index(doc);
return body;
}
/*
* Called from addReport, which handles any errors
*/
private async refreshIndex(index: string) {
return await this.client.callAsInternalUser('indices.refresh', { index });
const client = await this.getClient();
return client.indices.refresh({ index });
}
public async addReport(report: Report): Promise<Report> {
@ -156,7 +162,8 @@ export class ReportingStore {
}
try {
const document = await this.client.callAsInternalUser<ReportDocument>('get', {
const client = await this.getClient();
const { body: document } = await client.get<ReportSource>({
index: taskJson.index,
id: taskJson.id,
});
@ -166,17 +173,17 @@ export class ReportingStore {
_index: document._index,
_seq_no: document._seq_no,
_primary_term: document._primary_term,
jobtype: document._source.jobtype,
attempts: document._source.attempts,
browser_type: document._source.browser_type,
created_at: document._source.created_at,
created_by: document._source.created_by,
max_attempts: document._source.max_attempts,
meta: document._source.meta,
payload: document._source.payload,
process_expiration: document._source.process_expiration,
status: document._source.status,
timeout: document._source.timeout,
jobtype: document._source?.jobtype,
attempts: document._source?.attempts,
browser_type: document._source?.browser_type,
created_at: document._source?.created_at,
created_by: document._source?.created_by,
max_attempts: document._source?.max_attempts,
meta: document._source?.meta,
payload: document._source?.payload,
process_expiration: document._source?.process_expiration,
status: document._source?.status,
timeout: document._source?.timeout,
});
} catch (err) {
this.logger.error('Error in finding a report! ' + JSON.stringify({ report: taskJson }));
@ -191,14 +198,17 @@ export class ReportingStore {
try {
checkReportIsEditable(report);
return await this.client.callAsInternalUser('update', {
const client = await this.getClient();
const { body } = await client.update<ReportDocument>({
id: report._id,
index: report._index,
index: report._index!,
if_seq_no: report._seq_no,
if_primary_term: report._primary_term,
refresh: true,
body: { doc },
});
return (body as unknown) as ReportDocument;
} catch (err) {
this.logger.error('Error in setting report pending status!');
this.logger.error(err);
@ -215,14 +225,17 @@ export class ReportingStore {
try {
checkReportIsEditable(report);
return await this.client.callAsInternalUser('update', {
const client = await this.getClient();
const { body } = await client.update<ReportDocument>({
id: report._id,
index: report._index,
index: report._index!,
if_seq_no: report._seq_no,
if_primary_term: report._primary_term,
refresh: true,
body: { doc },
});
return (body as unknown) as ReportDocument;
} catch (err) {
this.logger.error('Error in setting report processing status!');
this.logger.error(err);
@ -239,14 +252,17 @@ export class ReportingStore {
try {
checkReportIsEditable(report);
return await this.client.callAsInternalUser('update', {
const client = await this.getClient();
const { body } = await client.update<ReportDocument>({
id: report._id,
index: report._index,
index: report._index!,
if_seq_no: report._seq_no,
if_primary_term: report._primary_term,
refresh: true,
body: { doc },
});
return (body as unknown) as ReportDocument;
} catch (err) {
this.logger.error('Error in setting report failed status!');
this.logger.error(err);
@ -267,14 +283,17 @@ export class ReportingStore {
};
checkReportIsEditable(report);
return await this.client.callAsInternalUser('update', {
const client = await this.getClient();
const { body } = await client.update<ReportDocument>({
id: report._id,
index: report._index,
index: report._index!,
if_seq_no: report._seq_no,
if_primary_term: report._primary_term,
refresh: true,
body: { doc },
});
return (body as unknown) as ReportDocument;
} catch (err) {
this.logger.error('Error in setting report complete status!');
this.logger.error(err);
@ -286,16 +305,17 @@ export class ReportingStore {
try {
checkReportIsEditable(report);
const updateParams = {
const client = await this.getClient();
const { body } = await client.update<ReportDocument>({
id: report._id,
index: report._index,
index: report._index!,
if_seq_no: report._seq_no,
if_primary_term: report._primary_term,
refresh: true,
body: { doc: { process_expiration: null } },
};
});
return await this.client.callAsInternalUser('update', updateParams);
return (body as unknown) as ReportDocument;
} catch (err) {
this.logger.error('Error in clearing expiration!');
this.logger.error(err);
@ -312,12 +332,11 @@ export class ReportingStore {
* Pending reports are not included in this search: they may be scheduled in TM just not run yet.
* TODO Should we get a list of the reports that are pending and scheduled in TM so we can exclude them from this query?
*/
public async findZombieReportDocuments(
logger = this.logger
): Promise<ReportRecordTimeout[] | null> {
const searchParams: SearchParams = {
public async findZombieReportDocuments(): Promise<ReportRecordTimeout[] | null> {
const client = await this.getClient();
const { body } = await client.search<ReportRecordTimeout['_source']>({
index: this.indexPrefix + '-*',
filterPath: 'hits.hits',
filter_path: 'hits.hits',
body: {
sort: { created_at: { order: 'desc' } },
query: {
@ -335,13 +354,8 @@ export class ReportingStore {
},
},
},
};
});
const result = await this.client.callAsInternalUser<ReportRecordTimeout['_source']>(
'search',
searchParams
);
return result.hits?.hits;
return body.hits?.hits as ReportRecordTimeout[];
}
}

View file

@ -47,7 +47,7 @@ export class ReportingPlugin
registerUiSettings(core);
const { elasticsearch, http } = core;
const { http } = core;
const { features, licensing, security, spaces, taskManager } = plugins;
const { initializerContext: initContext, reportingCore } = this;
@ -56,7 +56,6 @@ export class ReportingPlugin
reportingCore.pluginSetup({
features,
elasticsearch,
licensing,
basePath,
router,

View file

@ -6,6 +6,8 @@
*/
import { UnwrapPromise } from '@kbn/utility-types';
import type { DeeplyMockedKeys } from '@kbn/utility-types/jest';
import { ElasticsearchClient } from 'kibana/server';
import { setupServer } from 'src/core/server/test_utils';
import supertest from 'supertest';
import { ReportingCore } from '../..';
@ -26,6 +28,7 @@ describe('POST /diagnose/config', () => {
let core: ReportingCore;
let mockSetupDeps: any;
let config: any;
let mockEsClient: DeeplyMockedKeys<ElasticsearchClient>;
const mockLogger = createMockLevelLogger();
@ -38,9 +41,6 @@ describe('POST /diagnose/config', () => {
);
mockSetupDeps = createMockPluginSetup({
elasticsearch: {
legacy: { client: { callAsInternalUser: jest.fn() } },
},
router: httpSetup.createRouter(''),
} as unknown) as any;
@ -58,6 +58,7 @@ describe('POST /diagnose/config', () => {
};
core = await createMockReportingCore(config, mockSetupDeps);
mockEsClient = (await core.getEsClient()).asInternalUser as typeof mockEsClient;
});
afterEach(async () => {
@ -65,15 +66,15 @@ describe('POST /diagnose/config', () => {
});
it('returns a 200 by default when configured properly', async () => {
mockSetupDeps.elasticsearch.legacy.client.callAsInternalUser.mockImplementation(() =>
Promise.resolve({
mockEsClient.cluster.getSettings.mockResolvedValueOnce({
body: {
defaults: {
http: {
max_content_length: '100mb',
},
},
})
);
},
} as any);
registerDiagnoseConfig(core, mockLogger);
await server.start();
@ -94,15 +95,15 @@ describe('POST /diagnose/config', () => {
it('returns a 200 with help text when not configured properly', async () => {
config.get.mockImplementation(() => 10485760);
mockSetupDeps.elasticsearch.legacy.client.callAsInternalUser.mockImplementation(() =>
Promise.resolve({
mockEsClient.cluster.getSettings.mockResolvedValueOnce({
body: {
defaults: {
http: {
max_content_length: '5mb',
},
},
})
);
},
} as any);
registerDiagnoseConfig(core, mockLogger);
await server.start();

View file

@ -28,7 +28,7 @@ const numberToByteSizeValue = (value: number | ByteSizeValue) => {
export const registerDiagnoseConfig = (reporting: ReportingCore, logger: Logger) => {
const setupDeps = reporting.getPluginSetupDeps();
const userHandler = authorizedUserPreRoutingFactory(reporting);
const { router, elasticsearch } = setupDeps;
const { router } = setupDeps;
router.post(
{
@ -37,13 +37,13 @@ export const registerDiagnoseConfig = (reporting: ReportingCore, logger: Logger)
},
userHandler(async (user, context, req, res) => {
const warnings = [];
const { callAsInternalUser } = elasticsearch.legacy.client;
const { asInternalUser: elasticsearchClient } = await reporting.getEsClient();
const config = reporting.getConfig();
const elasticClusterSettingsResponse = await callAsInternalUser('cluster.getSettings', {
includeDefaults: true,
const { body: clusterSettings } = await elasticsearchClient.cluster.getSettings({
include_defaults: true,
});
const { persistent, transient, defaults: defaultSettings } = elasticClusterSettingsResponse;
const { persistent, transient, defaults: defaultSettings } = clusterSettings;
const elasticClusterSettings = defaults({}, persistent, transient, defaultSettings);
const elasticSearchMaxContent = get(

View file

@ -6,8 +6,9 @@
*/
import { UnwrapPromise } from '@kbn/utility-types';
import type { DeeplyMockedKeys } from '@kbn/utility-types/jest';
import { of } from 'rxjs';
import sinon from 'sinon';
import { ElasticsearchClient } from 'kibana/server';
import { setupServer } from 'src/core/server/test_utils';
import supertest from 'supertest';
import { ReportingCore } from '..';
@ -24,8 +25,8 @@ describe('POST /api/reporting/generate', () => {
let server: SetupServerReturn['server'];
let httpSetup: SetupServerReturn['httpSetup'];
let mockExportTypesRegistry: ExportTypesRegistry;
let callClusterStub: any;
let core: ReportingCore;
let mockEsClient: DeeplyMockedKeys<ElasticsearchClient>;
const config = {
get: jest.fn().mockImplementation((...args) => {
@ -55,12 +56,7 @@ describe('POST /api/reporting/generate', () => {
() => ({})
);
callClusterStub = sinon.stub().resolves({});
const mockSetupDeps = createMockPluginSetup({
elasticsearch: {
legacy: { client: { callAsInternalUser: callClusterStub } },
},
security: {
license: { isEnabled: () => true },
authc: {
@ -85,6 +81,9 @@ describe('POST /api/reporting/generate', () => {
runTaskFnFactory: () => async () => ({ runParamsTest: { test2: 'yes' } } as any),
});
core.getExportTypesRegistry = () => mockExportTypesRegistry;
mockEsClient = (await core.getEsClient()).asInternalUser as typeof mockEsClient;
mockEsClient.index.mockResolvedValue({ body: {} } as any);
});
afterEach(async () => {
@ -144,7 +143,7 @@ describe('POST /api/reporting/generate', () => {
});
it('returns 500 if job handler throws an error', async () => {
callClusterStub.withArgs('index').rejects('silly');
mockEsClient.index.mockRejectedValueOnce('silly');
registerJobGenerationRoutes(core, mockLogger);
@ -157,7 +156,7 @@ describe('POST /api/reporting/generate', () => {
});
it(`returns 200 if job handler doesn't error`, async () => {
callClusterStub.withArgs('index').resolves({ _id: 'foo', _index: 'foo-index' });
mockEsClient.index.mockResolvedValueOnce({ body: { _id: 'foo', _index: 'foo-index' } } as any);
registerJobGenerationRoutes(core, mockLogger);
await server.start();

View file

@ -6,7 +6,9 @@
*/
import { UnwrapPromise } from '@kbn/utility-types';
import type { DeeplyMockedKeys } from '@kbn/utility-types/jest';
import { of } from 'rxjs';
import { ElasticsearchClient } from 'kibana/server';
import { setupServer } from 'src/core/server/test_utils';
import supertest from 'supertest';
import { ReportingCore } from '..';
@ -29,6 +31,7 @@ describe('GET /api/reporting/jobs/download', () => {
let httpSetup: SetupServerReturn['httpSetup'];
let exportTypesRegistry: ExportTypesRegistry;
let core: ReportingCore;
let mockEsClient: DeeplyMockedKeys<ElasticsearchClient>;
const config = createMockConfig(createMockConfigSchema());
const getHits = (...sources: any) => {
@ -47,9 +50,6 @@ describe('GET /api/reporting/jobs/download', () => {
() => ({})
);
const mockSetupDeps = createMockPluginSetup({
elasticsearch: {
legacy: { client: { callAsInternalUser: jest.fn() } },
},
security: {
license: {
isEnabled: () => true,
@ -89,6 +89,8 @@ describe('GET /api/reporting/jobs/download', () => {
validLicenses: ['basic', 'gold'],
} as ExportTypeDefinition);
core.getExportTypesRegistry = () => exportTypesRegistry;
mockEsClient = (await core.getEsClient()).asInternalUser as typeof mockEsClient;
});
afterEach(async () => {
@ -96,10 +98,7 @@ describe('GET /api/reporting/jobs/download', () => {
});
it('fails on malformed download IDs', async () => {
// @ts-ignore
core.pluginSetupDeps.elasticsearch.legacy.client = {
callAsInternalUser: jest.fn().mockReturnValue(Promise.resolve(getHits())),
};
mockEsClient.search.mockResolvedValueOnce({ body: getHits() } as any);
registerJobInfoRoutes(core);
await server.start();
@ -171,11 +170,7 @@ describe('GET /api/reporting/jobs/download', () => {
});
it('returns 404 if job not found', async () => {
// @ts-ignore
core.pluginSetupDeps.elasticsearch.legacy.client = {
callAsInternalUser: jest.fn().mockReturnValue(Promise.resolve(getHits())),
};
mockEsClient.search.mockResolvedValueOnce({ body: getHits() } as any);
registerJobInfoRoutes(core);
await server.start();
@ -184,12 +179,9 @@ describe('GET /api/reporting/jobs/download', () => {
});
it('returns a 401 if not a valid job type', async () => {
// @ts-ignore
core.pluginSetupDeps.elasticsearch.legacy.client = {
callAsInternalUser: jest
.fn()
.mockReturnValue(Promise.resolve(getHits({ jobtype: 'invalidJobType' }))),
};
mockEsClient.search.mockResolvedValueOnce({
body: getHits({ jobtype: 'invalidJobType' }),
} as any);
registerJobInfoRoutes(core);
await server.start();
@ -198,14 +190,9 @@ describe('GET /api/reporting/jobs/download', () => {
});
it('when a job is incomplete', async () => {
// @ts-ignore
core.pluginSetupDeps.elasticsearch.legacy.client = {
callAsInternalUser: jest
.fn()
.mockReturnValue(
Promise.resolve(getHits({ jobtype: 'unencodedJobType', status: 'pending' }))
),
};
mockEsClient.search.mockResolvedValueOnce({
body: getHits({ jobtype: 'unencodedJobType', status: 'pending' }),
} as any);
registerJobInfoRoutes(core);
await server.start();
@ -218,18 +205,13 @@ describe('GET /api/reporting/jobs/download', () => {
});
it('when a job fails', async () => {
// @ts-ignore
core.pluginSetupDeps.elasticsearch.legacy.client = {
callAsInternalUser: jest.fn().mockReturnValue(
Promise.resolve(
getHits({
jobtype: 'unencodedJobType',
status: 'failed',
output: { content: 'job failure message' },
})
)
),
};
mockEsClient.search.mockResolvedValueOnce({
body: getHits({
jobtype: 'unencodedJobType',
status: 'failed',
output: { content: 'job failure message' },
}),
} as any);
registerJobInfoRoutes(core);
await server.start();
@ -243,7 +225,7 @@ describe('GET /api/reporting/jobs/download', () => {
});
describe('successful downloads', () => {
const getCompleteHits = async ({
const getCompleteHits = ({
jobType = 'unencodedJobType',
outputContent = 'job output content',
outputContentType = 'text/plain',
@ -260,11 +242,7 @@ describe('GET /api/reporting/jobs/download', () => {
};
it('when a known job-type is complete', async () => {
const hits = getCompleteHits();
// @ts-ignore
core.pluginSetupDeps.elasticsearch.legacy.client = {
callAsInternalUser: jest.fn().mockReturnValue(Promise.resolve(hits)),
};
mockEsClient.search.mockResolvedValueOnce({ body: getCompleteHits() } as any);
registerJobInfoRoutes(core);
await server.start();
@ -276,11 +254,7 @@ describe('GET /api/reporting/jobs/download', () => {
});
it('succeeds when security is not there or disabled', async () => {
const hits = getCompleteHits();
// @ts-ignore
core.pluginSetupDeps.elasticsearch.legacy.client = {
callAsInternalUser: jest.fn().mockReturnValue(Promise.resolve(hits)),
};
mockEsClient.search.mockResolvedValueOnce({ body: getCompleteHits() } as any);
// @ts-ignore
core.pluginSetupDeps.security = null;
@ -297,14 +271,12 @@ describe('GET /api/reporting/jobs/download', () => {
});
it(`doesn't encode output-content for non-specified job-types`, async () => {
const hits = getCompleteHits({
jobType: 'unencodedJobType',
outputContent: 'test',
});
// @ts-ignore
core.pluginSetupDeps.elasticsearch.legacy.client = {
callAsInternalUser: jest.fn().mockReturnValue(Promise.resolve(hits)),
};
mockEsClient.search.mockResolvedValueOnce({
body: getCompleteHits({
jobType: 'unencodedJobType',
outputContent: 'test',
}),
} as any);
registerJobInfoRoutes(core);
await server.start();
@ -316,15 +288,13 @@ describe('GET /api/reporting/jobs/download', () => {
});
it(`base64 encodes output content for configured jobTypes`, async () => {
const hits = getCompleteHits({
jobType: 'base64EncodedJobType',
outputContent: 'test',
outputContentType: 'application/pdf',
});
// @ts-ignore
core.pluginSetupDeps.elasticsearch.legacy.client = {
callAsInternalUser: jest.fn().mockReturnValue(Promise.resolve(hits)),
};
mockEsClient.search.mockResolvedValueOnce({
body: getCompleteHits({
jobType: 'base64EncodedJobType',
outputContent: 'test',
outputContentType: 'application/pdf',
}),
} as any);
registerJobInfoRoutes(core);
await server.start();
@ -337,15 +307,13 @@ describe('GET /api/reporting/jobs/download', () => {
});
it('refuses to return unknown content-types', async () => {
const hits = getCompleteHits({
jobType: 'unencodedJobType',
outputContent: 'alert("all your base mine now");',
outputContentType: 'application/html',
});
// @ts-ignore
core.pluginSetupDeps.elasticsearch.legacy.client = {
callAsInternalUser: jest.fn().mockReturnValue(Promise.resolve(hits)),
};
mockEsClient.search.mockResolvedValueOnce({
body: getCompleteHits({
jobType: 'unencodedJobType',
outputContent: 'alert("all your base mine now");',
outputContentType: 'application/html',
}),
} as any);
registerJobInfoRoutes(core);
await server.start();

View file

@ -5,83 +5,59 @@
* 2.0.
*/
import { UnwrapPromise } from '@kbn/utility-types';
import { i18n } from '@kbn/i18n';
import { errors as elasticsearchErrors } from 'elasticsearch';
import { get } from 'lodash';
import { ResponseError } from '@elastic/elasticsearch/lib/errors';
import { ElasticsearchClient } from 'src/core/server';
import { ReportingCore } from '../../';
import { ReportDocument } from '../../lib/store';
import { ReportingUser } from '../../types';
const esErrors = elasticsearchErrors as Record<string, any>;
const defaultSize = 10;
// TODO: use SearchRequest from elasticsearch-client
interface QueryBody {
size?: number;
from?: number;
_source?: {
excludes: string[];
};
query: {
constant_score: {
filter: {
bool: {
must: Array<Record<string, any>>;
};
};
};
};
}
type SearchRequest = Required<Parameters<ElasticsearchClient['search']>>[0];
interface GetOpts {
includeContent?: boolean;
}
// TODO: use SearchResult from elasticsearch-client
interface CountAggResult {
count: number;
}
const defaultSize = 10;
const getUsername = (user: ReportingUser) => (user ? user.username : false);
function getSearchBody(body: SearchRequest['body']): SearchRequest['body'] {
return {
_source: {
excludes: ['output.content'],
},
sort: [{ created_at: { order: 'desc' } }],
size: defaultSize,
...body,
};
}
export function jobsQueryFactory(reportingCore: ReportingCore) {
const { elasticsearch } = reportingCore.getPluginSetupDeps();
const { callAsInternalUser } = elasticsearch.legacy.client;
function execQuery(queryType: string, body: QueryBody) {
const defaultBody: Record<string, object> = {
search: {
_source: {
excludes: ['output.content'],
},
sort: [{ created_at: { order: 'desc' } }],
size: defaultSize,
},
};
function getIndex() {
const config = reportingCore.getConfig();
const index = config.get('index');
const query = {
index: `${index}-*`,
body: Object.assign(defaultBody[queryType] || {}, body),
};
return callAsInternalUser(queryType, query).catch((err) => {
if (err instanceof esErrors['401']) return;
if (err instanceof esErrors['403']) return;
if (err instanceof esErrors['404']) return;
throw err;
});
return `${config.get('index')}-*`;
}
type Result = number;
async function execQuery<T extends (client: ElasticsearchClient) => any>(
callback: T
): Promise<UnwrapPromise<ReturnType<T>> | undefined> {
try {
const { asInternalUser: client } = await reportingCore.getEsClient();
function getHits(query: Promise<Result>) {
return query.then((res) => get(res, 'hits.hits', []));
return await callback(client);
} catch (error) {
if (error instanceof ResponseError && [401, 403, 404].includes(error.statusCode)) {
return;
}
throw error;
}
}
return {
list(
async list(
jobTypes: string[],
user: ReportingUser,
page = 0,
@ -89,32 +65,34 @@ export function jobsQueryFactory(reportingCore: ReportingCore) {
jobIds: string[] | null
) {
const username = getUsername(user);
const body: QueryBody = {
const body = getSearchBody({
size,
from: size * page,
query: {
constant_score: {
filter: {
bool: {
must: [{ terms: { jobtype: jobTypes } }, { term: { created_by: username } }],
must: [
{ terms: { jobtype: jobTypes } },
{ term: { created_by: username } },
...(jobIds ? [{ ids: { values: jobIds } }] : []),
],
},
},
},
},
};
});
if (jobIds) {
body.query.constant_score.filter.bool.must.push({
ids: { values: jobIds },
});
}
const response = await execQuery((elasticsearchClient) =>
elasticsearchClient.search({ body, index: getIndex() })
);
return getHits(execQuery('search', body));
return response?.body.hits?.hits ?? [];
},
count(jobTypes: string[], user: ReportingUser) {
async count(jobTypes: string[], user: ReportingUser) {
const username = getUsername(user);
const body: QueryBody = {
const body = {
query: {
constant_score: {
filter: {
@ -126,17 +104,21 @@ export function jobsQueryFactory(reportingCore: ReportingCore) {
},
};
return execQuery('count', body).then((doc: CountAggResult) => {
if (!doc) return 0;
return doc.count;
});
const response = await execQuery((elasticsearchClient) =>
elasticsearchClient.count({ body, index: getIndex() })
);
return response?.body.count ?? 0;
},
get(user: ReportingUser, id: string, opts: GetOpts = {}): Promise<ReportDocument | void> {
if (!id) return Promise.resolve();
const username = getUsername(user);
async get(user: ReportingUser, id: string, opts: GetOpts = {}): Promise<ReportDocument | void> {
if (!id) {
return;
}
const body: QueryBody = {
const username = getUsername(user);
const body: SearchRequest['body'] = {
...(opts.includeContent ? { _source: { excludes: [] } } : {}),
query: {
constant_score: {
filter: {
@ -149,22 +131,23 @@ export function jobsQueryFactory(reportingCore: ReportingCore) {
size: 1,
};
if (opts.includeContent) {
body._source = {
excludes: [],
};
const response = await execQuery((elasticsearchClient) =>
elasticsearchClient.search({ body, index: getIndex() })
);
if (response?.body.hits?.hits?.length !== 1) {
return;
}
return getHits(execQuery('search', body)).then((hits) => {
if (hits.length !== 1) return;
return hits[0];
});
return response.body.hits.hits[0] as ReportDocument;
},
async delete(deleteIndex: string, id: string) {
try {
const { asInternalUser: elasticsearchClient } = await reportingCore.getEsClient();
const query = { id, index: deleteIndex, refresh: true };
return callAsInternalUser('delete', query);
return await elasticsearchClient.delete(query);
} catch (error) {
throw new Error(
i18n.translate('xpack.reporting.jobsQuery.deleteError', {

View file

@ -37,7 +37,6 @@ import { createMockLevelLogger } from './create_mock_levellogger';
export const createMockPluginSetup = (setupMock?: any): ReportingInternalSetup => {
return {
features: featuresPluginMock.createSetup(),
elasticsearch: setupMock.elasticsearch || { legacy: { client: {} } },
basePath: { set: jest.fn() },
router: setupMock.router,
security: setupMock.security,
@ -137,7 +136,7 @@ export const createMockReportingCore = async (
) => {
const mockReportingCore = ({
getConfig: () => config,
getElasticsearchService: () => setupDepsMock?.elasticsearch,
getEsClient: () => startDepsMock?.esClient,
getDataService: () => startDepsMock?.data,
} as unknown) as ReportingCore;