UNPKG

15.5 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3const tslib_1 = require("tslib");
4const through_1 = tslib_1.__importDefault(require("through"));
5const errors_1 = require("../errors");
6class Peer {
7 constructor(options) {
8 this.endpoint = options.endpoint;
9 this.mutableConnected = false;
10 this.stream = options.stream;
11 this.transform = options.transform;
12 this.buffer = through_1.default();
13 this.mutableConnecting = false;
14 this.mutableClosed = false;
15 this.onErrorInternal = options.onError;
16 this.onCloseInternal = options.onClose;
17 this.transform.on('error', this.onError.bind(this));
18 this.buffer.on('error', this.onError.bind(this));
19 this.buffer.pause();
20 this.stream.pipe(this.transform).pipe(this.buffer);
21 }
22 get connected() {
23 return this.mutableConnected;
24 }
25 async connect() {
26 if (this.mutableConnecting || this.mutableConnected) {
27 return;
28 }
29 this.mutableConnecting = true;
30 this.mutableClosed = false;
31 try {
32 await this.connectInternal();
33 this.stream.on('error', this.onError.bind(this));
34 this.stream.on('close', this.onClose.bind(this));
35 this.mutableConnected = true;
36 }
37 finally {
38 this.mutableConnecting = false;
39 }
40 }
41 close(hadError) {
42 if (this.mutableClosed) {
43 return;
44 }
45 this.mutableClosed = true;
46 if (!this.mutableConnecting && !this.mutableConnected) {
47 return;
48 }
49 this.mutableConnecting = false;
50 this.mutableConnected = false;
51 this.closeInternal(!!hadError);
52 this.stream.unpipe(this.transform);
53 this.transform.unpipe(this.buffer);
54 this.transform.end();
55 this.buffer.end();
56 }
57 write(buffer) {
58 if (!this.mutableConnected || this.mutableClosed) {
59 return;
60 }
61 try {
62 this.stream.write(buffer);
63 }
64 catch (error) {
65 this.onError(error);
66 }
67 }
68 async receiveMessage(timeoutMS) {
69 return new Promise((resolve, reject) => {
70 let handled = false;
71 const cleanup = () => {
72 this.buffer.pause();
73 handled = true;
74 this.stream.removeListener('error', onError);
75 this.transform.removeListener('error', onError);
76 this.buffer.removeListener('error', onError);
77 this.buffer.removeListener('data', onDataReceived);
78 };
79 const onError = (error) => {
80 if (!handled) {
81 cleanup();
82 reject(error);
83 }
84 };
85 const onDataReceived = (data) => {
86 if (!handled) {
87 cleanup();
88 resolve(data);
89 }
90 };
91 this.stream.once('error', onError);
92 this.transform.once('error', onError);
93 this.buffer.once('error', onError);
94 this.buffer.once('data', onDataReceived);
95 this.buffer.resume();
96 if (timeoutMS !== undefined) {
97 setTimeout(() => onError(new errors_1.ReceiveMessageTimeoutError(this.endpoint)), timeoutMS);
98 }
99 });
100 }
101 streamData(onDataReceived) {
102 this.buffer.on('data', onDataReceived);
103 this.buffer.resume();
104 return {
105 unsubscribe: () => {
106 this.buffer.pause();
107 this.buffer.removeListener('data', onDataReceived);
108 },
109 };
110 }
111 onError(error) {
112 this.close(true);
113 this.onErrorInternal(this, error);
114 }
115 onClose() {
116 this.mutableClosed = true;
117 this.onCloseInternal(this);
118 }
119}
120exports.Peer = Peer;
121
122//# sourceMappingURL=data:application/json;charset=utf8;base64,{"version":3,"sources":["Peer.ts"],"names":[],"mappings":";;;AACA,8DAA8B;AAC9B,sCAAuD;AAWvD,MAAsB,IAAI;IAWxB,YAAmB,OAA6B;QAC9C,IAAI,CAAC,QAAQ,GAAG,OAAO,CAAC,QAAQ,CAAC;QACjC,IAAI,CAAC,gBAAgB,GAAG,KAAK,CAAC;QAE9B,IAAI,CAAC,MAAM,GAAG,OAAO,CAAC,MAAM,CAAC;QAC7B,IAAI,CAAC,SAAS,GAAG,OAAO,CAAC,SAAS,CAAC;QACnC,IAAI,CAAC,MAAM,GAAG,iBAAO,EAAE,CAAC;QAExB,IAAI,CAAC,iBAAiB,GAAG,KAAK,CAAC;QAC/B,IAAI,CAAC,aAAa,GAAG,KAAK,CAAC;QAE3B,IAAI,CAAC,eAAe,GAAG,OAAO,CAAC,OAAO,CAAC;QACvC,IAAI,CAAC,eAAe,GAAG,OAAO,CAAC,OAAO,CAAC;QAEvC,IAAI,CAAC,SAAS,CAAC,EAAE,CAAC,OAAO,EAAE,IAAI,CAAC,OAAO,CAAC,IAAI,CAAC,IAAI,CAAC,CAAC,CAAC;QACpD,IAAI,CAAC,MAAM,CAAC,EAAE,CAAC,OAAO,EAAE,IAAI,CAAC,OAAO,CAAC,IAAI,CAAC,IAAI,CAAC,CAAC,CAAC;QACjD,IAAI,CAAC,MAAM,CAAC,KAAK,EAAE,CAAC;QACpB,IAAI,CAAC,MAAM,CAAC,IAAI,CAAC,IAAI,CAAC,SAAS,CAAC,CAAC,IAAI,CAAC,IAAI,CAAC,MAAM,CAAC,CAAC;IACrD,CAAC;IAED,IAAW,SAAS;QAClB,OAAO,IAAI,CAAC,gBAAgB,CAAC;IAC/B,CAAC;IAEM,KAAK,CAAC,OAAO;QAClB,IAAI,IAAI,CAAC,iBAAiB,IAAI,IAAI,CAAC,gBAAgB,EAAE;YACnD,OAAO;SACR;QACD,IAAI,CAAC,iBAAiB,GAAG,IAAI,CAAC;QAC9B,IAAI,CAAC,aAAa,GAAG,KAAK,CAAC;QAE3B,IAAI;YACF,MAAM,IAAI,CAAC,eAAe,EAAE,CAAC;YAC7B,IAAI,CAAC,MAAM,CAAC,EAAE,CAAC,OAAO,EAAE,IAAI,CAAC,OAAO,CAAC,IAAI,CAAC,IAAI,CAAC,CAAC,CAAC;YACjD,IAAI,CAAC,MAAM,CAAC,EAAE,CAAC,OAAO,EAAE,IAAI,CAAC,OAAO,CAAC,IAAI,CAAC,IAAI,CAAC,CAAC,CAAC;YACjD,IAAI,CAAC,gBAAgB,GAAG,IAAI,CAAC;SAC9B;gBAAS;YACR,IAAI,CAAC,iBAAiB,GAAG,KAAK,CAAC;SAChC;IACH,CAAC;IAEM,KAAK,CAAC,QAAkB;QAC7B,IAAI,IAAI,CAAC,aAAa,EAAE;YACtB,OAAO;SACR;QACD,IAAI,CAAC,aAAa,GAAG,IAAI,CAAC;QAE1B,IAAI,CAAC,IAAI,CAAC,iBAAiB,IAAI,CAAC,IAAI,CAAC,gBAAgB,EAAE;YACrD,OAAO;SACR;QACD,IAAI,CAAC,iBAAiB,GAAG,KAAK,CAAC;QAC/B,IAAI,CAAC,gBAAgB,GAAG,KAAK,CAAC;QAE9B,IAAI,CAAC,aAAa,CAAC,CAAC,CAAC,QAAQ,CAAC,CAAC;QAC/B,IAAI,CAAC,MAAM,CAAC,MAAM,CAAC,IAAI,CAAC,SAAS,CAAC,CAAC;QACnC,IAAI,CAAC,SAAS,CAAC,MAAM,CAAC,IAAI,CAAC,MAAM,CAAC,CAAC;QACnC,IAAI,CAAC,SAAS,CAAC,GAAG,EAAE,CAAC;QACrB,IAAI,CAAC,MAAM,CAAC,GAAG,EAAE,CAAC;IACpB,CAAC;IAEM,KAAK,CAAC,MAAc;QACzB,IAAI,CAAC,IAAI,CAAC,gBAAgB,IAAI,IAAI,CAAC,aAAa,EAAE;YAChD,OAAO;SACR;QAED,IAAI;YACF,IAAI,CAAC,MAAM,CAAC,KAAK,CAAC,MAAM,CAAC,CAAC;SAC3B;QAAC,OAAO,KAAK,EAAE;YACd,IAAI,CAAC,OAAO,CAAC,KAAK,CAAC,CAAC;SACrB;IACH,CAAC;IAEM,KAAK,CAAC,cAAc,CAAC,SAAkB;QAC5C,OAAO,IAAI,OAAO,CAAU,CAAC,OAAO,EAAE,MAAM,EAAE,EAAE;YAC9C,IAAI,OAAO,GAAG,KAAK,CAAC;YACpB,MAAM,OAAO,GAAG,GAAG,EAAE;gBACnB,IAAI,CAAC,MAAM,CAAC,KAAK,EAAE,CAAC;gBACpB,OAAO,GAAG,IAAI,CAAC;gBAEf,IAAI,CAAC,MAAM,CAAC,cAAc,CAAC,OAAO,EAAE,OAAO,CAAC,CAAC;gBAE7C,IAAI,CAAC,SAAS,CAAC,cAAc,CAAC,OAAO,EAAE,OAAO,CAAC,CAAC;gBAEhD,IAAI,CAAC,MAAM,CAAC,cAAc,CAAC,OAAO,EAAE,OAAO,CAAC,CAAC;gBAE7C,IAAI,CAAC,MAAM,CAAC,cAAc,CAAC,MAAM,EAAE,cAAc,CAAC,CAAC;YACrD,CAAC,CAAC;YAEF,MAAM,OAAO,GAAG,CAAC,KAAY,EAAE,EAAE;gBAC/B,IAAI,CAAC,OAAO,EAAE;oBACZ,OAAO,EAAE,CAAC;oBACV,MAAM,CAAC,KAAK,CAAC,CAAC;iBACf;YACH,CAAC,CAAC;YAEF,MAAM,cAAc,GAAG,CAAC,IAAa,EAAE,EAAE;gBACvC,IAAI,CAAC,OAAO,EAAE;oBACZ,OAAO,EAAE,CAAC;oBACV,OAAO,CAAC,IAAI,CAAC,CAAC;iBACf;YACH,CAAC,CAAC;YAEF,IAAI,CAAC,MAAM,CAAC,IAAI,CAAC,OAAO,EAAE,OAAO,CAAC,CAAC;YACnC,IAAI,CAAC,SAAS,CAAC,IAAI,CAAC,OAAO,EAAE,OAAO,CAAC,CAAC;YACtC,IAAI,CAAC,MAAM,CAAC,IAAI,CAAC,OAAO,EAAE,OAAO,CAAC,CAAC;YACnC,IAAI,CAAC,MAAM,CAAC,IAAI,CAAC,MAAM,EAAE,cAAc,CAAC,CAAC;YACzC,IAAI,CAAC,MAAM,CAAC,MAAM,EAAE,CAAC;YAErB,IAAI,SAAS,KAAK,SAAS,EAAE;gBAC3B,UAAU,CAAC,GAAG,EAAE,CAAC,OAAO,CAAC,IAAI,mCAA0B,CAAC,IAAI,CAAC,QAAQ,CAAC,CAAC,EAAE,SAAS,CAAC,CAAC;aACrF;QACH,CAAC,CAAC,CAAC;IACL,CAAC;IAEM,UAAU,CAAC,cAAuC;QACvD,IAAI,CAAC,MAAM,CAAC,EAAE,CAAC,MAAM,EAAE,cAAc,CAAC,CAAC;QACvC,IAAI,CAAC,MAAM,CAAC,MAAM,EAAE,CAAC;QAErB,OAAO;YACL,WAAW,EAAE,GAAG,EAAE;gBAChB,IAAI,CAAC,MAAM,CAAC,KAAK,EAAE,CAAC;gBACpB,IAAI,CAAC,MAAM,CAAC,cAAc,CAAC,MAAM,EAAE,cAAc,CAAC,CAAC;YACrD,CAAC;SACF,CAAC;IACJ,CAAC;IAKS,OAAO,CAAC,KAAY;QAC5B,IAAI,CAAC,KAAK,CAAC,IAAI,CAAC,CAAC;QACjB,IAAI,CAAC,eAAe,CAAC,IAAI,EAAE,KAAK,CAAC,CAAC;IACpC,CAAC;IAEO,OAAO;QACb,IAAI,CAAC,aAAa,GAAG,IAAI,CAAC;QAC1B,IAAI,CAAC,eAAe,CAAC,IAAI,CAAC,CAAC;IAC7B,CAAC;CACF;AArJD,oBAqJC","file":"neo-one-node-core/src/net/Peer.js","sourcesContent":["import { Duplex } from 'stream';\nimport through from 'through';\nimport { ReceiveMessageTimeoutError } from '../errors';\nimport { Endpoint } from './types';\n\nexport interface PeerOptions<Message> {\n  readonly endpoint: Endpoint;\n  readonly stream: Duplex;\n  readonly transform: Duplex;\n  readonly onError: (peer: Peer<Message>, error: Error) => void;\n  readonly onClose: (peer: Peer<Message>) => void;\n}\n\nexport abstract class Peer<Message> {\n  public readonly endpoint: Endpoint;\n  private readonly stream: Duplex;\n  private readonly transform: Duplex;\n  private readonly buffer: Duplex;\n  private mutableConnected: boolean;\n  private mutableConnecting: boolean;\n  private mutableClosed: boolean;\n  private readonly onErrorInternal: (peer: Peer<Message>, error: Error) => void;\n  private readonly onCloseInternal: (peer: Peer<Message>) => void;\n\n  public constructor(options: PeerOptions<Message>) {\n    this.endpoint = options.endpoint;\n    this.mutableConnected = false;\n\n    this.stream = options.stream;\n    this.transform = options.transform;\n    this.buffer = through();\n\n    this.mutableConnecting = false;\n    this.mutableClosed = false;\n\n    this.onErrorInternal = options.onError;\n    this.onCloseInternal = options.onClose;\n\n    this.transform.on('error', this.onError.bind(this));\n    this.buffer.on('error', this.onError.bind(this));\n    this.buffer.pause();\n    this.stream.pipe(this.transform).pipe(this.buffer);\n  }\n\n  public get connected(): boolean {\n    return this.mutableConnected;\n  }\n\n  public async connect(): Promise<void> {\n    if (this.mutableConnecting || this.mutableConnected) {\n      return;\n    }\n    this.mutableConnecting = true;\n    this.mutableClosed = false;\n\n    try {\n      await this.connectInternal();\n      this.stream.on('error', this.onError.bind(this));\n      this.stream.on('close', this.onClose.bind(this));\n      this.mutableConnected = true;\n    } finally {\n      this.mutableConnecting = false;\n    }\n  }\n\n  public close(hadError?: boolean): void {\n    if (this.mutableClosed) {\n      return;\n    }\n    this.mutableClosed = true;\n\n    if (!this.mutableConnecting && !this.mutableConnected) {\n      return;\n    }\n    this.mutableConnecting = false;\n    this.mutableConnected = false;\n\n    this.closeInternal(!!hadError);\n    this.stream.unpipe(this.transform);\n    this.transform.unpipe(this.buffer);\n    this.transform.end();\n    this.buffer.end();\n  }\n\n  public write(buffer: Buffer): void {\n    if (!this.mutableConnected || this.mutableClosed) {\n      return;\n    }\n\n    try {\n      this.stream.write(buffer);\n    } catch (error) {\n      this.onError(error);\n    }\n  }\n\n  public async receiveMessage(timeoutMS?: number): Promise<Message> {\n    return new Promise<Message>((resolve, reject) => {\n      let handled = false;\n      const cleanup = () => {\n        this.buffer.pause();\n        handled = true;\n        // eslint-disable-next-line\n        this.stream.removeListener('error', onError);\n        // eslint-disable-next-line\n        this.transform.removeListener('error', onError);\n        // eslint-disable-next-line\n        this.buffer.removeListener('error', onError);\n        // eslint-disable-next-line\n        this.buffer.removeListener('data', onDataReceived);\n      };\n\n      const onError = (error: Error) => {\n        if (!handled) {\n          cleanup();\n          reject(error);\n        }\n      };\n\n      const onDataReceived = (data: Message) => {\n        if (!handled) {\n          cleanup();\n          resolve(data);\n        }\n      };\n\n      this.stream.once('error', onError);\n      this.transform.once('error', onError);\n      this.buffer.once('error', onError);\n      this.buffer.once('data', onDataReceived);\n      this.buffer.resume();\n\n      if (timeoutMS !== undefined) {\n        setTimeout(() => onError(new ReceiveMessageTimeoutError(this.endpoint)), timeoutMS);\n      }\n    });\n  }\n\n  public streamData(onDataReceived: (data: Message) => void): { readonly unsubscribe: () => void } {\n    this.buffer.on('data', onDataReceived);\n    this.buffer.resume();\n\n    return {\n      unsubscribe: () => {\n        this.buffer.pause();\n        this.buffer.removeListener('data', onDataReceived);\n      },\n    };\n  }\n\n  protected abstract async connectInternal(): Promise<void>;\n  protected abstract closeInternal(hadError: boolean): void;\n\n  protected onError(error: Error): void {\n    this.close(true);\n    this.onErrorInternal(this, error);\n  }\n\n  private onClose(): void {\n    this.mutableClosed = true;\n    this.onCloseInternal(this);\n  }\n}\n"]}