UNPKG

29 kBJavaScriptView Raw
1"use strict";
2/*
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18Object.defineProperty(exports, "__esModule", { value: true });
19exports.ChannelImplementation = void 0;
20const call_stream_1 = require("./call-stream");
21const channel_credentials_1 = require("./channel-credentials");
22const resolving_load_balancer_1 = require("./resolving-load-balancer");
23const subchannel_pool_1 = require("./subchannel-pool");
24const picker_1 = require("./picker");
25const constants_1 = require("./constants");
26const filter_stack_1 = require("./filter-stack");
27const call_credentials_filter_1 = require("./call-credentials-filter");
28const deadline_filter_1 = require("./deadline-filter");
29const compression_filter_1 = require("./compression-filter");
30const resolver_1 = require("./resolver");
31const logging_1 = require("./logging");
32const max_message_size_filter_1 = require("./max-message-size-filter");
33const http_proxy_1 = require("./http_proxy");
34const uri_parser_1 = require("./uri-parser");
35const connectivity_state_1 = require("./connectivity-state");
36const channelz_1 = require("./channelz");
37/**
38 * See https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args
39 */
40const MAX_TIMEOUT_TIME = 2147483647;
41let nextCallNumber = 0;
42function getNewCallNumber() {
43 const callNumber = nextCallNumber;
44 nextCallNumber += 1;
45 if (nextCallNumber >= Number.MAX_SAFE_INTEGER) {
46 nextCallNumber = 0;
47 }
48 return callNumber;
49}
50class ChannelImplementation {
51 constructor(target, credentials, options) {
52 var _a, _b, _c, _d;
53 this.credentials = credentials;
54 this.options = options;
55 this.connectivityState = connectivity_state_1.ConnectivityState.IDLE;
56 this.currentPicker = new picker_1.UnavailablePicker();
57 /**
58 * Calls queued up to get a call config. Should only be populated before the
59 * first time the resolver returns a result, which includes the ConfigSelector.
60 */
61 this.configSelectionQueue = [];
62 this.pickQueue = [];
63 this.connectivityStateWatchers = [];
64 this.configSelector = null;
65 /**
66 * This is the error from the name resolver if it failed most recently. It
67 * is only used to end calls that start while there is no config selector
68 * and the name resolver is in backoff, so it should be nulled if
69 * configSelector becomes set or the channel state becomes anything other
70 * than TRANSIENT_FAILURE.
71 */
72 this.currentResolutionError = null;
73 // Channelz info
74 this.channelzEnabled = true;
75 this.callTracker = new channelz_1.ChannelzCallTracker();
76 this.childrenTracker = new channelz_1.ChannelzChildrenTracker();
77 if (typeof target !== 'string') {
78 throw new TypeError('Channel target must be a string');
79 }
80 if (!(credentials instanceof channel_credentials_1.ChannelCredentials)) {
81 throw new TypeError('Channel credentials must be a ChannelCredentials object');
82 }
83 if (options) {
84 if (typeof options !== 'object') {
85 throw new TypeError('Channel options must be an object');
86 }
87 }
88 this.originalTarget = target;
89 const originalTargetUri = uri_parser_1.parseUri(target);
90 if (originalTargetUri === null) {
91 throw new Error(`Could not parse target name "${target}"`);
92 }
93 /* This ensures that the target has a scheme that is registered with the
94 * resolver */
95 const defaultSchemeMapResult = resolver_1.mapUriDefaultScheme(originalTargetUri);
96 if (defaultSchemeMapResult === null) {
97 throw new Error(`Could not find a default scheme for target name "${target}"`);
98 }
99 this.callRefTimer = setInterval(() => { }, MAX_TIMEOUT_TIME);
100 (_b = (_a = this.callRefTimer).unref) === null || _b === void 0 ? void 0 : _b.call(_a);
101 if (this.options['grpc.enable_channelz'] === 0) {
102 this.channelzEnabled = false;
103 }
104 this.channelzTrace = new channelz_1.ChannelzTrace();
105 this.channelzRef = channelz_1.registerChannelzChannel(target, () => this.getChannelzInfo(), this.channelzEnabled);
106 if (this.channelzEnabled) {
107 this.channelzTrace.addTrace('CT_INFO', 'Channel created');
108 }
109 if (this.options['grpc.default_authority']) {
110 this.defaultAuthority = this.options['grpc.default_authority'];
111 }
112 else {
113 this.defaultAuthority = resolver_1.getDefaultAuthority(defaultSchemeMapResult);
114 }
115 const proxyMapResult = http_proxy_1.mapProxyName(defaultSchemeMapResult, options);
116 this.target = proxyMapResult.target;
117 this.options = Object.assign({}, this.options, proxyMapResult.extraOptions);
118 /* The global boolean parameter to getSubchannelPool has the inverse meaning to what
119 * the grpc.use_local_subchannel_pool channel option means. */
120 this.subchannelPool = subchannel_pool_1.getSubchannelPool(((_c = options['grpc.use_local_subchannel_pool']) !== null && _c !== void 0 ? _c : 0) === 0);
121 const channelControlHelper = {
122 createSubchannel: (subchannelAddress, subchannelArgs) => {
123 const subchannel = this.subchannelPool.getOrCreateSubchannel(this.target, subchannelAddress, Object.assign({}, this.options, subchannelArgs), this.credentials);
124 if (this.channelzEnabled) {
125 this.channelzTrace.addTrace('CT_INFO', 'Created subchannel or used existing subchannel', subchannel.getChannelzRef());
126 }
127 return subchannel;
128 },
129 updateState: (connectivityState, picker) => {
130 this.currentPicker = picker;
131 const queueCopy = this.pickQueue.slice();
132 this.pickQueue = [];
133 this.callRefTimerUnref();
134 for (const { callStream, callMetadata, callConfig, dynamicFilters } of queueCopy) {
135 this.tryPick(callStream, callMetadata, callConfig, dynamicFilters);
136 }
137 this.updateState(connectivityState);
138 },
139 requestReresolution: () => {
140 // This should never be called.
141 throw new Error('Resolving load balancer should never call requestReresolution');
142 },
143 addChannelzChild: (child) => {
144 if (this.channelzEnabled) {
145 this.childrenTracker.refChild(child);
146 }
147 },
148 removeChannelzChild: (child) => {
149 if (this.channelzEnabled) {
150 this.childrenTracker.unrefChild(child);
151 }
152 }
153 };
154 this.resolvingLoadBalancer = new resolving_load_balancer_1.ResolvingLoadBalancer(this.target, channelControlHelper, options, (configSelector) => {
155 if (this.channelzEnabled) {
156 this.channelzTrace.addTrace('CT_INFO', 'Address resolution succeeded');
157 }
158 this.configSelector = configSelector;
159 this.currentResolutionError = null;
160 /* We process the queue asynchronously to ensure that the corresponding
161 * load balancer update has completed. */
162 process.nextTick(() => {
163 const localQueue = this.configSelectionQueue;
164 this.configSelectionQueue = [];
165 this.callRefTimerUnref();
166 for (const { callStream, callMetadata } of localQueue) {
167 this.tryGetConfig(callStream, callMetadata);
168 }
169 this.configSelectionQueue = [];
170 });
171 }, (status) => {
172 if (this.channelzEnabled) {
173 this.channelzTrace.addTrace('CT_WARNING', 'Address resolution failed with code ' + status.code + ' and details "' + status.details + '"');
174 }
175 if (this.configSelectionQueue.length > 0) {
176 this.trace('Name resolution failed with calls queued for config selection');
177 }
178 if (this.configSelector === null) {
179 this.currentResolutionError = status;
180 }
181 const localQueue = this.configSelectionQueue;
182 this.configSelectionQueue = [];
183 this.callRefTimerUnref();
184 for (const { callStream, callMetadata } of localQueue) {
185 if (callMetadata.getOptions().waitForReady) {
186 this.callRefTimerRef();
187 this.configSelectionQueue.push({ callStream, callMetadata });
188 }
189 else {
190 callStream.cancelWithStatus(status.code, status.details);
191 }
192 }
193 });
194 this.filterStackFactory = new filter_stack_1.FilterStackFactory([
195 new call_credentials_filter_1.CallCredentialsFilterFactory(this),
196 new deadline_filter_1.DeadlineFilterFactory(this),
197 new max_message_size_filter_1.MaxMessageSizeFilterFactory(this.options),
198 new compression_filter_1.CompressionFilterFactory(this, this.options),
199 ]);
200 this.trace('Channel constructed with options ' + JSON.stringify(options, undefined, 2));
201 const error = new Error();
202 logging_1.trace(constants_1.LogVerbosity.DEBUG, 'channel_stacktrace', '(' + this.channelzRef.id + ') ' + 'Channel constructed \n' + ((_d = error.stack) === null || _d === void 0 ? void 0 : _d.substring(error.stack.indexOf('\n') + 1)));
203 }
204 getChannelzInfo() {
205 return {
206 target: this.originalTarget,
207 state: this.connectivityState,
208 trace: this.channelzTrace,
209 callTracker: this.callTracker,
210 children: this.childrenTracker.getChildLists()
211 };
212 }
213 trace(text, verbosityOverride) {
214 logging_1.trace(verbosityOverride !== null && verbosityOverride !== void 0 ? verbosityOverride : constants_1.LogVerbosity.DEBUG, 'channel', '(' + this.channelzRef.id + ') ' + uri_parser_1.uriToString(this.target) + ' ' + text);
215 }
216 callRefTimerRef() {
217 var _a, _b, _c, _d;
218 // If the hasRef function does not exist, always run the code
219 if (!((_b = (_a = this.callRefTimer).hasRef) === null || _b === void 0 ? void 0 : _b.call(_a))) {
220 this.trace('callRefTimer.ref | configSelectionQueue.length=' +
221 this.configSelectionQueue.length +
222 ' pickQueue.length=' +
223 this.pickQueue.length);
224 (_d = (_c = this.callRefTimer).ref) === null || _d === void 0 ? void 0 : _d.call(_c);
225 }
226 }
227 callRefTimerUnref() {
228 var _a, _b;
229 // If the hasRef function does not exist, always run the code
230 if (!this.callRefTimer.hasRef || this.callRefTimer.hasRef()) {
231 this.trace('callRefTimer.unref | configSelectionQueue.length=' +
232 this.configSelectionQueue.length +
233 ' pickQueue.length=' +
234 this.pickQueue.length);
235 (_b = (_a = this.callRefTimer).unref) === null || _b === void 0 ? void 0 : _b.call(_a);
236 }
237 }
238 pushPick(callStream, callMetadata, callConfig, dynamicFilters) {
239 this.pickQueue.push({ callStream, callMetadata, callConfig, dynamicFilters });
240 this.callRefTimerRef();
241 }
242 /**
243 * Check the picker output for the given call and corresponding metadata,
244 * and take any relevant actions. Should not be called while iterating
245 * over pickQueue.
246 * @param callStream
247 * @param callMetadata
248 */
249 tryPick(callStream, callMetadata, callConfig, dynamicFilters) {
250 var _a, _b;
251 const pickResult = this.currentPicker.pick({
252 metadata: callMetadata,
253 extraPickInfo: callConfig.pickInformation,
254 });
255 const subchannelString = pickResult.subchannel ?
256 '(' + pickResult.subchannel.getChannelzRef().id + ') ' + pickResult.subchannel.getAddress() :
257 '' + pickResult.subchannel;
258 this.trace('Pick result for call [' +
259 callStream.getCallNumber() +
260 ']: ' +
261 picker_1.PickResultType[pickResult.pickResultType] +
262 ' subchannel: ' +
263 subchannelString +
264 ' status: ' + ((_a = pickResult.status) === null || _a === void 0 ? void 0 : _a.code) +
265 ' ' + ((_b = pickResult.status) === null || _b === void 0 ? void 0 : _b.details));
266 switch (pickResult.pickResultType) {
267 case picker_1.PickResultType.COMPLETE:
268 if (pickResult.subchannel === null) {
269 callStream.cancelWithStatus(constants_1.Status.UNAVAILABLE, 'Request dropped by load balancing policy');
270 // End the call with an error
271 }
272 else {
273 /* If the subchannel is not in the READY state, that indicates a bug
274 * somewhere in the load balancer or picker. So, we log an error and
275 * queue the pick to be tried again later. */
276 if (pickResult.subchannel.getConnectivityState() !==
277 connectivity_state_1.ConnectivityState.READY) {
278 logging_1.log(constants_1.LogVerbosity.ERROR, 'Error: COMPLETE pick result subchannel ' +
279 subchannelString +
280 ' has state ' +
281 connectivity_state_1.ConnectivityState[pickResult.subchannel.getConnectivityState()]);
282 this.pushPick(callStream, callMetadata, callConfig, dynamicFilters);
283 break;
284 }
285 /* We need to clone the callMetadata here because the transparent
286 * retry code in the promise resolution handler use the same
287 * callMetadata object, so it needs to stay unmodified */
288 callStream.filterStack
289 .sendMetadata(Promise.resolve(callMetadata.clone()))
290 .then((finalMetadata) => {
291 var _a, _b, _c;
292 const subchannelState = pickResult.subchannel.getConnectivityState();
293 if (subchannelState === connectivity_state_1.ConnectivityState.READY) {
294 try {
295 const pickExtraFilters = pickResult.extraFilterFactories.map(factory => factory.createFilter(callStream));
296 (_a = pickResult.subchannel) === null || _a === void 0 ? void 0 : _a.getRealSubchannel().startCallStream(finalMetadata, callStream, [...dynamicFilters, ...pickExtraFilters]);
297 /* If we reach this point, the call stream has started
298 * successfully */
299 (_b = callConfig.onCommitted) === null || _b === void 0 ? void 0 : _b.call(callConfig);
300 (_c = pickResult.onCallStarted) === null || _c === void 0 ? void 0 : _c.call(pickResult);
301 }
302 catch (error) {
303 const errorCode = error.code;
304 if (errorCode === 'ERR_HTTP2_GOAWAY_SESSION' ||
305 errorCode === 'ERR_HTTP2_INVALID_SESSION') {
306 /* An error here indicates that something went wrong with
307 * the picked subchannel's http2 stream right before we
308 * tried to start the stream. We are handling a promise
309 * result here, so this is asynchronous with respect to the
310 * original tryPick call, so calling it again is not
311 * recursive. We call tryPick immediately instead of
312 * queueing this pick again because handling the queue is
313 * triggered by state changes, and we want to immediately
314 * check if the state has already changed since the
315 * previous tryPick call. We do this instead of cancelling
316 * the stream because the correct behavior may be
317 * re-queueing instead, based on the logic in the rest of
318 * tryPick */
319 this.trace('Failed to start call on picked subchannel ' +
320 subchannelString +
321 ' with error ' +
322 error.message +
323 '. Retrying pick', constants_1.LogVerbosity.INFO);
324 this.tryPick(callStream, callMetadata, callConfig, dynamicFilters);
325 }
326 else {
327 this.trace('Failed to start call on picked subchanel ' +
328 subchannelString +
329 ' with error ' +
330 error.message +
331 '. Ending call', constants_1.LogVerbosity.INFO);
332 callStream.cancelWithStatus(constants_1.Status.INTERNAL, `Failed to start HTTP/2 stream with error: ${error.message}`);
333 }
334 }
335 }
336 else {
337 /* The logic for doing this here is the same as in the catch
338 * block above */
339 this.trace('Picked subchannel ' +
340 subchannelString +
341 ' has state ' +
342 connectivity_state_1.ConnectivityState[subchannelState] +
343 ' after metadata filters. Retrying pick', constants_1.LogVerbosity.INFO);
344 this.tryPick(callStream, callMetadata, callConfig, dynamicFilters);
345 }
346 }, (error) => {
347 // We assume the error code isn't 0 (Status.OK)
348 callStream.cancelWithStatus(typeof error.code === 'number' ? error.code : constants_1.Status.UNKNOWN, `Getting metadata from plugin failed with error: ${error.message}`);
349 });
350 }
351 break;
352 case picker_1.PickResultType.QUEUE:
353 this.pushPick(callStream, callMetadata, callConfig, dynamicFilters);
354 break;
355 case picker_1.PickResultType.TRANSIENT_FAILURE:
356 if (callMetadata.getOptions().waitForReady) {
357 this.pushPick(callStream, callMetadata, callConfig, dynamicFilters);
358 }
359 else {
360 callStream.cancelWithStatus(pickResult.status.code, pickResult.status.details);
361 }
362 break;
363 case picker_1.PickResultType.DROP:
364 callStream.cancelWithStatus(pickResult.status.code, pickResult.status.details);
365 break;
366 default:
367 throw new Error(`Invalid state: unknown pickResultType ${pickResult.pickResultType}`);
368 }
369 }
370 removeConnectivityStateWatcher(watcherObject) {
371 const watcherIndex = this.connectivityStateWatchers.findIndex((value) => value === watcherObject);
372 if (watcherIndex >= 0) {
373 this.connectivityStateWatchers.splice(watcherIndex, 1);
374 }
375 }
376 updateState(newState) {
377 logging_1.trace(constants_1.LogVerbosity.DEBUG, 'connectivity_state', '(' + this.channelzRef.id + ') ' +
378 uri_parser_1.uriToString(this.target) +
379 ' ' +
380 connectivity_state_1.ConnectivityState[this.connectivityState] +
381 ' -> ' +
382 connectivity_state_1.ConnectivityState[newState]);
383 if (this.channelzEnabled) {
384 this.channelzTrace.addTrace('CT_INFO', connectivity_state_1.ConnectivityState[this.connectivityState] + ' -> ' + connectivity_state_1.ConnectivityState[newState]);
385 }
386 this.connectivityState = newState;
387 const watchersCopy = this.connectivityStateWatchers.slice();
388 for (const watcherObject of watchersCopy) {
389 if (newState !== watcherObject.currentState) {
390 if (watcherObject.timer) {
391 clearTimeout(watcherObject.timer);
392 }
393 this.removeConnectivityStateWatcher(watcherObject);
394 watcherObject.callback();
395 }
396 }
397 if (newState !== connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE) {
398 this.currentResolutionError = null;
399 }
400 }
401 tryGetConfig(stream, metadata) {
402 if (stream.getStatus() !== null) {
403 /* If the stream has a status, it has already finished and we don't need
404 * to take any more actions on it. */
405 return;
406 }
407 if (this.configSelector === null) {
408 /* This branch will only be taken at the beginning of the channel's life,
409 * before the resolver ever returns a result. So, the
410 * ResolvingLoadBalancer may be idle and if so it needs to be kicked
411 * because it now has a pending request. */
412 this.resolvingLoadBalancer.exitIdle();
413 if (this.currentResolutionError && !metadata.getOptions().waitForReady) {
414 stream.cancelWithStatus(this.currentResolutionError.code, this.currentResolutionError.details);
415 }
416 else {
417 this.configSelectionQueue.push({
418 callStream: stream,
419 callMetadata: metadata,
420 });
421 this.callRefTimerRef();
422 }
423 }
424 else {
425 const callConfig = this.configSelector(stream.getMethod(), metadata);
426 if (callConfig.status === constants_1.Status.OK) {
427 if (callConfig.methodConfig.timeout) {
428 const deadline = new Date();
429 deadline.setSeconds(deadline.getSeconds() + callConfig.methodConfig.timeout.seconds);
430 deadline.setMilliseconds(deadline.getMilliseconds() +
431 callConfig.methodConfig.timeout.nanos / 1000000);
432 stream.setConfigDeadline(deadline);
433 // Refreshing the filters makes the deadline filter pick up the new deadline
434 stream.filterStack.refresh();
435 }
436 if (callConfig.dynamicFilterFactories.length > 0) {
437 /* These dynamicFilters are the mechanism for implementing gRFC A39:
438 * https://github.com/grpc/proposal/blob/master/A39-xds-http-filters.md
439 * We run them here instead of with the rest of the filters because
440 * that spec says "the xDS HTTP filters will run in between name
441 * resolution and load balancing".
442 *
443 * We use the filter stack here to simplify the multi-filter async
444 * waterfall logic, but we pass along the underlying list of filters
445 * to avoid having nested filter stacks when combining it with the
446 * original filter stack. We do not pass along the original filter
447 * factory list because these filters may need to persist data
448 * between sending headers and other operations. */
449 const dynamicFilterStackFactory = new filter_stack_1.FilterStackFactory(callConfig.dynamicFilterFactories);
450 const dynamicFilterStack = dynamicFilterStackFactory.createFilter(stream);
451 dynamicFilterStack.sendMetadata(Promise.resolve(metadata)).then(filteredMetadata => {
452 this.tryPick(stream, filteredMetadata, callConfig, dynamicFilterStack.getFilters());
453 });
454 }
455 else {
456 this.tryPick(stream, metadata, callConfig, []);
457 }
458 }
459 else {
460 stream.cancelWithStatus(callConfig.status, 'Failed to route call to method ' + stream.getMethod());
461 }
462 }
463 }
464 _startCallStream(stream, metadata) {
465 this.tryGetConfig(stream, metadata.clone());
466 }
467 close() {
468 this.resolvingLoadBalancer.destroy();
469 this.updateState(connectivity_state_1.ConnectivityState.SHUTDOWN);
470 clearInterval(this.callRefTimer);
471 if (this.channelzEnabled) {
472 channelz_1.unregisterChannelzRef(this.channelzRef);
473 }
474 this.subchannelPool.unrefUnusedSubchannels();
475 }
476 getTarget() {
477 return uri_parser_1.uriToString(this.target);
478 }
479 getConnectivityState(tryToConnect) {
480 const connectivityState = this.connectivityState;
481 if (tryToConnect) {
482 this.resolvingLoadBalancer.exitIdle();
483 }
484 return connectivityState;
485 }
486 watchConnectivityState(currentState, deadline, callback) {
487 if (this.connectivityState === connectivity_state_1.ConnectivityState.SHUTDOWN) {
488 throw new Error('Channel has been shut down');
489 }
490 let timer = null;
491 if (deadline !== Infinity) {
492 const deadlineDate = deadline instanceof Date ? deadline : new Date(deadline);
493 const now = new Date();
494 if (deadline === -Infinity || deadlineDate <= now) {
495 process.nextTick(callback, new Error('Deadline passed without connectivity state change'));
496 return;
497 }
498 timer = setTimeout(() => {
499 this.removeConnectivityStateWatcher(watcherObject);
500 callback(new Error('Deadline passed without connectivity state change'));
501 }, deadlineDate.getTime() - now.getTime());
502 }
503 const watcherObject = {
504 currentState,
505 callback,
506 timer,
507 };
508 this.connectivityStateWatchers.push(watcherObject);
509 }
510 /**
511 * Get the channelz reference object for this channel. The returned value is
512 * garbage if channelz is disabled for this channel.
513 * @returns
514 */
515 getChannelzRef() {
516 return this.channelzRef;
517 }
518 createCall(method, deadline, host, parentCall, propagateFlags) {
519 if (typeof method !== 'string') {
520 throw new TypeError('Channel#createCall: method must be a string');
521 }
522 if (!(typeof deadline === 'number' || deadline instanceof Date)) {
523 throw new TypeError('Channel#createCall: deadline must be a number or Date');
524 }
525 if (this.connectivityState === connectivity_state_1.ConnectivityState.SHUTDOWN) {
526 throw new Error('Channel has been shut down');
527 }
528 const callNumber = getNewCallNumber();
529 this.trace('createCall [' +
530 callNumber +
531 '] method="' +
532 method +
533 '", deadline=' +
534 deadline);
535 const finalOptions = {
536 deadline: deadline,
537 flags: propagateFlags !== null && propagateFlags !== void 0 ? propagateFlags : constants_1.Propagate.DEFAULTS,
538 host: host !== null && host !== void 0 ? host : this.defaultAuthority,
539 parentCall: parentCall,
540 };
541 const stream = new call_stream_1.Http2CallStream(method, this, finalOptions, this.filterStackFactory, this.credentials._getCallCredentials(), callNumber);
542 if (this.channelzEnabled) {
543 this.callTracker.addCallStarted();
544 stream.addStatusWatcher(status => {
545 if (status.code === constants_1.Status.OK) {
546 this.callTracker.addCallSucceeded();
547 }
548 else {
549 this.callTracker.addCallFailed();
550 }
551 });
552 }
553 return stream;
554 }
555}
556exports.ChannelImplementation = ChannelImplementation;
557//# sourceMappingURL=channel.js.map
\No newline at end of file