UNPKG

11.4 kBTypeScriptView Raw
1/// <reference types="node" />
2import { EventEmitter } from 'events';
3import { AsyncOptionalCreatable, Duration, Env } from '@salesforce/kit/lib';
4import { AnyFunction, AnyJson, JsonMap } from '@salesforce/ts-types/lib';
5import { Org } from '../org';
6import { StatusResult } from './client';
7/**
8 * Types for defining extensions.
9 */
10export interface StreamingExtension {
11 /**
12 * Extension for outgoing message.
13 *
14 * @param message The message.
15 * @param callback The callback to invoke after the message is processed.
16 */
17 outgoing?: (message: JsonMap, callback: AnyFunction) => void;
18 /**
19 * Extension for the incoming message.
20 *
21 * @param message The message.
22 * @param callback The callback to invoke after the message is processed.
23 */
24 incoming?: (message: JsonMap, callback: AnyFunction) => void;
25}
26/**
27 * Function type for processing messages
28 */
29export declare type StreamProcessor = (message: JsonMap) => StatusResult;
30/**
31 * Comet client interface. The is to allow for mocking the inner streaming Cometd implementation.
32 * The Faye implementation is used by default but it could be used to adapt another Cometd impl.
33 */
34export declare abstract class CometClient extends EventEmitter {
35 /**
36 * Disable polling features.
37 *
38 * @param label Polling feature label.
39 */
40 abstract disable(label: string): void;
41 /**
42 * Add a custom extension to the underlying client.
43 *
44 * @param extension The json function for the extension.
45 */
46 abstract addExtension(extension: StreamingExtension): void;
47 /**
48 * Sets an http header name/value.
49 *
50 * @param name The header name.
51 * @param value The header value.
52 */
53 abstract setHeader(name: string, value: string): void;
54 /**
55 * handshake with the streaming channel
56 *
57 * @param callback Callback for the handshake when it successfully completes. The handshake should throw
58 * errors when errors are encountered.
59 */
60 abstract handshake(callback: () => void): void;
61 /**
62 * Subscribes to Comet topics. Subscribe should perform a handshake if one hasn't been performed yet.
63 *
64 * @param channel The topic to subscribe to.
65 * @param callback The callback to execute once a message has been received.
66 */
67 abstract subscribe(channel: string, callback: (message: JsonMap) => void): CometSubscription;
68 /**
69 * Method to call to disconnect the client from the server.
70 */
71 abstract disconnect(): void;
72}
73/**
74 * Inner streaming client interface. This implements the Cometd behavior.
75 * Also allows for mocking the functional behavior.
76 */
77export interface StreamingClientIfc {
78 /**
79 * Returns a comet client implementation.
80 *
81 * @param url The target url of the streaming service endpoint.
82 */
83 getCometClient: (url: string) => CometClient;
84 /**
85 * Sets the logger function for the CometClient.
86 *
87 * @param logLine A log message passed to the the assigned function.
88 */
89 setLogger: (logLine: (message: string) => void) => void;
90}
91/**
92 * The subscription object returned from the cometd subscribe object.
93 */
94export interface CometSubscription {
95 callback(callback: () => void): void;
96 errback(callback: (error: Error) => void): void;
97}
98/**
99 * Api wrapper to support Salesforce streaming. The client contains an internal implementation of a cometd specification.
100 *
101 * Salesforce client and timeout information
102 *
103 * Streaming API imposes two timeouts, as supported in the Bayeux protocol.
104 *
105 * Socket timeout: 110 seconds
106 * A client receives events (JSON-formatted HTTP responses) while it waits on a connection. If no events are generated
107 * and the client is still waiting, the connection times out after 110 seconds and the server closes the connection.
108 * Clients should reconnect before two minutes to avoid the connection timeout.
109 *
110 * Reconnect timeout: 40 seconds
111 * After receiving the events, a client needs to reconnect to receive the next set of events. If the reconnection
112 * doesn't happen within 40 seconds, the server expires the subscription and the connection is closed. If this happens,
113 * the client must start again and handshake, subscribe, and connect. Each Streaming API client logs into an instance
114 * and maintains a session. When the client handshakes, connects, or subscribes, the session timeout is restarted. A
115 * client session times out if the client doesn’t reconnect to the server within 40 seconds after receiving a response
116 * (an event, subscribe result, and so on).
117 *
118 * Note that these timeouts apply to the Streaming API client session and not the Salesforce authentication session. If
119 * the client session times out, the authentication session remains active until the organization-specific timeout
120 * policy goes into effect.
121 *
122 * ```
123 * const streamProcessor = (message: JsonMap): StatusResult => {
124 * const payload = ensureJsonMap(message.payload);
125 * const id = ensureString(payload.id);
126 *
127 * if (payload.status !== 'Active') {
128 * return { completed: false };
129 * }
130 *
131 * return {
132 * completed: true,
133 * payload: id
134 * };
135 * };
136 *
137 * const org = await Org.create({});
138 * const options = new StreamingClient.DefaultOptions(org, 'MyPushTopics', streamProcessor);
139 *
140 * const asyncStatusClient = await StreamingClient.create(options);
141 *
142 * await asyncStatusClient.handshake();
143 *
144 * const info: RequestInfo = {
145 * method: 'POST',
146 * url: `${org.getField(OrgFields.INSTANCE_URL)}/SomeService`,
147 * headers: { HEADER: 'HEADER_VALUE'},
148 * body: 'My content'
149 * };
150 *
151 * await asyncStatusClient.subscribe(async () => {
152 * const connection = await org.getConnection();
153 * // Now that we are subscribed, we can initiate the request that will cause the events to start streaming.
154 * const requestResponse: JsonCollection = await connection.request(info);
155 * const id = ensureJsonMap(requestResponse).id;
156 * console.log(`this.id: ${JSON.stringify(ensureString(id), null, 4)}`);
157 * });
158 * ```
159 */
160export declare class StreamingClient extends AsyncOptionalCreatable<StreamingClient.Options> {
161 private readonly targetUrl;
162 private readonly options;
163 private logger;
164 private cometClient;
165 /**
166 * Constructor
167 *
168 * @param options Streaming client options
169 * {@link AsyncCreatable.create}
170 */
171 constructor(options?: StreamingClient.Options);
172 /**
173 * Asynchronous initializer.
174 */
175 init(): Promise<void>;
176 /**
177 * Allows replaying of of Streaming events starting with replayId.
178 *
179 * @param replayId The starting message id to replay from.
180 */
181 replay(replayId: number): void;
182 /**
183 * Provides a convenient way to handshake with the server endpoint before trying to subscribe.
184 */
185 handshake(): Promise<StreamingClient.ConnectionState>;
186 /**
187 * Subscribe to streaming events. When the streaming processor that's set in the options completes execution it
188 * returns a payload in the StatusResult object. The payload is just echoed here for convenience.
189 *
190 * **Throws** *{@link SfdxError}{ name: '{@link StreamingClient.TimeoutErrorType.SUBSCRIBE}'}* When the subscribe timeout occurs.
191 *
192 * @param streamInit This function should call the platform apis that result in streaming updates on push topics.
193 * {@link StatusResult}
194 */
195 subscribe(streamInit?: () => Promise<void>): Promise<AnyJson | void>;
196 /**
197 * Handler for incoming streaming messages.
198 *
199 * @param message The message to process.
200 * @param cb The callback. Failure to call this can cause the internal comet client to hang.
201 */
202 private incoming;
203 private doTimeout;
204 private disconnectClient;
205 private disconnect;
206 /**
207 * Simple inner log wrapper
208 *
209 * @param message The message to log
210 */
211 private log;
212}
213export declare namespace StreamingClient {
214 /**
215 * Options for the StreamingClient
216 *
217 * @interface
218 */
219 interface Options {
220 /**
221 * The org streaming target.
222 */
223 org: Org;
224 /**
225 * The hard timeout that happens with subscribe
226 */
227 subscribeTimeout: Duration;
228 /**
229 * The hard timeout that happens with a handshake.
230 */
231 handshakeTimeout: Duration;
232 /**
233 * The streaming channel aka topic
234 */
235 channel: string;
236 /**
237 * The salesforce api version
238 */
239 apiVersion: string;
240 /**
241 * The function for processing streaming messages
242 */
243 streamProcessor: StreamProcessor;
244 /**
245 * The function for build the inner client impl. Allows for mocking.
246 */
247 streamingImpl: StreamingClientIfc;
248 }
249 /**
250 * Default Streaming Options. Uses Faye as the cometd impl.
251 */
252 class DefaultOptions implements StreamingClient.Options {
253 static readonly SFDX_ENABLE_FAYE_COOKIES_ALLOW_ALL_PATHS = "SFDX_ENABLE_FAYE_REQUEST_RESPONSE_LOGGING";
254 static readonly SFDX_ENABLE_FAYE_REQUEST_RESPONSE_LOGGING = "SFDX_ENABLE_FAYE_REQUEST_RESPONSE_LOGGING";
255 static readonly DEFAULT_SUBSCRIBE_TIMEOUT: Duration;
256 static readonly DEFAULT_HANDSHAKE_TIMEOUT: Duration;
257 apiVersion: string;
258 org: Org;
259 streamProcessor: StreamProcessor;
260 subscribeTimeout: Duration;
261 handshakeTimeout: Duration;
262 channel: string;
263 streamingImpl: StreamingClientIfc;
264 /**
265 * Constructor for DefaultStreamingOptions
266 *
267 * @param org The streaming target org
268 * @param channel The streaming channel or topic. If the topic is a system topic then api 36.0 is used.
269 * System topics are deprecated.
270 * @param streamProcessor The function called that can process streaming messages.
271 * @param envDep
272 * @see {@link StatusResult}
273 */
274 constructor(org: Org, channel: string, streamProcessor: StreamProcessor, envDep?: Env);
275 /**
276 * Setter for the subscribe timeout.
277 *
278 * **Throws** An error if the newTime is less than the default time.
279 *
280 * @param newTime The new subscribe timeout.
281 * {@link DefaultOptions.DEFAULT_SUBSCRIBE_TIMEOUT}
282 */
283 setSubscribeTimeout(newTime: Duration): void;
284 /**
285 * Setter for the handshake timeout.
286 *
287 * **Throws** An error if the newTime is less than the default time.
288 *
289 * @param newTime The new handshake timeout
290 * {@link DefaultOptions.DEFAULT_HANDSHAKE_TIMEOUT}
291 */
292 setHandshakeTimeout(newTime: Duration): void;
293 }
294 /**
295 * Connection state
296 *
297 * @see {@link StreamingClient.handshake}
298 */
299 enum ConnectionState {
300 /**
301 * Used to indicated that the streaming client is connected.
302 */
303 CONNECTED = 0
304 }
305 /**
306 * Indicators to test error names for StreamingTimeouts
307 */
308 enum TimeoutErrorType {
309 /**
310 * To indicate the error occurred on handshake
311 */
312 HANDSHAKE = "genericHandshakeTimeoutMessage",
313 /**
314 * To indicate the error occurred on subscribe
315 */
316 SUBSCRIBE = "genericTimeoutMessage"
317 }
318}