UNPKG

3.99 kBJavaScriptView Raw
1"use strict";
2const stream_1 = require("stream");
3/**
4 * Returns a function that compares emitted `read()` call with that of the
5 * most recent `read` call from another stream.
6 *
7 * @param {StreamState} stream
8 * @param {StreamState} otherStream
9 * @param {Function(boolean)} resolve
10 * @return {Function(Buffer|string)}
11 */
12const createReadFn = (stream, otherStream, resolve) => {
13 return () => {
14 let data = stream.stream.read();
15 if (!data) {
16 return stream.stream.once('readable', stream.read);
17 }
18 // Make sure `data` is a buffer.
19 if (!Buffer.isBuffer(data)) {
20 if (typeof data === 'object') {
21 data = JSON.stringify(data);
22 }
23 else {
24 data = data.toString();
25 }
26 data = Buffer.from(data);
27 }
28 const newPos = stream.pos + data.length;
29 if (stream.pos < otherStream.pos) {
30 let minLength = Math.min(data.length, otherStream.data.length);
31 let streamData = data.slice(0, minLength);
32 stream.data = data.slice(minLength);
33 let otherStreamData = otherStream.data.slice(0, minLength);
34 otherStream.data = otherStream.data.slice(minLength);
35 // Compare.
36 for (let i = 0, len = streamData.length; i < len; i++) {
37 if (streamData[i] !== otherStreamData[i]) {
38 return resolve(false);
39 }
40 }
41 }
42 else {
43 stream.data = data;
44 }
45 stream.pos = newPos;
46 if (newPos > otherStream.pos) {
47 if (otherStream.ended) {
48 // If this stream is still emitting `data` events but the other has
49 // ended, then this is longer than the other one.
50 return resolve(false);
51 }
52 // If this stream has caught up to the other,
53 // read from other one.
54 otherStream.read();
55 }
56 else {
57 stream.read();
58 }
59 };
60};
61/**
62 * Creates a function that gets called when a stream ends.
63 *
64 * @param {StreamState} stream
65 * @param {StreamState} otherStream
66 * @param {Function(boolean)} resolve
67 */
68const createOnEndFn = (stream, otherStream, resolve) => {
69 return () => {
70 stream.ended = true;
71 if (otherStream.ended) {
72 resolve(stream.pos === otherStream.pos);
73 }
74 else {
75 otherStream.read();
76 }
77 };
78};
79module.exports = (stream1, stream2) => new Promise((resolve, reject) => {
80 const readStream1 = stream1.pipe(new stream_1.PassThrough({ objectMode: true }));
81 const readStream2 = stream2.pipe(new stream_1.PassThrough({ objectMode: true }));
82 const cleanup = (equal) => {
83 stream1.removeListener('error', reject);
84 readStream1.removeListener('end', onend1);
85 readStream1.removeListener('readable', streamState1.read);
86 stream2.removeListener('error', reject);
87 readStream2.removeListener('end', onend2);
88 readStream1.removeListener('readable', streamState2.read);
89 resolve(equal);
90 };
91 const streamState1 = {
92 id: 1,
93 stream: readStream1,
94 data: null,
95 pos: 0,
96 ended: false,
97 };
98 const streamState2 = {
99 id: 2,
100 stream: readStream2,
101 data: null,
102 pos: 0,
103 ended: false,
104 };
105 streamState1.read = createReadFn(streamState1, streamState2, cleanup);
106 streamState2.read = createReadFn(streamState2, streamState1, cleanup);
107 const onend1 = createOnEndFn(streamState1, streamState2, cleanup);
108 const onend2 = createOnEndFn(streamState2, streamState1, cleanup);
109 stream1.on('error', reject);
110 readStream1.on('end', onend1);
111 stream2.on('error', reject);
112 readStream2.on('end', onend2);
113 // Start by reading from the first stream.
114 streamState1.stream.once('readable', streamState1.read);
115});
116//# sourceMappingURL=index.js.map
\No newline at end of file