UNPKG

9.03 kBTypeScriptView Raw
1import { AsyncOptionalCreatable, Duration, Env } from '@salesforce/kit/lib';
2import { AnyJson } from '@salesforce/ts-types/lib';
3import { Org } from '../org/org';
4import { CometClient, CometSubscription, Message, StatusResult, StreamingExtension, StreamProcessor } from './types';
5export { CometClient, CometSubscription, Message, StatusResult, StreamingExtension, StreamProcessor };
6/**
7 * Inner streaming client interface. This implements the Cometd behavior.
8 * Also allows for mocking the functional behavior.
9 */
10export interface StreamingClientIfc {
11 /**
12 * Returns a comet client implementation.
13 *
14 * @param url The target url of the streaming service endpoint.
15 */
16 getCometClient: (url: string) => CometClient;
17 /**
18 * Sets the logger function for the CometClient.
19 *
20 * @param logLine A log message passed to the the assigned function.
21 */
22 setLogger: (logLine: (message: string) => void) => void;
23}
24/**
25 * Api wrapper to support Salesforce streaming. The client contains an internal implementation of a cometd specification.
26 *
27 * Salesforce client and timeout information
28 *
29 * Streaming API imposes two timeouts, as supported in the Bayeux protocol.
30 *
31 * Socket timeout: 110 seconds
32 * A client receives events (JSON-formatted HTTP responses) while it waits on a connection. If no events are generated
33 * and the client is still waiting, the connection times out after 110 seconds and the server closes the connection.
34 * Clients should reconnect before two minutes to avoid the connection timeout.
35 *
36 * Reconnect timeout: 40 seconds
37 * After receiving the events, a client needs to reconnect to receive the next set of events. If the reconnection
38 * doesn't happen within 40 seconds, the server expires the subscription and the connection is closed. If this happens,
39 * the client must start again and handshake, subscribe, and connect. Each Streaming API client logs into an instance
40 * and maintains a session. When the client handshakes, connects, or subscribes, the session timeout is restarted. A
41 * client session times out if the client doesn’t reconnect to the server within 40 seconds after receiving a response
42 * (an event, subscribe result, and so on).
43 *
44 * Note that these timeouts apply to the Streaming API client session and not the Salesforce authentication session. If
45 * the client session times out, the authentication session remains active until the organization-specific timeout
46 * policy goes into effect.
47 *
48 * ```
49 * const streamProcessor = (message: JsonMap): StatusResult => {
50 * const payload = ensureJsonMap(message.payload);
51 * const id = ensureString(payload.id);
52 *
53 * if (payload.status !== 'Active') {
54 * return { completed: false };
55 * }
56 *
57 * return {
58 * completed: true,
59 * payload: id
60 * };
61 * };
62 *
63 * const org = await Org.create();
64 * const options = new StreamingClient.DefaultOptions(org, 'MyPushTopics', streamProcessor);
65 *
66 * const asyncStatusClient = await StreamingClient.create(options);
67 *
68 * await asyncStatusClient.handshake();
69 *
70 * const info: RequestInfo = {
71 * method: 'POST',
72 * url: `${org.getField(OrgFields.INSTANCE_URL)}/SomeService`,
73 * headers: { HEADER: 'HEADER_VALUE'},
74 * body: 'My content'
75 * };
76 *
77 * await asyncStatusClient.subscribe(async () => {
78 * const connection = await org.getConnection();
79 * // Now that we are subscribed, we can initiate the request that will cause the events to start streaming.
80 * const requestResponse: JsonCollection = await connection.request(info);
81 * const id = ensureJsonMap(requestResponse).id;
82 * console.log(`this.id: ${JSON.stringify(ensureString(id), null, 4)}`);
83 * });
84 * ```
85 */
86export declare class StreamingClient extends AsyncOptionalCreatable<StreamingClient.Options> {
87 private readonly targetUrl;
88 private readonly options;
89 private logger;
90 private cometClient;
91 /**
92 * Constructor
93 *
94 * @param options Streaming client options
95 * {@link AsyncCreatable.create}
96 */
97 constructor(options?: StreamingClient.Options);
98 /**
99 * Asynchronous initializer.
100 */
101 init(): Promise<void>;
102 /**
103 * Allows replaying of of Streaming events starting with replayId.
104 *
105 * @param replayId The starting message id to replay from.
106 */
107 replay(replayId: number): void;
108 /**
109 * Provides a convenient way to handshake with the server endpoint before trying to subscribe.
110 */
111 handshake(): Promise<StreamingClient.ConnectionState>;
112 /**
113 * Subscribe to streaming events. When the streaming processor that's set in the options completes execution it
114 * returns a payload in the StatusResult object. The payload is just echoed here for convenience.
115 *
116 * **Throws** *{@link SfError}{ name: '{@link StreamingClient.TimeoutErrorType.SUBSCRIBE}'}* When the subscribe timeout occurs.
117 *
118 * @param streamInit This function should call the platform apis that result in streaming updates on push topics.
119 * {@link StatusResult}
120 */
121 subscribe(streamInit?: () => Promise<void>): Promise<AnyJson | void>;
122 /**
123 * Handler for incoming streaming messages.
124 *
125 * @param message The message to process.
126 * @param cb The callback. Failure to call this can cause the internal comet client to hang.
127 */
128 private incoming;
129 private doTimeout;
130 private disconnectClient;
131 private disconnect;
132 /**
133 * Simple inner log wrapper
134 *
135 * @param message The message to log
136 */
137 private log;
138}
139export declare namespace StreamingClient {
140 /**
141 * Options for the StreamingClient
142 *
143 * @interface
144 */
145 interface Options {
146 /**
147 * The org streaming target.
148 */
149 org: Org;
150 /**
151 * The hard timeout that happens with subscribe
152 */
153 subscribeTimeout: Duration;
154 /**
155 * The hard timeout that happens with a handshake.
156 */
157 handshakeTimeout: Duration;
158 /**
159 * The streaming channel aka topic
160 */
161 channel: string;
162 /**
163 * The salesforce api version
164 */
165 apiVersion: string;
166 /**
167 * The function for processing streaming messages
168 */
169 streamProcessor: StreamProcessor;
170 /**
171 * The function for build the inner client impl. Allows for mocking.
172 */
173 streamingImpl: StreamingClientIfc;
174 }
175 /**
176 * Default Streaming Options. Uses Faye as the cometd impl.
177 */
178 class DefaultOptions implements StreamingClient.Options {
179 static readonly SFDX_ENABLE_FAYE_COOKIES_ALLOW_ALL_PATHS = "SFDX_ENABLE_FAYE_REQUEST_RESPONSE_LOGGING";
180 static readonly SFDX_ENABLE_FAYE_REQUEST_RESPONSE_LOGGING = "SFDX_ENABLE_FAYE_REQUEST_RESPONSE_LOGGING";
181 static readonly DEFAULT_SUBSCRIBE_TIMEOUT: Duration;
182 static readonly DEFAULT_HANDSHAKE_TIMEOUT: Duration;
183 apiVersion: string;
184 org: Org;
185 streamProcessor: StreamProcessor;
186 subscribeTimeout: Duration;
187 handshakeTimeout: Duration;
188 channel: string;
189 streamingImpl: StreamingClientIfc;
190 /**
191 * Constructor for DefaultStreamingOptions
192 *
193 * @param org The streaming target org
194 * @param channel The streaming channel or topic. If the topic is a system topic then api 36.0 is used.
195 * System topics are deprecated.
196 * @param streamProcessor The function called that can process streaming messages.
197 * @param envDep
198 * @see {@link StatusResult}
199 */
200 constructor(org: Org, channel: string, streamProcessor: StreamProcessor, envDep?: Env);
201 /**
202 * Setter for the subscribe timeout.
203 *
204 * **Throws** An error if the newTime is less than the default time.
205 *
206 * @param newTime The new subscribe timeout.
207 * {@link DefaultOptions.DEFAULT_SUBSCRIBE_TIMEOUT}
208 */
209 setSubscribeTimeout(newTime: Duration): void;
210 /**
211 * Setter for the handshake timeout.
212 *
213 * **Throws** An error if the newTime is less than the default time.
214 *
215 * @param newTime The new handshake timeout
216 * {@link DefaultOptions.DEFAULT_HANDSHAKE_TIMEOUT}
217 */
218 setHandshakeTimeout(newTime: Duration): void;
219 }
220 /**
221 * Connection state
222 *
223 * @see {@link StreamingClient.handshake}
224 */
225 enum ConnectionState {
226 /**
227 * Used to indicated that the streaming client is connected.
228 */
229 CONNECTED = 0
230 }
231 /**
232 * Indicators to test error names for StreamingTimeouts
233 */
234 enum TimeoutErrorType {
235 /**
236 * To indicate the error occurred on handshake
237 */
238 HANDSHAKE = "GenericHandshakeTimeoutError",
239 /**
240 * To indicate the error occurred on subscribe
241 */
242 SUBSCRIBE = "GenericTimeoutError"
243 }
244}