UNPKG

9.64 kBTypeScriptView Raw
1/// <reference types="node" />
2/// <reference types="node" />
3import { IAuthPacket, IConnackPacket, IDisconnectPacket, IPublishPacket, ISubscribePacket, Packet, QoS, IConnectPacket } from 'mqtt-packet';
4import { IMessageIdProvider } from './default-message-id-provider';
5import { DuplexOptions } from 'readable-stream';
6import Store, { IStore } from './store';
7import { ClientOptions } from 'ws';
8import { ClientRequestArgs } from 'http';
9import { DoneCallback, ErrorWithReasonCode, IStream, StreamBuilder, TimerVariant, VoidCallback } from './shared';
10import { TypedEventEmitter } from './TypedEmitter';
11import PingTimer from './PingTimer';
12export type MqttProtocol = 'wss' | 'ws' | 'mqtt' | 'mqtts' | 'tcp' | 'ssl' | 'wx' | 'wxs' | 'ali' | 'alis';
13export type StorePutCallback = () => void;
14export interface ISecureClientOptions {
15 key?: string | string[] | Buffer | Buffer[] | any[];
16 keyPath?: string;
17 cert?: string | string[] | Buffer | Buffer[];
18 certPath?: string;
19 ca?: string | string[] | Buffer | Buffer[];
20 caPaths?: string | string[];
21 rejectUnauthorized?: boolean;
22 ALPNProtocols?: string[] | Buffer[] | Uint8Array[] | Buffer | Uint8Array;
23}
24export type AckHandler = (topic: string, message: Buffer, packet: any, cb: (error: Error | number, code?: number) => void) => void;
25export interface IClientOptions extends ISecureClientOptions {
26 encoding?: BufferEncoding;
27 browserBufferSize?: number;
28 binary?: boolean;
29 my?: any;
30 manualConnect?: boolean;
31 authPacket?: Partial<IAuthPacket>;
32 writeCache?: boolean;
33 servername?: string;
34 defaultProtocol?: MqttProtocol;
35 query?: Record<string, string>;
36 auth?: string;
37 customHandleAcks?: AckHandler;
38 port?: number;
39 host?: string;
40 hostname?: string;
41 path?: string;
42 protocol?: MqttProtocol;
43 wsOptions?: ClientOptions | ClientRequestArgs | DuplexOptions;
44 reconnectPeriod?: number;
45 connectTimeout?: number;
46 incomingStore?: IStore;
47 outgoingStore?: IStore;
48 queueQoSZero?: boolean;
49 log?: (...args: any[]) => void;
50 autoUseTopicAlias?: boolean;
51 autoAssignTopicAlias?: boolean;
52 reschedulePings?: boolean;
53 servers?: Array<{
54 host: string;
55 port: number;
56 protocol?: 'wss' | 'ws' | 'mqtt' | 'mqtts' | 'tcp' | 'ssl' | 'wx' | 'wxs';
57 }>;
58 resubscribe?: boolean;
59 transformWsUrl?: (url: string, options: IClientOptions, client: MqttClient) => string;
60 createWebsocket?: (url: string, websocketSubProtocols: string[], options: IClientOptions) => any;
61 messageIdProvider?: IMessageIdProvider;
62 browserBufferTimeout?: number;
63 objectMode?: boolean;
64 clientId?: string;
65 protocolVersion?: IConnectPacket['protocolVersion'];
66 protocolId?: IConnectPacket['protocolId'];
67 clean?: boolean;
68 keepalive?: number;
69 username?: string;
70 password?: Buffer | string;
71 will?: IConnectPacket['will'];
72 properties?: IConnectPacket['properties'];
73 timerVariant?: TimerVariant;
74}
75export interface IClientPublishOptions {
76 qos?: QoS;
77 retain?: boolean;
78 dup?: boolean;
79 properties?: IPublishPacket['properties'];
80 cbStorePut?: StorePutCallback;
81}
82export interface IClientReconnectOptions {
83 incomingStore?: Store;
84 outgoingStore?: Store;
85}
86export interface IClientSubscribeProperties {
87 properties?: ISubscribePacket['properties'];
88}
89export interface IClientSubscribeOptions extends IClientSubscribeProperties {
90 qos: QoS;
91 nl?: boolean;
92 rap?: boolean;
93 rh?: number;
94}
95export interface ISubscriptionRequest extends IClientSubscribeOptions {
96 topic: string;
97}
98export interface ISubscriptionGrant extends Omit<ISubscriptionRequest, 'qos' | 'properties'> {
99 qos: QoS | 128;
100}
101export type ISubscriptionMap = {
102 [topic: string]: IClientSubscribeOptions;
103} & {
104 resubscribe?: boolean;
105};
106export { IConnackPacket, IDisconnectPacket, IPublishPacket, Packet };
107export type OnConnectCallback = (packet: IConnackPacket) => void;
108export type OnDisconnectCallback = (packet: IDisconnectPacket) => void;
109export type ClientSubscribeCallback = (err: Error | null, granted?: ISubscriptionGrant[]) => void;
110export type OnMessageCallback = (topic: string, payload: Buffer, packet: IPublishPacket) => void;
111export type OnPacketCallback = (packet: Packet) => void;
112export type OnCloseCallback = () => void;
113export type OnErrorCallback = (error: Error | ErrorWithReasonCode) => void;
114export type PacketCallback = (error?: Error, packet?: Packet) => any;
115export type CloseCallback = (error?: Error) => void;
116export interface MqttClientEventCallbacks {
117 connect: OnConnectCallback;
118 message: OnMessageCallback;
119 packetsend: OnPacketCallback;
120 packetreceive: OnPacketCallback;
121 disconnect: OnDisconnectCallback;
122 error: OnErrorCallback;
123 close: OnCloseCallback;
124 end: VoidCallback;
125 reconnect: VoidCallback;
126 offline: VoidCallback;
127 outgoingEmpty: VoidCallback;
128}
129export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbacks> {
130 static VERSION: any;
131 connected: boolean;
132 disconnecting: boolean;
133 disconnected: boolean;
134 reconnecting: boolean;
135 incomingStore: IStore;
136 outgoingStore: IStore;
137 options: IClientOptions;
138 queueQoSZero: boolean;
139 _reconnectCount: number;
140 log: (...args: any[]) => void;
141 messageIdProvider: IMessageIdProvider;
142 outgoing: Record<number, {
143 volatile: boolean;
144 cb: (err: Error, packet?: Packet) => void;
145 }>;
146 messageIdToTopic: Record<number, string[]>;
147 noop: (error?: any) => void;
148 pingResp: number;
149 pingTimer: PingTimer;
150 stream: IStream;
151 queue: {
152 packet: Packet;
153 cb: PacketCallback;
154 }[];
155 private streamBuilder;
156 private _resubscribeTopics;
157 private connackTimer;
158 private reconnectTimer;
159 private _storeProcessing;
160 private _packetIdsDuringStoreProcessing;
161 private _storeProcessingQueue;
162 private _firstConnection;
163 private topicAliasRecv;
164 private topicAliasSend;
165 private _deferredReconnect;
166 private connackPacket;
167 static defaultId(): string;
168 constructor(streamBuilder: StreamBuilder, options: IClientOptions);
169 handleAuth(packet: IAuthPacket, callback: PacketCallback): void;
170 handleMessage(packet: IPublishPacket, callback: DoneCallback): void;
171 private _nextId;
172 getLastMessageId(): number;
173 connect(): this;
174 publish(topic: string, message: string | Buffer): MqttClient;
175 publish(topic: string, message: string | Buffer, callback?: PacketCallback): MqttClient;
176 publish(topic: string, message: string | Buffer, opts?: IClientPublishOptions, callback?: PacketCallback): MqttClient;
177 publishAsync(topic: string, message: string | Buffer): Promise<Packet | undefined>;
178 publishAsync(topic: string, message: string | Buffer, opts?: IClientPublishOptions): Promise<Packet | undefined>;
179 subscribe(topicObject: string | string[] | ISubscriptionMap): MqttClient;
180 subscribe(topicObject: string | string[] | ISubscriptionMap, callback?: ClientSubscribeCallback): MqttClient;
181 subscribe(topicObject: string | string[] | ISubscriptionMap, opts?: IClientSubscribeOptions | IClientSubscribeProperties): MqttClient;
182 subscribe(topicObject: string | string[] | ISubscriptionMap, opts?: IClientSubscribeOptions | IClientSubscribeProperties, callback?: ClientSubscribeCallback): MqttClient;
183 subscribeAsync(topicObject: string | string[] | ISubscriptionMap): Promise<ISubscriptionGrant[]>;
184 subscribeAsync(topicObject: string | string[] | ISubscriptionMap, opts?: IClientSubscribeOptions | IClientSubscribeProperties): Promise<ISubscriptionGrant[]>;
185 unsubscribe(topic: string | string[]): MqttClient;
186 unsubscribe(topic: string | string[], opts?: IClientSubscribeOptions): MqttClient;
187 unsubscribe(topic: string | string[], callback?: PacketCallback): MqttClient;
188 unsubscribe(topic: string | string[], opts?: IClientSubscribeOptions, callback?: PacketCallback): MqttClient;
189 unsubscribeAsync(topic: string | string[]): Promise<Packet | undefined>;
190 unsubscribeAsync(topic: string | string[], opts?: IClientSubscribeOptions): Promise<Packet | undefined>;
191 end(cb?: DoneCallback): MqttClient;
192 end(force?: boolean): MqttClient;
193 end(opts?: Partial<IDisconnectPacket>, cb?: DoneCallback): MqttClient;
194 end(force?: boolean, cb?: DoneCallback): MqttClient;
195 end(force?: boolean, opts?: Partial<IDisconnectPacket>, cb?: DoneCallback): MqttClient;
196 endAsync(): Promise<void>;
197 endAsync(force?: boolean): Promise<void>;
198 endAsync(opts?: Partial<IDisconnectPacket>): Promise<void>;
199 endAsync(force?: boolean, opts?: Partial<IDisconnectPacket>): Promise<void>;
200 removeOutgoingMessage(messageId: number): MqttClient;
201 reconnect(opts?: Pick<IClientOptions, 'incomingStore' | 'outgoingStore'>): MqttClient;
202 private _flushVolatile;
203 private _flush;
204 private _removeTopicAliasAndRecoverTopicName;
205 private _checkDisconnecting;
206 private _reconnect;
207 private _setupReconnect;
208 private _clearReconnect;
209 private _cleanUp;
210 private _storeAndSend;
211 private _applyTopicAlias;
212 private _noop;
213 private _writePacket;
214 private _sendPacket;
215 private _storePacket;
216 private _setupPingTimer;
217 private _destroyPingTimer;
218 private _shiftPingInterval;
219 private _reschedulePing;
220 private _checkPing;
221 private _sendPing;
222 private _resubscribe;
223 private _onConnect;
224 private _invokeStoreProcessingQueue;
225 private _invokeAllStoreProcessingQueue;
226 private _flushStoreProcessingQueue;
227 private _removeOutgoingAndStoreMessage;
228}