[optimize] inject publicPath at request time (#14007)

* [optimize] inject publicPath at request time

* [optimize/getFileHash] finish doc block

* [optimize/bundlesRoute] correct return value doc type

* [optimize/bundleRoute] use more descriptive name for file hash cache

* [optimize/dynamicAssetResponse] add more details to doc

* [utils/createReplaceStream] trim the buffer based on the length of toReplace, not replacement

* [utils/createReplaceStream] add inline docs

* [utils/createReplaceStream] write unit tests

* [optimize/bundleRoute] expect supports buffers

* [optimize/bundleRoute/basePublicPath/tests] add happy path

* [optimize/bundlesRoute/tests] verify content-type header

* [optimize/bundlesRoute] use '

(cherry picked from commit 1ea82fa869)
This commit is contained in:
Spencer 2017-10-03 01:51:59 -07:00 committed by spalger
parent 3973a54086
commit 702514eba1
23 changed files with 765 additions and 21 deletions

View file

@ -148,6 +148,7 @@
"less": "2.7.1",
"less-loader": "2.2.3",
"lodash": "3.10.1",
"lru-cache": "4.1.1",
"markdown-it": "8.3.2",
"minimatch": "2.0.10",
"mkdirp": "0.5.1",
@ -190,6 +191,7 @@
"rimraf": "2.4.3",
"rison-node": "1.0.0",
"rjs-repack-loader": "1.0.6",
"rxjs": "5.4.3",
"script-loader": "0.6.1",
"semver": "5.1.0",
"style-loader": "0.12.3",

View file

@ -11,6 +11,8 @@ import { defaults, transform } from 'lodash';
import { fromRoot } from '../utils';
import pkg from '../../package.json';
import { PUBLIC_PATH_PLACEHOLDER } from './public_path_placeholder';
import { setLoaderQueryParam, makeLoaderString } from './loaders';
const babelExclude = [/[\/\\](webpackShims|node_modules|bower_components)[\/\\]/];
@ -18,7 +20,6 @@ const babelExclude = [/[\/\\](webpackShims|node_modules|bower_components)[\/\\]/
export default class BaseOptimizer {
constructor(opts) {
this.env = opts.env;
this.urlBasePath = opts.urlBasePath;
this.bundles = opts.bundles;
this.profile = opts.profile || false;
@ -100,7 +101,7 @@ export default class BaseOptimizer {
path: this.env.workingDir,
filename: '[name].bundle.js',
sourceMapFilename: '[file].map',
publicPath: `${this.urlBasePath || ''}/bundles/`,
publicPath: PUBLIC_PATH_PLACEHOLDER,
devtoolModuleFilenameTemplate: '[absolute-resource-path]'
},

View file

@ -0,0 +1,321 @@
import { resolve } from 'path';
import { readFileSync } from 'fs';
import crypto from 'crypto';
import Chance from 'chance';
import expect from 'expect.js';
import Hapi from 'hapi';
import Inert from 'inert';
import sinon from 'sinon';
import { createBundlesRoute } from '../bundles_route';
import { PUBLIC_PATH_PLACEHOLDER } from '../../public_path_placeholder';
const chance = new Chance();
const outputFixture = resolve(__dirname, './fixtures/output');
function replaceAll(source, replace, replaceWith) {
return source.split(replace).join(replaceWith);
}
describe('optimizer/bundle route', () => {
const sandbox = sinon.sandbox.create();
function createServer(options = {}) {
const {
bundlesPath = outputFixture,
basePublicPath = ''
} = options;
const server = new Hapi.Server();
server.connection({ port: 0 });
server.register([Inert]);
server.route(createBundlesRoute({
bundlesPath,
basePublicPath,
}));
return server;
}
afterEach(() => sandbox.restore());
describe('validation', () => {
it('validates that bundlesPath is an absolute path', () => {
expect(() => {
createBundlesRoute({
bundlesPath: null,
basePublicPath: ''
});
}).to.throwError(/absolute path/);
expect(() => {
createBundlesRoute({
bundlesPath: './relative',
basePublicPath: ''
});
}).to.throwError(/absolute path/);
expect(() => {
createBundlesRoute({
bundlesPath: 1234,
basePublicPath: ''
});
}).to.throwError(/absolute path/);
expect(() => {
createBundlesRoute({
bundlesPath: '/absolute/path',
basePublicPath: ''
});
}).to.not.throwError();
});
it('validates that basePublicPath is valid', () => {
expect(() => {
createBundlesRoute({
bundlesPath: '/bundles',
basePublicPath: 123
});
}).to.throwError(/string/);
expect(() => {
createBundlesRoute({
bundlesPath: '/bundles',
basePublicPath: {}
});
}).to.throwError(/string/);
expect(() => {
createBundlesRoute({
bundlesPath: '/bundles',
basePublicPath: '/a/'
});
}).to.throwError(/start and not end with a \//);
expect(() => {
createBundlesRoute({
bundlesPath: '/bundles',
basePublicPath: 'a/'
});
}).to.throwError(/start and not end with a \//);
expect(() => {
createBundlesRoute({
bundlesPath: '/bundles',
basePublicPath: '/a'
});
}).to.not.throwError();
expect(() => {
createBundlesRoute({
bundlesPath: '/bundles',
basePublicPath: ''
});
}).to.not.throwError();
});
});
describe('image', () => {
it('responds with exact file data', async () => {
const server = createServer();
const response = await server.inject({
url: '/bundles/image.png'
});
expect(response.statusCode).to.be(200);
const image = readFileSync(resolve(outputFixture, 'image.png'));
expect(response.headers).to.have.property('content-length', image.length);
expect(response.headers).to.have.property('content-type', 'image/png');
expect(image).to.eql(response.rawPayload);
});
});
describe('js file without placeholder', () => {
it('responds with no content-length and exact file data', async () => {
const server = createServer();
const response = await server.inject({
url: '/bundles/no_placeholder.js'
});
expect(response.statusCode).to.be(200);
expect(response.headers).to.not.have.property('content-length');
expect(response.headers).to.have.property('content-type', 'application/javascript; charset=utf-8');
expect(readFileSync(resolve(outputFixture, 'no_placeholder.js')))
.to.eql(response.rawPayload);
});
});
describe('js file with placeholder', () => {
it('responds with no content-length and modified file data', async () => {
const basePublicPath = `/${chance.word()}`;
const server = createServer({ basePublicPath });
const response = await server.inject({
url: '/bundles/with_placeholder.js'
});
expect(response.statusCode).to.be(200);
const source = readFileSync(resolve(outputFixture, 'with_placeholder.js'), 'utf8');
expect(response.headers).to.not.have.property('content-length');
expect(response.headers).to.have.property('content-type', 'application/javascript; charset=utf-8');
expect(response.result.indexOf(source)).to.be(-1);
expect(response.result).to.be(replaceAll(
source,
PUBLIC_PATH_PLACEHOLDER,
`${basePublicPath}/bundles/`
));
});
});
describe('css file without placeholder', () => {
it('responds with no content-length and exact file data', async () => {
const server = createServer();
const response = await server.inject({
url: '/bundles/no_placeholder.css'
});
expect(response.statusCode).to.be(200);
expect(response.headers).to.not.have.property('content-length');
expect(response.headers).to.have.property('content-type', 'text/css; charset=utf-8');
expect(readFileSync(resolve(outputFixture, 'no_placeholder.css')))
.to.eql(response.rawPayload);
});
});
describe('css file with placeholder', () => {
it('responds with no content-length and modified file data', async () => {
const basePublicPath = `/${chance.word()}`;
const server = createServer({ basePublicPath });
const response = await server.inject({
url: '/bundles/with_placeholder.css'
});
expect(response.statusCode).to.be(200);
const source = readFileSync(resolve(outputFixture, 'with_placeholder.css'), 'utf8');
expect(response.headers).to.not.have.property('content-length');
expect(response.headers).to.have.property('content-type', 'text/css; charset=utf-8');
expect(response.result.indexOf(source)).to.be(-1);
expect(response.result).to.be(replaceAll(
source,
PUBLIC_PATH_PLACEHOLDER,
`${basePublicPath}/bundles/`
));
});
});
describe('js file outside bundlesPath', () => {
it('responds with a 403', async () => {
const server = createServer();
const response = await server.inject({
url: '/bundles/../outside_output.js'
});
expect(response.statusCode).to.be(403);
expect(response.result).to.eql({
error: 'Forbidden',
message: 'Forbidden',
statusCode: 403
});
});
});
describe('missing js file', () => {
it('responds with 404', async () => {
const server = createServer();
const response = await server.inject({
url: '/bundles/non_existant.js'
});
expect(response.statusCode).to.be(404);
expect(response.result).to.eql({
error: 'Not Found',
message: 'Not Found',
statusCode: 404
});
});
});
describe('missing bundlesPath', () => {
it('responds with 404', async () => {
const server = createServer({
bundlesPath: resolve(__dirname, 'fixtures/not_really_output')
});
const response = await server.inject({
url: '/bundles/with_placeholder.js'
});
expect(response.statusCode).to.be(404);
expect(response.result).to.eql({
error: 'Not Found',
message: 'Not Found',
statusCode: 404
});
});
});
describe('etag', () => {
it('only calculates hash of file on first request', async () => {
const createHash = sandbox.spy(crypto, 'createHash');
const server = createServer();
sinon.assert.notCalled(createHash);
const resp1 = await server.inject({
url: '/bundles/no_placeholder.js'
});
sinon.assert.calledOnce(createHash);
createHash.reset();
expect(resp1.statusCode).to.be(200);
const resp2 = await server.inject({
url: '/bundles/no_placeholder.js'
});
sinon.assert.notCalled(createHash);
expect(resp2.statusCode).to.be(200);
});
it('is unique per basePublicPath although content is the same', async () => {
const basePublicPath1 = `/${chance.word()}`;
const basePublicPath2 = `/${chance.word()}`;
const [resp1, resp2] = await Promise.all([
createServer({ basePublicPath: basePublicPath1 }).inject({
url: '/bundles/no_placeholder.js'
}),
createServer({ basePublicPath: basePublicPath2 }).inject({
url: '/bundles/no_placeholder.js'
}),
]);
expect(resp1.statusCode).to.be(200);
expect(resp2.statusCode).to.be(200);
expect(resp1.rawPayload).to.eql(resp2.rawPayload);
expect(resp1.headers.etag).to.be.a('string');
expect(resp2.headers.etag).to.be.a('string');
expect(resp1.headers.etag).to.not.eql(resp2.headers.etag);
});
});
describe('cache control', () => {
it('responds with 304 when etag and last modified are sent back', async () => {
const server = createServer();
const resp = await server.inject({
url: '/bundles/with_placeholder.js'
});
expect(resp.statusCode).to.be(200);
const resp2 = await server.inject({
url: '/bundles/with_placeholder.js',
headers: {
'if-modified-since': resp.headers['last-modified'],
'if-none-match': resp.headers.etag
}
});
expect(resp2.statusCode).to.be(304);
expect(resp2.result).to.have.length(0);
});
});
});

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.3 KiB

View file

@ -0,0 +1,3 @@
body {
background-color: goldenrod;
}

View file

@ -0,0 +1 @@
console.log('chunk2');

View file

@ -0,0 +1,3 @@
body {
background-image: url(__REPLACE_WITH_PUBLIC_PATH__/image.png);
}

View file

@ -0,0 +1 @@
console.log('__REPLACE_WITH_PUBLIC_PATH__');

View file

@ -0,0 +1 @@
console.log('outside output');

View file

@ -0,0 +1,66 @@
import { isAbsolute, extname } from 'path';
import LruCache from 'lru-cache';
import { createDynamicAssetResponse } from './dynamic_asset_response';
/**
* Creates a route that serves files from `bundlesPath`. If the
* file is js or css then it is searched for instances of
* PUBLIC_PATH_PLACEHOLDER and replaces them with `publicPath`.
* @param {Object} options
* @property {string} options.bundlesPath
* @property {string} options.basePublicPath
* @return {Hapi.RouteConfig}
*/
export function createBundlesRoute({ bundlesPath, basePublicPath }) {
// rather than calculate the fileHash on every request, we
// provide a cache object to `createDynamicAssetResponse()` that
// will store the 100 most recently used hashes.
const fileHashCache = new LruCache(100);
if (typeof bundlesPath !== 'string' || !isAbsolute(bundlesPath)) {
throw new TypeError('bundlesPath must be an absolute path to the directory containing the bundles');
}
if (typeof basePublicPath !== 'string') {
throw new TypeError('basePublicPath must be a string');
}
if (!basePublicPath.match(/(^$|^\/.*[^\/]$)/)) {
throw new TypeError('basePublicPath must be empty OR start and not end with a /');
}
return {
method: 'GET',
path: '/bundles/{path*}',
config: {
auth: false,
ext: {
onPreHandler: {
method(request, reply) {
const ext = extname(request.params.path);
if (ext !== '.js' && ext !== '.css') {
return reply.continue();
}
reply(createDynamicAssetResponse({
request,
bundlesPath,
fileHashCache,
publicPath: `${basePublicPath}/bundles/`
}));
}
}
},
},
handler: {
directory: {
path: bundlesPath,
listing: false,
lookupCompressed: true,
}
}
};
}

View file

@ -0,0 +1,90 @@
import { resolve } from 'path';
import { open, fstat, createReadStream, close } from 'fs';
import Boom from 'boom';
import { fromNode as fcb } from 'bluebird';
import { getFileHash } from './file_hash';
import { replacePlaceholder } from '../public_path_placeholder';
/**
* Create a Hapi response for the requested path. This is designed
* to replicate a subset of the features provided by Hapi's Inert
* plugin including:
* - ensure path is not traversing out of the bundle directory
* - manage use file descriptors for file access to efficiently
* interact with the file multiple times in each request
* - generate and cache etag for the file
* - write correct headers to response for client-side caching
* and invalidation
* - stream file to response
*
* It differs from Inert in some important ways:
* - the PUBLIC_PATH_PLACEHOLDER is replaced with the correct
* public path as the response is streamed
* - cached hash/etag is based on the file on disk, but modified
* by the public path so that individual public paths have
* different etags, but can share a cache
*
* @param {Object} options
* @property {Hapi.Request} options.request
* @property {string} options.bundlesPath
* @property {string} options.publicPath
* @property {LruCache} options.fileHashCache
*/
export async function createDynamicAssetResponse(options) {
const {
request,
bundlesPath,
publicPath,
fileHashCache,
} = options;
let fd;
try {
const path = resolve(bundlesPath, request.params.path);
// prevent path traversal, only process paths that resolve within bundlesPath
if (!path.startsWith(bundlesPath)) {
return Boom.forbidden(null, 'EACCES');
}
// we use and manage a file descriptor mostly because
// that's what Inert does, and since we are accessing
// the file 2 or 3 times per request it seems logical
fd = await fcb(cb => open(path, 'r', cb));
const stat = await fcb(cb => fstat(fd, cb));
const hash = await getFileHash(fileHashCache, path, stat, fd);
const read = createReadStream(null, {
fd,
start: 0,
autoClose: true
});
fd = null; // read stream is now responsible for fd
const response = request.generateResponse(replacePlaceholder(read, publicPath));
response.code(200);
response.etag(`${hash}-${publicPath}`);
response.header('last-modified', stat.mtime.toUTCString());
response.type(request.server.mime.path(path).type);
return response;
} catch (error) {
if (fd) {
try {
await fcb(cb => close(fd, cb));
} catch (error) {
// ignore errors from close, we already have one to report
// and it's very likely they are the same
}
}
if (error.code === 'ENOENT') {
return Boom.notFound();
}
return Boom.boomify(error);
}
}

View file

@ -0,0 +1,45 @@
import { createHash } from 'crypto';
import { createReadStream } from 'fs';
import Rx from 'rxjs/Rx';
const $fromEvent = Rx.Observable.fromEvent;
const $throw = Rx.Observable.throw;
/**
* Get the hash of a file via a file descriptor
* @param {LruCache} cache
* @param {string} path
* @param {Fs.Stat} stat
* @param {Fs.FileDescriptor} fd
* @return {Promise<string>}
*/
export async function getFileHash(cache, path, stat, fd) {
const key = `${path}:${stat.ino}:${stat.size}:${stat.mtime.getTime()}`;
const cached = cache.get(key);
if (cached) {
return await cached;
}
const hash = createHash('sha1');
const read = createReadStream(null, {
fd,
start: 0,
autoClose: false
});
const promise = $fromEvent(read, 'data')
.merge($fromEvent(read, 'error').mergeMap($throw))
.takeUntil($fromEvent(read, 'end'))
.forEach(chunk => hash.update(chunk))
.then(() => hash.digest('hex'))
.catch((error) => {
// don't cache failed attempts
cache.del(key);
throw error;
});
cache.set(key, promise);
return await promise;
}

View file

@ -0,0 +1 @@
export { createBundlesRoute } from './bundles_route';

View file

@ -1,4 +1,5 @@
import FsOptimizer from './fs_optimizer';
import { createBundlesRoute } from './bundles_route';
export default async (kbnServer, server, config) => {
if (!config.get('optimize.enabled')) return;
@ -17,7 +18,11 @@ export default async (kbnServer, server, config) => {
}
const bundles = kbnServer.bundles;
server.exposeStaticDir('/bundles/{path*}', bundles.env.workingDir);
server.route(createBundlesRoute({
bundlesPath: bundles.env.workingDir,
basePublicPath: config.get('server.basePath')
}));
await bundles.writeEntryFiles();
// in prod, only bundle when someing is missing or invalid
@ -37,7 +42,6 @@ export default async (kbnServer, server, config) => {
env: bundles.env,
bundles: bundles,
profile: config.get('optimize.profile'),
urlBasePath: config.get('server.basePath'),
sourceMaps: config.get('optimize.sourceMaps'),
unsafeCache: config.get('optimize.unsafeCache'),
});

View file

@ -3,6 +3,8 @@ import WeirdControlFlow from './weird_control_flow';
import { once } from 'lodash';
import { join } from 'path';
import { createBundlesRoute } from '../bundles_route';
export default class LazyOptimizer extends BaseOptimizer {
constructor(opts) {
super(opts);
@ -67,20 +69,22 @@ export default class LazyOptimizer extends BaseOptimizer {
return join(this.compiler.outputPath, relativePath);
}
bindToServer(server) {
server.route({
path: '/bundles/{asset*}',
method: 'GET',
handler: async (request, reply) => {
try {
const path = await this.getPath(request.params.asset);
return reply.file(path);
} catch (error) {
console.log(error.stack);
return reply(error);
}
}
bindToServer(server, basePath) {
// calling `build.get()` resolves when the build is
// "stable" (the compiler is not running) so this pauses
// all requests received while the compiler is running
// and lets the continue once it is done.
server.ext('onRequest', (request, reply) => {
this.build.get()
.then(() => reply.continue())
.catch(reply);
});
server.route(createBundlesRoute({
bundlesPath: this.compiler.outputPath,
basePublicPath: basePath
}));
}
logRunStart() {

View file

@ -4,7 +4,8 @@ import { fromNode } from 'bluebird';
import registerHapiPlugins from '../../server/http/register_hapi_plugins';
export default class LazyServer {
constructor(host, port, optimizer) {
constructor(host, port, basePath, optimizer) {
this.basePath = basePath;
this.optimizer = optimizer;
this.server = new Server();
@ -18,7 +19,7 @@ export default class LazyServer {
async init() {
await this.optimizer.init();
this.optimizer.bindToServer(this.server);
this.optimizer.bindToServer(this.server, this.basePath);
await fromNode(cb => this.server.start(cb));
}
}

View file

@ -5,6 +5,7 @@ export default async (kbnServer, kibanaHapiServer, config) => {
const server = new LazyServer(
config.get('optimize.lazyHost'),
config.get('optimize.lazyPort'),
config.get('server.basePath'),
new LazyOptimizer({
log: (tags, data) => kibanaHapiServer.log(tags, data),
env: kbnServer.bundles.env,
@ -12,7 +13,6 @@ export default async (kbnServer, kibanaHapiServer, config) => {
profile: config.get('optimize.profile'),
sourceMaps: config.get('optimize.sourceMaps'),
prebuild: config.get('optimize.lazyPrebuild'),
urlBasePath: config.get('server.basePath'),
unsafeCache: config.get('optimize.unsafeCache'),
})
);

View file

@ -0,0 +1,29 @@
import { createReplaceStream } from '../utils';
import Rx from 'rxjs/Rx';
const $fromEvent = Rx.Observable.fromEvent;
export const PUBLIC_PATH_PLACEHOLDER = '__REPLACE_WITH_PUBLIC_PATH__';
export function replacePlaceholder(read, replacement) {
const replace = createReplaceStream(PUBLIC_PATH_PLACEHOLDER, replacement);
// handle errors on the read stream by proxying them
// to the replace stream so that the consumer can
// choose what to do with them.
$fromEvent(read, 'error')
.take(1)
.takeUntil($fromEvent(read, 'end'))
.forEach(error => {
replace.emit('error', error);
replace.end();
});
replace.close = () => {
read.unpipe();
read.close();
};
return read.pipe(replace);
}

View file

@ -27,7 +27,6 @@ export default async (kbnServer, server, config) => {
const bundlerEnv = new UiBundlerEnv(config.get('optimize.bundleDir'));
bundlerEnv.addContext('env', config.get('env.name'));
bundlerEnv.addContext('urlBasePath', config.get('server.basePath'));
bundlerEnv.addContext('sourceMaps', config.get('optimize.sourceMaps'));
bundlerEnv.addContext('kbnVersion', config.get('pkg.version'));
bundlerEnv.addContext('buildNum', config.get('pkg.buildNum'));

View file

@ -25,6 +25,7 @@ export {
createReduceStream,
createSplitStream,
createMapStream,
createReplaceStream,
} from './streams';
export {

View file

@ -0,0 +1,104 @@
import expect from 'expect.js';
import {
createReplaceStream,
createConcatStream,
createPromiseFromStreams,
createListStream,
createMapStream,
} from '../';
async function concatToString(streams) {
return await createPromiseFromStreams([
...streams,
createMapStream(buff => buff.toString('utf8')),
createConcatStream('')
]);
}
describe('replaceStream', () => {
it('produces buffers when it receives buffers', async () => {
const chunks = await createPromiseFromStreams([
createListStream([Buffer.from('foo'), Buffer.from('bar')]),
createReplaceStream('o', '0'),
createConcatStream([])
]);
chunks.forEach(chunk => {
expect(chunk).to.be.a(Buffer);
});
});
it('produces buffers when it receives strings', async () => {
const chunks = await createPromiseFromStreams([
createListStream(['foo', 'bar']),
createReplaceStream('o', '0'),
createConcatStream([])
]);
chunks.forEach(chunk => {
expect(chunk).to.be.a(Buffer);
});
});
it('expects toReplace to be a string', () => {
expect(() => createReplaceStream(Buffer.from('foo')))
.to.throwError(error => {
expect(error.message).to.match(/be a string/);
});
});
it('replaces multiple single-char instances in a single chunk', async () => {
expect(await concatToString([
createListStream([Buffer.from('f00 bar')]),
createReplaceStream('0', 'o'),
])).to.be('foo bar');
});
it('replaces multiple single-char instances in multiple chunks', async () => {
expect(await concatToString([
createListStream([Buffer.from('f0'), Buffer.from('0 bar')]),
createReplaceStream('0', 'o'),
])).to.be('foo bar');
});
it('replaces single multi-char instances in single chunks', async () => {
expect(await concatToString([
createListStream([Buffer.from('f0'), Buffer.from('0 bar')]),
createReplaceStream('0', 'o'),
])).to.be('foo bar');
});
it('replaces multiple multi-char instances in single chunks', async () => {
expect(await concatToString([
createListStream([Buffer.from('foo ba'), Buffer.from('r b'), Buffer.from('az bar')]),
createReplaceStream('bar', '*'),
])).to.be('foo * baz *');
});
it('replaces multi-char instance that stretches multiple chunks', async () => {
expect(await concatToString([
createListStream([
Buffer.from('foo supe'),
Buffer.from('rcalifra'),
Buffer.from('gilistic'),
Buffer.from('expialid'),
Buffer.from('ocious bar'),
]),
createReplaceStream('supercalifragilisticexpialidocious', '*'),
])).to.be('foo * bar');
});
it('ignores missing multi-char instance', async () => {
expect(await concatToString([
createListStream([
Buffer.from('foo supe'),
Buffer.from('rcalifra'),
Buffer.from('gili stic'),
Buffer.from('expialid'),
Buffer.from('ocious bar'),
]),
createReplaceStream('supercalifragilisticexpialidocious', '*'),
])).to.be('foo supercalifragili sticexpialidocious bar');
});
});

View file

@ -6,3 +6,4 @@ export { createJsonParseStream, createJsonStringifyStream } from './json_streams
export { createPromiseFromStreams } from './promise_from_streams';
export { createConcatStream } from './concat_stream';
export { createMapStream } from './map_stream';
export { createReplaceStream } from './replace_stream';

View file

@ -0,0 +1,66 @@
import { Transform } from 'stream';
export function createReplaceStream(toReplace, replacement) {
if (typeof toReplace !== 'string') {
throw new TypeError('toReplace must be a string');
}
let buffer = Buffer.alloc(0);
return new Transform({
objectMode: false,
async transform(value, enc, done) {
try {
buffer = Buffer.concat([buffer, value], buffer.length + value.length);
while (true) {
// try to find the next instance of `toReplace` in buffer
const index = buffer.indexOf(toReplace);
// if there is no next instance, break
if (index === -1) {
break;
}
// flush everything to the left of the next instance
// of `toReplace`
this.push(buffer.slice(0, index));
// then flush an instance of `replacement`
this.push(replacement);
// and finally update the buffer to include everything
// to the right of `toReplace`, dropping to replace from the buffer
buffer = buffer.slice(index + toReplace.length);
}
// until now we have only flushed data that is to the left
// of a discovered instance of `toReplace`. If `toReplace` is
// never found this would lead to us buffering the entire stream.
//
// Instead, we only keep enough buffer to complete a potentially
// patial instance of `toReplace`
if (buffer.length > toReplace.length) {
// the entire buffer except the last `toReplace.length` bytes
// so that if all but one byte from `toReplace` is in the buffer,
// and the next chunk delivers the necessary byte, the buffer will then
// contain a complete `toReplace` token.
this.push(buffer.slice(0, buffer.length - toReplace.length));
buffer = buffer.slice(-toReplace.length);
}
done();
} catch (err) {
done(err);
}
},
flush(callback) {
if (buffer.length) {
this.push(buffer);
}
buffer = null;
callback();
}
});
}