1 | ;
|
2 | /*!
|
3 | * Copyright 2018 Google Inc. All Rights Reserved.
|
4 | *
|
5 | * Licensed under the Apache License, Version 2.0 (the "License");
|
6 | * you may not use this file except in compliance with the License.
|
7 | * You may obtain a copy of the License at
|
8 | *
|
9 | * http://www.apache.org/licenses/LICENSE-2.0
|
10 | *
|
11 | * Unless required by applicable law or agreed to in writing, software
|
12 | * distributed under the License is distributed on an "AS IS" BASIS,
|
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
14 | * See the License for the specific language governing permissions and
|
15 | * limitations under the License.
|
16 | */
|
17 | Object.defineProperty(exports, "__esModule", { value: true });
|
18 | exports.MessageStream = exports.ChannelError = exports.StatusError = void 0;
|
19 | const promisify_1 = require("@google-cloud/promisify");
|
20 | const google_gax_1 = require("google-gax");
|
21 | const isStreamEnded = require("is-stream-ended");
|
22 | const stream_1 = require("stream");
|
23 | const pull_retry_1 = require("./pull-retry");
|
24 | const default_options_1 = require("./default-options");
|
25 | /*!
|
26 | * Frequency to ping streams.
|
27 | */
|
28 | const KEEP_ALIVE_INTERVAL = 30000;
|
29 | /*!
|
30 | * Deadline for the stream.
|
31 | */
|
32 | const PULL_TIMEOUT = require('./v1/subscriber_client_config.json').interfaces['google.pubsub.v1.Subscriber'].methods.StreamingPull.timeout_millis;
|
33 | /*!
|
34 | * default stream options
|
35 | */
|
36 | const DEFAULT_OPTIONS = {
|
37 | highWaterMark: 0,
|
38 | maxStreams: default_options_1.defaultOptions.subscription.maxStreams,
|
39 | timeout: 300000,
|
40 | };
|
41 | /**
|
42 | * Error wrapper for gRPC status objects.
|
43 | *
|
44 | * @class
|
45 | *
|
46 | * @param {object} status The gRPC status object.
|
47 | */
|
48 | class StatusError extends Error {
|
49 | constructor(status) {
|
50 | super(status.details);
|
51 | this.code = status.code;
|
52 | this.details = status.details;
|
53 | this.metadata = status.metadata;
|
54 | }
|
55 | }
|
56 | exports.StatusError = StatusError;
|
57 | /**
|
58 | * Error thrown when we fail to open a channel for the message stream.
|
59 | *
|
60 | * @class
|
61 | *
|
62 | * @param {Error} err The original error.
|
63 | */
|
64 | class ChannelError extends Error {
|
65 | constructor(err) {
|
66 | super(`Failed to connect to channel. Reason: ${process.env.DEBUG_GRPC ? err.stack : err.message}`);
|
67 | this.code = err.message.includes('deadline')
|
68 | ? google_gax_1.grpc.status.DEADLINE_EXCEEDED
|
69 | : google_gax_1.grpc.status.UNKNOWN;
|
70 | this.details = err.message;
|
71 | this.metadata = new google_gax_1.grpc.Metadata();
|
72 | }
|
73 | }
|
74 | exports.ChannelError = ChannelError;
|
75 | /**
|
76 | * @typedef {object} MessageStreamOptions
|
77 | * @property {number} [highWaterMark=0] Configures the Buffer level for all
|
78 | * underlying streams. See
|
79 | * {@link https://nodejs.org/en/docs/guides/backpressuring-in-streams/} for
|
80 | * more details.
|
81 | * @property {number} [maxStreams=5] Number of streaming connections to make.
|
82 | * @property {number} [timeout=300000] Timeout for establishing a connection.
|
83 | */
|
84 | /**
|
85 | * Streaming class used to manage multiple StreamingPull requests.
|
86 | *
|
87 | * @private
|
88 | * @class
|
89 | *
|
90 | * @param {Subscriber} sub The parent subscriber.
|
91 | * @param {MessageStreamOptions} [options] The message stream options.
|
92 | */
|
93 | class MessageStream extends stream_1.PassThrough {
|
94 | constructor(sub, options = {}) {
|
95 | options = Object.assign({}, DEFAULT_OPTIONS, options);
|
96 | super({ objectMode: true, highWaterMark: options.highWaterMark });
|
97 | this._options = options;
|
98 | this._retrier = new pull_retry_1.PullRetry();
|
99 | this._streams = new Map();
|
100 | this._subscriber = sub;
|
101 | this._fillStreamPool();
|
102 | this._keepAliveHandle = setInterval(() => this._keepAlive(), KEEP_ALIVE_INTERVAL);
|
103 | this._keepAliveHandle.unref();
|
104 | }
|
105 | /**
|
106 | * Destroys the stream and any underlying streams.
|
107 | *
|
108 | * @param {error?} error An error to emit, if any.
|
109 | * @private
|
110 | */
|
111 | destroy(error) {
|
112 | // We can't assume Node has taken care of this in <14.
|
113 | if (this.destroyed) {
|
114 | return;
|
115 | }
|
116 | super.destroy(error ? error : undefined);
|
117 | }
|
118 | /**
|
119 | * Destroys the stream and any underlying streams.
|
120 | *
|
121 | * @param {error?} error An error to emit, if any.
|
122 | * @param {Function} callback Callback for completion of any destruction.
|
123 | * @private
|
124 | */
|
125 | _destroy(error, callback) {
|
126 | this.destroyed = true;
|
127 | clearInterval(this._keepAliveHandle);
|
128 | for (const stream of this._streams.keys()) {
|
129 | this._removeStream(stream);
|
130 | stream.cancel();
|
131 | }
|
132 | callback(error);
|
133 | }
|
134 | /**
|
135 | * Adds a StreamingPull stream to the combined stream.
|
136 | *
|
137 | * @private
|
138 | *
|
139 | * @param {stream} stream The StreamingPull stream.
|
140 | */
|
141 | _addStream(stream) {
|
142 | this._setHighWaterMark(stream);
|
143 | this._streams.set(stream, false);
|
144 | stream
|
145 | .on('error', err => this._onError(stream, err))
|
146 | .once('status', status => this._onStatus(stream, status))
|
147 | .pipe(this, { end: false });
|
148 | }
|
149 | /**
|
150 | * Attempts to create and cache the desired number of StreamingPull requests.
|
151 | * gRPC does not supply a way to confirm that a stream is connected, so our
|
152 | * best bet is to open the streams and use the client.waitForReady() method to
|
153 | * confirm everything is ok.
|
154 | *
|
155 | * @private
|
156 | *
|
157 | * @returns {Promise}
|
158 | */
|
159 | async _fillStreamPool() {
|
160 | let client;
|
161 | try {
|
162 | client = await this._getClient();
|
163 | }
|
164 | catch (e) {
|
165 | this.destroy(e);
|
166 | }
|
167 | if (this.destroyed) {
|
168 | return;
|
169 | }
|
170 | const deadline = Date.now() + PULL_TIMEOUT;
|
171 | const request = {
|
172 | subscription: this._subscriber.name,
|
173 | streamAckDeadlineSeconds: this._subscriber.ackDeadline,
|
174 | maxOutstandingMessages: this._subscriber.useLegacyFlowControl
|
175 | ? 0
|
176 | : this._subscriber.maxMessages,
|
177 | maxOutstandingBytes: this._subscriber.useLegacyFlowControl
|
178 | ? 0
|
179 | : this._subscriber.maxBytes,
|
180 | };
|
181 | delete this._fillHandle;
|
182 | for (let i = this._streams.size; i < this._options.maxStreams; i++) {
|
183 | const stream = client.streamingPull({ deadline });
|
184 | this._addStream(stream);
|
185 | stream.write(request);
|
186 | }
|
187 | try {
|
188 | await this._waitForClientReady(client);
|
189 | }
|
190 | catch (e) {
|
191 | this.destroy(e);
|
192 | }
|
193 | }
|
194 | /**
|
195 | * It is critical that we keep as few `PullResponse` objects in memory as
|
196 | * possible to reduce the number of potential redeliveries. Because of this we
|
197 | * want to bypass gax for StreamingPull requests to avoid creating a Duplexify
|
198 | * stream, doing so essentially doubles the size of our readable buffer.
|
199 | *
|
200 | * @private
|
201 | *
|
202 | * @returns {Promise.<object>}
|
203 | */
|
204 | async _getClient() {
|
205 | const client = await this._subscriber.getClient();
|
206 | client.initialize();
|
207 | return client.subscriberStub;
|
208 | }
|
209 | /**
|
210 | * Since we do not use the streams to ack/modAck messages, they will close
|
211 | * by themselves unless we periodically send empty messages.
|
212 | *
|
213 | * @private
|
214 | */
|
215 | _keepAlive() {
|
216 | this._streams.forEach((receivedStatus, stream) => {
|
217 | // its possible that a status event fires off (signaling the rpc being
|
218 | // closed) but the stream hasn't drained yet, writing to this stream will
|
219 | // result in a `write after end` error
|
220 | if (!receivedStatus) {
|
221 | stream.write({});
|
222 | }
|
223 | });
|
224 | }
|
225 | /**
|
226 | * Once the stream has nothing left to read, we'll remove it and attempt to
|
227 | * refill our stream pool if needed.
|
228 | *
|
229 | * @private
|
230 | *
|
231 | * @param {Duplex} stream The ended stream.
|
232 | * @param {object} status The stream status.
|
233 | */
|
234 | _onEnd(stream, status) {
|
235 | this._removeStream(stream);
|
236 | if (this._fillHandle) {
|
237 | return;
|
238 | }
|
239 | if (this._retrier.retry(status)) {
|
240 | const delay = this._retrier.createTimeout();
|
241 | this._fillHandle = setTimeout(() => this._fillStreamPool(), delay);
|
242 | }
|
243 | else if (!this._streams.size) {
|
244 | this.destroy(new StatusError(status));
|
245 | }
|
246 | }
|
247 | /**
|
248 | * gRPC will usually emit a status as a ServiceError via `error` event before
|
249 | * it emits the status itself. In order to cut back on emitted errors, we'll
|
250 | * wait a tick on error and ignore it if the status has been received.
|
251 | *
|
252 | * @private
|
253 | *
|
254 | * @param {stream} stream The stream that errored.
|
255 | * @param {Error} err The error.
|
256 | */
|
257 | async _onError(stream, err) {
|
258 | await promisify_1.promisify(setImmediate)();
|
259 | const code = err.code;
|
260 | const receivedStatus = this._streams.get(stream) !== false;
|
261 | if (typeof code !== 'number' || !receivedStatus) {
|
262 | this.emit('error', err);
|
263 | }
|
264 | }
|
265 | /**
|
266 | * gRPC streams will emit a status event once the connection has been
|
267 | * terminated. This is preferable to end/close events because we'll receive
|
268 | * information as to why the stream closed and if it is safe to open another.
|
269 | *
|
270 | * @private
|
271 | *
|
272 | * @param {stream} stream The stream that was closed.
|
273 | * @param {object} status The status message stating why it was closed.
|
274 | */
|
275 | _onStatus(stream, status) {
|
276 | if (this.destroyed) {
|
277 | return;
|
278 | }
|
279 | this._streams.set(stream, true);
|
280 | if (isStreamEnded(stream)) {
|
281 | this._onEnd(stream, status);
|
282 | }
|
283 | else {
|
284 | stream.once('end', () => this._onEnd(stream, status));
|
285 | stream.push(null);
|
286 | }
|
287 | }
|
288 | /**
|
289 | * Removes a stream from the combined stream.
|
290 | *
|
291 | * @private
|
292 | *
|
293 | * @param {stream} stream The stream to remove.
|
294 | */
|
295 | _removeStream(stream) {
|
296 | stream.unpipe(this);
|
297 | this._streams.delete(stream);
|
298 | }
|
299 | /**
|
300 | * Neither gRPC or gax allow for the highWaterMark option to be specified.
|
301 | * However using the default value (16) it is possible to end up with a lot of
|
302 | * PullResponse objects stored in internal buffers. If this were to happen
|
303 | * and the client were slow to process messages, we could potentially see a
|
304 | * very large number of redeliveries happen before the messages even made it
|
305 | * to the client.
|
306 | *
|
307 | * @private
|
308 | *
|
309 | * @param {Duplex} stream The duplex stream to adjust the
|
310 | * highWaterMarks for.
|
311 | */
|
312 | _setHighWaterMark(stream) {
|
313 | stream._readableState.highWaterMark = this._options.highWaterMark;
|
314 | }
|
315 | /**
|
316 | * Promisified version of gRPCs Client#waitForReady function.
|
317 | *
|
318 | * @private
|
319 | *
|
320 | * @param {object} client The gRPC client to wait for.
|
321 | * @returns {Promise}
|
322 | */
|
323 | async _waitForClientReady(client) {
|
324 | const deadline = Date.now() + this._options.timeout;
|
325 | try {
|
326 | await promisify_1.promisify(client.waitForReady).call(client, deadline);
|
327 | }
|
328 | catch (e) {
|
329 | throw new ChannelError(e);
|
330 | }
|
331 | }
|
332 | }
|
333 | exports.MessageStream = MessageStream;
|
334 | //# sourceMappingURL=message-stream.js.map |
\ | No newline at end of file |