Fix aborted$ event and add completed$ event to KibanaRequest (#73898)

This commit is contained in:
Josh Dover 2020-07-31 09:19:43 -06:00 committed by GitHub
parent 1ccf55161e
commit 10cc600d5c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 128 additions and 2 deletions

View file

@ -0,0 +1,18 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->
[Home](./index.md) &gt; [kibana-plugin-core-server](./kibana-plugin-core-server.md) &gt; [KibanaRequestEvents](./kibana-plugin-core-server.kibanarequestevents.md) &gt; [completed$](./kibana-plugin-core-server.kibanarequestevents.completed_.md)
## KibanaRequestEvents.completed$ property
Observable that emits once if and when the request has been completely handled.
<b>Signature:</b>
```typescript
completed$: Observable<void>;
```
## Remarks
The request may be considered completed if: - A response has been sent to the client; or - The request was aborted.

View file

@ -17,4 +17,5 @@ export interface KibanaRequestEvents
| Property | Type | Description |
| --- | --- | --- |
| [aborted$](./kibana-plugin-core-server.kibanarequestevents.aborted_.md) | <code>Observable&lt;void&gt;</code> | Observable that emits once if and when the request has been aborted. |
| [completed$](./kibana-plugin-core-server.kibanarequestevents.completed_.md) | <code>Observable&lt;void&gt;</code> | Observable that emits once if and when the request has been completely handled. |

View file

@ -23,6 +23,7 @@ import { HttpService } from '../http_service';
import { contextServiceMock } from '../../context/context_service.mock';
import { loggingSystemMock } from '../../logging/logging_system.mock';
import { createHttpServer } from '../test_utils';
import { schema } from '@kbn/config-schema';
let server: HttpService;
@ -195,6 +196,96 @@ describe('KibanaRequest', () => {
expect(nextSpy).toHaveBeenCalledTimes(0);
expect(completeSpy).toHaveBeenCalledTimes(1);
});
it('does not complete before response has been sent', async () => {
const { server: innerServer, createRouter, registerOnPreAuth } = await server.setup(
setupDeps
);
const router = createRouter('/');
const nextSpy = jest.fn();
const completeSpy = jest.fn();
registerOnPreAuth((req, res, toolkit) => {
req.events.aborted$.subscribe({
next: nextSpy,
complete: completeSpy,
});
return toolkit.next();
});
router.post(
{ path: '/', validate: { body: schema.any() } },
async (context, request, res) => {
expect(completeSpy).not.toHaveBeenCalled();
return res.ok({ body: 'ok' });
}
);
await server.start();
await supertest(innerServer.listener).post('/').send({ data: 'test' }).expect(200);
expect(nextSpy).toHaveBeenCalledTimes(0);
expect(completeSpy).toHaveBeenCalledTimes(1);
});
});
describe('completed$', () => {
it('emits once and completes when response is sent', async () => {
const { server: innerServer, createRouter } = await server.setup(setupDeps);
const router = createRouter('/');
const nextSpy = jest.fn();
const completeSpy = jest.fn();
router.get({ path: '/', validate: false }, async (context, req, res) => {
req.events.completed$.subscribe({
next: nextSpy,
complete: completeSpy,
});
expect(nextSpy).not.toHaveBeenCalled();
expect(completeSpy).not.toHaveBeenCalled();
return res.ok({ body: 'ok' });
});
await server.start();
await supertest(innerServer.listener).get('/').expect(200);
expect(nextSpy).toHaveBeenCalledTimes(1);
expect(completeSpy).toHaveBeenCalledTimes(1);
});
it('emits once and completes when response is aborted', async (done) => {
expect.assertions(2);
const { server: innerServer, createRouter } = await server.setup(setupDeps);
const router = createRouter('/');
const nextSpy = jest.fn();
router.get({ path: '/', validate: false }, async (context, req, res) => {
req.events.completed$.subscribe({
next: nextSpy,
complete: () => {
expect(nextSpy).toHaveBeenCalledTimes(1);
done();
},
});
expect(nextSpy).not.toHaveBeenCalled();
await delay(30000);
return res.ok({ body: 'ok' });
});
await server.start();
const incomingRequest = supertest(innerServer.listener)
.get('/')
// end required to send request
.end();
setTimeout(() => incomingRequest.abort(), 50);
});
});
});
});

View file

@ -64,6 +64,16 @@ export interface KibanaRequestEvents {
* Observable that emits once if and when the request has been aborted.
*/
aborted$: Observable<void>;
/**
* Observable that emits once if and when the request has been completely handled.
*
* @remarks
* The request may be considered completed if:
* - A response has been sent to the client; or
* - The request was aborted.
*/
completed$: Observable<void>;
}
/**
@ -186,11 +196,16 @@ export class KibanaRequest<
private getEvents(request: Request): KibanaRequestEvents {
const finish$ = merge(
fromEvent(request.raw.req, 'end'), // all data consumed
fromEvent(request.raw.res, 'finish'), // Response has been sent
fromEvent(request.raw.req, 'close') // connection was closed
).pipe(shareReplay(1), first());
const aborted$ = fromEvent<void>(request.raw.req, 'aborted').pipe(first(), takeUntil(finish$));
const completed$ = merge<void, void>(finish$, aborted$).pipe(shareReplay(1), first());
return {
aborted$: fromEvent<void>(request.raw.req, 'aborted').pipe(first(), takeUntil(finish$)),
aborted$,
completed$,
} as const;
}

View file

@ -1071,6 +1071,7 @@ export class KibanaRequest<Params = unknown, Query = unknown, Body = unknown, Me
// @public
export interface KibanaRequestEvents {
aborted$: Observable<void>;
completed$: Observable<void>;
}
// @public