UNPKG

6.24 kBTypeScriptView Raw
1/*!
2 * Copyright 2018 Google Inc. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16/// <reference types="node" />
17import { grpc } from 'google-gax';
18import { PassThrough } from 'stream';
19import { Subscriber } from './subscriber';
20/**
21 * Error wrapper for gRPC status objects.
22 *
23 * @class
24 *
25 * @param {object} status The gRPC status object.
26 */
27export declare class StatusError extends Error implements grpc.ServiceError {
28 code: grpc.status;
29 details: string;
30 metadata: grpc.Metadata;
31 constructor(status: grpc.StatusObject);
32}
33/**
34 * Error thrown when we fail to open a channel for the message stream.
35 *
36 * @class
37 *
38 * @param {Error} err The original error.
39 */
40export declare class ChannelError extends Error implements grpc.ServiceError {
41 code: grpc.status;
42 details: string;
43 metadata: grpc.Metadata;
44 constructor(err: Error);
45}
46export interface MessageStreamOptions {
47 highWaterMark?: number;
48 maxStreams?: number;
49 timeout?: number;
50}
51/**
52 * @typedef {object} MessageStreamOptions
53 * @property {number} [highWaterMark=0] Configures the Buffer level for all
54 * underlying streams. See
55 * {@link https://nodejs.org/en/docs/guides/backpressuring-in-streams/} for
56 * more details.
57 * @property {number} [maxStreams=5] Number of streaming connections to make.
58 * @property {number} [timeout=300000] Timeout for establishing a connection.
59 */
60/**
61 * Streaming class used to manage multiple StreamingPull requests.
62 *
63 * @private
64 * @class
65 *
66 * @param {Subscriber} sub The parent subscriber.
67 * @param {MessageStreamOptions} [options] The message stream options.
68 */
69export declare class MessageStream extends PassThrough {
70 private _keepAliveHandle;
71 private _fillHandle?;
72 private _options;
73 private _retrier;
74 private _streams;
75 private _subscriber;
76 constructor(sub: Subscriber, options?: MessageStreamOptions);
77 /**
78 * Destroys the stream and any underlying streams.
79 *
80 * @param {error?} error An error to emit, if any.
81 * @private
82 */
83 destroy(error?: Error | null): void;
84 /**
85 * Destroys the stream and any underlying streams.
86 *
87 * @param {error?} error An error to emit, if any.
88 * @param {Function} callback Callback for completion of any destruction.
89 * @private
90 */
91 _destroy(error: Error | null, callback: (error: Error | null) => void): void;
92 /**
93 * Adds a StreamingPull stream to the combined stream.
94 *
95 * @private
96 *
97 * @param {stream} stream The StreamingPull stream.
98 */
99 private _addStream;
100 /**
101 * Attempts to create and cache the desired number of StreamingPull requests.
102 * gRPC does not supply a way to confirm that a stream is connected, so our
103 * best bet is to open the streams and use the client.waitForReady() method to
104 * confirm everything is ok.
105 *
106 * @private
107 *
108 * @returns {Promise}
109 */
110 private _fillStreamPool;
111 /**
112 * It is critical that we keep as few `PullResponse` objects in memory as
113 * possible to reduce the number of potential redeliveries. Because of this we
114 * want to bypass gax for StreamingPull requests to avoid creating a Duplexify
115 * stream, doing so essentially doubles the size of our readable buffer.
116 *
117 * @private
118 *
119 * @returns {Promise.<object>}
120 */
121 private _getClient;
122 /**
123 * Since we do not use the streams to ack/modAck messages, they will close
124 * by themselves unless we periodically send empty messages.
125 *
126 * @private
127 */
128 private _keepAlive;
129 /**
130 * Once the stream has nothing left to read, we'll remove it and attempt to
131 * refill our stream pool if needed.
132 *
133 * @private
134 *
135 * @param {Duplex} stream The ended stream.
136 * @param {object} status The stream status.
137 */
138 private _onEnd;
139 /**
140 * gRPC will usually emit a status as a ServiceError via `error` event before
141 * it emits the status itself. In order to cut back on emitted errors, we'll
142 * wait a tick on error and ignore it if the status has been received.
143 *
144 * @private
145 *
146 * @param {stream} stream The stream that errored.
147 * @param {Error} err The error.
148 */
149 private _onError;
150 /**
151 * gRPC streams will emit a status event once the connection has been
152 * terminated. This is preferable to end/close events because we'll receive
153 * information as to why the stream closed and if it is safe to open another.
154 *
155 * @private
156 *
157 * @param {stream} stream The stream that was closed.
158 * @param {object} status The status message stating why it was closed.
159 */
160 private _onStatus;
161 /**
162 * Removes a stream from the combined stream.
163 *
164 * @private
165 *
166 * @param {stream} stream The stream to remove.
167 */
168 private _removeStream;
169 /**
170 * Neither gRPC or gax allow for the highWaterMark option to be specified.
171 * However using the default value (16) it is possible to end up with a lot of
172 * PullResponse objects stored in internal buffers. If this were to happen
173 * and the client were slow to process messages, we could potentially see a
174 * very large number of redeliveries happen before the messages even made it
175 * to the client.
176 *
177 * @private
178 *
179 * @param {Duplex} stream The duplex stream to adjust the
180 * highWaterMarks for.
181 */
182 private _setHighWaterMark;
183 /**
184 * Promisified version of gRPCs Client#waitForReady function.
185 *
186 * @private
187 *
188 * @param {object} client The gRPC client to wait for.
189 * @returns {Promise}
190 */
191 private _waitForClientReady;
192}