UNPKG

5.01 kBJavaScriptView Raw
1'use strict';
2
3const assert = require('assert');
4const Base = require('sdk-base');
5const logger = require('./logger');
6const Channel = require('./channel');
7
8const defaultOptions = {
9 logger,
10 responseTimeout: 30000,
11};
12
13class RemotingClient extends Base {
14
15 /**
16 * rocketmq remoting client
17 * @param {Object} options
18 * - {HttpClient} httpclient - http request client
19 * - {Object} [logger] - log module
20 * - {Number} [responseTimeout] - tcp response timeout
21 * @class
22 */
23 constructor(options) {
24 assert(options.onsAddr || options.nameSrv, '[RemotingClient] options.onsAddr or options.nameSrv one of them is required');
25 assert(options.httpclient, '[RemotingClient] options.httpclient is required');
26 super(Object.assign({ initMethod: 'init' }, defaultOptions, options));
27 this._index = 0;
28 this._inited = false;
29 this._namesrvAddrList = [];
30 this._channels = new Map();
31 }
32
33 /**
34 * @property {HttpClient} RemotingClient#httpclient
35 */
36 get httpclient() {
37 return this.options.httpclient;
38 }
39
40 /**
41 * @property {Object} RemotingClient#logger
42 */
43 get logger() {
44 return this.options.logger;
45 }
46
47 /**
48 * @property {Number} RemotingClient#responseTimeout
49 */
50 get responseTimeout() {
51 return this.options.responseTimeout;
52 }
53
54 /**
55 * start the client
56 * @return {void}
57 */
58 async init() {
59 // get name server address at first
60 await this.updateNameServerAddressList();
61 this._inited = true;
62 this.logger.info('[mq:remoting_client] remoting client started');
63 }
64
65 /**
66 * close the client
67 * @return {void}
68 */
69 async close() {
70 if (!this._inited) {
71 return;
72 }
73
74 // wait all channel close
75 await Promise.all(Array.from(this._channels.keys()).map(addr => {
76 return new Promise(resolve => {
77 const channel = this._channels.get(addr);
78 if (channel && channel.isOK) {
79 channel.once('close', resolve);
80 channel.close();
81 } else {
82 resolve();
83 }
84 this._channels.delete(addr);
85 });
86 }));
87
88 this._inited = false;
89 this.emit('close');
90 this.removeAllListeners();
91
92 this.logger.info('[mq:remoting_client] remoting client is closed');
93 }
94
95 /**
96 * default error handler
97 * @param {Error} err - error object
98 * @return {void}
99 */
100 error(err) {
101 this.emit('error', err);
102 }
103
104 async handleClose(addr, channel) {
105 if (this._channels.get(addr) && this._channels.get(addr).clientId === channel.clientId) {
106 this._channels.delete(addr);
107 }
108 // refresh latest server list
109 await this.updateNameServerAddressList();
110 }
111
112 /**
113 * fetch name server address list
114 * @return {void}
115 */
116 async updateNameServerAddressList() {
117 if (this.options.nameSrv) {
118 this._namesrvAddrList = Array.isArray(this.options.nameSrv) ? this.options.nameSrv : [ this.options.nameSrv ];
119 return;
120 }
121 const ret = await this.httpclient.request(this.options.onsAddr, {
122 timeout: this.options.connectTimeout || 10000,
123 });
124 if (ret.status === 200) {
125 const addrs = ret.data.toString().trim();
126 const newList = addrs.split(';');
127 if (newList.length) {
128 this._namesrvAddrList = newList;
129 }
130 this.logger.info('[mq:remoting_client] fetch name server addresses successfully, address: %s', addrs);
131 } else {
132 throw new Error('[mq:remoting_client] fetch name server addresses failed, ret.statusCode: ' + ret.status);
133 }
134 }
135
136 /**
137 * invoke command
138 * @param {String} addr - server address
139 * @param {RemotingCommand} command - remoting command
140 * @param {Number} [timeout] - response timeout
141 * @return {Object} response
142 */
143 async invoke(addr, command, timeout) {
144 if (!this._inited) {
145 await this.ready();
146 }
147 return await this.getAndCreateChannel(addr)
148 .invoke(command, timeout || this.responseTimeout);
149 }
150
151 /**
152 * invoke command without response
153 * @param {String} addr - server address
154 * @param {RemotingCommand} command - remoting command
155 * @return {void}
156 */
157 async invokeOneway(addr, command) {
158 if (!this._inited) {
159 await this.ready();
160 }
161 await this.getAndCreateChannel(addr).invokeOneway(command);
162 }
163
164 /**
165 * get request channel from address
166 * @param {String} [addr] - server address
167 * @return {Channel} request channel
168 */
169 getAndCreateChannel(addr) {
170 if (!addr) {
171 this._index = ++this._index % this._namesrvAddrList.length;
172 addr = this._namesrvAddrList[this._index];
173 }
174 let channel = this._channels.get(addr);
175 if (channel && channel.isOK) {
176 return channel;
177 }
178 channel = new Channel(addr, this.options);
179 this._channels.set(addr, channel);
180 channel.once('close', this.handleClose.bind(this, addr, channel));
181 channel.on('error', err => this.error(err));
182 channel.on('request', (request, address) => this.emit('request', request, address));
183 return channel;
184 }
185}
186
187module.exports = RemotingClient;