UNPKG

10.9 kBJavaScriptView Raw
1"use strict";
2/*!
3 * Copyright 2018 Google Inc. All Rights Reserved.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17Object.defineProperty(exports, "__esModule", { value: true });
18exports.MessageStream = exports.ChannelError = exports.StatusError = void 0;
19const promisify_1 = require("@google-cloud/promisify");
20const google_gax_1 = require("google-gax");
21const isStreamEnded = require("is-stream-ended");
22const stream_1 = require("stream");
23const pull_retry_1 = require("./pull-retry");
24const default_options_1 = require("./default-options");
25/*!
26 * Frequency to ping streams.
27 */
28const KEEP_ALIVE_INTERVAL = 30000;
29/*!
30 * Deadline for the stream.
31 */
32const PULL_TIMEOUT = require('./v1/subscriber_client_config.json').interfaces['google.pubsub.v1.Subscriber'].methods.StreamingPull.timeout_millis;
33/*!
34 * default stream options
35 */
36const DEFAULT_OPTIONS = {
37 highWaterMark: 0,
38 maxStreams: default_options_1.defaultOptions.subscription.maxStreams,
39 timeout: 300000,
40};
41/**
42 * Error wrapper for gRPC status objects.
43 *
44 * @class
45 *
46 * @param {object} status The gRPC status object.
47 */
48class StatusError extends Error {
49 constructor(status) {
50 super(status.details);
51 this.code = status.code;
52 this.details = status.details;
53 this.metadata = status.metadata;
54 }
55}
56exports.StatusError = StatusError;
57/**
58 * Error thrown when we fail to open a channel for the message stream.
59 *
60 * @class
61 *
62 * @param {Error} err The original error.
63 */
64class ChannelError extends Error {
65 constructor(err) {
66 super(`Failed to connect to channel. Reason: ${process.env.DEBUG_GRPC ? err.stack : err.message}`);
67 this.code = err.message.includes('deadline')
68 ? google_gax_1.grpc.status.DEADLINE_EXCEEDED
69 : google_gax_1.grpc.status.UNKNOWN;
70 this.details = err.message;
71 this.metadata = new google_gax_1.grpc.Metadata();
72 }
73}
74exports.ChannelError = ChannelError;
75/**
76 * @typedef {object} MessageStreamOptions
77 * @property {number} [highWaterMark=0] Configures the Buffer level for all
78 * underlying streams. See
79 * {@link https://nodejs.org/en/docs/guides/backpressuring-in-streams/} for
80 * more details.
81 * @property {number} [maxStreams=5] Number of streaming connections to make.
82 * @property {number} [timeout=300000] Timeout for establishing a connection.
83 */
84/**
85 * Streaming class used to manage multiple StreamingPull requests.
86 *
87 * @private
88 * @class
89 *
90 * @param {Subscriber} sub The parent subscriber.
91 * @param {MessageStreamOptions} [options] The message stream options.
92 */
93class MessageStream extends stream_1.PassThrough {
94 constructor(sub, options = {}) {
95 options = Object.assign({}, DEFAULT_OPTIONS, options);
96 super({ objectMode: true, highWaterMark: options.highWaterMark });
97 this._options = options;
98 this._retrier = new pull_retry_1.PullRetry();
99 this._streams = new Map();
100 this._subscriber = sub;
101 this._fillStreamPool();
102 this._keepAliveHandle = setInterval(() => this._keepAlive(), KEEP_ALIVE_INTERVAL);
103 this._keepAliveHandle.unref();
104 }
105 /**
106 * Destroys the stream and any underlying streams.
107 *
108 * @param {error?} error An error to emit, if any.
109 * @param {Function} callback Callback for completion of any destruction.
110 * @private
111 */
112 _destroy(error, callback) {
113 clearInterval(this._keepAliveHandle);
114 for (const stream of this._streams.keys()) {
115 this._removeStream(stream);
116 stream.cancel();
117 }
118 callback(error);
119 }
120 /**
121 * Adds a StreamingPull stream to the combined stream.
122 *
123 * @private
124 *
125 * @param {stream} stream The StreamingPull stream.
126 */
127 _addStream(stream) {
128 this._setHighWaterMark(stream);
129 this._streams.set(stream, false);
130 stream
131 .on('error', err => this._onError(stream, err))
132 .once('status', status => this._onStatus(stream, status))
133 .pipe(this, { end: false });
134 }
135 /**
136 * Attempts to create and cache the desired number of StreamingPull requests.
137 * gRPC does not supply a way to confirm that a stream is connected, so our
138 * best bet is to open the streams and use the client.waitForReady() method to
139 * confirm everything is ok.
140 *
141 * @private
142 *
143 * @returns {Promise}
144 */
145 async _fillStreamPool() {
146 let client;
147 try {
148 client = await this._getClient();
149 }
150 catch (e) {
151 const err = e;
152 this.destroy(err);
153 }
154 if (this.destroyed) {
155 return;
156 }
157 const deadline = Date.now() + PULL_TIMEOUT;
158 const request = {
159 subscription: this._subscriber.name,
160 streamAckDeadlineSeconds: this._subscriber.ackDeadline,
161 maxOutstandingMessages: this._subscriber.useLegacyFlowControl
162 ? 0
163 : this._subscriber.maxMessages,
164 maxOutstandingBytes: this._subscriber.useLegacyFlowControl
165 ? 0
166 : this._subscriber.maxBytes,
167 };
168 delete this._fillHandle;
169 for (let i = this._streams.size; i < this._options.maxStreams; i++) {
170 const stream = client.streamingPull({ deadline });
171 this._addStream(stream);
172 stream.write(request);
173 }
174 try {
175 await this._waitForClientReady(client);
176 }
177 catch (e) {
178 const err = e;
179 this.destroy(err);
180 }
181 }
182 /**
183 * It is critical that we keep as few `PullResponse` objects in memory as
184 * possible to reduce the number of potential redeliveries. Because of this we
185 * want to bypass gax for StreamingPull requests to avoid creating a Duplexify
186 * stream, doing so essentially doubles the size of our readable buffer.
187 *
188 * @private
189 *
190 * @returns {Promise.<object>}
191 */
192 async _getClient() {
193 const client = await this._subscriber.getClient();
194 client.initialize();
195 return client.subscriberStub;
196 }
197 /**
198 * Since we do not use the streams to ack/modAck messages, they will close
199 * by themselves unless we periodically send empty messages.
200 *
201 * @private
202 */
203 _keepAlive() {
204 this._streams.forEach((receivedStatus, stream) => {
205 // its possible that a status event fires off (signaling the rpc being
206 // closed) but the stream hasn't drained yet, writing to this stream will
207 // result in a `write after end` error
208 if (!receivedStatus) {
209 stream.write({});
210 }
211 });
212 }
213 /**
214 * Once the stream has nothing left to read, we'll remove it and attempt to
215 * refill our stream pool if needed.
216 *
217 * @private
218 *
219 * @param {Duplex} stream The ended stream.
220 * @param {object} status The stream status.
221 */
222 _onEnd(stream, status) {
223 this._removeStream(stream);
224 if (this._fillHandle) {
225 return;
226 }
227 if (this._retrier.retry(status)) {
228 const delay = this._retrier.createTimeout();
229 this._fillHandle = setTimeout(() => this._fillStreamPool(), delay);
230 }
231 else if (!this._streams.size) {
232 this.destroy(new StatusError(status));
233 }
234 }
235 /**
236 * gRPC will usually emit a status as a ServiceError via `error` event before
237 * it emits the status itself. In order to cut back on emitted errors, we'll
238 * wait a tick on error and ignore it if the status has been received.
239 *
240 * @private
241 *
242 * @param {stream} stream The stream that errored.
243 * @param {Error} err The error.
244 */
245 async _onError(stream, err) {
246 await promisify_1.promisify(setImmediate)();
247 const code = err.code;
248 const receivedStatus = this._streams.get(stream) !== false;
249 if (typeof code !== 'number' || !receivedStatus) {
250 this.emit('error', err);
251 }
252 }
253 /**
254 * gRPC streams will emit a status event once the connection has been
255 * terminated. This is preferable to end/close events because we'll receive
256 * information as to why the stream closed and if it is safe to open another.
257 *
258 * @private
259 *
260 * @param {stream} stream The stream that was closed.
261 * @param {object} status The status message stating why it was closed.
262 */
263 _onStatus(stream, status) {
264 if (this.destroyed) {
265 return;
266 }
267 this._streams.set(stream, true);
268 if (isStreamEnded(stream)) {
269 this._onEnd(stream, status);
270 }
271 else {
272 stream.once('end', () => this._onEnd(stream, status));
273 stream.push(null);
274 }
275 }
276 /**
277 * Removes a stream from the combined stream.
278 *
279 * @private
280 *
281 * @param {stream} stream The stream to remove.
282 */
283 _removeStream(stream) {
284 stream.unpipe(this);
285 this._streams.delete(stream);
286 }
287 /**
288 * Neither gRPC or gax allow for the highWaterMark option to be specified.
289 * However using the default value (16) it is possible to end up with a lot of
290 * PullResponse objects stored in internal buffers. If this were to happen
291 * and the client were slow to process messages, we could potentially see a
292 * very large number of redeliveries happen before the messages even made it
293 * to the client.
294 *
295 * @private
296 *
297 * @param {Duplex} stream The duplex stream to adjust the
298 * highWaterMarks for.
299 */
300 _setHighWaterMark(stream) {
301 stream._readableState.highWaterMark = this._options.highWaterMark;
302 }
303 /**
304 * Promisified version of gRPCs Client#waitForReady function.
305 *
306 * @private
307 *
308 * @param {object} client The gRPC client to wait for.
309 * @returns {Promise}
310 */
311 async _waitForClientReady(client) {
312 const deadline = Date.now() + this._options.timeout;
313 try {
314 await promisify_1.promisify(client.waitForReady).call(client, deadline);
315 }
316 catch (e) {
317 const err = e;
318 throw new ChannelError(err);
319 }
320 }
321}
322exports.MessageStream = MessageStream;
323//# sourceMappingURL=message-stream.js.map
\No newline at end of file