1 | ;
|
2 | /*!
|
3 | * Copyright 2018 Google Inc. All Rights Reserved.
|
4 | *
|
5 | * Licensed under the Apache License, Version 2.0 (the "License");
|
6 | * you may not use this file except in compliance with the License.
|
7 | * You may obtain a copy of the License at
|
8 | *
|
9 | * http://www.apache.org/licenses/LICENSE-2.0
|
10 | *
|
11 | * Unless required by applicable law or agreed to in writing, software
|
12 | * distributed under the License is distributed on an "AS IS" BASIS,
|
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
14 | * See the License for the specific language governing permissions and
|
15 | * limitations under the License.
|
16 | */
|
17 | Object.defineProperty(exports, "__esModule", { value: true });
|
18 | exports.MessageStream = exports.ChannelError = exports.StatusError = void 0;
|
19 | const promisify_1 = require("@google-cloud/promisify");
|
20 | const google_gax_1 = require("google-gax");
|
21 | const isStreamEnded = require("is-stream-ended");
|
22 | const stream_1 = require("stream");
|
23 | const pull_retry_1 = require("./pull-retry");
|
24 | const default_options_1 = require("./default-options");
|
25 | /*!
|
26 | * Frequency to ping streams.
|
27 | */
|
28 | const KEEP_ALIVE_INTERVAL = 30000;
|
29 | /*!
|
30 | * Deadline for the stream.
|
31 | */
|
32 | const PULL_TIMEOUT = require('./v1/subscriber_client_config.json').interfaces['google.pubsub.v1.Subscriber'].methods.StreamingPull.timeout_millis;
|
33 | /*!
|
34 | * default stream options
|
35 | */
|
36 | const DEFAULT_OPTIONS = {
|
37 | highWaterMark: 0,
|
38 | maxStreams: default_options_1.defaultOptions.subscription.maxStreams,
|
39 | timeout: 300000,
|
40 | };
|
41 | /**
|
42 | * Error wrapper for gRPC status objects.
|
43 | *
|
44 | * @class
|
45 | *
|
46 | * @param {object} status The gRPC status object.
|
47 | */
|
48 | class StatusError extends Error {
|
49 | constructor(status) {
|
50 | super(status.details);
|
51 | this.code = status.code;
|
52 | this.details = status.details;
|
53 | this.metadata = status.metadata;
|
54 | }
|
55 | }
|
56 | exports.StatusError = StatusError;
|
57 | /**
|
58 | * Error thrown when we fail to open a channel for the message stream.
|
59 | *
|
60 | * @class
|
61 | *
|
62 | * @param {Error} err The original error.
|
63 | */
|
64 | class ChannelError extends Error {
|
65 | constructor(err) {
|
66 | super(`Failed to connect to channel. Reason: ${process.env.DEBUG_GRPC ? err.stack : err.message}`);
|
67 | this.code = err.message.includes('deadline')
|
68 | ? google_gax_1.grpc.status.DEADLINE_EXCEEDED
|
69 | : google_gax_1.grpc.status.UNKNOWN;
|
70 | this.details = err.message;
|
71 | this.metadata = new google_gax_1.grpc.Metadata();
|
72 | }
|
73 | }
|
74 | exports.ChannelError = ChannelError;
|
75 | /**
|
76 | * @typedef {object} MessageStreamOptions
|
77 | * @property {number} [highWaterMark=0] Configures the Buffer level for all
|
78 | * underlying streams. See
|
79 | * {@link https://nodejs.org/en/docs/guides/backpressuring-in-streams/} for
|
80 | * more details.
|
81 | * @property {number} [maxStreams=5] Number of streaming connections to make.
|
82 | * @property {number} [timeout=300000] Timeout for establishing a connection.
|
83 | */
|
84 | /**
|
85 | * Streaming class used to manage multiple StreamingPull requests.
|
86 | *
|
87 | * @private
|
88 | * @class
|
89 | *
|
90 | * @param {Subscriber} sub The parent subscriber.
|
91 | * @param {MessageStreamOptions} [options] The message stream options.
|
92 | */
|
93 | class MessageStream extends stream_1.PassThrough {
|
94 | constructor(sub, options = {}) {
|
95 | options = Object.assign({}, DEFAULT_OPTIONS, options);
|
96 | super({ objectMode: true, highWaterMark: options.highWaterMark });
|
97 | this._options = options;
|
98 | this._retrier = new pull_retry_1.PullRetry();
|
99 | this._streams = new Map();
|
100 | this._subscriber = sub;
|
101 | this._fillStreamPool();
|
102 | this._keepAliveHandle = setInterval(() => this._keepAlive(), KEEP_ALIVE_INTERVAL);
|
103 | this._keepAliveHandle.unref();
|
104 | }
|
105 | /**
|
106 | * Destroys the stream and any underlying streams.
|
107 | *
|
108 | * @param {error?} error An error to emit, if any.
|
109 | * @param {Function} callback Callback for completion of any destruction.
|
110 | * @private
|
111 | */
|
112 | _destroy(error, callback) {
|
113 | clearInterval(this._keepAliveHandle);
|
114 | for (const stream of this._streams.keys()) {
|
115 | this._removeStream(stream);
|
116 | stream.cancel();
|
117 | }
|
118 | callback(error);
|
119 | }
|
120 | /**
|
121 | * Adds a StreamingPull stream to the combined stream.
|
122 | *
|
123 | * @private
|
124 | *
|
125 | * @param {stream} stream The StreamingPull stream.
|
126 | */
|
127 | _addStream(stream) {
|
128 | this._setHighWaterMark(stream);
|
129 | this._streams.set(stream, false);
|
130 | stream
|
131 | .on('error', err => this._onError(stream, err))
|
132 | .once('status', status => this._onStatus(stream, status))
|
133 | .pipe(this, { end: false });
|
134 | }
|
135 | /**
|
136 | * Attempts to create and cache the desired number of StreamingPull requests.
|
137 | * gRPC does not supply a way to confirm that a stream is connected, so our
|
138 | * best bet is to open the streams and use the client.waitForReady() method to
|
139 | * confirm everything is ok.
|
140 | *
|
141 | * @private
|
142 | *
|
143 | * @returns {Promise}
|
144 | */
|
145 | async _fillStreamPool() {
|
146 | let client;
|
147 | try {
|
148 | client = await this._getClient();
|
149 | }
|
150 | catch (e) {
|
151 | const err = e;
|
152 | this.destroy(err);
|
153 | }
|
154 | if (this.destroyed) {
|
155 | return;
|
156 | }
|
157 | const deadline = Date.now() + PULL_TIMEOUT;
|
158 | const request = {
|
159 | subscription: this._subscriber.name,
|
160 | streamAckDeadlineSeconds: this._subscriber.ackDeadline,
|
161 | maxOutstandingMessages: this._subscriber.useLegacyFlowControl
|
162 | ? 0
|
163 | : this._subscriber.maxMessages,
|
164 | maxOutstandingBytes: this._subscriber.useLegacyFlowControl
|
165 | ? 0
|
166 | : this._subscriber.maxBytes,
|
167 | };
|
168 | delete this._fillHandle;
|
169 | for (let i = this._streams.size; i < this._options.maxStreams; i++) {
|
170 | const stream = client.streamingPull({ deadline });
|
171 | this._addStream(stream);
|
172 | stream.write(request);
|
173 | }
|
174 | try {
|
175 | await this._waitForClientReady(client);
|
176 | }
|
177 | catch (e) {
|
178 | const err = e;
|
179 | this.destroy(err);
|
180 | }
|
181 | }
|
182 | /**
|
183 | * It is critical that we keep as few `PullResponse` objects in memory as
|
184 | * possible to reduce the number of potential redeliveries. Because of this we
|
185 | * want to bypass gax for StreamingPull requests to avoid creating a Duplexify
|
186 | * stream, doing so essentially doubles the size of our readable buffer.
|
187 | *
|
188 | * @private
|
189 | *
|
190 | * @returns {Promise.<object>}
|
191 | */
|
192 | async _getClient() {
|
193 | const client = await this._subscriber.getClient();
|
194 | client.initialize();
|
195 | return client.subscriberStub;
|
196 | }
|
197 | /**
|
198 | * Since we do not use the streams to ack/modAck messages, they will close
|
199 | * by themselves unless we periodically send empty messages.
|
200 | *
|
201 | * @private
|
202 | */
|
203 | _keepAlive() {
|
204 | this._streams.forEach((receivedStatus, stream) => {
|
205 | // its possible that a status event fires off (signaling the rpc being
|
206 | // closed) but the stream hasn't drained yet, writing to this stream will
|
207 | // result in a `write after end` error
|
208 | if (!receivedStatus) {
|
209 | stream.write({});
|
210 | }
|
211 | });
|
212 | }
|
213 | /**
|
214 | * Once the stream has nothing left to read, we'll remove it and attempt to
|
215 | * refill our stream pool if needed.
|
216 | *
|
217 | * @private
|
218 | *
|
219 | * @param {Duplex} stream The ended stream.
|
220 | * @param {object} status The stream status.
|
221 | */
|
222 | _onEnd(stream, status) {
|
223 | this._removeStream(stream);
|
224 | if (this._fillHandle) {
|
225 | return;
|
226 | }
|
227 | if (this._retrier.retry(status)) {
|
228 | const delay = this._retrier.createTimeout();
|
229 | this._fillHandle = setTimeout(() => this._fillStreamPool(), delay);
|
230 | }
|
231 | else if (!this._streams.size) {
|
232 | this.destroy(new StatusError(status));
|
233 | }
|
234 | }
|
235 | /**
|
236 | * gRPC will usually emit a status as a ServiceError via `error` event before
|
237 | * it emits the status itself. In order to cut back on emitted errors, we'll
|
238 | * wait a tick on error and ignore it if the status has been received.
|
239 | *
|
240 | * @private
|
241 | *
|
242 | * @param {stream} stream The stream that errored.
|
243 | * @param {Error} err The error.
|
244 | */
|
245 | async _onError(stream, err) {
|
246 | await promisify_1.promisify(setImmediate)();
|
247 | const code = err.code;
|
248 | const receivedStatus = this._streams.get(stream) !== false;
|
249 | if (typeof code !== 'number' || !receivedStatus) {
|
250 | this.emit('error', err);
|
251 | }
|
252 | }
|
253 | /**
|
254 | * gRPC streams will emit a status event once the connection has been
|
255 | * terminated. This is preferable to end/close events because we'll receive
|
256 | * information as to why the stream closed and if it is safe to open another.
|
257 | *
|
258 | * @private
|
259 | *
|
260 | * @param {stream} stream The stream that was closed.
|
261 | * @param {object} status The status message stating why it was closed.
|
262 | */
|
263 | _onStatus(stream, status) {
|
264 | if (this.destroyed) {
|
265 | return;
|
266 | }
|
267 | this._streams.set(stream, true);
|
268 | if (isStreamEnded(stream)) {
|
269 | this._onEnd(stream, status);
|
270 | }
|
271 | else {
|
272 | stream.once('end', () => this._onEnd(stream, status));
|
273 | stream.push(null);
|
274 | }
|
275 | }
|
276 | /**
|
277 | * Removes a stream from the combined stream.
|
278 | *
|
279 | * @private
|
280 | *
|
281 | * @param {stream} stream The stream to remove.
|
282 | */
|
283 | _removeStream(stream) {
|
284 | stream.unpipe(this);
|
285 | this._streams.delete(stream);
|
286 | }
|
287 | /**
|
288 | * Neither gRPC or gax allow for the highWaterMark option to be specified.
|
289 | * However using the default value (16) it is possible to end up with a lot of
|
290 | * PullResponse objects stored in internal buffers. If this were to happen
|
291 | * and the client were slow to process messages, we could potentially see a
|
292 | * very large number of redeliveries happen before the messages even made it
|
293 | * to the client.
|
294 | *
|
295 | * @private
|
296 | *
|
297 | * @param {Duplex} stream The duplex stream to adjust the
|
298 | * highWaterMarks for.
|
299 | */
|
300 | _setHighWaterMark(stream) {
|
301 | stream._readableState.highWaterMark = this._options.highWaterMark;
|
302 | }
|
303 | /**
|
304 | * Promisified version of gRPCs Client#waitForReady function.
|
305 | *
|
306 | * @private
|
307 | *
|
308 | * @param {object} client The gRPC client to wait for.
|
309 | * @returns {Promise}
|
310 | */
|
311 | async _waitForClientReady(client) {
|
312 | const deadline = Date.now() + this._options.timeout;
|
313 | try {
|
314 | await promisify_1.promisify(client.waitForReady).call(client, deadline);
|
315 | }
|
316 | catch (e) {
|
317 | const err = e;
|
318 | throw new ChannelError(err);
|
319 | }
|
320 | }
|
321 | }
|
322 | exports.MessageStream = MessageStream;
|
323 | //# sourceMappingURL=message-stream.js.map |
\ | No newline at end of file |