1 | 'use strict';
|
2 |
|
3 | const JSON2 = require('JSON2');
|
4 | const bytes = require('bytes');
|
5 | const fmt = require('util').format;
|
6 | const ByteBuffer = require('byte');
|
7 | const MessageQueue = require('./message_queue');
|
8 | const RemotingClient = require('./remoting_client');
|
9 | const PullStatus = require('./consumer/pull_status');
|
10 | const SendStatus = require('./producer/send_status');
|
11 | const RequestCode = require('./protocol/request_code');
|
12 | const ResponseCode = require('./protocol/response_code');
|
13 | const MessageConst = require('./message/message_const');
|
14 | const MessageDecoder = require('./message/message_decoder');
|
15 | const RemotingCommand = require('./protocol/command/remoting_command');
|
16 |
|
17 |
|
18 |
|
19 | const NAMESPACE_PROJECT_CONFIG = 'PROJECT_CONFIG';
|
20 | const VIRTUAL_APPGROUP_PREFIX = '%%PROJECT_%s%%';
|
21 |
|
22 | const byteBuffer = ByteBuffer.allocate(bytes('1m'));
|
23 |
|
24 |
|
25 |
|
26 | class MQClientAPI extends RemotingClient {
|
27 |
|
28 | |
29 |
|
30 |
|
31 |
|
32 |
|
33 |
|
34 |
|
35 |
|
36 | constructor(options) {
|
37 | super(options);
|
38 |
|
39 |
|
40 | this.projectGroupPrefix = null;
|
41 | }
|
42 |
|
43 | |
44 |
|
45 |
|
46 |
|
47 | async init() {
|
48 | await super.init();
|
49 |
|
50 | }
|
51 |
|
52 | |
53 |
|
54 |
|
55 |
|
56 |
|
57 |
|
58 | async getProjectGroupByIp(ip, timeoutMillis) {
|
59 | try {
|
60 | return await this.getKVConfigByValue(NAMESPACE_PROJECT_CONFIG, ip, timeoutMillis);
|
61 | } catch (err) {
|
62 | err.message = `[mq:api] Can not get project config from server, ${err.message}`;
|
63 | this.logger.error(err);
|
64 | return null;
|
65 | }
|
66 | }
|
67 |
|
68 | |
69 |
|
70 |
|
71 |
|
72 |
|
73 |
|
74 |
|
75 | async getKVConfigByValue(namespace, value, timeoutMillis) {
|
76 | const requestHeader = {
|
77 | namespace,
|
78 | key: value,
|
79 | };
|
80 | const request = RemotingCommand.createRequestCommand(RequestCode.GET_KV_CONFIG_BY_VALUE, requestHeader);
|
81 | const response = await this.invokeForNameSrvAtLeastOnce(request, timeoutMillis);
|
82 | switch (response.code) {
|
83 | case ResponseCode.SUCCESS:
|
84 | {
|
85 | const responseHeader = response.decodeCommandCustomHeader();
|
86 | return responseHeader && responseHeader.value;
|
87 | }
|
88 | default:
|
89 | this._defaultHandler(request, response);
|
90 | break;
|
91 | }
|
92 | }
|
93 |
|
94 | |
95 |
|
96 |
|
97 |
|
98 |
|
99 |
|
100 | async getDefaultTopicRouteInfoFromNameServer(topic, timeoutMillis) {
|
101 | const requestHeader = {
|
102 | topic,
|
103 | };
|
104 |
|
105 | const request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
|
106 | const response = await this.invokeForNameSrvAtLeastOnce(request, timeoutMillis);
|
107 | switch (response.code) {
|
108 | case ResponseCode.SUCCESS:
|
109 | {
|
110 | const body = response.body;
|
111 | if (body) {
|
112 | this.logger.info('[mq:client_api] get Topic [%s] RouteInfoFromNameServer: %s', topic, body.toString());
|
113 |
|
114 | const routerInfoData = JSON2.parse(body.toString());
|
115 |
|
116 | routerInfoData.queueDatas.sort(compare);
|
117 | routerInfoData.brokerDatas.sort(compare);
|
118 | return routerInfoData;
|
119 | }
|
120 | break;
|
121 | }
|
122 | case ResponseCode.TOPIC_NOT_EXIST:
|
123 | this.logger.info('[mq:client_api] get Topic [%s] RouteInfoFromNameServer is not exist value', topic);
|
124 | default:
|
125 | this._defaultHandler(request, response);
|
126 | break;
|
127 | }
|
128 | }
|
129 |
|
130 | |
131 |
|
132 |
|
133 |
|
134 |
|
135 |
|
136 |
|
137 |
|
138 |
|
139 | async unregisterClient(addr, clientId, producerGroup, consumerGroup, timeoutMillis) {
|
140 | producerGroup = this._buildWithProjectGroup(producerGroup);
|
141 | consumerGroup = this._buildWithProjectGroup(consumerGroup);
|
142 |
|
143 | const requestHeader = {
|
144 | clientID: clientId,
|
145 | producerGroup,
|
146 | consumerGroup,
|
147 | };
|
148 | const request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, requestHeader);
|
149 | const response = await this.invoke(addr, request, timeoutMillis);
|
150 |
|
151 | switch (response.code) {
|
152 | case ResponseCode.SUCCESS:
|
153 | break;
|
154 | default:
|
155 | this._defaultHandler(request, response);
|
156 | break;
|
157 | }
|
158 | }
|
159 |
|
160 | |
161 |
|
162 |
|
163 |
|
164 |
|
165 |
|
166 | async getTopicRouteInfoFromNameServer(topic, timeoutMillis) {
|
167 | topic = this._buildWithProjectGroup(topic);
|
168 | const request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, {
|
169 | topic,
|
170 | });
|
171 | const response = await this.invokeForNameSrvAtLeastOnce(request, timeoutMillis);
|
172 | switch (response.code) {
|
173 | case ResponseCode.SUCCESS:
|
174 | {
|
175 | const body = response.body;
|
176 | if (body) {
|
177 | const routerInfoData = JSON2.parse(body.toString());
|
178 |
|
179 | routerInfoData.queueDatas.sort(compare);
|
180 | routerInfoData.brokerDatas.sort(compare);
|
181 | return routerInfoData;
|
182 | }
|
183 | break;
|
184 | }
|
185 | case ResponseCode.TOPIC_NOT_EXIST:
|
186 | this.logger.warn('[mq:client_api] get Topic [%s] RouteInfoFromNameServer is not exist value', topic);
|
187 | default:
|
188 | this._defaultHandler(request, response);
|
189 | break;
|
190 | }
|
191 | }
|
192 |
|
193 | |
194 |
|
195 |
|
196 |
|
197 |
|
198 |
|
199 |
|
200 | async sendHearbeat(addr, heartbeatData, timeout) {
|
201 | if (this.projectGroupPrefix) {
|
202 | for (const consumerData of heartbeatData.consumerDataSet) {
|
203 | consumerData.groupName = this._buildWithProjectGroup(consumerData.groupName);
|
204 | for (const subscriptionData of consumerData.subscriptionDataSet) {
|
205 | subscriptionData.topic = this._buildWithProjectGroup(subscriptionData.topic);
|
206 | }
|
207 | }
|
208 | for (const producerData of heartbeatData.producerDataSet) {
|
209 | producerData.groupName = this._buildWithProjectGroup(producerData.groupName);
|
210 | }
|
211 | }
|
212 | const request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
|
213 | request.body = Buffer.from(JSON.stringify(heartbeatData));
|
214 | const response = await this.invoke(addr, request, timeout);
|
215 | if (response.code !== ResponseCode.SUCCESS) {
|
216 | this._defaultHandler(request, response);
|
217 | }
|
218 | }
|
219 |
|
220 | |
221 |
|
222 |
|
223 |
|
224 |
|
225 |
|
226 | async updateConsumerOffsetOneway(brokerAddr, requestHeader) {
|
227 | requestHeader.consumerGroup = this._buildWithProjectGroup(requestHeader.consumerGroup);
|
228 | requestHeader.topic = this._buildWithProjectGroup(requestHeader.topic);
|
229 |
|
230 | const request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);
|
231 | await this.invokeOneway(brokerAddr, request);
|
232 | }
|
233 |
|
234 | |
235 |
|
236 |
|
237 |
|
238 |
|
239 |
|
240 |
|
241 | async queryConsumerOffset(brokerAddr, requestHeader, timeoutMillis) {
|
242 | requestHeader.consumerGroup = this._buildWithProjectGroup(requestHeader.consumerGroup);
|
243 | requestHeader.topic = this._buildWithProjectGroup(requestHeader.topic);
|
244 |
|
245 | const request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader);
|
246 | const response = await this.invoke(brokerAddr, request, timeoutMillis);
|
247 | switch (response.code) {
|
248 | case ResponseCode.SUCCESS:
|
249 | {
|
250 | const responseHeader = response.decodeCommandCustomHeader();
|
251 | return Number(responseHeader.offset.toString());
|
252 | }
|
253 | default:
|
254 | this._defaultHandler(request, response);
|
255 | break;
|
256 | }
|
257 | }
|
258 |
|
259 | |
260 |
|
261 |
|
262 |
|
263 |
|
264 |
|
265 |
|
266 |
|
267 | async getMaxOffset(addr, topic, queueId, timeoutMillis) {
|
268 | topic = this._buildWithProjectGroup(topic);
|
269 | const requestHeader = {
|
270 | topic,
|
271 | queueId,
|
272 | };
|
273 | const request = RemotingCommand.createRequestCommand(RequestCode.GET_MAX_OFFSET, requestHeader);
|
274 | const response = await this.invoke(addr, request, timeoutMillis);
|
275 |
|
276 | switch (response.code) {
|
277 | case ResponseCode.SUCCESS:
|
278 | {
|
279 | const responseHeader = response.decodeCommandCustomHeader();
|
280 |
|
281 | return responseHeader && Number(responseHeader.offset);
|
282 | }
|
283 | default:
|
284 | this._defaultHandler(request, response);
|
285 | break;
|
286 | }
|
287 | }
|
288 |
|
289 | |
290 |
|
291 |
|
292 |
|
293 |
|
294 |
|
295 |
|
296 |
|
297 |
|
298 | async searchOffset(addr, topic, queueId, timestamp, timeoutMillis) {
|
299 | topic = this._buildWithProjectGroup(topic);
|
300 | const requestHeader = {
|
301 | topic,
|
302 | queueId,
|
303 | timestamp,
|
304 | };
|
305 | const request = RemotingCommand.createRequestCommand(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader);
|
306 | const response = await this.invoke(addr, request, timeoutMillis);
|
307 | switch (response.code) {
|
308 | case ResponseCode.SUCCESS:
|
309 | {
|
310 | const responseHeader = response.decodeCommandCustomHeader();
|
311 |
|
312 | return responseHeader && Number(responseHeader.offset);
|
313 | }
|
314 | default:
|
315 | this._defaultHandler(request, response);
|
316 | break;
|
317 | }
|
318 | }
|
319 |
|
320 | |
321 |
|
322 |
|
323 |
|
324 |
|
325 |
|
326 |
|
327 | async getConsumerIdListByGroup(addr, consumerGroup, timeoutMillis) {
|
328 | consumerGroup = this._buildWithProjectGroup(consumerGroup);
|
329 | const requestHeader = {
|
330 | consumerGroup,
|
331 | };
|
332 | const request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP, requestHeader);
|
333 | const response = await this.invoke(addr, request, timeoutMillis);
|
334 | switch (response.code) {
|
335 | case ResponseCode.SUCCESS:
|
336 | if (response.body) {
|
337 | const body = JSON2.parse(response.body.toString());
|
338 | return body.consumerIdList;
|
339 | }
|
340 | break;
|
341 | default:
|
342 | this._defaultHandler(request, response);
|
343 | break;
|
344 | }
|
345 | }
|
346 |
|
347 | |
348 |
|
349 |
|
350 |
|
351 |
|
352 |
|
353 |
|
354 | async pullMessage(brokerAddr, requestHeader, timeoutMillis) {
|
355 | requestHeader.consumerGroup = this._buildWithProjectGroup(requestHeader.consumerGroup);
|
356 | requestHeader.topic = this._buildWithProjectGroup(requestHeader.topic);
|
357 |
|
358 | const request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
|
359 | const response = await this.invoke(brokerAddr, request, timeoutMillis);
|
360 | let pullStatus = PullStatus.NO_NEW_MSG;
|
361 | switch (response.code) {
|
362 | case ResponseCode.SUCCESS:
|
363 | pullStatus = PullStatus.FOUND;
|
364 | break;
|
365 | case ResponseCode.PULL_NOT_FOUND:
|
366 | pullStatus = PullStatus.NO_NEW_MSG;
|
367 | break;
|
368 | case ResponseCode.PULL_RETRY_IMMEDIATELY:
|
369 | pullStatus = PullStatus.NO_MATCHED_MSG;
|
370 | break;
|
371 | case ResponseCode.PULL_OFFSET_MOVED:
|
372 | pullStatus = PullStatus.OFFSET_ILLEGAL;
|
373 | break;
|
374 | default:
|
375 | this._defaultHandler(request, response);
|
376 | break;
|
377 | }
|
378 |
|
379 | const responseHeader = response.decodeCommandCustomHeader();
|
380 | let msgList = [];
|
381 | if (pullStatus === PullStatus.FOUND) {
|
382 | byteBuffer.reset();
|
383 | byteBuffer.put(response.body).flip();
|
384 | msgList = MessageDecoder.decodes(byteBuffer);
|
385 |
|
386 | for (const msg of msgList) {
|
387 | msg.topic = this._clearProjectGroup(msg.topic);
|
388 | msg.properties[MessageConst.PROPERTY_MIN_OFFSET] = responseHeader.minOffset.toString();
|
389 | msg.properties[MessageConst.PROPERTY_MAX_OFFSET] = responseHeader.maxOffset.toString();
|
390 | }
|
391 | }
|
392 |
|
393 | return {
|
394 | pullStatus,
|
395 | nextBeginOffset: Number(responseHeader.nextBeginOffset),
|
396 | minOffset: Number(responseHeader.minOffset),
|
397 | maxOffset: Number(responseHeader.maxOffset),
|
398 | msgFoundList: msgList,
|
399 | suggestWhichBrokerId: responseHeader.suggestWhichBrokerId,
|
400 | };
|
401 | }
|
402 |
|
403 | |
404 |
|
405 |
|
406 |
|
407 |
|
408 |
|
409 |
|
410 |
|
411 | async createTopic(addr, defaultTopic, topicConfig, timeoutMillis) {
|
412 | const topicWithProjectGroup = this._buildWithProjectGroup(topicConfig.topicName);
|
413 | const requestHeader = {
|
414 | topic: topicWithProjectGroup,
|
415 | defaultTopic,
|
416 | readQueueNums: topicConfig.readQueueNums,
|
417 | writeQueueNums: topicConfig.writeQueueNums,
|
418 | perm: topicConfig.perm,
|
419 | topicFilterType: topicConfig.topicFilterType,
|
420 | topicSysFlag: topicConfig.topicSysFlag,
|
421 | order: topicConfig.order,
|
422 | };
|
423 | const request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
|
424 | const response = await this.invoke(addr, request, timeoutMillis);
|
425 | switch (response.code) {
|
426 | case ResponseCode.SUCCESS:
|
427 | return;
|
428 | default:
|
429 | this._defaultHandler(request, response);
|
430 | break;
|
431 | }
|
432 | }
|
433 |
|
434 | |
435 |
|
436 |
|
437 |
|
438 |
|
439 |
|
440 |
|
441 |
|
442 |
|
443 | async sendMessage(brokerAddr, brokerName, msg, requestHeader, timeoutMillis) {
|
444 | msg.topic = this._buildWithProjectGroup(msg.topic);
|
445 | requestHeader.producerGroup = this._buildWithProjectGroup(requestHeader.producerGroup);
|
446 | requestHeader.topic = this._buildWithProjectGroup(requestHeader.topic);
|
447 |
|
448 | const requestHeaderV2 = {
|
449 | a: requestHeader.producerGroup,
|
450 | b: requestHeader.topic,
|
451 | c: requestHeader.defaultTopic,
|
452 | d: requestHeader.defaultTopicQueueNums,
|
453 | e: requestHeader.queueId,
|
454 | f: requestHeader.sysFlag,
|
455 | g: requestHeader.bornTimestamp,
|
456 | h: requestHeader.flag,
|
457 | i: requestHeader.properties,
|
458 | j: requestHeader.reconsumeTimes,
|
459 | k: requestHeader.unitMode,
|
460 | l: requestHeader.maxReconsumeTimes,
|
461 | };
|
462 | const request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
|
463 | request.body = msg.body;
|
464 | const response = await this.invoke(brokerAddr, request, timeoutMillis);
|
465 | let sendStatus = SendStatus.SEND_OK;
|
466 | switch (response.code) {
|
467 | case ResponseCode.FLUSH_DISK_TIMEOUT:
|
468 | sendStatus = SendStatus.FLUSH_DISK_TIMEOUT;
|
469 | break;
|
470 | case ResponseCode.FLUSH_SLAVE_TIMEOUT:
|
471 | sendStatus = SendStatus.FLUSH_SLAVE_TIMEOUT;
|
472 | break;
|
473 | case ResponseCode.SLAVE_NOT_AVAILABLE:
|
474 | sendStatus = SendStatus.SLAVE_NOT_AVAILABLE;
|
475 | break;
|
476 | case ResponseCode.SUCCESS:
|
477 | sendStatus = SendStatus.SEND_OK;
|
478 | break;
|
479 | default:
|
480 | this._defaultHandler(request, response);
|
481 | break;
|
482 | }
|
483 | const responseHeader = response.decodeCommandCustomHeader();
|
484 | const messageQueue = new MessageQueue(msg.topic, brokerName, responseHeader.queueId);
|
485 | messageQueue.topic = this._clearProjectGroup(messageQueue.topic);
|
486 |
|
487 | return {
|
488 | sendStatus,
|
489 | msgId: responseHeader.msgId,
|
490 | messageQueue,
|
491 | queueOffset: Number(responseHeader.queueOffset),
|
492 | };
|
493 | }
|
494 |
|
495 | |
496 |
|
497 |
|
498 |
|
499 |
|
500 |
|
501 |
|
502 |
|
503 |
|
504 | async consumerSendMessageBack(brokerAddr, msg, consumerGroup, delayLevel, timeoutMillis, maxConsumeRetryTimes) {
|
505 | const requestHeader = {
|
506 | offset: msg.commitLogOffset,
|
507 | group: consumerGroup,
|
508 | delayLevel,
|
509 | originMsgId: msg.msgId,
|
510 | originTopic: msg.topic,
|
511 | unitMode: false,
|
512 | maxReconsumeTimes: maxConsumeRetryTimes,
|
513 | };
|
514 | const request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
|
515 | const response = await this.invoke(brokerAddr, request, timeoutMillis);
|
516 | switch (response.code) {
|
517 | case ResponseCode.SUCCESS:
|
518 | return;
|
519 | default:
|
520 | this._defaultHandler(request, response);
|
521 | break;
|
522 | }
|
523 | }
|
524 |
|
525 |
|
526 |
|
527 |
|
528 |
|
529 |
|
530 |
|
531 |
|
532 |
|
533 |
|
534 |
|
535 |
|
536 |
|
537 |
|
538 |
|
539 |
|
540 |
|
541 |
|
542 |
|
543 |
|
544 |
|
545 |
|
546 |
|
547 |
|
548 |
|
549 | _defaultHandler(request, response) {
|
550 | const err = new Error(response.remark);
|
551 | err.name = 'MQClientException';
|
552 | err.code = response.code;
|
553 | throw err;
|
554 | }
|
555 |
|
556 | _buildWithProjectGroup(origin) {
|
557 | if (this.projectGroupPrefix) {
|
558 | const prefix = fmt(VIRTUAL_APPGROUP_PREFIX, this.projectGroupPrefix);
|
559 | if (!origin.endsWith(prefix)) {
|
560 | return origin + prefix;
|
561 | }
|
562 | return origin;
|
563 | }
|
564 | return origin;
|
565 | }
|
566 |
|
567 | _clearProjectGroup(origin) {
|
568 | const prefix = fmt(VIRTUAL_APPGROUP_PREFIX, this.projectGroupPrefix);
|
569 | if (prefix && origin.endsWith(prefix)) {
|
570 | return origin.slice(0, origin.lastIndexOf(prefix));
|
571 | }
|
572 | return origin;
|
573 | }
|
574 | }
|
575 |
|
576 | module.exports = MQClientAPI;
|
577 |
|
578 |
|
579 |
|
580 | function compare(routerA, routerB) {
|
581 | if (routerA.brokerName > routerB.brokerName) {
|
582 | return 1;
|
583 | } else if (routerA.brokerName < routerB.brokerName) {
|
584 | return -1;
|
585 | }
|
586 | return 0;
|
587 | }
|