1 | ;
|
2 | Object.defineProperty(exports, "__esModule", { value: true });
|
3 | const tslib_1 = require("tslib");
|
4 | const through_1 = tslib_1.__importDefault(require("through"));
|
5 | const errors_1 = require("../errors");
|
6 | class Peer {
|
7 | constructor(options) {
|
8 | this.endpoint = options.endpoint;
|
9 | this.mutableConnected = false;
|
10 | this.stream = options.stream;
|
11 | this.transform = options.transform;
|
12 | this.buffer = through_1.default();
|
13 | this.mutableConnecting = false;
|
14 | this.mutableClosed = false;
|
15 | this.onErrorInternal = options.onError;
|
16 | this.onCloseInternal = options.onClose;
|
17 | this.transform.on('error', this.onError.bind(this));
|
18 | this.buffer.on('error', this.onError.bind(this));
|
19 | this.buffer.pause();
|
20 | this.stream.pipe(this.transform).pipe(this.buffer);
|
21 | }
|
22 | get connected() {
|
23 | return this.mutableConnected;
|
24 | }
|
25 | async connect() {
|
26 | if (this.mutableConnecting || this.mutableConnected) {
|
27 | return;
|
28 | }
|
29 | this.mutableConnecting = true;
|
30 | this.mutableClosed = false;
|
31 | try {
|
32 | await this.connectInternal();
|
33 | this.stream.on('error', this.onError.bind(this));
|
34 | this.stream.on('close', this.onClose.bind(this));
|
35 | this.mutableConnected = true;
|
36 | }
|
37 | finally {
|
38 | this.mutableConnecting = false;
|
39 | }
|
40 | }
|
41 | close(hadError) {
|
42 | if (this.mutableClosed) {
|
43 | return;
|
44 | }
|
45 | this.mutableClosed = true;
|
46 | if (!this.mutableConnecting && !this.mutableConnected) {
|
47 | return;
|
48 | }
|
49 | this.mutableConnecting = false;
|
50 | this.mutableConnected = false;
|
51 | this.closeInternal(!!hadError);
|
52 | this.stream.unpipe(this.transform);
|
53 | this.transform.unpipe(this.buffer);
|
54 | this.transform.end();
|
55 | this.buffer.end();
|
56 | }
|
57 | write(buffer) {
|
58 | if (!this.mutableConnected || this.mutableClosed) {
|
59 | return;
|
60 | }
|
61 | try {
|
62 | this.stream.write(buffer);
|
63 | }
|
64 | catch (error) {
|
65 | this.onError(error);
|
66 | }
|
67 | }
|
68 | async receiveMessage(timeoutMS) {
|
69 | return new Promise((resolve, reject) => {
|
70 | let handled = false;
|
71 | const cleanup = () => {
|
72 | this.buffer.pause();
|
73 | handled = true;
|
74 | this.stream.removeListener('error', onError);
|
75 | this.transform.removeListener('error', onError);
|
76 | this.buffer.removeListener('error', onError);
|
77 | this.buffer.removeListener('data', onDataReceived);
|
78 | };
|
79 | const onError = (error) => {
|
80 | if (!handled) {
|
81 | cleanup();
|
82 | reject(error);
|
83 | }
|
84 | };
|
85 | const onDataReceived = (data) => {
|
86 | if (!handled) {
|
87 | cleanup();
|
88 | resolve(data);
|
89 | }
|
90 | };
|
91 | this.stream.once('error', onError);
|
92 | this.transform.once('error', onError);
|
93 | this.buffer.once('error', onError);
|
94 | this.buffer.once('data', onDataReceived);
|
95 | this.buffer.resume();
|
96 | if (timeoutMS !== undefined) {
|
97 | setTimeout(() => onError(new errors_1.ReceiveMessageTimeoutError(this.endpoint)), timeoutMS);
|
98 | }
|
99 | });
|
100 | }
|
101 | streamData(onDataReceived) {
|
102 | this.buffer.on('data', onDataReceived);
|
103 | this.buffer.resume();
|
104 | return {
|
105 | unsubscribe: () => {
|
106 | this.buffer.pause();
|
107 | this.buffer.removeListener('data', onDataReceived);
|
108 | },
|
109 | };
|
110 | }
|
111 | onError(error) {
|
112 | this.close(true);
|
113 | this.onErrorInternal(this, error);
|
114 | }
|
115 | onClose() {
|
116 | this.mutableClosed = true;
|
117 | this.onCloseInternal(this);
|
118 | }
|
119 | }
|
120 | exports.Peer = Peer;
|
121 |
|
122 | //# sourceMappingURL=data:application/json;charset=utf8;base64,{"version":3,"sources":["Peer.ts"],"names":[],"mappings":";;;AACA,8DAA8B;AAC9B,sCAAuD;AAWvD,MAAsB,IAAI;IAWxB,YAAmB,OAA6B;QAC9C,IAAI,CAAC,QAAQ,GAAG,OAAO,CAAC,QAAQ,CAAC;QACjC,IAAI,CAAC,gBAAgB,GAAG,KAAK,CAAC;QAE9B,IAAI,CAAC,MAAM,GAAG,OAAO,CAAC,MAAM,CAAC;QAC7B,IAAI,CAAC,SAAS,GAAG,OAAO,CAAC,SAAS,CAAC;QACnC,IAAI,CAAC,MAAM,GAAG,iBAAO,EAAE,CAAC;QAExB,IAAI,CAAC,iBAAiB,GAAG,KAAK,CAAC;QAC/B,IAAI,CAAC,aAAa,GAAG,KAAK,CAAC;QAE3B,IAAI,CAAC,eAAe,GAAG,OAAO,CAAC,OAAO,CAAC;QACvC,IAAI,CAAC,eAAe,GAAG,OAAO,CAAC,OAAO,CAAC;QAEvC,IAAI,CAAC,SAAS,CAAC,EAAE,CAAC,OAAO,EAAE,IAAI,CAAC,OAAO,CAAC,IAAI,CAAC,IAAI,CAAC,CAAC,CAAC;QACpD,IAAI,CAAC,MAAM,CAAC,EAAE,CAAC,OAAO,EAAE,IAAI,CAAC,OAAO,CAAC,IAAI,CAAC,IAAI,CAAC,CAAC,CAAC;QACjD,IAAI,CAAC,MAAM,CAAC,KAAK,EAAE,CAAC;QACpB,IAAI,CAAC,MAAM,CAAC,IAAI,CAAC,IAAI,CAAC,SAAS,CAAC,CAAC,IAAI,CAAC,IAAI,CAAC,MAAM,CAAC,CAAC;IACrD,CAAC;IAED,IAAW,SAAS;QAClB,OAAO,IAAI,CAAC,gBAAgB,CAAC;IAC/B,CAAC;IAEM,KAAK,CAAC,OAAO;QAClB,IAAI,IAAI,CAAC,iBAAiB,IAAI,IAAI,CAAC,gBAAgB,EAAE;YACnD,OAAO;SACR;QACD,IAAI,CAAC,iBAAiB,GAAG,IAAI,CAAC;QAC9B,IAAI,CAAC,aAAa,GAAG,KAAK,CAAC;QAE3B,IAAI;YACF,MAAM,IAAI,CAAC,eAAe,EAAE,CAAC;YAC7B,IAAI,CAAC,MAAM,CAAC,EAAE,CAAC,OAAO,EAAE,IAAI,CAAC,OAAO,CAAC,IAAI,CAAC,IAAI,CAAC,CAAC,CAAC;YACjD,IAAI,CAAC,MAAM,CAAC,EAAE,CAAC,OAAO,EAAE,IAAI,CAAC,OAAO,CAAC,IAAI,CAAC,IAAI,CAAC,CAAC,CAAC;YACjD,IAAI,CAAC,gBAAgB,GAAG,IAAI,CAAC;SAC9B;gBAAS;YACR,IAAI,CAAC,iBAAiB,GAAG,KAAK,CAAC;SAChC;IACH,CAAC;IAEM,KAAK,CAAC,QAAkB;QAC7B,IAAI,IAAI,CAAC,aAAa,EAAE;YACtB,OAAO;SACR;QACD,IAAI,CAAC,aAAa,GAAG,IAAI,CAAC;QAE1B,IAAI,CAAC,IAAI,CAAC,iBAAiB,IAAI,CAAC,IAAI,CAAC,gBAAgB,EAAE;YACrD,OAAO;SACR;QACD,IAAI,CAAC,iBAAiB,GAAG,KAAK,CAAC;QAC/B,IAAI,CAAC,gBAAgB,GAAG,KAAK,CAAC;QAE9B,IAAI,CAAC,aAAa,CAAC,CAAC,CAAC,QAAQ,CAAC,CAAC;QAC/B,IAAI,CAAC,MAAM,CAAC,MAAM,CAAC,IAAI,CAAC,SAAS,CAAC,CAAC;QACnC,IAAI,CAAC,SAAS,CAAC,MAAM,CAAC,IAAI,CAAC,MAAM,CAAC,CAAC;QACnC,IAAI,CAAC,SAAS,CAAC,GAAG,EAAE,CAAC;QACrB,IAAI,CAAC,MAAM,CAAC,GAAG,EAAE,CAAC;IACpB,CAAC;IAEM,KAAK,CAAC,MAAc;QACzB,IAAI,CAAC,IAAI,CAAC,gBAAgB,IAAI,IAAI,CAAC,aAAa,EAAE;YAChD,OAAO;SACR;QAED,IAAI;YACF,IAAI,CAAC,MAAM,CAAC,KAAK,CAAC,MAAM,CAAC,CAAC;SAC3B;QAAC,OAAO,KAAK,EAAE;YACd,IAAI,CAAC,OAAO,CAAC,KAAK,CAAC,CAAC;SACrB;IACH,CAAC;IAEM,KAAK,CAAC,cAAc,CAAC,SAAkB;QAC5C,OAAO,IAAI,OAAO,CAAU,CAAC,OAAO,EAAE,MAAM,EAAE,EAAE;YAC9C,IAAI,OAAO,GAAG,KAAK,CAAC;YACpB,MAAM,OAAO,GAAG,GAAG,EAAE;gBACnB,IAAI,CAAC,MAAM,CAAC,KAAK,EAAE,CAAC;gBACpB,OAAO,GAAG,IAAI,CAAC;gBAEf,IAAI,CAAC,MAAM,CAAC,cAAc,CAAC,OAAO,EAAE,OAAO,CAAC,CAAC;gBAE7C,IAAI,CAAC,SAAS,CAAC,cAAc,CAAC,OAAO,EAAE,OAAO,CAAC,CAAC;gBAEhD,IAAI,CAAC,MAAM,CAAC,cAAc,CAAC,OAAO,EAAE,OAAO,CAAC,CAAC;gBAE7C,IAAI,CAAC,MAAM,CAAC,cAAc,CAAC,MAAM,EAAE,cAAc,CAAC,CAAC;YACrD,CAAC,CAAC;YAEF,MAAM,OAAO,GAAG,CAAC,KAAY,EAAE,EAAE;gBAC/B,IAAI,CAAC,OAAO,EAAE;oBACZ,OAAO,EAAE,CAAC;oBACV,MAAM,CAAC,KAAK,CAAC,CAAC;iBACf;YACH,CAAC,CAAC;YAEF,MAAM,cAAc,GAAG,CAAC,IAAa,EAAE,EAAE;gBACvC,IAAI,CAAC,OAAO,EAAE;oBACZ,OAAO,EAAE,CAAC;oBACV,OAAO,CAAC,IAAI,CAAC,CAAC;iBACf;YACH,CAAC,CAAC;YAEF,IAAI,CAAC,MAAM,CAAC,IAAI,CAAC,OAAO,EAAE,OAAO,CAAC,CAAC;YACnC,IAAI,CAAC,SAAS,CAAC,IAAI,CAAC,OAAO,EAAE,OAAO,CAAC,CAAC;YACtC,IAAI,CAAC,MAAM,CAAC,IAAI,CAAC,OAAO,EAAE,OAAO,CAAC,CAAC;YACnC,IAAI,CAAC,MAAM,CAAC,IAAI,CAAC,MAAM,EAAE,cAAc,CAAC,CAAC;YACzC,IAAI,CAAC,MAAM,CAAC,MAAM,EAAE,CAAC;YAErB,IAAI,SAAS,KAAK,SAAS,EAAE;gBAC3B,UAAU,CAAC,GAAG,EAAE,CAAC,OAAO,CAAC,IAAI,mCAA0B,CAAC,IAAI,CAAC,QAAQ,CAAC,CAAC,EAAE,SAAS,CAAC,CAAC;aACrF;QACH,CAAC,CAAC,CAAC;IACL,CAAC;IAEM,UAAU,CAAC,cAAuC;QACvD,IAAI,CAAC,MAAM,CAAC,EAAE,CAAC,MAAM,EAAE,cAAc,CAAC,CAAC;QACvC,IAAI,CAAC,MAAM,CAAC,MAAM,EAAE,CAAC;QAErB,OAAO;YACL,WAAW,EAAE,GAAG,EAAE;gBAChB,IAAI,CAAC,MAAM,CAAC,KAAK,EAAE,CAAC;gBACpB,IAAI,CAAC,MAAM,CAAC,cAAc,CAAC,MAAM,EAAE,cAAc,CAAC,CAAC;YACrD,CAAC;SACF,CAAC;IACJ,CAAC;IAKS,OAAO,CAAC,KAAY;QAC5B,IAAI,CAAC,KAAK,CAAC,IAAI,CAAC,CAAC;QACjB,IAAI,CAAC,eAAe,CAAC,IAAI,EAAE,KAAK,CAAC,CAAC;IACpC,CAAC;IAEO,OAAO;QACb,IAAI,CAAC,aAAa,GAAG,IAAI,CAAC;QAC1B,IAAI,CAAC,eAAe,CAAC,IAAI,CAAC,CAAC;IAC7B,CAAC;CACF;AArJD,oBAqJC","file":"neo-one-node-core/src/net/Peer.js","sourcesContent":["import { Duplex } from 'stream';\nimport through from 'through';\nimport { ReceiveMessageTimeoutError } from '../errors';\nimport { Endpoint } from './types';\n\nexport interface PeerOptions<Message> {\n  readonly endpoint: Endpoint;\n  readonly stream: Duplex;\n  readonly transform: Duplex;\n  readonly onError: (peer: Peer<Message>, error: Error) => void;\n  readonly onClose: (peer: Peer<Message>) => void;\n}\n\nexport abstract class Peer<Message> {\n  public readonly endpoint: Endpoint;\n  private readonly stream: Duplex;\n  private readonly transform: Duplex;\n  private readonly buffer: Duplex;\n  private mutableConnected: boolean;\n  private mutableConnecting: boolean;\n  private mutableClosed: boolean;\n  private readonly onErrorInternal: (peer: Peer<Message>, error: Error) => void;\n  private readonly onCloseInternal: (peer: Peer<Message>) => void;\n\n  public constructor(options: PeerOptions<Message>) {\n    this.endpoint = options.endpoint;\n    this.mutableConnected = false;\n\n    this.stream = options.stream;\n    this.transform = options.transform;\n    this.buffer = through();\n\n    this.mutableConnecting = false;\n    this.mutableClosed = false;\n\n    this.onErrorInternal = options.onError;\n    this.onCloseInternal = options.onClose;\n\n    this.transform.on('error', this.onError.bind(this));\n    this.buffer.on('error', this.onError.bind(this));\n    this.buffer.pause();\n    this.stream.pipe(this.transform).pipe(this.buffer);\n  }\n\n  public get connected(): boolean {\n    return this.mutableConnected;\n  }\n\n  public async connect(): Promise<void> {\n    if (this.mutableConnecting || this.mutableConnected) {\n      return;\n    }\n    this.mutableConnecting = true;\n    this.mutableClosed = false;\n\n    try {\n      await this.connectInternal();\n      this.stream.on('error', this.onError.bind(this));\n      this.stream.on('close', this.onClose.bind(this));\n      this.mutableConnected = true;\n    } finally {\n      this.mutableConnecting = false;\n    }\n  }\n\n  public close(hadError?: boolean): void {\n    if (this.mutableClosed) {\n      return;\n    }\n    this.mutableClosed = true;\n\n    if (!this.mutableConnecting && !this.mutableConnected) {\n      return;\n    }\n    this.mutableConnecting = false;\n    this.mutableConnected = false;\n\n    this.closeInternal(!!hadError);\n    this.stream.unpipe(this.transform);\n    this.transform.unpipe(this.buffer);\n    this.transform.end();\n    this.buffer.end();\n  }\n\n  public write(buffer: Buffer): void {\n    if (!this.mutableConnected || this.mutableClosed) {\n      return;\n    }\n\n    try {\n      this.stream.write(buffer);\n    } catch (error) {\n      this.onError(error);\n    }\n  }\n\n  public async receiveMessage(timeoutMS?: number): Promise<Message> {\n    return new Promise<Message>((resolve, reject) => {\n      let handled = false;\n      const cleanup = () => {\n        this.buffer.pause();\n        handled = true;\n        // eslint-disable-next-line\n        this.stream.removeListener('error', onError);\n        // eslint-disable-next-line\n        this.transform.removeListener('error', onError);\n        // eslint-disable-next-line\n        this.buffer.removeListener('error', onError);\n        // eslint-disable-next-line\n        this.buffer.removeListener('data', onDataReceived);\n      };\n\n      const onError = (error: Error) => {\n        if (!handled) {\n          cleanup();\n          reject(error);\n        }\n      };\n\n      const onDataReceived = (data: Message) => {\n        if (!handled) {\n          cleanup();\n          resolve(data);\n        }\n      };\n\n      this.stream.once('error', onError);\n      this.transform.once('error', onError);\n      this.buffer.once('error', onError);\n      this.buffer.once('data', onDataReceived);\n      this.buffer.resume();\n\n      if (timeoutMS !== undefined) {\n        setTimeout(() => onError(new ReceiveMessageTimeoutError(this.endpoint)), timeoutMS);\n      }\n    });\n  }\n\n  public streamData(onDataReceived: (data: Message) => void): { readonly unsubscribe: () => void } {\n    this.buffer.on('data', onDataReceived);\n    this.buffer.resume();\n\n    return {\n      unsubscribe: () => {\n        this.buffer.pause();\n        this.buffer.removeListener('data', onDataReceived);\n      },\n    };\n  }\n\n  protected abstract async connectInternal(): Promise<void>;\n  protected abstract closeInternal(hadError: boolean): void;\n\n  protected onError(error: Error): void {\n    this.close(true);\n    this.onErrorInternal(this, error);\n  }\n\n  private onClose(): void {\n    this.mutableClosed = true;\n    this.onCloseInternal(this);\n  }\n}\n"]}
|