UNPKG

6.64 kBJavaScriptView Raw
1'use strict';
2
3const assert = require('assert');
4const Base = require('sdk-base');
5const logger = require('./logger');
6const Channel = require('./channel');
7
8const defaultOptions = {
9 logger,
10 responseTimeout: 30000,
11};
12
13class RemotingClient extends Base {
14
15 /**
16 * rocketmq remoting client
17 * @param {Object} options
18 * - {HttpClient} httpclient - http request client
19 * - {Object} [logger] - log module
20 * - {Number} [responseTimeout] - tcp response timeout
21 * @class
22 */
23 constructor(options) {
24 assert(options.onsAddr || options.nameSrv, '[RemotingClient] options.onsAddr or options.nameSrv one of them is required');
25 assert(options.httpclient, '[RemotingClient] options.httpclient is required');
26 super(Object.assign({ initMethod: 'init' }, defaultOptions, options));
27 this._index = 0;
28 this._inited = false;
29 this._namesrvAddrList = [];
30 this._channels = new Map();
31 }
32
33 /**
34 * @property {HttpClient} RemotingClient#httpclient
35 */
36 get httpclient() {
37 return this.options.httpclient;
38 }
39
40 /**
41 * @property {Object} RemotingClient#logger
42 */
43 get logger() {
44 return this.options.logger;
45 }
46
47 /**
48 * @property {Number} RemotingClient#responseTimeout
49 */
50 get responseTimeout() {
51 return this.options.responseTimeout;
52 }
53
54 get unitName() {
55 return this.options.unitName;
56 }
57
58 /**
59 * start the client
60 * @return {void}
61 */
62 async init() {
63 // get name server address at first
64 await this.updateNameServerAddressList();
65 this._inited = true;
66 this.logger.info('[mq:remoting_client] remoting client started');
67 }
68
69 /**
70 * close the client
71 * @return {void}
72 */
73 async close() {
74 if (!this._inited) {
75 return;
76 }
77
78 // wait all channel close
79 await Promise.all(Array.from(this._channels.keys()).map(addr => {
80 return new Promise(resolve => {
81 const channel = this._channels.get(addr);
82 if (channel && channel.isOK) {
83 channel.once('close', resolve);
84 channel.close();
85 } else {
86 resolve();
87 }
88 this._channels.delete(addr);
89 });
90 }));
91
92 this._inited = false;
93 this.emit('close');
94 this.removeAllListeners();
95
96 this.logger.info('[mq:remoting_client] remoting client is closed');
97 }
98
99 /**
100 * default error handler
101 * @param {Error} err - error object
102 * @return {void}
103 */
104 error(err) {
105 this.emit('error', err);
106 }
107
108 async handleClose(addr, channel) {
109 if (this._channels.get(addr) && this._channels.get(addr).clientId === channel.clientId) {
110 this._channels.delete(addr);
111 }
112 await this.updateNameServerAddressListThrottle();
113 }
114
115 async updateNameServerAddressListThrottle() {
116 if (this._updatingNameServerAddressList) {
117 // if it is in the process of updating and there is a new update task
118 // it needs to be recorded at this time and update again at the end to avoid missing the last update.
119 this._needToUpdatingNameServerAddressList = true;
120 return;
121 }
122 this._updatingNameServerAddressList = true;
123 let updateError;
124 try {
125 await this.updateNameServerAddressList();
126 } catch(e) {
127 updateError = e;
128 }
129 this._updatingNameServerAddressList = false;
130 if (this._needToUpdatingNameServerAddressList) {
131 this._needToUpdatingNameServerAddressList = false;
132 this.updateNameServerAddressListThrottle();
133 }
134 if (updateError) {
135 throw updateError;
136 }
137 }
138
139 /**
140 * fetch name server address list
141 * @return {void}
142 */
143 async updateNameServerAddressList() {
144 if (this.options.nameSrv) {
145 this._namesrvAddrList = Array.isArray(this.options.nameSrv) ? this.options.nameSrv : [ this.options.nameSrv ];
146 return;
147 }
148 let url = this.options.onsAddr;
149 if (this.unitName) {
150 url = url + '-' + this.unitName + '?nofix=1';
151 }
152 const ret = await this.httpclient.request(url, {
153 timeout: this.options.connectTimeout || 10000,
154 });
155 if (ret.status === 200) {
156 const addrs = ret.data.toString().trim();
157 const newList = addrs.split(';');
158 if (newList.length) {
159 this._namesrvAddrList = newList;
160 }
161 this.logger.info('[mq:remoting_client] fetch name server addresses successfully, address: %s', addrs);
162 } else {
163 throw new Error('[mq:remoting_client] fetch name server addresses failed, ret.statusCode: ' + ret.status);
164 }
165 }
166
167 /**
168 * invoke command
169 * @param {String} addr - server address
170 * @param {RemotingCommand} command - remoting command
171 * @param {Number} [timeout] - response timeout
172 * @return {Object} response
173 */
174 async invoke(addr, command, timeout) {
175 if (!this._inited) {
176 await this.ready();
177 }
178 return await this.getAndCreateChannel(addr)
179 .invoke(command, timeout || this.responseTimeout);
180 }
181
182 /**
183 * invoke command for namesrv, return success at least once success
184 * @param {RemotingCommand} command - remoting command
185 * @param {Number} [timeout] - response timeout
186 * @return {Object} response
187 */
188 async invokeForNameSrvAtLeastOnce(command, timeout) {
189 let response = {};
190 let count = this._namesrvAddrList.length;
191 let err = null;
192 while (count--) {
193 try {
194 response = await this.invoke(null, command, timeout);
195 // 有一个成功即无需抛 invoke 错误
196 err = null;
197 break;
198 } catch (e) {
199 err = e;
200 this.logger.warn(err);
201 }
202 }
203 if (err) {
204 throw err;
205 }
206 return response;
207 }
208
209 /**
210 * invoke command without response
211 * @param {String} addr - server address
212 * @param {RemotingCommand} command - remoting command
213 * @return {void}
214 */
215 async invokeOneway(addr, command) {
216 if (!this._inited) {
217 await this.ready();
218 }
219 await this.getAndCreateChannel(addr).invokeOneway(command);
220 }
221
222 /**
223 * get request channel from address
224 * @param {String} [addr] - server address
225 * @return {Channel} request channel
226 */
227 getAndCreateChannel(addr) {
228 if (!addr) {
229 this._index = ++this._index % this._namesrvAddrList.length;
230 addr = this._namesrvAddrList[this._index];
231 }
232 let channel = this._channels.get(addr);
233 if (channel && channel.isOK) {
234 return channel;
235 }
236 channel = new Channel(addr, this.options);
237 this._channels.set(addr, channel);
238 channel.once('close', this.handleClose.bind(this, addr, channel));
239 channel.on('error', err => this.error(err));
240 channel.on('request', (request, address) => this.emit('request', request, address));
241 return channel;
242 }
243}
244
245module.exports = RemotingClient;