1 | 'use strict';
|
2 |
|
3 | const is = require('is-type-of');
|
4 | const gather = require('p-gather');
|
5 | const sleep = require('mz-modules/sleep');
|
6 | const utility = require('utility');
|
7 | const MixAll = require('./mix_all');
|
8 | const MQClientAPI = require('./mq_client_api');
|
9 | const MessageQueue = require('./message_queue');
|
10 | const PermName = require('./protocol/perm_name');
|
11 | const TopicPublishInfo = require('./producer/topic_publish_info');
|
12 |
|
13 | const instanceTable = new Map();
|
14 |
|
15 | class MQClient extends MQClientAPI {
|
16 |
|
17 | |
18 |
|
19 |
|
20 |
|
21 |
|
22 | constructor(clientConfig) {
|
23 | super(clientConfig.options);
|
24 |
|
25 | this._clientConfig = clientConfig;
|
26 | this._brokerAddrTable = new Map();
|
27 | this._consumerTable = new Map();
|
28 | this._producerTable = new Map();
|
29 | this._topicRouteTable = new Map();
|
30 |
|
31 | }
|
32 |
|
33 | |
34 |
|
35 |
|
36 | get clientId() {
|
37 | return this._clientConfig.clientId;
|
38 | }
|
39 |
|
40 | |
41 |
|
42 |
|
43 | get pollNameServerInteval() {
|
44 | return this._clientConfig.pollNameServerInteval;
|
45 | }
|
46 |
|
47 | |
48 |
|
49 |
|
50 | get heartbeatBrokerInterval() {
|
51 | return this._clientConfig.heartbeatBrokerInterval;
|
52 | }
|
53 |
|
54 | |
55 |
|
56 |
|
57 | get persistConsumerOffsetInterval() {
|
58 | return this._clientConfig.persistConsumerOffsetInterval;
|
59 | }
|
60 |
|
61 | |
62 |
|
63 |
|
64 | get rebalanceInterval() {
|
65 | return this._clientConfig.rebalanceInterval;
|
66 | }
|
67 |
|
68 | |
69 |
|
70 |
|
71 | async init() {
|
72 | await super.init();
|
73 | await this.updateAllTopicRouterInfo();
|
74 | await this.sendHeartbeatToAllBroker();
|
75 | await this.doRebalance();
|
76 |
|
77 | this.startScheduledTask('updateAllTopicRouterInfo', this.pollNameServerInteval);
|
78 | this.startScheduledTask('sendHeartbeatToAllBroker', this.heartbeatBrokerInterval);
|
79 | this.startScheduledTask('doRebalance', this.rebalanceInterval);
|
80 | this.startScheduledTask('persistAllConsumerOffset', this.persistConsumerOffsetInterval);
|
81 | }
|
82 |
|
83 | async close() {
|
84 | if (this._consumerTable.size || this._producerTable.size) {
|
85 | return;
|
86 | }
|
87 |
|
88 | await super.close();
|
89 |
|
90 | }
|
91 |
|
92 | |
93 |
|
94 |
|
95 |
|
96 |
|
97 |
|
98 |
|
99 | startScheduledTask(name, interval, delay) {
|
100 | (async () => {
|
101 | await sleep(delay || interval);
|
102 | while (this._inited) {
|
103 | try {
|
104 | this.logger.info('[mq:client] execute `%s` at %s', name, utility.YYYYMMDDHHmmss());
|
105 | await this[name]();
|
106 | } catch (err) {
|
107 | this.logger.error(err);
|
108 | }
|
109 | await sleep(interval);
|
110 | }
|
111 | })();
|
112 | }
|
113 |
|
114 | |
115 |
|
116 |
|
117 |
|
118 |
|
119 |
|
120 | registerConsumer(group, consumer) {
|
121 | if (this._consumerTable.has(group)) {
|
122 | this.logger.warn('[mq:client] the consumer group [%s] exist already.', group);
|
123 | return;
|
124 | }
|
125 | this._consumerTable.set(group, consumer);
|
126 | this.logger.info('[mq:client] new consumer has regitsered, group: %s', group);
|
127 | }
|
128 |
|
129 | |
130 |
|
131 |
|
132 |
|
133 |
|
134 | async unregisterConsumer(group) {
|
135 | this._consumerTable.delete(group);
|
136 | await this.unregister(null, group);
|
137 | this.logger.info('[mq:client] unregister consumer, group: %s', group);
|
138 | }
|
139 |
|
140 | |
141 |
|
142 |
|
143 |
|
144 |
|
145 |
|
146 | registerProducer(group, producer) {
|
147 | if (this._producerTable.has(group)) {
|
148 | this.logger.warn('[mq:client] the producer group [%s] exist already.', group);
|
149 | return;
|
150 | }
|
151 | this._producerTable.set(group, producer);
|
152 | this.logger.info('[mq:client] new producer has regitsered, group: %s', group);
|
153 | }
|
154 |
|
155 | |
156 |
|
157 |
|
158 |
|
159 |
|
160 | async unregisterProducer(group) {
|
161 | this._producerTable.delete(group);
|
162 | await this.unregister(group, null);
|
163 | this.logger.info('[mq:client] unregister producer, group: %s', group);
|
164 | }
|
165 |
|
166 | |
167 |
|
168 |
|
169 |
|
170 |
|
171 |
|
172 | async unregister(producerGroup, consumerGroup) {
|
173 | const brokerAddrTable = this._brokerAddrTable;
|
174 | for (const brokerName of brokerAddrTable.keys()) {
|
175 | const oneTable = brokerAddrTable.get(brokerName);
|
176 | if (!oneTable) {
|
177 | continue;
|
178 | }
|
179 |
|
180 | for (const id in oneTable) {
|
181 | const addr = oneTable[id];
|
182 | if (addr) {
|
183 | await this.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, 3000);
|
184 | }
|
185 | }
|
186 | }
|
187 | }
|
188 |
|
189 | |
190 |
|
191 |
|
192 |
|
193 | async updateAllTopicRouterInfo() {
|
194 | const topics = [];
|
195 |
|
196 | for (const groupName of this._consumerTable.keys()) {
|
197 | const consumer = this._consumerTable.get(groupName);
|
198 | if (!consumer) {
|
199 | continue;
|
200 | }
|
201 | for (const topic of consumer.subscriptions.keys()) {
|
202 | topics.push(topic);
|
203 | }
|
204 | }
|
205 |
|
206 |
|
207 | for (const groupName of this._producerTable.keys()) {
|
208 | const producer = this._producerTable.get(groupName);
|
209 | if (!producer) {
|
210 | continue;
|
211 | }
|
212 | for (const topic of producer.publishTopicList) {
|
213 | topics.push(topic);
|
214 | }
|
215 | }
|
216 | this.logger.info('[mq:client] try to update all topic route info. topic: %j', topics);
|
217 | const ret = await gather(topics.map(topic => this.updateTopicRouteInfoFromNameServer(topic)));
|
218 | ret.forEach(data => {
|
219 | if (data.isError) {
|
220 | data.error.message = `[mq:client] updateAllTopicRouterInfo occurred error, ${data.error.message}`;
|
221 | this.emit('error', data.error);
|
222 | }
|
223 | });
|
224 | }
|
225 |
|
226 | |
227 |
|
228 |
|
229 |
|
230 |
|
231 |
|
232 | async updateTopicRouteInfoFromNameServer(topic, isDefault, defaultMQProducer) {
|
233 | this.logger.info('[mq:client] updateTopicRouteInfoFromNameServer() topic: %s, isDefault: %s', topic, !!isDefault);
|
234 | if (isDefault && defaultMQProducer) {
|
235 | const topicRouteData = await this.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.createTopicKey, 3000);
|
236 | if (topicRouteData) {
|
237 | for (const data of topicRouteData.queueDatas) {
|
238 | const queueNums =
|
239 | defaultMQProducer.defaultTopicQueueNums < data.readQueueNums ?
|
240 | defaultMQProducer.defaultTopicQueueNums : data.readQueueNums;
|
241 | data.readQueueNums = queueNums;
|
242 | data.writeQueueNums = queueNums;
|
243 | }
|
244 | this._refreshTopicRouteInfo(topic, topicRouteData);
|
245 | }
|
246 | } else {
|
247 | const topicRouteData = await this.getTopicRouteInfoFromNameServer(topic, 3000);
|
248 | if (topicRouteData) {
|
249 | this._refreshTopicRouteInfo(topic, topicRouteData);
|
250 | }
|
251 | }
|
252 | }
|
253 |
|
254 | _refreshTopicRouteInfo(topic, topicRouteData) {
|
255 | if (!topicRouteData) {
|
256 | return;
|
257 | }
|
258 |
|
259 |
|
260 |
|
261 |
|
262 |
|
263 |
|
264 |
|
265 |
|
266 |
|
267 |
|
268 |
|
269 |
|
270 |
|
271 |
|
272 |
|
273 |
|
274 |
|
275 |
|
276 | const prev = this._topicRouteTable.get(topic);
|
277 | const needUpdate = this._isRouteDataChanged(prev, topicRouteData) || this._isNeedUpdateTopicRouteInfo(topic);
|
278 |
|
279 | this.logger.info('[mq:client] refresh route data for topic: %s, route data: %j, needUpdate: %s', topic, topicRouteData, needUpdate);
|
280 | if (!needUpdate) {
|
281 | return;
|
282 | }
|
283 |
|
284 |
|
285 |
|
286 |
|
287 |
|
288 |
|
289 |
|
290 | for (const brokerData of topicRouteData.brokerDatas) {
|
291 | this._brokerAddrTable.set(brokerData.brokerName, brokerData.brokerAddrs);
|
292 | }
|
293 |
|
294 |
|
295 | const publishInfo = this._topicRouteData2TopicPublishInfo(topic, topicRouteData);
|
296 | publishInfo.haveTopicRouterInfo = true;
|
297 | for (const groupName of this._producerTable.keys()) {
|
298 | const producer = this._producerTable.get(groupName);
|
299 | if (producer) {
|
300 | producer.updateTopicPublishInfo(topic, publishInfo);
|
301 | }
|
302 | }
|
303 |
|
304 |
|
305 | const subscribeInfo = this._topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
|
306 | for (const groupName of this._consumerTable.keys()) {
|
307 | const consumer = this._consumerTable.get(groupName);
|
308 | if (consumer) {
|
309 | consumer.updateTopicSubscribeInfo(topic, subscribeInfo);
|
310 | }
|
311 | }
|
312 |
|
313 | this.logger.info('[mq:client] update topicRouteTable for topic: %s, with topicRouteData: %j', topic, topicRouteData);
|
314 | this._topicRouteTable.set(topic, topicRouteData);
|
315 | }
|
316 |
|
317 | _isRouteDataChanged(prev, current) {
|
318 | if (is.nullOrUndefined(prev) || is.nullOrUndefined(current)) {
|
319 | return true;
|
320 | }
|
321 |
|
322 | return JSON.stringify(prev) !== JSON.stringify(current);
|
323 | }
|
324 |
|
325 | _isNeedUpdateTopicRouteInfo(topic) {
|
326 |
|
327 | for (const groupName of this._producerTable.keys()) {
|
328 | const producer = this._producerTable.get(groupName);
|
329 | if (producer && producer.isPublishTopicNeedUpdate(topic)) {
|
330 | return true;
|
331 | }
|
332 | }
|
333 |
|
334 |
|
335 | for (const groupName of this._consumerTable.keys()) {
|
336 | const consumer = this._consumerTable.get(groupName);
|
337 | if (consumer && consumer.isSubscribeTopicNeedUpdate(topic)) {
|
338 | return true;
|
339 | }
|
340 | }
|
341 | return false;
|
342 | }
|
343 |
|
344 | _topicRouteData2TopicPublishInfo(topic, topicRouteData) {
|
345 | const info = new TopicPublishInfo();
|
346 |
|
347 | if (topicRouteData.orderTopicConf && topicRouteData.orderTopicConf.length) {
|
348 | const brokers = topicRouteData.orderTopicConf.split(';');
|
349 | for (const broker of brokers) {
|
350 | const item = broker.split(':');
|
351 | const nums = parseInt(item[1], 10);
|
352 | for (let i = 0; i < nums; i++) {
|
353 | info.messageQueueList.push(new MessageQueue(topic, item[0], i));
|
354 | }
|
355 | }
|
356 | info.orderTopic = true;
|
357 | } else {
|
358 | for (const queueData of topicRouteData.queueDatas) {
|
359 | if (PermName.isWriteable(queueData.perm)) {
|
360 | const brokerData = topicRouteData.brokerDatas.find(data => {
|
361 | return data.brokerName === queueData.brokerName;
|
362 | });
|
363 | if (!brokerData || !brokerData.brokerAddrs[MixAll.MASTER_ID]) {
|
364 | continue;
|
365 | }
|
366 | for (let i = 0, nums = queueData.writeQueueNums; i < nums; i++) {
|
367 | info.messageQueueList.push(new MessageQueue(topic, queueData.brokerName, i));
|
368 | }
|
369 | }
|
370 | }
|
371 | info.orderTopic = false;
|
372 | }
|
373 | return info;
|
374 | }
|
375 |
|
376 | _topicRouteData2TopicSubscribeInfo(topic, topicRouteData) {
|
377 | const messageQueueList = [];
|
378 | for (const queueData of topicRouteData.queueDatas) {
|
379 | if (PermName.isReadable(queueData.perm)) {
|
380 | for (let i = 0, nums = queueData.readQueueNums; i < nums; i++) {
|
381 | messageQueueList.push(new MessageQueue(topic, queueData.brokerName, i));
|
382 | }
|
383 | }
|
384 | }
|
385 | return messageQueueList;
|
386 | }
|
387 |
|
388 | |
389 |
|
390 |
|
391 |
|
392 | async sendHeartbeatToAllBroker() {
|
393 | this._cleanOfflineBroker();
|
394 |
|
395 | const heartbeatData = this._prepareHeartbeatData();
|
396 | const consumerEmpty = heartbeatData.consumerDataSet.length === 0;
|
397 | const producerEmpty = heartbeatData.producerDataSet.length === 0;
|
398 | if (consumerEmpty && producerEmpty) {
|
399 | this.logger.info('[mq:client] sending hearbeat, but no consumer and no producer');
|
400 | return;
|
401 | }
|
402 |
|
403 | const brokers = [];
|
404 | for (const brokerName of this._brokerAddrTable.keys()) {
|
405 | const oneTable = this._brokerAddrTable.get(brokerName);
|
406 | if (!oneTable) {
|
407 | continue;
|
408 | }
|
409 |
|
410 | for (const id in oneTable) {
|
411 | const addr = oneTable[id];
|
412 | if (!addr) {
|
413 | continue;
|
414 | }
|
415 |
|
416 |
|
417 | if (consumerEmpty && Number(id) !== MixAll.MASTER_ID) {
|
418 | continue;
|
419 | }
|
420 |
|
421 | brokers.push(addr);
|
422 | }
|
423 | }
|
424 | const ret = await gather(brokers.map(addr => this.sendHearbeat(addr, heartbeatData, 3000)));
|
425 | this.logger.info('[mq:client] send heartbeat: %j to : %j, and result: %j', heartbeatData, brokers, ret);
|
426 | }
|
427 |
|
428 | _prepareHeartbeatData() {
|
429 | const heartbeatData = {
|
430 | clientID: this.clientId,
|
431 | consumerDataSet: [],
|
432 | producerDataSet: [],
|
433 | };
|
434 |
|
435 |
|
436 | for (const groupName of this._consumerTable.keys()) {
|
437 | const consumer = this._consumerTable.get(groupName);
|
438 | if (consumer) {
|
439 | const subscriptionDataSet = [];
|
440 | for (const topic of consumer.subscriptions.keys()) {
|
441 | const data = consumer.subscriptions.get(topic);
|
442 | if (data && data.subscriptionData) {
|
443 | subscriptionDataSet.push(data.subscriptionData);
|
444 | }
|
445 | }
|
446 |
|
447 | heartbeatData.consumerDataSet.push({
|
448 | groupName: consumer.consumerGroup,
|
449 | consumeType: consumer.consumeType,
|
450 | messageModel: consumer.messageModel,
|
451 | consumeFromWhere: consumer.consumeFromWhere,
|
452 | subscriptionDataSet,
|
453 | unitMode: consumer.unitMode,
|
454 | });
|
455 | }
|
456 | }
|
457 |
|
458 |
|
459 | for (const groupName of this._producerTable.keys()) {
|
460 | const producer = this._producerTable.get(groupName);
|
461 | if (producer) {
|
462 | heartbeatData.producerDataSet.push({
|
463 | groupName,
|
464 | });
|
465 | }
|
466 | }
|
467 | return heartbeatData;
|
468 | }
|
469 |
|
470 | _cleanOfflineBroker() {
|
471 | for (const brokerName of this._brokerAddrTable.keys()) {
|
472 | const oneTable = this._brokerAddrTable.get(brokerName);
|
473 | let exists = false;
|
474 |
|
475 | for (const brokerId in oneTable) {
|
476 | const addr = oneTable[brokerId];
|
477 | if (this._isBrokerAddrExistInTopicRouteTable(addr)) {
|
478 | exists = true;
|
479 | } else {
|
480 | delete oneTable[brokerId];
|
481 | this.logger.info('[mq:client] the broker addr[%s %s] is offline, remove it', brokerName, addr);
|
482 | }
|
483 | }
|
484 |
|
485 | if (!exists) {
|
486 | this._brokerAddrTable.delete(brokerName);
|
487 | this.logger.info('[mq:client] the broker[%s] name\'s host is offline, remove it', brokerName);
|
488 | }
|
489 | }
|
490 | }
|
491 |
|
492 | _isBrokerAddrExistInTopicRouteTable(addr) {
|
493 | for (const topic of this._topicRouteTable.keys()) {
|
494 | const topicRouteData = this._topicRouteTable.get(topic);
|
495 | for (const brokerData of topicRouteData.brokerDatas) {
|
496 | for (const brokerId in brokerData.brokerAddrs) {
|
497 | if (brokerData.brokerAddrs[brokerId] === addr) {
|
498 | return true;
|
499 | }
|
500 | }
|
501 | }
|
502 | }
|
503 | return false;
|
504 | }
|
505 |
|
506 | |
507 |
|
508 |
|
509 |
|
510 | async doRebalance() {
|
511 | for (const groupName of this._consumerTable.keys()) {
|
512 | const consumer = this._consumerTable.get(groupName);
|
513 | if (consumer) {
|
514 | await consumer.doRebalance();
|
515 | }
|
516 | }
|
517 | }
|
518 |
|
519 | |
520 |
|
521 |
|
522 |
|
523 |
|
524 |
|
525 | async findConsumerIdList(topic, group) {
|
526 | let brokerAddr = this.findBrokerAddrByTopic(topic);
|
527 | if (!brokerAddr) {
|
528 | await this.updateTopicRouteInfoFromNameServer(topic);
|
529 | brokerAddr = this.findBrokerAddrByTopic(topic);
|
530 |
|
531 | if (!brokerAddr) {
|
532 | throw new Error(`The broker of topic[${topic}] not exist`);
|
533 | } else {
|
534 | return await this.getConsumerIdListByGroup(brokerAddr, group, 3000);
|
535 | }
|
536 | } else {
|
537 | return await this.getConsumerIdListByGroup(brokerAddr, group, 3000);
|
538 | }
|
539 | }
|
540 |
|
541 |
|
542 | findBrokerAddrByTopic(topic) {
|
543 | const topicRouteData = this._topicRouteTable.get(topic);
|
544 | if (topicRouteData) {
|
545 | const brokerDatas = topicRouteData.brokerDatas || [];
|
546 | const broker = brokerDatas[0];
|
547 | if (broker) {
|
548 |
|
549 | const addr = broker.brokerAddrs[MixAll.MASTER_ID];
|
550 | if (!addr) {
|
551 | for (const id in broker.brokerAddrs) {
|
552 | return broker.brokerAddrs[id];
|
553 | }
|
554 | }
|
555 | return addr;
|
556 | }
|
557 | }
|
558 | return null;
|
559 | }
|
560 |
|
561 | |
562 |
|
563 |
|
564 |
|
565 |
|
566 | findBrokerAddressInAdmin(brokerName) {
|
567 | const map = this._brokerAddrTable.get(brokerName);
|
568 | if (!map) {
|
569 | return null;
|
570 | }
|
571 | if (map[MixAll.MASTER_ID]) {
|
572 | return {
|
573 | brokerAddr: map[MixAll.MASTER_ID],
|
574 | slave: false,
|
575 | };
|
576 | }
|
577 | for (const id in map) {
|
578 | if (map[id]) {
|
579 | return {
|
580 | brokerAddr: map[id],
|
581 | slave: true,
|
582 | };
|
583 | }
|
584 | }
|
585 | return null;
|
586 | }
|
587 |
|
588 | findBrokerAddressInSubscribe(brokerName, brokerId, onlyThisBroker) {
|
589 | let brokerAddr = null;
|
590 | let slave = false;
|
591 | let found = false;
|
592 | const map = this._brokerAddrTable.get(brokerName);
|
593 |
|
594 | if (map) {
|
595 | brokerAddr = map[brokerId];
|
596 | slave = Number(brokerId) !== MixAll.MASTER_ID;
|
597 | found = !is.nullOrUndefined(brokerAddr);
|
598 |
|
599 |
|
600 | if (!found && !onlyThisBroker) {
|
601 | for (const id in map) {
|
602 | brokerAddr = map[id];
|
603 | slave = Number(id) !== MixAll.MASTER_ID;
|
604 | found = true;
|
605 | break;
|
606 | }
|
607 | }
|
608 | }
|
609 | if (found) {
|
610 | return {
|
611 | brokerAddr,
|
612 | slave,
|
613 | };
|
614 | }
|
615 | return null;
|
616 | }
|
617 |
|
618 |
|
619 | |
620 |
|
621 |
|
622 |
|
623 |
|
624 | async maxOffset(messageQueue) {
|
625 | let brokerAddr = this.findBrokerAddressInPublish(messageQueue.brokerName);
|
626 | if (!brokerAddr) {
|
627 | await this.updateTopicRouteInfoFromNameServer(messageQueue.topic);
|
628 | brokerAddr = this.findBrokerAddressInPublish(messageQueue.brokerName);
|
629 |
|
630 | if (!brokerAddr) {
|
631 | throw new Error(`The broker[${messageQueue.brokerName}] not exist`);
|
632 | } else {
|
633 | return await this.getMaxOffset(brokerAddr, messageQueue.topic, messageQueue.queueId, 3000);
|
634 | }
|
635 | } else {
|
636 | return await this.getMaxOffset(brokerAddr, messageQueue.topic, messageQueue.queueId, 3000);
|
637 | }
|
638 | }
|
639 |
|
640 |
|
641 | findBrokerAddressInPublish(brokerName) {
|
642 | const map = this._brokerAddrTable.get(brokerName);
|
643 | return map && map[MixAll.MASTER_ID];
|
644 | }
|
645 |
|
646 | async persistAllConsumerOffset() {
|
647 | for (const groupName of this._consumerTable.keys()) {
|
648 | const consumer = this._consumerTable.get(groupName);
|
649 | if (consumer) {
|
650 | await consumer.persistConsumerOffset();
|
651 | }
|
652 | }
|
653 | }
|
654 |
|
655 | async searchOffset(messageQueue, timestamp) {
|
656 | let brokerAddr = this.findBrokerAddressInPublish(messageQueue.brokerName);
|
657 | if (!brokerAddr) {
|
658 | await this.updateTopicRouteInfoFromNameServer(messageQueue.topic);
|
659 | brokerAddr = this.findBrokerAddressInPublish(messageQueue.brokerName);
|
660 |
|
661 | if (!brokerAddr) {
|
662 | throw new Error('The broker[' + messageQueue.brokerName + '] not exist');
|
663 | } else {
|
664 | return await super.searchOffset(brokerAddr, messageQueue.topic, messageQueue.queueId, timestamp, 3000);
|
665 | }
|
666 | } else {
|
667 | return await super.searchOffset(brokerAddr, messageQueue.topic, messageQueue.queueId, timestamp, 3000);
|
668 | }
|
669 | }
|
670 |
|
671 | static getAndCreateMQClient(clientConfig) {
|
672 | const clientId = clientConfig.clientId;
|
673 | let instance = instanceTable.get(clientId);
|
674 | if (!instance) {
|
675 | instance = new MQClient(clientConfig);
|
676 | instanceTable.set(clientId, instance);
|
677 | instance.once('close', () => {
|
678 | instanceTable.delete(clientId);
|
679 | });
|
680 | }
|
681 | return instance;
|
682 | }
|
683 | }
|
684 |
|
685 | module.exports = MQClient;
|