UNPKG

20.1 kBJavaScriptView Raw
1'use strict';
2
3const JSON2 = require('JSON2');
4const bytes = require('bytes');
5const fmt = require('util').format;
6const ByteBuffer = require('byte');
7const MessageQueue = require('./message_queue');
8const RemotingClient = require('./remoting_client');
9const PullStatus = require('./consumer/pull_status');
10const SendStatus = require('./producer/send_status');
11const RequestCode = require('./protocol/request_code');
12const ResponseCode = require('./protocol/response_code');
13const MessageConst = require('./message/message_const');
14const MessageDecoder = require('./message/message_decoder');
15const RemotingCommand = require('./protocol/command/remoting_command');
16
17// const localIp = require('address').ip();
18// const NAMESPACE_ORDER_TOPIC_CONFIG = 'ORDER_TOPIC_CONFIG';
19const NAMESPACE_PROJECT_CONFIG = 'PROJECT_CONFIG';
20const VIRTUAL_APPGROUP_PREFIX = '%%PROJECT_%s%%';
21
22const byteBuffer = ByteBuffer.allocate(bytes('1m'));
23
24/* eslint no-fallthrough:0 */
25
26class MQClientAPI extends RemotingClient {
27
28 /**
29 * Metaq api wrapper
30 * @param {Object} options
31 * - {HttpClient} httpclient - http request client
32 * - {Object} [logger] - log module
33 * - {Number} [responseTimeout] - tcp response timeout
34 * @class
35 */
36 constructor(options) {
37 super(options);
38
39 // virtual env project group
40 this.projectGroupPrefix = null;
41 }
42
43 /**
44 * start the client
45 * @return {void}
46 */
47 async init() {
48 await super.init();
49 // this.projectGroupPrefix = await this.getProjectGroupByIp(localIp, 3000);
50 }
51
52 /**
53 * get project group prefix by ip address
54 * @param {String} ip - ip address
55 * @param {Number} [timeoutMillis] - timeout in milliseconds
56 * @return {String} prefix
57 */
58 async getProjectGroupByIp(ip, timeoutMillis) {
59 try {
60 return await this.getKVConfigByValue(NAMESPACE_PROJECT_CONFIG, ip, timeoutMillis);
61 } catch (err) {
62 err.message = `[mq:api] Can not get project config from server, ${err.message}`;
63 this.logger.error(err);
64 return null;
65 }
66 }
67
68 /**
69 * get key-value config
70 * @param {String} namespace - config namespace
71 * @param {String} value - config key
72 * @param {Number} [timeoutMillis] - timeout in milliseconds
73 * @return {String} config value
74 */
75 async getKVConfigByValue(namespace, value, timeoutMillis) {
76 const requestHeader = {
77 namespace,
78 key: value,
79 };
80 const request = RemotingCommand.createRequestCommand(RequestCode.GET_KV_CONFIG_BY_VALUE, requestHeader);
81 const response = await this.invokeForNameSrvAtLeastOnce(request, timeoutMillis);
82 switch (response.code) {
83 case ResponseCode.SUCCESS:
84 {
85 const responseHeader = response.decodeCommandCustomHeader();
86 return responseHeader && responseHeader.value;
87 }
88 default:
89 this._defaultHandler(request, response);
90 break;
91 }
92 }
93
94 /**
95 * get route info of topic
96 * @param {String} topic - topic
97 * @param {Number} [timeoutMillis] - timeout in milliseconds
98 * @return {Object} router info
99 */
100 async getDefaultTopicRouteInfoFromNameServer(topic, timeoutMillis) {
101 const requestHeader = {
102 topic,
103 };
104
105 const request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
106 const response = await this.invokeForNameSrvAtLeastOnce(request, timeoutMillis);
107 switch (response.code) {
108 case ResponseCode.SUCCESS:
109 {
110 const body = response.body;
111 if (body) {
112 this.logger.info('[mq:client_api] get Topic [%s] RouteInfoFromNameServer: %s', topic, body.toString());
113 // JSON.parse dose not work here
114 const routerInfoData = JSON2.parse(body.toString());
115 // sort
116 routerInfoData.queueDatas.sort(compare);
117 routerInfoData.brokerDatas.sort(compare);
118 return routerInfoData;
119 }
120 break;
121 }
122 case ResponseCode.TOPIC_NOT_EXIST:
123 this.logger.info('[mq:client_api] get Topic [%s] RouteInfoFromNameServer is not exist value', topic);
124 default:
125 this._defaultHandler(request, response);
126 break;
127 }
128 }
129
130 /**
131 * notify broker that client is offline
132 * @param {String} addr - brokder address
133 * @param {String} clientId - clientId
134 * @param {String} producerGroup - producer group name
135 * @param {String} consumerGroup - consumer group name
136 * @param {Number} [timeoutMillis] - timeout in milliseconds
137 * @return {void}
138 */
139 async unregisterClient(addr, clientId, producerGroup, consumerGroup, timeoutMillis) {
140 producerGroup = this._buildWithProjectGroup(producerGroup);
141 consumerGroup = this._buildWithProjectGroup(consumerGroup);
142
143 const requestHeader = {
144 clientID: clientId,
145 producerGroup,
146 consumerGroup,
147 };
148 const request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, requestHeader);
149 const response = await this.invoke(addr, request, timeoutMillis);
150
151 switch (response.code) {
152 case ResponseCode.SUCCESS:
153 break;
154 default:
155 this._defaultHandler(request, response);
156 break;
157 }
158 }
159
160 /**
161 * get route info from name server
162 * @param {String} topic - topic
163 * @param {Number} [timeoutMillis] - timeout in milliseconds
164 * @return {Object} route info
165 */
166 async getTopicRouteInfoFromNameServer(topic, timeoutMillis) {
167 topic = this._buildWithProjectGroup(topic);
168 const request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, {
169 topic,
170 });
171 const response = await this.invokeForNameSrvAtLeastOnce(request, timeoutMillis);
172 switch (response.code) {
173 case ResponseCode.SUCCESS:
174 {
175 const body = response.body;
176 if (body) {
177 const routerInfoData = JSON2.parse(body.toString());
178 // sort
179 routerInfoData.queueDatas.sort(compare);
180 routerInfoData.brokerDatas.sort(compare);
181 return routerInfoData;
182 }
183 break;
184 }
185 case ResponseCode.TOPIC_NOT_EXIST:
186 this.logger.warn('[mq:client_api] get Topic [%s] RouteInfoFromNameServer is not exist value', topic);
187 default:
188 this._defaultHandler(request, response);
189 break;
190 }
191 }
192
193 /**
194 * send heartbeat
195 * @param {String} addr - broker address
196 * @param {Object} heartbeatData - heartbeat data
197 * @param {Number} [timeout] - timeout in milliseconds
198 * @return {void}
199 */
200 async sendHearbeat(addr, heartbeatData, timeout) {
201 if (this.projectGroupPrefix) {
202 for (const consumerData of heartbeatData.consumerDataSet) {
203 consumerData.groupName = this._buildWithProjectGroup(consumerData.groupName);
204 for (const subscriptionData of consumerData.subscriptionDataSet) {
205 subscriptionData.topic = this._buildWithProjectGroup(subscriptionData.topic);
206 }
207 }
208 for (const producerData of heartbeatData.producerDataSet) {
209 producerData.groupName = this._buildWithProjectGroup(producerData.groupName);
210 }
211 }
212 const request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
213 request.body = Buffer.from(JSON.stringify(heartbeatData));
214 const response = await this.invoke(addr, request, timeout);
215 if (response.code !== ResponseCode.SUCCESS) {
216 this._defaultHandler(request, response);
217 }
218 }
219
220 /**
221 * update consumer offset
222 * @param {String} brokerAddr - broker address
223 * @param {Object} requestHeader - request header
224 * @return {void}
225 */
226 async updateConsumerOffsetOneway(brokerAddr, requestHeader) {
227 requestHeader.consumerGroup = this._buildWithProjectGroup(requestHeader.consumerGroup);
228 requestHeader.topic = this._buildWithProjectGroup(requestHeader.topic);
229
230 const request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);
231 await this.invokeOneway(brokerAddr, request);
232 }
233
234 /**
235 * query consume offset
236 * @param {String} brokerAddr - broker address
237 * @param {Object} requestHeader - request header
238 * @param {Number} [timeoutMillis] - timeout in milliseconds
239 * @return {Number} offset
240 */
241 async queryConsumerOffset(brokerAddr, requestHeader, timeoutMillis) {
242 requestHeader.consumerGroup = this._buildWithProjectGroup(requestHeader.consumerGroup);
243 requestHeader.topic = this._buildWithProjectGroup(requestHeader.topic);
244
245 const request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader);
246 const response = await this.invoke(brokerAddr, request, timeoutMillis);
247 switch (response.code) {
248 case ResponseCode.SUCCESS:
249 {
250 const responseHeader = response.decodeCommandCustomHeader();
251 return Number(responseHeader.offset.toString());
252 }
253 default:
254 this._defaultHandler(request, response);
255 break;
256 }
257 }
258
259 /**
260 * get current max offset of queue
261 * @param {String} addr - broker address
262 * @param {String} topic - topic
263 * @param {Number} queueId - queue id
264 * @param {Number} [timeoutMillis] - timeout in milliseconds
265 * @return {Number} offset
266 */
267 async getMaxOffset(addr, topic, queueId, timeoutMillis) {
268 topic = this._buildWithProjectGroup(topic);
269 const requestHeader = {
270 topic,
271 queueId,
272 };
273 const request = RemotingCommand.createRequestCommand(RequestCode.GET_MAX_OFFSET, requestHeader);
274 const response = await this.invoke(addr, request, timeoutMillis);
275
276 switch (response.code) {
277 case ResponseCode.SUCCESS:
278 {
279 const responseHeader = response.decodeCommandCustomHeader();
280 // todo:
281 return responseHeader && Number(responseHeader.offset);
282 }
283 default:
284 this._defaultHandler(request, response);
285 break;
286 }
287 }
288
289 /**
290 * search consume offset by timestamp
291 * @param {String} addr - broker address
292 * @param {String} topic - topic
293 * @param {Number} queueId - queue id
294 * @param {String} timestamp - timestamp used to query
295 * @param {Number} [timeoutMillis] - timeout in milliseconds
296 * @return {Number} offset
297 */
298 async searchOffset(addr, topic, queueId, timestamp, timeoutMillis) {
299 topic = this._buildWithProjectGroup(topic);
300 const requestHeader = {
301 topic,
302 queueId,
303 timestamp,
304 };
305 const request = RemotingCommand.createRequestCommand(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader);
306 const response = await this.invoke(addr, request, timeoutMillis);
307 switch (response.code) {
308 case ResponseCode.SUCCESS:
309 {
310 const responseHeader = response.decodeCommandCustomHeader();
311 // todo:
312 return responseHeader && Number(responseHeader.offset);
313 }
314 default:
315 this._defaultHandler(request, response);
316 break;
317 }
318 }
319
320 /**
321 * get all consumer's id in same group
322 * @param {String} addr - broker address
323 * @param {String} consumerGroup - consumer group
324 * @param {Number} [timeoutMillis] - timeout in milliseconds
325 * @return {Array} consumer list
326 */
327 async getConsumerIdListByGroup(addr, consumerGroup, timeoutMillis) {
328 consumerGroup = this._buildWithProjectGroup(consumerGroup);
329 const requestHeader = {
330 consumerGroup,
331 };
332 const request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP, requestHeader);
333 const response = await this.invoke(addr, request, timeoutMillis);
334 switch (response.code) {
335 case ResponseCode.SUCCESS:
336 if (response.body) {
337 const body = JSON2.parse(response.body.toString());
338 return body.consumerIdList;
339 }
340 break;
341 default:
342 this._defaultHandler(request, response);
343 break;
344 }
345 }
346
347 /**
348 * pull message from broker
349 * @param {String} brokerAddr - broker address
350 * @param {Object} requestHeader - request header
351 * @param {Number} [timeoutMillis] - timeout in milliseconds
352 * @return {Object} pull result
353 */
354 async pullMessage(brokerAddr, requestHeader, timeoutMillis) {
355 requestHeader.consumerGroup = this._buildWithProjectGroup(requestHeader.consumerGroup);
356 requestHeader.topic = this._buildWithProjectGroup(requestHeader.topic);
357
358 const request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
359 const response = await this.invoke(brokerAddr, request, timeoutMillis);
360 let pullStatus = PullStatus.NO_NEW_MSG;
361 switch (response.code) {
362 case ResponseCode.SUCCESS:
363 pullStatus = PullStatus.FOUND;
364 break;
365 case ResponseCode.PULL_NOT_FOUND:
366 pullStatus = PullStatus.NO_NEW_MSG;
367 break;
368 case ResponseCode.PULL_RETRY_IMMEDIATELY:
369 pullStatus = PullStatus.NO_MATCHED_MSG;
370 break;
371 case ResponseCode.PULL_OFFSET_MOVED:
372 pullStatus = PullStatus.OFFSET_ILLEGAL;
373 break;
374 default:
375 this._defaultHandler(request, response);
376 break;
377 }
378
379 const responseHeader = response.decodeCommandCustomHeader();
380 let msgList = [];
381 if (pullStatus === PullStatus.FOUND) {
382 byteBuffer.reset();
383 byteBuffer.put(response.body).flip();
384 msgList = MessageDecoder.decodes(byteBuffer);
385
386 for (const msg of msgList) {
387 msg.topic = this._clearProjectGroup(msg.topic);
388 msg.properties[MessageConst.PROPERTY_MIN_OFFSET] = responseHeader.minOffset.toString();
389 msg.properties[MessageConst.PROPERTY_MAX_OFFSET] = responseHeader.maxOffset.toString();
390 }
391 }
392
393 return {
394 pullStatus,
395 nextBeginOffset: Number(responseHeader.nextBeginOffset),
396 minOffset: Number(responseHeader.minOffset),
397 maxOffset: Number(responseHeader.maxOffset),
398 msgFoundList: msgList,
399 suggestWhichBrokerId: responseHeader.suggestWhichBrokerId,
400 };
401 }
402
403 /**
404 * create topic
405 * @param {String} addr - broker address
406 * @param {String} defaultTopic - default topic: TBW102
407 * @param {Object} topicConfig - new topic config
408 * @param {Number} [timeoutMillis] - timeout in milliseconds
409 * @return {void}
410 */
411 async createTopic(addr, defaultTopic, topicConfig, timeoutMillis) {
412 const topicWithProjectGroup = this._buildWithProjectGroup(topicConfig.topicName);
413 const requestHeader = {
414 topic: topicWithProjectGroup,
415 defaultTopic,
416 readQueueNums: topicConfig.readQueueNums,
417 writeQueueNums: topicConfig.writeQueueNums,
418 perm: topicConfig.perm,
419 topicFilterType: topicConfig.topicFilterType,
420 topicSysFlag: topicConfig.topicSysFlag,
421 order: topicConfig.order,
422 };
423 const request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
424 const response = await this.invoke(addr, request, timeoutMillis);
425 switch (response.code) {
426 case ResponseCode.SUCCESS:
427 return;
428 default:
429 this._defaultHandler(request, response);
430 break;
431 }
432 }
433
434 /**
435 * send message
436 * @param {String} brokerAddr - broker address
437 * @param {String} brokerName - broker name
438 * @param {Message} msg - msg object
439 * @param {Object} requestHeader - request header
440 * @param {Number} [timeoutMillis] - timeout in milliseconds
441 * @return {Object} sendResult
442 */
443 async sendMessage(brokerAddr, brokerName, msg, requestHeader, timeoutMillis) {
444 msg.topic = this._buildWithProjectGroup(msg.topic);
445 requestHeader.producerGroup = this._buildWithProjectGroup(requestHeader.producerGroup);
446 requestHeader.topic = this._buildWithProjectGroup(requestHeader.topic);
447
448 const requestHeaderV2 = {
449 a: requestHeader.producerGroup,
450 b: requestHeader.topic,
451 c: requestHeader.defaultTopic,
452 d: requestHeader.defaultTopicQueueNums,
453 e: requestHeader.queueId,
454 f: requestHeader.sysFlag,
455 g: requestHeader.bornTimestamp,
456 h: requestHeader.flag,
457 i: requestHeader.properties,
458 j: requestHeader.reconsumeTimes,
459 k: requestHeader.unitMode,
460 l: requestHeader.maxReconsumeTimes,
461 };
462 const request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
463 request.body = msg.body;
464 const response = await this.invoke(brokerAddr, request, timeoutMillis);
465 let sendStatus = SendStatus.SEND_OK;
466 switch (response.code) {
467 case ResponseCode.FLUSH_DISK_TIMEOUT:
468 sendStatus = SendStatus.FLUSH_DISK_TIMEOUT;
469 break;
470 case ResponseCode.FLUSH_SLAVE_TIMEOUT:
471 sendStatus = SendStatus.FLUSH_SLAVE_TIMEOUT;
472 break;
473 case ResponseCode.SLAVE_NOT_AVAILABLE:
474 sendStatus = SendStatus.SLAVE_NOT_AVAILABLE;
475 break;
476 case ResponseCode.SUCCESS:
477 sendStatus = SendStatus.SEND_OK;
478 break;
479 default:
480 this._defaultHandler(request, response);
481 break;
482 }
483 const responseHeader = response.decodeCommandCustomHeader();
484 const messageQueue = new MessageQueue(msg.topic, brokerName, responseHeader.queueId);
485 messageQueue.topic = this._clearProjectGroup(messageQueue.topic);
486
487 return {
488 sendStatus,
489 msgId: responseHeader.msgId,
490 messageQueue,
491 queueOffset: Number(responseHeader.queueOffset),
492 };
493 }
494
495 /**
496 * consumer send message back
497 * @param {String} brokerAddr - broker address
498 * @param {Message} msg - message object
499 * @param {String} consumerGroup - consumer group
500 * @param {Number} delayLevel - delay level
501 * @param {Number} timeoutMillis - timeout in millis
502 * @param {Number} maxConsumeRetryTimes - max retry times
503 */
504 async consumerSendMessageBack(brokerAddr, msg, consumerGroup, delayLevel, timeoutMillis, maxConsumeRetryTimes) {
505 const requestHeader = {
506 offset: msg.commitLogOffset,
507 group: consumerGroup,
508 delayLevel,
509 originMsgId: msg.msgId,
510 originTopic: msg.topic,
511 unitMode: false,
512 maxReconsumeTimes: maxConsumeRetryTimes,
513 };
514 const request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
515 const response = await this.invoke(brokerAddr, request, timeoutMillis);
516 switch (response.code) {
517 case ResponseCode.SUCCESS:
518 return;
519 default:
520 this._defaultHandler(request, response);
521 break;
522 }
523 }
524
525 // * viewMessage(brokerAddr, phyoffset, timeoutMillis) {
526 // const requestHeader = {
527 // offset: phyoffset,
528 // };
529 // const request = RemotingCommand.createRequestCommand(RequestCode.VIEW_MESSAGE_BY_ID, requestHeader);
530 // const response = await this.invoke(brokerAddr, request, timeoutMillis);
531 // switch (response.code) {
532 // case ResponseCode.SUCCESS:
533 // {
534 // const byteBuffer = ByteBuffer.wrap(response.body);
535 // const messageExt = MessageDecoder.decode(byteBuffer);
536 // // 清除虚拟运行环境相关的projectGroupPrefix
537 // if (this.projectGroupPrefix) {
538 // messageExt.topic = this._clearProjectGroup(messageExt.topic, this.projectGroupPrefix);
539 // }
540 // return messageExt;
541 // }
542 // default:
543 // this._defaultHandler(request, response);
544 // break;
545 // }
546 // }
547
548 // default handler
549 _defaultHandler(request, response) {
550 const err = new Error(response.remark);
551 err.name = 'MQClientException';
552 err.code = response.code;
553 throw err;
554 }
555
556 _buildWithProjectGroup(origin) {
557 if (this.projectGroupPrefix) {
558 const prefix = fmt(VIRTUAL_APPGROUP_PREFIX, this.projectGroupPrefix);
559 if (!origin.endsWith(prefix)) {
560 return origin + prefix;
561 }
562 return origin;
563 }
564 return origin;
565 }
566
567 _clearProjectGroup(origin) {
568 const prefix = fmt(VIRTUAL_APPGROUP_PREFIX, this.projectGroupPrefix);
569 if (prefix && origin.endsWith(prefix)) {
570 return origin.slice(0, origin.lastIndexOf(prefix));
571 }
572 return origin;
573 }
574}
575
576module.exports = MQClientAPI;
577
578// Helper
579// ---------------
580function compare(routerA, routerB) {
581 if (routerA.brokerName > routerB.brokerName) {
582 return 1;
583 } else if (routerA.brokerName < routerB.brokerName) {
584 return -1;
585 }
586 return 0;
587}