diff --git a/src/legacy/core_plugins/interpreter/public/canvas/ajax_stream/ajax_stream.test.ts b/src/legacy/core_plugins/interpreter/public/canvas/ajax_stream/ajax_stream.test.ts index 243c766206d2..4463758e30bd 100644 --- a/src/legacy/core_plugins/interpreter/public/canvas/ajax_stream/ajax_stream.test.ts +++ b/src/legacy/core_plugins/interpreter/public/canvas/ajax_stream/ajax_stream.test.ts @@ -26,7 +26,7 @@ describe('ajaxStream', () => { it('pulls items from the stream and calls the handler', async () => { const handler = jest.fn(() => ({})); const { req, sendText, done } = mockRequest(); - const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n'].map(m => `${m.length}:${m}`); + const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n']; const promise = ajaxStream('', {}, req, { url: '/test/endpoint', @@ -43,12 +43,34 @@ describe('ajaxStream', () => { expect(handler).toHaveBeenCalledWith({ tis: 'fate' }); }); + it('handles newlines in values', async () => { + const handler = jest.fn(() => ({})); + const { req, sendText, done } = mockRequest(); + const messages = [ + JSON.stringify({ hello: 'wo\nrld' }), + '\n', + JSON.stringify({ tis: 'fa\nte' }), + '\n', + ]; + + const promise = ajaxStream('', {}, req, { + url: '/test/endpoint', + onResponse: handler, + }); + + messages.forEach(sendText); + done(); + + await promise; + expect(handler).toHaveBeenCalledTimes(2); + expect(handler).toHaveBeenCalledWith({ hello: 'wo\nrld' }); + expect(handler).toHaveBeenCalledWith({ tis: 'fa\nte' }); + }); + it('handles partial messages', async () => { const handler = jest.fn(() => ({})); const { req, sendText, done } = mockRequest(); - const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n'] - .map(m => `${m.length}:${m}`) - .join(''); + const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n'].join(''); const promise = ajaxStream('', {}, req, { url: '/test/endpoint', @@ -117,7 +139,7 @@ describe('ajaxStream', () => { it('rejects if the payload contains invalid JSON', async () => { const handler = jest.fn(() => ({})); const { req, sendText, done } = mockRequest(); - const messages = ['{ waut? }\n'].map(m => `${m.length}:${m}`).join(''); + const messages = ['{ waut? }\n'].join(''); const promise = ajaxStream('', {}, req, { url: '/test/endpoint', @@ -130,32 +152,12 @@ describe('ajaxStream', () => { expect(await promise.then(() => true).catch(() => false)).toBeFalsy(); }); - it('rejects if the delim is invalid', async () => { - const handler = jest.fn(() => ({})); - const { req, sendText, done } = mockRequest(); - const messages = '{ "hi": "there" }'; - - const promise = ajaxStream('', {}, req, { - url: '/test/endpoint', - onResponse: handler, - }); - - sendText(messages); - done(); - - expect(await promise.then(() => true).catch(({ message }) => message)).toMatch( - /invalid stream response/i - ); - }); - it('rejects if the handler throws', async () => { const handler = jest.fn(() => { throw new Error('DOH!'); }); const { req, sendText, done } = mockRequest(); - const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n'] - .map(m => `${m.length}:${m}`) - .join(''); + const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n'].join(''); const promise = ajaxStream('', {}, req, { url: '/test/endpoint', diff --git a/src/legacy/core_plugins/interpreter/public/canvas/ajax_stream/ajax_stream.ts b/src/legacy/core_plugins/interpreter/public/canvas/ajax_stream/ajax_stream.ts index 74e10e2a271b..867581081f82 100644 --- a/src/legacy/core_plugins/interpreter/public/canvas/ajax_stream/ajax_stream.ts +++ b/src/legacy/core_plugins/interpreter/public/canvas/ajax_stream/ajax_stream.ts @@ -64,34 +64,19 @@ function processBatchResponseStream(handler: BatchResponseHandler) { return (text: string) => { // While there's text to process... while (index < text.length) { - // Our messages are delimited by colon: len:json - const delim = ':'; + // We're using new line-delimited JSON. + const delim = '\n'; const delimIndex = text.indexOf(delim, index); - const payloadStart = delimIndex + delim.length; // We've got an incomplete batch length if (delimIndex < 0) { return; } - const rawLen = text.slice(index, delimIndex); - const payloadLen = parseInt(rawLen, 10); - const payloadEnd = payloadStart + payloadLen; - - // We've got an invalid batch message (e.g. one without a numeric length: prefix) - if (isNaN(payloadLen)) { - throw new Error(`Invalid stream response length: ${rawLen}`); - } - - // We've got an incomplete batch message - if (text.length < payloadEnd) { - return; - } - - const payload = JSON.parse(text.slice(payloadStart, payloadEnd)); + const payload = JSON.parse(text.slice(index, delimIndex)); handler(payload); - index = payloadEnd; + index = delimIndex + 1; } }; } diff --git a/src/legacy/core_plugins/interpreter/server/routes/server_functions.ts b/src/legacy/core_plugins/interpreter/server/routes/server_functions.ts index 63e59ad61371..e03ad361b555 100644 --- a/src/legacy/core_plugins/interpreter/server/routes/server_functions.ts +++ b/src/legacy/core_plugins/interpreter/server/routes/server_functions.ts @@ -75,7 +75,7 @@ function runServerFunctions(server: any) { // Send the initial headers. res.writeHead(200, { - 'Content-Type': 'text/plain', + 'Content-Type': 'application/x-ndjson', Connection: 'keep-alive', 'Transfer-Encoding': 'chunked', 'Cache-Control': 'no-cache', @@ -83,8 +83,7 @@ function runServerFunctions(server: any) { // Write a length-delimited response const streamResult = (result: any) => { - const payload = JSON.stringify(result) + '\n'; - res.write(`${payload.length}:${payload}`); + res.write(JSON.stringify(result) + '\n'); }; // Tries to run an interpreter function, and ensures a consistent error payload on failure.