UNPKG

20.2 kBJavaScriptView Raw
1'use strict';
2
3const JSON2 = require('JSON2');
4const bytes = require('bytes');
5const fmt = require('util').format;
6const ByteBuffer = require('byte');
7const MessageQueue = require('./message_queue');
8const RemotingClient = require('./remoting_client');
9const PullStatus = require('./consumer/pull_status');
10const SendStatus = require('./producer/send_status');
11const RequestCode = require('./protocol/request_code');
12const ResponseCode = require('./protocol/response_code');
13const MessageConst = require('./message/message_const');
14const MessageDecoder = require('./message/message_decoder');
15const RemotingCommand = require('./protocol/command/remoting_command');
16
17// const localIp = require('address').ip();
18// const NAMESPACE_ORDER_TOPIC_CONFIG = 'ORDER_TOPIC_CONFIG';
19const NAMESPACE_PROJECT_CONFIG = 'PROJECT_CONFIG';
20const VIRTUAL_APPGROUP_PREFIX = '%%PROJECT_%s%%';
21
22const byteBuffer = ByteBuffer.allocate(bytes('1m'));
23
24/* eslint no-fallthrough:0 */
25
26class MQClientAPI extends RemotingClient {
27
28 /**
29 * Metaq api wrapper
30 * @param {Object} options
31 * - {HttpClient} httpclient - http request client
32 * - {Object} [logger] - log module
33 * - {Number} [responseTimeout] - tcp response timeout
34 * @class
35 */
36 constructor(options) {
37 super(options);
38
39 // virtual env project group
40 this.projectGroupPrefix = null;
41 }
42
43 /**
44 * start the client
45 * @return {void}
46 */
47 async init() {
48 await super.init();
49 // this.projectGroupPrefix = await this.getProjectGroupByIp(localIp, 3000);
50 }
51
52 /**
53 * get project group prefix by ip address
54 * @param {String} ip - ip address
55 * @param {Number} [timeoutMillis] - timeout in milliseconds
56 * @return {String} prefix
57 */
58 async getProjectGroupByIp(ip, timeoutMillis) {
59 try {
60 return await this.getKVConfigByValue(NAMESPACE_PROJECT_CONFIG, ip, timeoutMillis);
61 } catch (err) {
62 err.message = `[mq:api] Can not get project config from server, ${err.message}`;
63 this.logger.error(err);
64 return null;
65 }
66 }
67
68 /**
69 * get key-value config
70 * @param {String} namespace - config namespace
71 * @param {String} value - config key
72 * @param {Number} [timeoutMillis] - timeout in milliseconds
73 * @return {String} config value
74 */
75 async getKVConfigByValue(namespace, value, timeoutMillis) {
76 const requestHeader = {
77 namespace,
78 key: value,
79 };
80 const request = RemotingCommand.createRequestCommand(RequestCode.GET_KV_CONFIG_BY_VALUE, requestHeader);
81 const response = await this.invoke(null, request, timeoutMillis);
82 switch (response.code) {
83 case ResponseCode.SUCCESS:
84 {
85 const responseHeader = response.decodeCommandCustomHeader();
86 return responseHeader && responseHeader.value;
87 }
88 default:
89 this._defaultHandler(request, response);
90 break;
91 }
92 }
93
94 /**
95 * get route info of topic
96 * @param {String} topic - topic
97 * @param {Number} [timeoutMillis] - timeout in milliseconds
98 * @return {Object} router info
99 */
100 async getDefaultTopicRouteInfoFromNameServer(topic, timeoutMillis) {
101 const requestHeader = {
102 topic,
103 };
104
105 const request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
106 const response = await this.invoke(null, request, timeoutMillis);
107 switch (response.code) {
108 case ResponseCode.SUCCESS:
109 {
110 const body = response.body;
111 if (body) {
112 this.logger.info('[mq:client_api] get Topic [%s] RouteInfoFromNameServer: %s', topic, body.toString());
113 // JSON.parse dose not work here
114 const routerInfoData = JSON2.parse(body.toString());
115 // sort
116 routerInfoData.queueDatas.sort(compare);
117 routerInfoData.brokerDatas.sort(compare);
118 return routerInfoData;
119 }
120 break;
121 }
122 case ResponseCode.TOPIC_NOT_EXIST:
123 this.logger.info('[mq:client_api] get Topic [%s] RouteInfoFromNameServer is not exist value', topic);
124 default:
125 this._defaultHandler(request, response);
126 break;
127 }
128 }
129
130 /**
131 * notify broker that client is offline
132 * @param {String} addr - brokder address
133 * @param {String} clientId - clientId
134 * @param {String} producerGroup - producer group name
135 * @param {String} consumerGroup - consumer group name
136 * @param {Number} [timeoutMillis] - timeout in milliseconds
137 * @return {void}
138 */
139 async unregisterClient(addr, clientId, producerGroup, consumerGroup, timeoutMillis) {
140 producerGroup = this._buildWithProjectGroup(producerGroup);
141 consumerGroup = this._buildWithProjectGroup(consumerGroup);
142
143 const requestHeader = {
144 clientID: clientId,
145 producerGroup,
146 consumerGroup,
147 };
148 const request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, requestHeader);
149 const response = await this.invoke(addr, request, timeoutMillis);
150
151 switch (response.code) {
152 case ResponseCode.SUCCESS:
153 break;
154 default:
155 this._defaultHandler(request, response);
156 break;
157 }
158 }
159
160 /**
161 * get route info from name server
162 * @param {String} topic - topic
163 * @param {Number} [timeoutMillis] - timeout in milliseconds
164 * @return {Object} route info
165 */
166 async getTopicRouteInfoFromNameServer(topic, timeoutMillis) {
167 topic = this._buildWithProjectGroup(topic);
168 const request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, {
169 topic,
170 });
171 let response = {};
172 let count = this._namesrvAddrList.length;
173 while (count--) {
174 try {
175 response = await this.invoke(null, request, timeoutMillis);
176 break;
177 } catch (err) {
178 this.logger.warn(err);
179 }
180 }
181 switch (response.code) {
182 case ResponseCode.SUCCESS:
183 {
184 const body = response.body;
185 if (body) {
186 const routerInfoData = JSON2.parse(body.toString());
187 // sort
188 routerInfoData.queueDatas.sort(compare);
189 routerInfoData.brokerDatas.sort(compare);
190 return routerInfoData;
191 }
192 break;
193 }
194 case ResponseCode.TOPIC_NOT_EXIST:
195 this.logger.warn('[mq:client_api] get Topic [%s] RouteInfoFromNameServer is not exist value', topic);
196 default:
197 this._defaultHandler(request, response);
198 break;
199 }
200 }
201
202 /**
203 * send heartbeat
204 * @param {String} addr - broker address
205 * @param {Object} heartbeatData - heartbeat data
206 * @param {Number} [timeout] - timeout in milliseconds
207 * @return {void}
208 */
209 async sendHearbeat(addr, heartbeatData, timeout) {
210 if (this.projectGroupPrefix) {
211 for (const consumerData of heartbeatData.consumerDataSet) {
212 consumerData.groupName = this._buildWithProjectGroup(consumerData.groupName);
213 for (const subscriptionData of consumerData.subscriptionDataSet) {
214 subscriptionData.topic = this._buildWithProjectGroup(subscriptionData.topic);
215 }
216 }
217 for (const producerData of heartbeatData.producerDataSet) {
218 producerData.groupName = this._buildWithProjectGroup(producerData.groupName);
219 }
220 }
221 const request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
222 request.body = Buffer.from(JSON.stringify(heartbeatData));
223 const response = await this.invoke(addr, request, timeout);
224 if (response.code !== ResponseCode.SUCCESS) {
225 this._defaultHandler(request, response);
226 }
227 }
228
229 /**
230 * update consumer offset
231 * @param {String} brokerAddr - broker address
232 * @param {Object} requestHeader - request header
233 * @return {void}
234 */
235 async updateConsumerOffsetOneway(brokerAddr, requestHeader) {
236 requestHeader.consumerGroup = this._buildWithProjectGroup(requestHeader.consumerGroup);
237 requestHeader.topic = this._buildWithProjectGroup(requestHeader.topic);
238
239 const request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);
240 await this.invokeOneway(brokerAddr, request);
241 }
242
243 /**
244 * query consume offset
245 * @param {String} brokerAddr - broker address
246 * @param {Object} requestHeader - request header
247 * @param {Number} [timeoutMillis] - timeout in milliseconds
248 * @return {Number} offset
249 */
250 async queryConsumerOffset(brokerAddr, requestHeader, timeoutMillis) {
251 requestHeader.consumerGroup = this._buildWithProjectGroup(requestHeader.consumerGroup);
252 requestHeader.topic = this._buildWithProjectGroup(requestHeader.topic);
253
254 const request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader);
255 const response = await this.invoke(brokerAddr, request, timeoutMillis);
256 switch (response.code) {
257 case ResponseCode.SUCCESS:
258 {
259 const responseHeader = response.decodeCommandCustomHeader();
260 return Number(responseHeader.offset.toString());
261 }
262 default:
263 this._defaultHandler(request, response);
264 break;
265 }
266 }
267
268 /**
269 * get current max offset of queue
270 * @param {String} addr - broker address
271 * @param {String} topic - topic
272 * @param {Number} queueId - queue id
273 * @param {Number} [timeoutMillis] - timeout in milliseconds
274 * @return {Number} offset
275 */
276 async getMaxOffset(addr, topic, queueId, timeoutMillis) {
277 topic = this._buildWithProjectGroup(topic);
278 const requestHeader = {
279 topic,
280 queueId,
281 };
282 const request = RemotingCommand.createRequestCommand(RequestCode.GET_MAX_OFFSET, requestHeader);
283 const response = await this.invoke(addr, request, timeoutMillis);
284
285 switch (response.code) {
286 case ResponseCode.SUCCESS:
287 {
288 const responseHeader = response.decodeCommandCustomHeader();
289 // todo:
290 return responseHeader && Number(responseHeader.offset);
291 }
292 default:
293 this._defaultHandler(request, response);
294 break;
295 }
296 }
297
298 /**
299 * search consume offset by timestamp
300 * @param {String} addr - broker address
301 * @param {String} topic - topic
302 * @param {Number} queueId - queue id
303 * @param {String} timestamp - timestamp used to query
304 * @param {Number} [timeoutMillis] - timeout in milliseconds
305 * @return {Number} offset
306 */
307 async searchOffset(addr, topic, queueId, timestamp, timeoutMillis) {
308 topic = this._buildWithProjectGroup(topic);
309 const requestHeader = {
310 topic,
311 queueId,
312 timestamp,
313 };
314 const request = RemotingCommand.createRequestCommand(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader);
315 const response = await this.invoke(addr, request, timeoutMillis);
316 switch (response.code) {
317 case ResponseCode.SUCCESS:
318 {
319 const responseHeader = response.decodeCommandCustomHeader();
320 // todo:
321 return responseHeader && Number(responseHeader.offset);
322 }
323 default:
324 this._defaultHandler(request, response);
325 break;
326 }
327 }
328
329 /**
330 * get all consumer's id in same group
331 * @param {String} addr - broker address
332 * @param {String} consumerGroup - consumer group
333 * @param {Number} [timeoutMillis] - timeout in milliseconds
334 * @return {Array} consumer list
335 */
336 async getConsumerIdListByGroup(addr, consumerGroup, timeoutMillis) {
337 consumerGroup = this._buildWithProjectGroup(consumerGroup);
338 const requestHeader = {
339 consumerGroup,
340 };
341 const request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP, requestHeader);
342 const response = await this.invoke(addr, request, timeoutMillis);
343 switch (response.code) {
344 case ResponseCode.SUCCESS:
345 if (response.body) {
346 const body = JSON2.parse(response.body.toString());
347 return body.consumerIdList;
348 }
349 break;
350 default:
351 this._defaultHandler(request, response);
352 break;
353 }
354 }
355
356 /**
357 * pull message from broker
358 * @param {String} brokerAddr - broker address
359 * @param {Object} requestHeader - request header
360 * @param {Number} [timeoutMillis] - timeout in milliseconds
361 * @return {Object} pull result
362 */
363 async pullMessage(brokerAddr, requestHeader, timeoutMillis) {
364 requestHeader.consumerGroup = this._buildWithProjectGroup(requestHeader.consumerGroup);
365 requestHeader.topic = this._buildWithProjectGroup(requestHeader.topic);
366
367 const request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
368 const response = await this.invoke(brokerAddr, request, timeoutMillis);
369 let pullStatus = PullStatus.NO_NEW_MSG;
370 switch (response.code) {
371 case ResponseCode.SUCCESS:
372 pullStatus = PullStatus.FOUND;
373 break;
374 case ResponseCode.PULL_NOT_FOUND:
375 pullStatus = PullStatus.NO_NEW_MSG;
376 break;
377 case ResponseCode.PULL_RETRY_IMMEDIATELY:
378 pullStatus = PullStatus.NO_MATCHED_MSG;
379 break;
380 case ResponseCode.PULL_OFFSET_MOVED:
381 pullStatus = PullStatus.OFFSET_ILLEGAL;
382 break;
383 default:
384 this._defaultHandler(request, response);
385 break;
386 }
387
388 const responseHeader = response.decodeCommandCustomHeader();
389 let msgList = [];
390 if (pullStatus === PullStatus.FOUND) {
391 byteBuffer.reset();
392 byteBuffer.put(response.body).flip();
393 msgList = MessageDecoder.decodes(byteBuffer);
394
395 for (const msg of msgList) {
396 msg.topic = this._clearProjectGroup(msg.topic);
397 msg.properties[MessageConst.PROPERTY_MIN_OFFSET] = responseHeader.minOffset.toString();
398 msg.properties[MessageConst.PROPERTY_MAX_OFFSET] = responseHeader.maxOffset.toString();
399 }
400 }
401
402 return {
403 pullStatus,
404 nextBeginOffset: Number(responseHeader.nextBeginOffset),
405 minOffset: Number(responseHeader.minOffset),
406 maxOffset: Number(responseHeader.maxOffset),
407 msgFoundList: msgList,
408 suggestWhichBrokerId: responseHeader.suggestWhichBrokerId,
409 };
410 }
411
412 /**
413 * create topic
414 * @param {String} addr - broker address
415 * @param {String} defaultTopic - default topic: TBW102
416 * @param {Object} topicConfig - new topic config
417 * @param {Number} [timeoutMillis] - timeout in milliseconds
418 * @return {void}
419 */
420 async createTopic(addr, defaultTopic, topicConfig, timeoutMillis) {
421 const topicWithProjectGroup = this._buildWithProjectGroup(topicConfig.topicName);
422 const requestHeader = {
423 topic: topicWithProjectGroup,
424 defaultTopic,
425 readQueueNums: topicConfig.readQueueNums,
426 writeQueueNums: topicConfig.writeQueueNums,
427 perm: topicConfig.perm,
428 topicFilterType: topicConfig.topicFilterType,
429 topicSysFlag: topicConfig.topicSysFlag,
430 order: topicConfig.order,
431 };
432 const request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
433 const response = await this.invoke(addr, request, timeoutMillis);
434 switch (response.code) {
435 case ResponseCode.SUCCESS:
436 return;
437 default:
438 this._defaultHandler(request, response);
439 break;
440 }
441 }
442
443 /**
444 * send message
445 * @param {String} brokerAddr - broker address
446 * @param {String} brokerName - broker name
447 * @param {Message} msg - msg object
448 * @param {Object} requestHeader - request header
449 * @param {Number} [timeoutMillis] - timeout in milliseconds
450 * @return {Object} sendResult
451 */
452 async sendMessage(brokerAddr, brokerName, msg, requestHeader, timeoutMillis) {
453 msg.topic = this._buildWithProjectGroup(msg.topic);
454 requestHeader.producerGroup = this._buildWithProjectGroup(requestHeader.producerGroup);
455 requestHeader.topic = this._buildWithProjectGroup(requestHeader.topic);
456
457 const requestHeaderV2 = {
458 a: requestHeader.producerGroup,
459 b: requestHeader.topic,
460 c: requestHeader.defaultTopic,
461 d: requestHeader.defaultTopicQueueNums,
462 e: requestHeader.queueId,
463 f: requestHeader.sysFlag,
464 g: requestHeader.bornTimestamp,
465 h: requestHeader.flag,
466 i: requestHeader.properties,
467 j: requestHeader.reconsumeTimes,
468 k: requestHeader.unitMode,
469 l: requestHeader.maxReconsumeTimes,
470 };
471 const request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
472 request.body = msg.body;
473 const response = await this.invoke(brokerAddr, request, timeoutMillis);
474 let sendStatus = SendStatus.SEND_OK;
475 switch (response.code) {
476 case ResponseCode.FLUSH_DISK_TIMEOUT:
477 sendStatus = SendStatus.FLUSH_DISK_TIMEOUT;
478 break;
479 case ResponseCode.FLUSH_SLAVE_TIMEOUT:
480 sendStatus = SendStatus.FLUSH_SLAVE_TIMEOUT;
481 break;
482 case ResponseCode.SLAVE_NOT_AVAILABLE:
483 sendStatus = SendStatus.SLAVE_NOT_AVAILABLE;
484 break;
485 case ResponseCode.SUCCESS:
486 sendStatus = SendStatus.SEND_OK;
487 break;
488 default:
489 this._defaultHandler(request, response);
490 break;
491 }
492 const responseHeader = response.decodeCommandCustomHeader();
493 const messageQueue = new MessageQueue(msg.topic, brokerName, responseHeader.queueId);
494 messageQueue.topic = this._clearProjectGroup(messageQueue.topic);
495
496 return {
497 sendStatus,
498 msgId: responseHeader.msgId,
499 messageQueue,
500 queueOffset: Number(responseHeader.queueOffset),
501 };
502 }
503
504 /**
505 * consumer send message back
506 * @param {String} brokerAddr - broker address
507 * @param {Message} msg - message object
508 * @param {String} consumerGroup - consumer group
509 * @param {Number} delayLevel - delay level
510 * @param {Number} timeoutMillis - timeout in millis
511 * @param {Number} maxConsumeRetryTimes - max retry times
512 */
513 async consumerSendMessageBack(brokerAddr, msg, consumerGroup, delayLevel, timeoutMillis, maxConsumeRetryTimes) {
514 const requestHeader = {
515 offset: msg.commitLogOffset,
516 group: consumerGroup,
517 delayLevel,
518 originMsgId: msg.msgId,
519 originTopic: msg.topic,
520 unitMode: false,
521 maxReconsumeTimes: maxConsumeRetryTimes,
522 };
523 const request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
524 const response = await this.invoke(brokerAddr, request, timeoutMillis);
525 switch (response.code) {
526 case ResponseCode.SUCCESS:
527 return;
528 default:
529 this._defaultHandler(request, response);
530 break;
531 }
532 }
533
534 // * viewMessage(brokerAddr, phyoffset, timeoutMillis) {
535 // const requestHeader = {
536 // offset: phyoffset,
537 // };
538 // const request = RemotingCommand.createRequestCommand(RequestCode.VIEW_MESSAGE_BY_ID, requestHeader);
539 // const response = await this.invoke(brokerAddr, request, timeoutMillis);
540 // switch (response.code) {
541 // case ResponseCode.SUCCESS:
542 // {
543 // const byteBuffer = ByteBuffer.wrap(response.body);
544 // const messageExt = MessageDecoder.decode(byteBuffer);
545 // // 清除虚拟运行环境相关的projectGroupPrefix
546 // if (this.projectGroupPrefix) {
547 // messageExt.topic = this._clearProjectGroup(messageExt.topic, this.projectGroupPrefix);
548 // }
549 // return messageExt;
550 // }
551 // default:
552 // this._defaultHandler(request, response);
553 // break;
554 // }
555 // }
556
557 // default handler
558 _defaultHandler(request, response) {
559 const err = new Error(response.remark);
560 err.name = 'MQClientException';
561 err.code = response.code;
562 throw err;
563 }
564
565 _buildWithProjectGroup(origin) {
566 if (this.projectGroupPrefix) {
567 const prefix = fmt(VIRTUAL_APPGROUP_PREFIX, this.projectGroupPrefix);
568 if (!origin.endsWith(prefix)) {
569 return origin + prefix;
570 }
571 return origin;
572 }
573 return origin;
574 }
575
576 _clearProjectGroup(origin) {
577 const prefix = fmt(VIRTUAL_APPGROUP_PREFIX, this.projectGroupPrefix);
578 if (prefix && origin.endsWith(prefix)) {
579 return origin.slice(0, origin.lastIndexOf(prefix));
580 }
581 return origin;
582 }
583}
584
585module.exports = MQClientAPI;
586
587// Helper
588// ---------------
589function compare(routerA, routerB) {
590 if (routerA.brokerName > routerB.brokerName) {
591 return 1;
592 } else if (routerA.brokerName < routerB.brokerName) {
593 return -1;
594 }
595 return 0;
596}