UNPKG

8.15 kBJavaScriptView Raw
1"use strict";
2/* --------------------------------------------------------------------------------------------
3 * Copyright (c) Microsoft Corporation. All rights reserved.
4 * Licensed under the MIT License. See License.txt in the project root for license information.
5 * ------------------------------------------------------------------------------------------ */
6Object.defineProperty(exports, "__esModule", { value: true });
7exports.ReadableStreamMessageReader = exports.AbstractMessageReader = exports.MessageReader = void 0;
8const ral_1 = require("./ral");
9const Is = require("./is");
10const events_1 = require("./events");
11const semaphore_1 = require("./semaphore");
12var MessageReader;
13(function (MessageReader) {
14 function is(value) {
15 let candidate = value;
16 return candidate && Is.func(candidate.listen) && Is.func(candidate.dispose) &&
17 Is.func(candidate.onError) && Is.func(candidate.onClose) && Is.func(candidate.onPartialMessage);
18 }
19 MessageReader.is = is;
20})(MessageReader || (exports.MessageReader = MessageReader = {}));
21class AbstractMessageReader {
22 constructor() {
23 this.errorEmitter = new events_1.Emitter();
24 this.closeEmitter = new events_1.Emitter();
25 this.partialMessageEmitter = new events_1.Emitter();
26 }
27 dispose() {
28 this.errorEmitter.dispose();
29 this.closeEmitter.dispose();
30 }
31 get onError() {
32 return this.errorEmitter.event;
33 }
34 fireError(error) {
35 this.errorEmitter.fire(this.asError(error));
36 }
37 get onClose() {
38 return this.closeEmitter.event;
39 }
40 fireClose() {
41 this.closeEmitter.fire(undefined);
42 }
43 get onPartialMessage() {
44 return this.partialMessageEmitter.event;
45 }
46 firePartialMessage(info) {
47 this.partialMessageEmitter.fire(info);
48 }
49 asError(error) {
50 if (error instanceof Error) {
51 return error;
52 }
53 else {
54 return new Error(`Reader received error. Reason: ${Is.string(error.message) ? error.message : 'unknown'}`);
55 }
56 }
57}
58exports.AbstractMessageReader = AbstractMessageReader;
59var ResolvedMessageReaderOptions;
60(function (ResolvedMessageReaderOptions) {
61 function fromOptions(options) {
62 let charset;
63 let result;
64 let contentDecoder;
65 const contentDecoders = new Map();
66 let contentTypeDecoder;
67 const contentTypeDecoders = new Map();
68 if (options === undefined || typeof options === 'string') {
69 charset = options ?? 'utf-8';
70 }
71 else {
72 charset = options.charset ?? 'utf-8';
73 if (options.contentDecoder !== undefined) {
74 contentDecoder = options.contentDecoder;
75 contentDecoders.set(contentDecoder.name, contentDecoder);
76 }
77 if (options.contentDecoders !== undefined) {
78 for (const decoder of options.contentDecoders) {
79 contentDecoders.set(decoder.name, decoder);
80 }
81 }
82 if (options.contentTypeDecoder !== undefined) {
83 contentTypeDecoder = options.contentTypeDecoder;
84 contentTypeDecoders.set(contentTypeDecoder.name, contentTypeDecoder);
85 }
86 if (options.contentTypeDecoders !== undefined) {
87 for (const decoder of options.contentTypeDecoders) {
88 contentTypeDecoders.set(decoder.name, decoder);
89 }
90 }
91 }
92 if (contentTypeDecoder === undefined) {
93 contentTypeDecoder = (0, ral_1.default)().applicationJson.decoder;
94 contentTypeDecoders.set(contentTypeDecoder.name, contentTypeDecoder);
95 }
96 return { charset, contentDecoder, contentDecoders, contentTypeDecoder, contentTypeDecoders };
97 }
98 ResolvedMessageReaderOptions.fromOptions = fromOptions;
99})(ResolvedMessageReaderOptions || (ResolvedMessageReaderOptions = {}));
100class ReadableStreamMessageReader extends AbstractMessageReader {
101 constructor(readable, options) {
102 super();
103 this.readable = readable;
104 this.options = ResolvedMessageReaderOptions.fromOptions(options);
105 this.buffer = (0, ral_1.default)().messageBuffer.create(this.options.charset);
106 this._partialMessageTimeout = 10000;
107 this.nextMessageLength = -1;
108 this.messageToken = 0;
109 this.readSemaphore = new semaphore_1.Semaphore(1);
110 }
111 set partialMessageTimeout(timeout) {
112 this._partialMessageTimeout = timeout;
113 }
114 get partialMessageTimeout() {
115 return this._partialMessageTimeout;
116 }
117 listen(callback) {
118 this.nextMessageLength = -1;
119 this.messageToken = 0;
120 this.partialMessageTimer = undefined;
121 this.callback = callback;
122 const result = this.readable.onData((data) => {
123 this.onData(data);
124 });
125 this.readable.onError((error) => this.fireError(error));
126 this.readable.onClose(() => this.fireClose());
127 return result;
128 }
129 onData(data) {
130 try {
131 this.buffer.append(data);
132 while (true) {
133 if (this.nextMessageLength === -1) {
134 const headers = this.buffer.tryReadHeaders(true);
135 if (!headers) {
136 return;
137 }
138 const contentLength = headers.get('content-length');
139 if (!contentLength) {
140 this.fireError(new Error(`Header must provide a Content-Length property.\n${JSON.stringify(Object.fromEntries(headers))}`));
141 return;
142 }
143 const length = parseInt(contentLength);
144 if (isNaN(length)) {
145 this.fireError(new Error(`Content-Length value must be a number. Got ${contentLength}`));
146 return;
147 }
148 this.nextMessageLength = length;
149 }
150 const body = this.buffer.tryReadBody(this.nextMessageLength);
151 if (body === undefined) {
152 /** We haven't received the full message yet. */
153 this.setPartialMessageTimer();
154 return;
155 }
156 this.clearPartialMessageTimer();
157 this.nextMessageLength = -1;
158 // Make sure that we convert one received message after the
159 // other. Otherwise it could happen that a decoding of a second
160 // smaller message finished before the decoding of a first larger
161 // message and then we would deliver the second message first.
162 this.readSemaphore.lock(async () => {
163 const bytes = this.options.contentDecoder !== undefined
164 ? await this.options.contentDecoder.decode(body)
165 : body;
166 const message = await this.options.contentTypeDecoder.decode(bytes, this.options);
167 this.callback(message);
168 }).catch((error) => {
169 this.fireError(error);
170 });
171 }
172 }
173 catch (error) {
174 this.fireError(error);
175 }
176 }
177 clearPartialMessageTimer() {
178 if (this.partialMessageTimer) {
179 this.partialMessageTimer.dispose();
180 this.partialMessageTimer = undefined;
181 }
182 }
183 setPartialMessageTimer() {
184 this.clearPartialMessageTimer();
185 if (this._partialMessageTimeout <= 0) {
186 return;
187 }
188 this.partialMessageTimer = (0, ral_1.default)().timer.setTimeout((token, timeout) => {
189 this.partialMessageTimer = undefined;
190 if (token === this.messageToken) {
191 this.firePartialMessage({ messageToken: token, waitingTime: timeout });
192 this.setPartialMessageTimer();
193 }
194 }, this._partialMessageTimeout, this.messageToken, this._partialMessageTimeout);
195 }
196}
197exports.ReadableStreamMessageReader = ReadableStreamMessageReader;