UNPKG

20.5 kBJavaScriptView Raw
1'use strict';
2
3const is = require('is-type-of');
4const gather = require('co-gather');
5const sleep = require('mz-modules/sleep');
6const utility = require('utility');
7const MixAll = require('./mix_all');
8const MQClientAPI = require('./mq_client_api');
9const MessageQueue = require('./message_queue');
10const PermName = require('./protocol/perm_name');
11const TopicPublishInfo = require('./producer/topic_publish_info');
12
13const instanceTable = new Map();
14
15class MQClient extends MQClientAPI {
16
17 /**
18 * metaq client
19 * @param {Object} clientConfig -
20 * @class
21 */
22 constructor(clientConfig) {
23 super(clientConfig.options);
24
25 this._clientConfig = clientConfig;
26 this._brokerAddrTable = new Map();
27 this._consumerTable = new Map();
28 this._producerTable = new Map();
29 this._topicRouteTable = new Map();
30 // this.API.on('command', this.handleServerRequest.bind(this));
31 }
32
33 /**
34 * @property {String} MQClient#clientId
35 */
36 get clientId() {
37 return this._clientConfig.clientId;
38 }
39
40 /**
41 * @property {Number} MQClient#pollNameServerInteval
42 */
43 get pollNameServerInteval() {
44 return this._clientConfig.pollNameServerInteval;
45 }
46
47 /**
48 * @property {Number} MQClient#heartbeatBrokerInterval
49 */
50 get heartbeatBrokerInterval() {
51 return this._clientConfig.heartbeatBrokerInterval;
52 }
53
54 /**
55 * @property {Number} MQClient#persistConsumerOffsetInterval
56 */
57 get persistConsumerOffsetInterval() {
58 return this._clientConfig.persistConsumerOffsetInterval;
59 }
60
61 /**
62 * @property {Number} MQClient#rebalanceInterval
63 */
64 get rebalanceInterval() {
65 return this._clientConfig.rebalanceInterval;
66 }
67
68 /**
69 * start the client
70 */
71 async init() {
72 await super.init();
73 await this.updateAllTopicRouterInfo();
74 await this.sendHeartbeatToAllBroker();
75 await this.doRebalance();
76
77 this.startScheduledTask('updateAllTopicRouterInfo', this.pollNameServerInteval);
78 this.startScheduledTask('sendHeartbeatToAllBroker', this.heartbeatBrokerInterval);
79 this.startScheduledTask('doRebalance', this.rebalanceInterval);
80 this.startScheduledTask('persistAllConsumerOffset', this.persistConsumerOffsetInterval);
81 }
82
83 async close() {
84 if (this._consumerTable.size || this._producerTable.size) {
85 return;
86 }
87
88 await super.close();
89 // todo:
90 }
91
92 /**
93 * start a schedule task
94 * @param {String} name - method name
95 * @param {Number} interval - schedule interval
96 * @param {Number} [delay] - delay time interval
97 * @return {void}
98 */
99 startScheduledTask(name, interval, delay) {
100 (async () => {
101 await sleep(delay || interval);
102 while (this._inited) {
103 try {
104 this.logger.info('[mq:client] execute `%s` at %s', name, utility.YYYYMMDDHHmmss());
105 await this[name]();
106 } catch (err) {
107 this.logger.error(err);
108 }
109 await sleep(interval);
110 }
111 })();
112 }
113
114 /**
115 * regitser consumer
116 * @param {String} group - consumer group name
117 * @param {Consumer} consumer - consumer instance
118 * @return {void}
119 */
120 registerConsumer(group, consumer) {
121 if (this._consumerTable.has(group)) {
122 this.logger.warn('[mq:client] the consumer group [%s] exist already.', group);
123 return;
124 }
125 this._consumerTable.set(group, consumer);
126 this.logger.info('[mq:client] new consumer has regitsered, group: %s', group);
127 }
128
129 /**
130 * unregister consumer
131 * @param {String} group - consumer group name
132 * @return {void}
133 */
134 async unregisterConsumer(group) {
135 this._consumerTable.delete(group);
136 await this.unregister(null, group);
137 this.logger.info('[mq:client] unregister consumer, group: %s', group);
138 }
139
140 /**
141 * register producer
142 * @param {String} group - producer group name
143 * @param {Producer} producer - producer
144 * @return {void}
145 */
146 registerProducer(group, producer) {
147 if (this._producerTable.has(group)) {
148 this.logger.warn('[mq:client] the producer group [%s] exist already.', group);
149 return;
150 }
151 this._producerTable.set(group, producer);
152 this.logger.info('[mq:client] new producer has regitsered, group: %s', group);
153 }
154
155 /**
156 * unregister producer
157 * @param {String} group - producer group name
158 * @return {void}
159 */
160 async unregisterProducer(group) {
161 this._producerTable.delete(group);
162 await this.unregister(group, null);
163 this.logger.info('[mq:client] unregister producer, group: %s', group);
164 }
165
166 /**
167 * notify all broker that producer or consumer is offline
168 * @param {String} producerGroup - producer group name
169 * @param {String} consumerGroup - consumer group name
170 * @return {void}
171 */
172 async unregister(producerGroup, consumerGroup) {
173 const brokerAddrTable = this._brokerAddrTable;
174 for (const brokerName of brokerAddrTable.keys()) {
175 const oneTable = brokerAddrTable.get(brokerName);
176 if (!oneTable) {
177 continue;
178 }
179
180 for (const id in oneTable) {
181 const addr = oneTable[id];
182 if (addr) {
183 await this.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, 3000);
184 }
185 }
186 }
187 }
188
189 /**
190 * update all router info
191 * @return {void}
192 */
193 async updateAllTopicRouterInfo() {
194 const topics = [];
195 // consumer
196 for (const groupName of this._consumerTable.keys()) {
197 const consumer = this._consumerTable.get(groupName);
198 if (!consumer) {
199 continue;
200 }
201 for (const topic of consumer.subscriptions.keys()) {
202 topics.push(topic);
203 }
204 }
205
206 // producer
207 for (const groupName of this._producerTable.keys()) {
208 const producer = this._producerTable.get(groupName);
209 if (!producer) {
210 continue;
211 }
212 for (const topic of producer.publishTopicList) {
213 topics.push(topic);
214 }
215 }
216 this.logger.info('[mq:client] try to update all topic route info. topic: %j', topics);
217 const ret = await gather(topics.map(topic => () => this.updateTopicRouteInfoFromNameServer(topic)));
218 ret.forEach(data => {
219 if (data.isError) {
220 data.error.message = `[mq:client] updateAllTopicRouterInfo occurred error, ${data.error.message}`;
221 this.emit('error', data.error);
222 }
223 });
224 }
225
226 /**
227 * update topic route info
228 * @param {String} topic - topic
229 * @param {Boolean} [isDefault] - is default or not
230 * @param {Producer} [defaultMQProducer] - producer
231 */
232 async updateTopicRouteInfoFromNameServer(topic, isDefault, defaultMQProducer) {
233 this.logger.info('[mq:client] updateTopicRouteInfoFromNameServer() topic: %s, isDefault: %s', topic, !!isDefault);
234 if (isDefault && defaultMQProducer) {
235 const topicRouteData = await this.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.createTopicKey, 3000);
236 if (topicRouteData) {
237 for (const data of topicRouteData.queueDatas) {
238 const queueNums =
239 defaultMQProducer.defaultTopicQueueNums < data.readQueueNums ?
240 defaultMQProducer.defaultTopicQueueNums : data.readQueueNums;
241 data.readQueueNums = queueNums;
242 data.writeQueueNums = queueNums;
243 }
244 this._refreshTopicRouteInfo(topic, topicRouteData);
245 }
246 } else {
247 const topicRouteData = await this.getTopicRouteInfoFromNameServer(topic, 3000);
248 if (topicRouteData) {
249 this._refreshTopicRouteInfo(topic, topicRouteData);
250 }
251 }
252 }
253
254 _refreshTopicRouteInfo(topic, topicRouteData) {
255 if (!topicRouteData) {
256 return;
257 }
258
259 // @example
260 // topicRouteData => {
261 // "brokerDatas": [{
262 // "brokerAddrs": {
263 // "0": "10.218.145.166:10911"
264 // },
265 // "brokerName": "taobaodaily-02"
266 // }],
267 // "filterServerTable": {},
268 // "queueDatas": [{
269 // "brokerName": "taobaodaily-02",
270 // "perm": 6,
271 // "readQueueNums": 8,
272 // "topicSynFlag": 0,
273 // "writeQueueNums": 8
274 // }]
275 // }
276 const prev = this._topicRouteTable.get(topic);
277 const needUpdate = this._isRouteDataChanged(prev, topicRouteData) || this._isNeedUpdateTopicRouteInfo(topic);
278
279 this.logger.info('[mq:client] refresh route data for topic: %s, route data: %j, needUpdate: %s', topic, topicRouteData, needUpdate);
280 if (!needUpdate) {
281 return;
282 }
283
284 // @example:
285 // this.brokerAddrTable => {
286 // "taobaodaily-02": {
287 // "0": "10.218.145.166:10911"
288 // }
289 // }
290 for (const brokerData of topicRouteData.brokerDatas) {
291 this._brokerAddrTable.set(brokerData.brokerName, brokerData.brokerAddrs);
292 }
293
294 // update producer route data
295 const publishInfo = this._topicRouteData2TopicPublishInfo(topic, topicRouteData);
296 publishInfo.haveTopicRouterInfo = true;
297 for (const groupName of this._producerTable.keys()) {
298 const producer = this._producerTable.get(groupName);
299 if (producer) {
300 producer.updateTopicPublishInfo(topic, publishInfo);
301 }
302 }
303
304 // 更新订阅队列信息
305 const subscribeInfo = this._topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
306 for (const groupName of this._consumerTable.keys()) {
307 const consumer = this._consumerTable.get(groupName);
308 if (consumer) {
309 consumer.updateTopicSubscribeInfo(topic, subscribeInfo);
310 }
311 }
312
313 this.logger.info('[mq:client] update topicRouteTable for topic: %s, with topicRouteData: %j', topic, topicRouteData);
314 this._topicRouteTable.set(topic, topicRouteData);
315 }
316
317 _isRouteDataChanged(prev, current) {
318 if (is.nullOrUndefined(prev) || is.nullOrUndefined(current)) {
319 return true;
320 }
321 // todo: performance enhance ?
322 return JSON.stringify(prev) !== JSON.stringify(current);
323 }
324
325 _isNeedUpdateTopicRouteInfo(topic) {
326 // producer
327 for (const groupName of this._producerTable.keys()) {
328 const producer = this._producerTable.get(groupName);
329 if (producer && producer.isPublishTopicNeedUpdate(topic)) {
330 return true;
331 }
332 }
333
334 // consumer
335 for (const groupName of this._consumerTable.keys()) {
336 const consumer = this._consumerTable.get(groupName);
337 if (consumer && consumer.isSubscribeTopicNeedUpdate(topic)) {
338 return true;
339 }
340 }
341 return false;
342 }
343
344 _topicRouteData2TopicPublishInfo(topic, topicRouteData) {
345 const info = new TopicPublishInfo();
346 // 顺序消息
347 if (topicRouteData.orderTopicConf && topicRouteData.orderTopicConf.length) {
348 const brokers = topicRouteData.orderTopicConf.split(';');
349 for (const broker of brokers) {
350 const item = broker.split(':');
351 const nums = parseInt(item[1], 10);
352 for (let i = 0; i < nums; i++) {
353 info.messageQueueList.push(new MessageQueue(topic, item[0], i));
354 }
355 }
356 info.orderTopic = true;
357 } else { // 非顺序消息
358 for (const queueData of topicRouteData.queueDatas) {
359 if (PermName.isWriteable(queueData.perm)) {
360 const brokerData = topicRouteData.brokerDatas.find(data => {
361 return data.brokerName === queueData.brokerName;
362 });
363 if (!brokerData || !brokerData.brokerAddrs[MixAll.MASTER_ID]) {
364 continue;
365 }
366 for (let i = 0, nums = queueData.writeQueueNums; i < nums; i++) {
367 info.messageQueueList.push(new MessageQueue(topic, queueData.brokerName, i));
368 }
369 }
370 }
371 info.orderTopic = false;
372 }
373 return info;
374 }
375
376 _topicRouteData2TopicSubscribeInfo(topic, topicRouteData) {
377 const messageQueueList = [];
378 for (const queueData of topicRouteData.queueDatas) {
379 if (PermName.isReadable(queueData.perm)) {
380 for (let i = 0, nums = queueData.readQueueNums; i < nums; i++) {
381 messageQueueList.push(new MessageQueue(topic, queueData.brokerName, i));
382 }
383 }
384 }
385 return messageQueueList;
386 }
387
388 /**
389 * send heartbeat to all brokers
390 * @return {void}
391 */
392 async sendHeartbeatToAllBroker() {
393 this._cleanOfflineBroker();
394
395 const heartbeatData = this._prepareHeartbeatData();
396 const consumerEmpty = heartbeatData.consumerDataSet.length === 0;
397 const producerEmpty = heartbeatData.producerDataSet.length === 0;
398 if (consumerEmpty && producerEmpty) {
399 this.logger.info('[mq:client] sending hearbeat, but no consumer and no producer');
400 return;
401 }
402
403 const brokers = [];
404 for (const brokerName of this._brokerAddrTable.keys()) {
405 const oneTable = this._brokerAddrTable.get(brokerName);
406 if (!oneTable) {
407 continue;
408 }
409
410 for (const id in oneTable) {
411 const addr = oneTable[id];
412 if (!addr) {
413 continue;
414 }
415
416 // 说明只有Producer,则不向Slave发心跳
417 if (consumerEmpty && Number(id) !== MixAll.MASTER_ID) {
418 continue;
419 }
420
421 brokers.push(addr);
422 }
423 }
424 const ret = await gather(brokers.map(addr => () => this.sendHearbeat(addr, heartbeatData, 3000)));
425 this.logger.info('[mq:client] send heartbeat: %j to : %j, and result: %j', heartbeatData, brokers, ret);
426 }
427
428 _prepareHeartbeatData() {
429 const heartbeatData = {
430 clientID: this.clientId,
431 consumerDataSet: [],
432 producerDataSet: [],
433 };
434
435 // Consumer
436 for (const groupName of this._consumerTable.keys()) {
437 const consumer = this._consumerTable.get(groupName);
438 if (consumer) {
439 const subscriptionDataSet = [];
440 for (const topic of consumer.subscriptions.keys()) {
441 const data = consumer.subscriptions.get(topic);
442 if (data && data.subscriptionData) {
443 subscriptionDataSet.push(data.subscriptionData);
444 }
445 }
446
447 heartbeatData.consumerDataSet.push({
448 groupName: consumer.consumerGroup,
449 consumeType: consumer.consumeType,
450 messageModel: consumer.messageModel,
451 consumeFromWhere: consumer.consumeFromWhere,
452 subscriptionDataSet,
453 unitMode: consumer.unitMode,
454 });
455 }
456 }
457
458 // Producer
459 for (const groupName of this._producerTable.keys()) {
460 const producer = this._producerTable.get(groupName);
461 if (producer) {
462 heartbeatData.producerDataSet.push({
463 groupName,
464 });
465 }
466 }
467 return heartbeatData;
468 }
469
470 _cleanOfflineBroker() {
471 for (const brokerName of this._brokerAddrTable.keys()) {
472 const oneTable = this._brokerAddrTable.get(brokerName);
473 let exists = false;
474
475 for (const brokerId in oneTable) {
476 const addr = oneTable[brokerId];
477 if (this._isBrokerAddrExistInTopicRouteTable(addr)) {
478 exists = true;
479 } else {
480 delete oneTable[brokerId];
481 this.logger.info('[mq:client] the broker addr[%s %s] is offline, remove it', brokerName, addr);
482 }
483 }
484
485 if (!exists) {
486 this._brokerAddrTable.delete(brokerName);
487 this.logger.info('[mq:client] the broker[%s] name\'s host is offline, remove it', brokerName);
488 }
489 }
490 }
491
492 _isBrokerAddrExistInTopicRouteTable(addr) {
493 for (const topic of this._topicRouteTable.keys()) {
494 const topicRouteData = this._topicRouteTable.get(topic);
495 for (const brokerData of topicRouteData.brokerDatas) {
496 for (const brokerId in brokerData.brokerAddrs) {
497 if (brokerData.brokerAddrs[brokerId] === addr) {
498 return true;
499 }
500 }
501 }
502 }
503 return false;
504 }
505
506 /**
507 * rebalance
508 * @return {void}
509 */
510 async doRebalance() {
511 for (const groupName of this._consumerTable.keys()) {
512 const consumer = this._consumerTable.get(groupName);
513 if (consumer) {
514 await consumer.doRebalance();
515 }
516 }
517 }
518
519 /**
520 * get all consumer list of topic
521 * @param {String} topic - topic
522 * @param {String} group - consumer group
523 * @return {Array} consumer list
524 */
525 async findConsumerIdList(topic, group) {
526 let brokerAddr = this.findBrokerAddrByTopic(topic);
527 if (!brokerAddr) {
528 await this.updateTopicRouteInfoFromNameServer(topic);
529 brokerAddr = this.findBrokerAddrByTopic(topic);
530
531 if (!brokerAddr) {
532 throw new Error(`The broker of topic[${topic}] not exist`);
533 } else {
534 return await this.getConsumerIdListByGroup(brokerAddr, group, 3000);
535 }
536 } else {
537 return await this.getConsumerIdListByGroup(brokerAddr, group, 3000);
538 }
539 }
540
541 // get broker address by topic
542 findBrokerAddrByTopic(topic) {
543 const topicRouteData = this._topicRouteTable.get(topic);
544 if (topicRouteData) {
545 const brokerDatas = topicRouteData.brokerDatas || [];
546 const broker = brokerDatas[0];
547 if (broker) {
548 // master first, not found try slave
549 const addr = broker.brokerAddrs[MixAll.MASTER_ID];
550 if (!addr) {
551 for (const id in broker.brokerAddrs) {
552 return broker.brokerAddrs[id];
553 }
554 }
555 return addr;
556 }
557 }
558 return null;
559 }
560
561 /**
562 * find broker address
563 * @param {String} brokerName - broker name
564 * @return {Object} broker info
565 */
566 findBrokerAddressInAdmin(brokerName) {
567 const map = this._brokerAddrTable.get(brokerName);
568 if (!map) {
569 return null;
570 }
571 if (map[MixAll.MASTER_ID]) {
572 return {
573 brokerAddr: map[MixAll.MASTER_ID],
574 slave: false,
575 };
576 }
577 for (const id in map) {
578 if (map[id]) {
579 return {
580 brokerAddr: map[id],
581 slave: true,
582 };
583 }
584 }
585 return null;
586 }
587
588 findBrokerAddressInSubscribe(brokerName, brokerId, onlyThisBroker) {
589 let brokerAddr = null;
590 let slave = false;
591 let found = false;
592 const map = this._brokerAddrTable.get(brokerName);
593
594 if (map) {
595 brokerAddr = map[brokerId];
596 slave = Number(brokerId) !== MixAll.MASTER_ID;
597 found = !is.nullOrUndefined(brokerAddr);
598
599 // 尝试寻找其他Broker
600 if (!found && !onlyThisBroker) {
601 for (const id in map) {
602 brokerAddr = map[id];
603 slave = Number(id) !== MixAll.MASTER_ID;
604 found = true;
605 break;
606 }
607 }
608 }
609 if (found) {
610 return {
611 brokerAddr,
612 slave,
613 };
614 }
615 return null;
616 }
617
618
619 /**
620 * get current offset of message queue
621 * @param {MessageQueue} messageQueue - message queue
622 * @return {Number} offset
623 */
624 async maxOffset(messageQueue) {
625 let brokerAddr = this.findBrokerAddressInPublish(messageQueue.brokerName);
626 if (!brokerAddr) {
627 await this.updateTopicRouteInfoFromNameServer(messageQueue.topic);
628 brokerAddr = this.findBrokerAddressInPublish(messageQueue.brokerName);
629
630 if (!brokerAddr) {
631 throw new Error(`The broker[${messageQueue.brokerName}] not exist`);
632 } else {
633 return await this.getMaxOffset(brokerAddr, messageQueue.topic, messageQueue.queueId, 3000);
634 }
635 } else {
636 return await this.getMaxOffset(brokerAddr, messageQueue.topic, messageQueue.queueId, 3000);
637 }
638 }
639
640 // should be master
641 findBrokerAddressInPublish(brokerName) {
642 const map = this._brokerAddrTable.get(brokerName);
643 return map && map[MixAll.MASTER_ID];
644 }
645
646 async persistAllConsumerOffset() {
647 for (const groupName of this._consumerTable.keys()) {
648 const consumer = this._consumerTable.get(groupName);
649 if (consumer) {
650 await consumer.persistConsumerOffset();
651 }
652 }
653 }
654
655 async searchOffset(messageQueue, timestamp) {
656 let brokerAddr = this.findBrokerAddressInPublish(messageQueue.brokerName);
657 if (!brokerAddr) {
658 await this.updateTopicRouteInfoFromNameServer(messageQueue.topic);
659 brokerAddr = this.findBrokerAddressInPublish(messageQueue.brokerName);
660
661 if (!brokerAddr) {
662 throw new Error('The broker[' + messageQueue.brokerName + '] not exist');
663 } else {
664 return await super.searchOffset(brokerAddr, messageQueue.topic, messageQueue.queueId, timestamp, 3000);
665 }
666 } else {
667 return await super.searchOffset(brokerAddr, messageQueue.topic, messageQueue.queueId, timestamp, 3000);
668 }
669 }
670
671 static getAndCreateMQClient(clientConfig) {
672 const clientId = clientConfig.clientId;
673 let instance = instanceTable.get(clientId);
674 if (!instance) {
675 instance = new MQClient(clientConfig);
676 instanceTable.set(clientId, instance);
677 instance.once('close', () => {
678 instanceTable.delete(clientId);
679 });
680 }
681 return instance;
682 }
683}
684
685module.exports = MQClient;