UNPKG

6.05 kBTypeScriptView Raw
1/*!
2 * Copyright 2018 Google Inc. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16/// <reference types="node" />
17import { grpc } from 'google-gax';
18import { PassThrough } from 'stream';
19import { Subscriber } from './subscriber';
20/**
21 * Error wrapper for gRPC status objects.
22 *
23 * @class
24 *
25 * @param {object} status The gRPC status object.
26 */
27export declare class StatusError extends Error implements grpc.ServiceError {
28 code: grpc.status;
29 details: string;
30 metadata: grpc.Metadata;
31 constructor(status: grpc.StatusObject);
32}
33/**
34 * Error thrown when we fail to open a channel for the message stream.
35 *
36 * @class
37 *
38 * @param {Error} err The original error.
39 */
40export declare class ChannelError extends Error implements grpc.ServiceError {
41 code: grpc.status;
42 details: string;
43 metadata: grpc.Metadata;
44 constructor(err: Error);
45}
46export interface MessageStreamOptions {
47 highWaterMark?: number;
48 maxStreams?: number;
49 timeout?: number;
50}
51/**
52 * @typedef {object} MessageStreamOptions
53 * @property {number} [highWaterMark=0] Configures the Buffer level for all
54 * underlying streams. See
55 * {@link https://nodejs.org/en/docs/guides/backpressuring-in-streams/} for
56 * more details.
57 * @property {number} [maxStreams=5] Number of streaming connections to make.
58 * @property {number} [timeout=300000] Timeout for establishing a connection.
59 */
60/**
61 * Streaming class used to manage multiple StreamingPull requests.
62 *
63 * @private
64 * @class
65 *
66 * @param {Subscriber} sub The parent subscriber.
67 * @param {MessageStreamOptions} [options] The message stream options.
68 */
69export declare class MessageStream extends PassThrough {
70 private _keepAliveHandle;
71 private _fillHandle?;
72 private _options;
73 private _retrier;
74 private _streams;
75 private _subscriber;
76 constructor(sub: Subscriber, options?: MessageStreamOptions);
77 /**
78 * Destroys the stream and any underlying streams.
79 *
80 * @param {error?} error An error to emit, if any.
81 * @param {Function} callback Callback for completion of any destruction.
82 * @private
83 */
84 _destroy(error: Error | null, callback: (error: Error | null) => void): void;
85 /**
86 * Adds a StreamingPull stream to the combined stream.
87 *
88 * @private
89 *
90 * @param {stream} stream The StreamingPull stream.
91 */
92 private _addStream;
93 /**
94 * Attempts to create and cache the desired number of StreamingPull requests.
95 * gRPC does not supply a way to confirm that a stream is connected, so our
96 * best bet is to open the streams and use the client.waitForReady() method to
97 * confirm everything is ok.
98 *
99 * @private
100 *
101 * @returns {Promise}
102 */
103 private _fillStreamPool;
104 /**
105 * It is critical that we keep as few `PullResponse` objects in memory as
106 * possible to reduce the number of potential redeliveries. Because of this we
107 * want to bypass gax for StreamingPull requests to avoid creating a Duplexify
108 * stream, doing so essentially doubles the size of our readable buffer.
109 *
110 * @private
111 *
112 * @returns {Promise.<object>}
113 */
114 private _getClient;
115 /**
116 * Since we do not use the streams to ack/modAck messages, they will close
117 * by themselves unless we periodically send empty messages.
118 *
119 * @private
120 */
121 private _keepAlive;
122 /**
123 * Once the stream has nothing left to read, we'll remove it and attempt to
124 * refill our stream pool if needed.
125 *
126 * @private
127 *
128 * @param {Duplex} stream The ended stream.
129 * @param {object} status The stream status.
130 */
131 private _onEnd;
132 /**
133 * gRPC will usually emit a status as a ServiceError via `error` event before
134 * it emits the status itself. In order to cut back on emitted errors, we'll
135 * wait a tick on error and ignore it if the status has been received.
136 *
137 * @private
138 *
139 * @param {stream} stream The stream that errored.
140 * @param {Error} err The error.
141 */
142 private _onError;
143 /**
144 * gRPC streams will emit a status event once the connection has been
145 * terminated. This is preferable to end/close events because we'll receive
146 * information as to why the stream closed and if it is safe to open another.
147 *
148 * @private
149 *
150 * @param {stream} stream The stream that was closed.
151 * @param {object} status The status message stating why it was closed.
152 */
153 private _onStatus;
154 /**
155 * Removes a stream from the combined stream.
156 *
157 * @private
158 *
159 * @param {stream} stream The stream to remove.
160 */
161 private _removeStream;
162 /**
163 * Neither gRPC or gax allow for the highWaterMark option to be specified.
164 * However using the default value (16) it is possible to end up with a lot of
165 * PullResponse objects stored in internal buffers. If this were to happen
166 * and the client were slow to process messages, we could potentially see a
167 * very large number of redeliveries happen before the messages even made it
168 * to the client.
169 *
170 * @private
171 *
172 * @param {Duplex} stream The duplex stream to adjust the
173 * highWaterMarks for.
174 */
175 private _setHighWaterMark;
176 /**
177 * Promisified version of gRPCs Client#waitForReady function.
178 *
179 * @private
180 *
181 * @param {object} client The gRPC client to wait for.
182 * @returns {Promise}
183 */
184 private _waitForClientReady;
185}