Change ajax_stream to use new-line delimited JSON (#52797)
This commit is contained in:
parent
860be3c3eb
commit
ba2e2588a3
|
@ -26,7 +26,7 @@ describe('ajaxStream', () => {
|
|||
it('pulls items from the stream and calls the handler', async () => {
|
||||
const handler = jest.fn(() => ({}));
|
||||
const { req, sendText, done } = mockRequest();
|
||||
const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n'].map(m => `${m.length}:${m}`);
|
||||
const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n'];
|
||||
|
||||
const promise = ajaxStream('', {}, req, {
|
||||
url: '/test/endpoint',
|
||||
|
@ -43,12 +43,34 @@ describe('ajaxStream', () => {
|
|||
expect(handler).toHaveBeenCalledWith({ tis: 'fate' });
|
||||
});
|
||||
|
||||
it('handles newlines in values', async () => {
|
||||
const handler = jest.fn(() => ({}));
|
||||
const { req, sendText, done } = mockRequest();
|
||||
const messages = [
|
||||
JSON.stringify({ hello: 'wo\nrld' }),
|
||||
'\n',
|
||||
JSON.stringify({ tis: 'fa\nte' }),
|
||||
'\n',
|
||||
];
|
||||
|
||||
const promise = ajaxStream('', {}, req, {
|
||||
url: '/test/endpoint',
|
||||
onResponse: handler,
|
||||
});
|
||||
|
||||
messages.forEach(sendText);
|
||||
done();
|
||||
|
||||
await promise;
|
||||
expect(handler).toHaveBeenCalledTimes(2);
|
||||
expect(handler).toHaveBeenCalledWith({ hello: 'wo\nrld' });
|
||||
expect(handler).toHaveBeenCalledWith({ tis: 'fa\nte' });
|
||||
});
|
||||
|
||||
it('handles partial messages', async () => {
|
||||
const handler = jest.fn(() => ({}));
|
||||
const { req, sendText, done } = mockRequest();
|
||||
const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n']
|
||||
.map(m => `${m.length}:${m}`)
|
||||
.join('');
|
||||
const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n'].join('');
|
||||
|
||||
const promise = ajaxStream('', {}, req, {
|
||||
url: '/test/endpoint',
|
||||
|
@ -117,7 +139,7 @@ describe('ajaxStream', () => {
|
|||
it('rejects if the payload contains invalid JSON', async () => {
|
||||
const handler = jest.fn(() => ({}));
|
||||
const { req, sendText, done } = mockRequest();
|
||||
const messages = ['{ waut? }\n'].map(m => `${m.length}:${m}`).join('');
|
||||
const messages = ['{ waut? }\n'].join('');
|
||||
|
||||
const promise = ajaxStream('', {}, req, {
|
||||
url: '/test/endpoint',
|
||||
|
@ -130,32 +152,12 @@ describe('ajaxStream', () => {
|
|||
expect(await promise.then(() => true).catch(() => false)).toBeFalsy();
|
||||
});
|
||||
|
||||
it('rejects if the delim is invalid', async () => {
|
||||
const handler = jest.fn(() => ({}));
|
||||
const { req, sendText, done } = mockRequest();
|
||||
const messages = '{ "hi": "there" }';
|
||||
|
||||
const promise = ajaxStream('', {}, req, {
|
||||
url: '/test/endpoint',
|
||||
onResponse: handler,
|
||||
});
|
||||
|
||||
sendText(messages);
|
||||
done();
|
||||
|
||||
expect(await promise.then(() => true).catch(({ message }) => message)).toMatch(
|
||||
/invalid stream response/i
|
||||
);
|
||||
});
|
||||
|
||||
it('rejects if the handler throws', async () => {
|
||||
const handler = jest.fn(() => {
|
||||
throw new Error('DOH!');
|
||||
});
|
||||
const { req, sendText, done } = mockRequest();
|
||||
const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n']
|
||||
.map(m => `${m.length}:${m}`)
|
||||
.join('');
|
||||
const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n'].join('');
|
||||
|
||||
const promise = ajaxStream('', {}, req, {
|
||||
url: '/test/endpoint',
|
||||
|
|
|
@ -64,34 +64,19 @@ function processBatchResponseStream<T>(handler: BatchResponseHandler<T>) {
|
|||
return (text: string) => {
|
||||
// While there's text to process...
|
||||
while (index < text.length) {
|
||||
// Our messages are delimited by colon: len:json
|
||||
const delim = ':';
|
||||
// We're using new line-delimited JSON.
|
||||
const delim = '\n';
|
||||
const delimIndex = text.indexOf(delim, index);
|
||||
const payloadStart = delimIndex + delim.length;
|
||||
|
||||
// We've got an incomplete batch length
|
||||
if (delimIndex < 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const rawLen = text.slice(index, delimIndex);
|
||||
const payloadLen = parseInt(rawLen, 10);
|
||||
const payloadEnd = payloadStart + payloadLen;
|
||||
|
||||
// We've got an invalid batch message (e.g. one without a numeric length: prefix)
|
||||
if (isNaN(payloadLen)) {
|
||||
throw new Error(`Invalid stream response length: ${rawLen}`);
|
||||
}
|
||||
|
||||
// We've got an incomplete batch message
|
||||
if (text.length < payloadEnd) {
|
||||
return;
|
||||
}
|
||||
|
||||
const payload = JSON.parse(text.slice(payloadStart, payloadEnd));
|
||||
const payload = JSON.parse(text.slice(index, delimIndex));
|
||||
handler(payload);
|
||||
|
||||
index = payloadEnd;
|
||||
index = delimIndex + 1;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
@ -75,7 +75,7 @@ function runServerFunctions(server: any) {
|
|||
|
||||
// Send the initial headers.
|
||||
res.writeHead(200, {
|
||||
'Content-Type': 'text/plain',
|
||||
'Content-Type': 'application/x-ndjson',
|
||||
Connection: 'keep-alive',
|
||||
'Transfer-Encoding': 'chunked',
|
||||
'Cache-Control': 'no-cache',
|
||||
|
@ -83,8 +83,7 @@ function runServerFunctions(server: any) {
|
|||
|
||||
// Write a length-delimited response
|
||||
const streamResult = (result: any) => {
|
||||
const payload = JSON.stringify(result) + '\n';
|
||||
res.write(`${payload.length}:${payload}`);
|
||||
res.write(JSON.stringify(result) + '\n');
|
||||
};
|
||||
|
||||
// Tries to run an interpreter function, and ensures a consistent error payload on failure.
|
||||
|
|
Loading…
Reference in a new issue