UNPKG

1.93 kBJavaScriptView Raw
1/**
2 * Copyright 2018 Google Inc. All rights reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16const {helper} = require('./helper');
17const EventEmitter = require('events');
18
19class Pipe extends EventEmitter {
20 /**
21 * @param {!NodeJS.WritableStream} pipeWrite
22 * @param {!NodeJS.ReadableStream} pipeRead
23 */
24 constructor(pipeWrite, pipeRead) {
25 super();
26 this._pipeWrite = pipeWrite;
27 this._pendingMessage = '';
28 this._eventListeners = [
29 helper.addEventListener(pipeRead, 'data', buffer => this._dispatch(buffer))
30 ];
31 }
32
33 /**
34 * @param {string} message
35 */
36 send(message) {
37 this._pipeWrite.write(message);
38 this._pipeWrite.write('\0');
39 }
40
41 /**
42 * @param {!Buffer} buffer
43 */
44 _dispatch(buffer) {
45 let end = buffer.indexOf('\0');
46 if (end === -1) {
47 this._pendingMessage += buffer.toString();
48 return;
49 }
50 const message = this._pendingMessage + buffer.toString(undefined, 0, end);
51 this.emit('message', message);
52
53 let start = end + 1;
54 end = buffer.indexOf('\0', start);
55 while (end !== -1) {
56 this.emit('message', buffer.toString(undefined, start, end));
57 start = end + 1;
58 end = buffer.indexOf('\0', start);
59 }
60 this._pendingMessage = buffer.toString(undefined, start);
61 }
62
63 close() {
64 this._pipeWrite = null;
65 helper.removeEventListeners(this._eventListeners);
66 }
67}
68
69module.exports = Pipe;