From 10cc600d5cdc9239ce174683b9fee6c3b22c8f4c Mon Sep 17 00:00:00 2001 From: Josh Dover Date: Fri, 31 Jul 2020 09:19:43 -0600 Subject: [PATCH] Fix aborted$ event and add completed$ event to KibanaRequest (#73898) --- ...e-server.kibanarequestevents.completed_.md | 18 ++++ ...-plugin-core-server.kibanarequestevents.md | 1 + .../http/integration_tests/request.test.ts | 91 +++++++++++++++++++ src/core/server/http/router/request.ts | 19 +++- src/core/server/server.api.md | 1 + 5 files changed, 128 insertions(+), 2 deletions(-) create mode 100644 docs/development/core/server/kibana-plugin-core-server.kibanarequestevents.completed_.md diff --git a/docs/development/core/server/kibana-plugin-core-server.kibanarequestevents.completed_.md b/docs/development/core/server/kibana-plugin-core-server.kibanarequestevents.completed_.md new file mode 100644 index 000000000000..c9f8ab11f6b1 --- /dev/null +++ b/docs/development/core/server/kibana-plugin-core-server.kibanarequestevents.completed_.md @@ -0,0 +1,18 @@ + + +[Home](./index.md) > [kibana-plugin-core-server](./kibana-plugin-core-server.md) > [KibanaRequestEvents](./kibana-plugin-core-server.kibanarequestevents.md) > [completed$](./kibana-plugin-core-server.kibanarequestevents.completed_.md) + +## KibanaRequestEvents.completed$ property + +Observable that emits once if and when the request has been completely handled. + +Signature: + +```typescript +completed$: Observable; +``` + +## Remarks + +The request may be considered completed if: - A response has been sent to the client; or - The request was aborted. + diff --git a/docs/development/core/server/kibana-plugin-core-server.kibanarequestevents.md b/docs/development/core/server/kibana-plugin-core-server.kibanarequestevents.md index 21826c8b2938..dfd7efd27cb5 100644 --- a/docs/development/core/server/kibana-plugin-core-server.kibanarequestevents.md +++ b/docs/development/core/server/kibana-plugin-core-server.kibanarequestevents.md @@ -17,4 +17,5 @@ export interface KibanaRequestEvents | Property | Type | Description | | --- | --- | --- | | [aborted$](./kibana-plugin-core-server.kibanarequestevents.aborted_.md) | Observable<void> | Observable that emits once if and when the request has been aborted. | +| [completed$](./kibana-plugin-core-server.kibanarequestevents.completed_.md) | Observable<void> | Observable that emits once if and when the request has been completely handled. | diff --git a/src/core/server/http/integration_tests/request.test.ts b/src/core/server/http/integration_tests/request.test.ts index 2d018f7f464b..3a7335583296 100644 --- a/src/core/server/http/integration_tests/request.test.ts +++ b/src/core/server/http/integration_tests/request.test.ts @@ -23,6 +23,7 @@ import { HttpService } from '../http_service'; import { contextServiceMock } from '../../context/context_service.mock'; import { loggingSystemMock } from '../../logging/logging_system.mock'; import { createHttpServer } from '../test_utils'; +import { schema } from '@kbn/config-schema'; let server: HttpService; @@ -195,6 +196,96 @@ describe('KibanaRequest', () => { expect(nextSpy).toHaveBeenCalledTimes(0); expect(completeSpy).toHaveBeenCalledTimes(1); }); + + it('does not complete before response has been sent', async () => { + const { server: innerServer, createRouter, registerOnPreAuth } = await server.setup( + setupDeps + ); + const router = createRouter('/'); + + const nextSpy = jest.fn(); + const completeSpy = jest.fn(); + + registerOnPreAuth((req, res, toolkit) => { + req.events.aborted$.subscribe({ + next: nextSpy, + complete: completeSpy, + }); + return toolkit.next(); + }); + + router.post( + { path: '/', validate: { body: schema.any() } }, + async (context, request, res) => { + expect(completeSpy).not.toHaveBeenCalled(); + return res.ok({ body: 'ok' }); + } + ); + + await server.start(); + + await supertest(innerServer.listener).post('/').send({ data: 'test' }).expect(200); + + expect(nextSpy).toHaveBeenCalledTimes(0); + expect(completeSpy).toHaveBeenCalledTimes(1); + }); + }); + + describe('completed$', () => { + it('emits once and completes when response is sent', async () => { + const { server: innerServer, createRouter } = await server.setup(setupDeps); + const router = createRouter('/'); + + const nextSpy = jest.fn(); + const completeSpy = jest.fn(); + + router.get({ path: '/', validate: false }, async (context, req, res) => { + req.events.completed$.subscribe({ + next: nextSpy, + complete: completeSpy, + }); + + expect(nextSpy).not.toHaveBeenCalled(); + expect(completeSpy).not.toHaveBeenCalled(); + return res.ok({ body: 'ok' }); + }); + + await server.start(); + + await supertest(innerServer.listener).get('/').expect(200); + expect(nextSpy).toHaveBeenCalledTimes(1); + expect(completeSpy).toHaveBeenCalledTimes(1); + }); + + it('emits once and completes when response is aborted', async (done) => { + expect.assertions(2); + const { server: innerServer, createRouter } = await server.setup(setupDeps); + const router = createRouter('/'); + + const nextSpy = jest.fn(); + + router.get({ path: '/', validate: false }, async (context, req, res) => { + req.events.completed$.subscribe({ + next: nextSpy, + complete: () => { + expect(nextSpy).toHaveBeenCalledTimes(1); + done(); + }, + }); + + expect(nextSpy).not.toHaveBeenCalled(); + await delay(30000); + return res.ok({ body: 'ok' }); + }); + + await server.start(); + + const incomingRequest = supertest(innerServer.listener) + .get('/') + // end required to send request + .end(); + setTimeout(() => incomingRequest.abort(), 50); + }); }); }); }); diff --git a/src/core/server/http/router/request.ts b/src/core/server/http/router/request.ts index 0e73431fe7c6..93ffb5aa4825 100644 --- a/src/core/server/http/router/request.ts +++ b/src/core/server/http/router/request.ts @@ -64,6 +64,16 @@ export interface KibanaRequestEvents { * Observable that emits once if and when the request has been aborted. */ aborted$: Observable; + + /** + * Observable that emits once if and when the request has been completely handled. + * + * @remarks + * The request may be considered completed if: + * - A response has been sent to the client; or + * - The request was aborted. + */ + completed$: Observable; } /** @@ -186,11 +196,16 @@ export class KibanaRequest< private getEvents(request: Request): KibanaRequestEvents { const finish$ = merge( - fromEvent(request.raw.req, 'end'), // all data consumed + fromEvent(request.raw.res, 'finish'), // Response has been sent fromEvent(request.raw.req, 'close') // connection was closed ).pipe(shareReplay(1), first()); + + const aborted$ = fromEvent(request.raw.req, 'aborted').pipe(first(), takeUntil(finish$)); + const completed$ = merge(finish$, aborted$).pipe(shareReplay(1), first()); + return { - aborted$: fromEvent(request.raw.req, 'aborted').pipe(first(), takeUntil(finish$)), + aborted$, + completed$, } as const; } diff --git a/src/core/server/server.api.md b/src/core/server/server.api.md index c1054c27d084..21ef66230f69 100644 --- a/src/core/server/server.api.md +++ b/src/core/server/server.api.md @@ -1071,6 +1071,7 @@ export class KibanaRequest; + completed$: Observable; } // @public