UNPKG

11.2 kBJavaScriptView Raw
1"use strict";
2/*!
3 * Copyright 2018 Google Inc. All Rights Reserved.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17Object.defineProperty(exports, "__esModule", { value: true });
18exports.MessageStream = exports.ChannelError = exports.StatusError = void 0;
19const promisify_1 = require("@google-cloud/promisify");
20const google_gax_1 = require("google-gax");
21const isStreamEnded = require("is-stream-ended");
22const stream_1 = require("stream");
23const pull_retry_1 = require("./pull-retry");
24const default_options_1 = require("./default-options");
25/*!
26 * Frequency to ping streams.
27 */
28const KEEP_ALIVE_INTERVAL = 30000;
29/*!
30 * Deadline for the stream.
31 */
32const PULL_TIMEOUT = require('./v1/subscriber_client_config.json').interfaces['google.pubsub.v1.Subscriber'].methods.StreamingPull.timeout_millis;
33/*!
34 * default stream options
35 */
36const DEFAULT_OPTIONS = {
37 highWaterMark: 0,
38 maxStreams: default_options_1.defaultOptions.subscription.maxStreams,
39 timeout: 300000,
40};
41/**
42 * Error wrapper for gRPC status objects.
43 *
44 * @class
45 *
46 * @param {object} status The gRPC status object.
47 */
48class StatusError extends Error {
49 constructor(status) {
50 super(status.details);
51 this.code = status.code;
52 this.details = status.details;
53 this.metadata = status.metadata;
54 }
55}
56exports.StatusError = StatusError;
57/**
58 * Error thrown when we fail to open a channel for the message stream.
59 *
60 * @class
61 *
62 * @param {Error} err The original error.
63 */
64class ChannelError extends Error {
65 constructor(err) {
66 super(`Failed to connect to channel. Reason: ${process.env.DEBUG_GRPC ? err.stack : err.message}`);
67 this.code = err.message.includes('deadline')
68 ? google_gax_1.grpc.status.DEADLINE_EXCEEDED
69 : google_gax_1.grpc.status.UNKNOWN;
70 this.details = err.message;
71 this.metadata = new google_gax_1.grpc.Metadata();
72 }
73}
74exports.ChannelError = ChannelError;
75/**
76 * @typedef {object} MessageStreamOptions
77 * @property {number} [highWaterMark=0] Configures the Buffer level for all
78 * underlying streams. See
79 * {@link https://nodejs.org/en/docs/guides/backpressuring-in-streams/} for
80 * more details.
81 * @property {number} [maxStreams=5] Number of streaming connections to make.
82 * @property {number} [timeout=300000] Timeout for establishing a connection.
83 */
84/**
85 * Streaming class used to manage multiple StreamingPull requests.
86 *
87 * @private
88 * @class
89 *
90 * @param {Subscriber} sub The parent subscriber.
91 * @param {MessageStreamOptions} [options] The message stream options.
92 */
93class MessageStream extends stream_1.PassThrough {
94 constructor(sub, options = {}) {
95 options = Object.assign({}, DEFAULT_OPTIONS, options);
96 super({ objectMode: true, highWaterMark: options.highWaterMark });
97 this._options = options;
98 this._retrier = new pull_retry_1.PullRetry();
99 this._streams = new Map();
100 this._subscriber = sub;
101 this._fillStreamPool();
102 this._keepAliveHandle = setInterval(() => this._keepAlive(), KEEP_ALIVE_INTERVAL);
103 this._keepAliveHandle.unref();
104 }
105 /**
106 * Destroys the stream and any underlying streams.
107 *
108 * @param {error?} error An error to emit, if any.
109 * @private
110 */
111 destroy(error) {
112 // We can't assume Node has taken care of this in <14.
113 if (this.destroyed) {
114 return;
115 }
116 super.destroy(error ? error : undefined);
117 }
118 /**
119 * Destroys the stream and any underlying streams.
120 *
121 * @param {error?} error An error to emit, if any.
122 * @param {Function} callback Callback for completion of any destruction.
123 * @private
124 */
125 _destroy(error, callback) {
126 this.destroyed = true;
127 clearInterval(this._keepAliveHandle);
128 for (const stream of this._streams.keys()) {
129 this._removeStream(stream);
130 stream.cancel();
131 }
132 callback(error);
133 }
134 /**
135 * Adds a StreamingPull stream to the combined stream.
136 *
137 * @private
138 *
139 * @param {stream} stream The StreamingPull stream.
140 */
141 _addStream(stream) {
142 this._setHighWaterMark(stream);
143 this._streams.set(stream, false);
144 stream
145 .on('error', err => this._onError(stream, err))
146 .once('status', status => this._onStatus(stream, status))
147 .pipe(this, { end: false });
148 }
149 /**
150 * Attempts to create and cache the desired number of StreamingPull requests.
151 * gRPC does not supply a way to confirm that a stream is connected, so our
152 * best bet is to open the streams and use the client.waitForReady() method to
153 * confirm everything is ok.
154 *
155 * @private
156 *
157 * @returns {Promise}
158 */
159 async _fillStreamPool() {
160 let client;
161 try {
162 client = await this._getClient();
163 }
164 catch (e) {
165 this.destroy(e);
166 }
167 if (this.destroyed) {
168 return;
169 }
170 const deadline = Date.now() + PULL_TIMEOUT;
171 const request = {
172 subscription: this._subscriber.name,
173 streamAckDeadlineSeconds: this._subscriber.ackDeadline,
174 maxOutstandingMessages: this._subscriber.useLegacyFlowControl
175 ? 0
176 : this._subscriber.maxMessages,
177 maxOutstandingBytes: this._subscriber.useLegacyFlowControl
178 ? 0
179 : this._subscriber.maxBytes,
180 };
181 delete this._fillHandle;
182 for (let i = this._streams.size; i < this._options.maxStreams; i++) {
183 const stream = client.streamingPull({ deadline });
184 this._addStream(stream);
185 stream.write(request);
186 }
187 try {
188 await this._waitForClientReady(client);
189 }
190 catch (e) {
191 this.destroy(e);
192 }
193 }
194 /**
195 * It is critical that we keep as few `PullResponse` objects in memory as
196 * possible to reduce the number of potential redeliveries. Because of this we
197 * want to bypass gax for StreamingPull requests to avoid creating a Duplexify
198 * stream, doing so essentially doubles the size of our readable buffer.
199 *
200 * @private
201 *
202 * @returns {Promise.<object>}
203 */
204 async _getClient() {
205 const client = await this._subscriber.getClient();
206 client.initialize();
207 return client.subscriberStub;
208 }
209 /**
210 * Since we do not use the streams to ack/modAck messages, they will close
211 * by themselves unless we periodically send empty messages.
212 *
213 * @private
214 */
215 _keepAlive() {
216 this._streams.forEach((receivedStatus, stream) => {
217 // its possible that a status event fires off (signaling the rpc being
218 // closed) but the stream hasn't drained yet, writing to this stream will
219 // result in a `write after end` error
220 if (!receivedStatus) {
221 stream.write({});
222 }
223 });
224 }
225 /**
226 * Once the stream has nothing left to read, we'll remove it and attempt to
227 * refill our stream pool if needed.
228 *
229 * @private
230 *
231 * @param {Duplex} stream The ended stream.
232 * @param {object} status The stream status.
233 */
234 _onEnd(stream, status) {
235 this._removeStream(stream);
236 if (this._fillHandle) {
237 return;
238 }
239 if (this._retrier.retry(status)) {
240 const delay = this._retrier.createTimeout();
241 this._fillHandle = setTimeout(() => this._fillStreamPool(), delay);
242 }
243 else if (!this._streams.size) {
244 this.destroy(new StatusError(status));
245 }
246 }
247 /**
248 * gRPC will usually emit a status as a ServiceError via `error` event before
249 * it emits the status itself. In order to cut back on emitted errors, we'll
250 * wait a tick on error and ignore it if the status has been received.
251 *
252 * @private
253 *
254 * @param {stream} stream The stream that errored.
255 * @param {Error} err The error.
256 */
257 async _onError(stream, err) {
258 await promisify_1.promisify(setImmediate)();
259 const code = err.code;
260 const receivedStatus = this._streams.get(stream) !== false;
261 if (typeof code !== 'number' || !receivedStatus) {
262 this.emit('error', err);
263 }
264 }
265 /**
266 * gRPC streams will emit a status event once the connection has been
267 * terminated. This is preferable to end/close events because we'll receive
268 * information as to why the stream closed and if it is safe to open another.
269 *
270 * @private
271 *
272 * @param {stream} stream The stream that was closed.
273 * @param {object} status The status message stating why it was closed.
274 */
275 _onStatus(stream, status) {
276 if (this.destroyed) {
277 return;
278 }
279 this._streams.set(stream, true);
280 if (isStreamEnded(stream)) {
281 this._onEnd(stream, status);
282 }
283 else {
284 stream.once('end', () => this._onEnd(stream, status));
285 stream.push(null);
286 }
287 }
288 /**
289 * Removes a stream from the combined stream.
290 *
291 * @private
292 *
293 * @param {stream} stream The stream to remove.
294 */
295 _removeStream(stream) {
296 stream.unpipe(this);
297 this._streams.delete(stream);
298 }
299 /**
300 * Neither gRPC or gax allow for the highWaterMark option to be specified.
301 * However using the default value (16) it is possible to end up with a lot of
302 * PullResponse objects stored in internal buffers. If this were to happen
303 * and the client were slow to process messages, we could potentially see a
304 * very large number of redeliveries happen before the messages even made it
305 * to the client.
306 *
307 * @private
308 *
309 * @param {Duplex} stream The duplex stream to adjust the
310 * highWaterMarks for.
311 */
312 _setHighWaterMark(stream) {
313 stream._readableState.highWaterMark = this._options.highWaterMark;
314 }
315 /**
316 * Promisified version of gRPCs Client#waitForReady function.
317 *
318 * @private
319 *
320 * @param {object} client The gRPC client to wait for.
321 * @returns {Promise}
322 */
323 async _waitForClientReady(client) {
324 const deadline = Date.now() + this._options.timeout;
325 try {
326 await promisify_1.promisify(client.waitForReady).call(client, deadline);
327 }
328 catch (e) {
329 throw new ChannelError(e);
330 }
331 }
332}
333exports.MessageStream = MessageStream;
334//# sourceMappingURL=message-stream.js.map
\No newline at end of file