1 | "use strict";
|
2 | const stream_1 = require("stream");
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 | const createReadFn = (stream, otherStream, resolve) => {
|
13 | return () => {
|
14 | let data = stream.stream.read();
|
15 | if (!data) {
|
16 | return stream.stream.once('readable', stream.read);
|
17 | }
|
18 |
|
19 | if (!Buffer.isBuffer(data)) {
|
20 | if (typeof data === 'object') {
|
21 | data = JSON.stringify(data);
|
22 | }
|
23 | else {
|
24 | data = data.toString();
|
25 | }
|
26 | data = Buffer.from(data);
|
27 | }
|
28 | const newPos = stream.pos + data.length;
|
29 | if (stream.pos < otherStream.pos) {
|
30 | let minLength = Math.min(data.length, otherStream.data.length);
|
31 | let streamData = data.slice(0, minLength);
|
32 | stream.data = data.slice(minLength);
|
33 | let otherStreamData = otherStream.data.slice(0, minLength);
|
34 | otherStream.data = otherStream.data.slice(minLength);
|
35 |
|
36 | for (let i = 0, len = streamData.length; i < len; i++) {
|
37 | if (streamData[i] !== otherStreamData[i]) {
|
38 | return resolve(false);
|
39 | }
|
40 | }
|
41 | }
|
42 | else {
|
43 | stream.data = data;
|
44 | }
|
45 | stream.pos = newPos;
|
46 | if (newPos > otherStream.pos) {
|
47 | if (otherStream.ended) {
|
48 |
|
49 |
|
50 | return resolve(false);
|
51 | }
|
52 |
|
53 |
|
54 | otherStream.read();
|
55 | }
|
56 | else {
|
57 | stream.read();
|
58 | }
|
59 | };
|
60 | };
|
61 |
|
62 |
|
63 |
|
64 |
|
65 |
|
66 |
|
67 |
|
68 | const createOnEndFn = (stream, otherStream, resolve) => {
|
69 | return () => {
|
70 | stream.ended = true;
|
71 | if (otherStream.ended) {
|
72 | resolve(stream.pos === otherStream.pos);
|
73 | }
|
74 | else {
|
75 | otherStream.read();
|
76 | }
|
77 | };
|
78 | };
|
79 | module.exports = (stream1, stream2) => new Promise((resolve, reject) => {
|
80 | const readStream1 = stream1.pipe(new stream_1.PassThrough({ objectMode: true }));
|
81 | const readStream2 = stream2.pipe(new stream_1.PassThrough({ objectMode: true }));
|
82 | const cleanup = (equal) => {
|
83 | stream1.removeListener('error', reject);
|
84 | readStream1.removeListener('end', onend1);
|
85 | readStream1.removeListener('readable', streamState1.read);
|
86 | stream2.removeListener('error', reject);
|
87 | readStream2.removeListener('end', onend2);
|
88 | readStream1.removeListener('readable', streamState2.read);
|
89 | resolve(equal);
|
90 | };
|
91 | const streamState1 = {
|
92 | id: 1,
|
93 | stream: readStream1,
|
94 | data: null,
|
95 | pos: 0,
|
96 | ended: false,
|
97 | };
|
98 | const streamState2 = {
|
99 | id: 2,
|
100 | stream: readStream2,
|
101 | data: null,
|
102 | pos: 0,
|
103 | ended: false,
|
104 | };
|
105 | streamState1.read = createReadFn(streamState1, streamState2, cleanup);
|
106 | streamState2.read = createReadFn(streamState2, streamState1, cleanup);
|
107 | const onend1 = createOnEndFn(streamState1, streamState2, cleanup);
|
108 | const onend2 = createOnEndFn(streamState2, streamState1, cleanup);
|
109 | stream1.on('error', reject);
|
110 | readStream1.on('end', onend1);
|
111 | stream2.on('error', reject);
|
112 | readStream2.on('end', onend2);
|
113 |
|
114 | streamState1.stream.once('readable', streamState1.read);
|
115 | });
|
116 |
|
\ | No newline at end of file |