UNPKG

2.85 kBJavaScriptView Raw
1'use strict';
2
3const Base = require('sdk-base');
4const address = require('address');
5const MixAll = require('./mix_all');
6
7const defaultOptions = {
8 instanceName: 'DEFAULT',
9 pollNameServerInteval: 30 * 1000,
10 heartbeatBrokerInterval: 30 * 1000,
11 persistConsumerOffsetInterval: 5 * 1000,
12 rebalanceInterval: 10 * 1000,
13 clientIP: address.ip(),
14 unitMode: false,
15 unitName: null,
16 // 阿里云自创建实例,需要 ns 前缀
17 namespace: '',
18 // 公有云生产环境:http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
19 // 公有云公测环境:http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
20 // 杭州金融云环境:http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
21 // 杭州深圳云环境:http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal
22 onsAddr: 'http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet',
23 // https://help.aliyun.com/document_detail/102895.html 阿里云产品更新,支持实例化
24 // nameSrv: 'onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80',
25 onsChannel: 'ALIYUN', // CLOUD, ALIYUN, ALL
26};
27
28class ClientConfig extends Base {
29
30 /**
31 * Producer 与 Consumer的公共配置
32 * @param {Object} options
33 * - {String} instanceName 示例名称
34 * - {Number} pollNameServerInteval name server 同步的间隔
35 * - {Number} heartbeatBrokerInterval 心跳的间隔
36 * - {Number} persistConsumerOffsetInterval 持久化消费进度的间隔
37 * - {Boolean} unitMode 是否为单元化的订阅组
38 */
39 constructor(options) {
40 super(Object.assign({}, defaultOptions, options));
41 this.instanceName = this.options.instanceName;
42 }
43
44 get clientId() {
45 return this.unitName ?
46 `${this.options.clientIP}@${this.instanceName}@${this.unitName}` :
47 `${this.options.clientIP}@${this.instanceName}`;
48 }
49
50 get pollNameServerInteval() {
51 return this.options.pollNameServerInteval;
52 }
53
54 get heartbeatBrokerInterval() {
55 return this.options.heartbeatBrokerInterval;
56 }
57
58 get persistConsumerOffsetInterval() {
59 return this.options.persistConsumerOffsetInterval;
60 }
61
62 get rebalanceInterval() {
63 return this.options.rebalanceInterval;
64 }
65
66 get unitMode() {
67 return this.options.unitMode;
68 }
69
70 get unitName() {
71 return this.options.unitName;
72 }
73
74 get namespace() {
75 return this.options.namespace;
76 }
77
78 formatTopic(topic) {
79 if (this.namespace && (!topic.startsWith(this.namespace) &&
80 !topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))) {
81 topic = `${this.namespace}%${topic}`;
82 }
83 return topic;
84 }
85
86 /**
87 * 将实例名修改为进程号
88 */
89 changeInstanceNameToPID() {
90 if (this.instanceName === 'DEFAULT') {
91 this.instanceName = process.pid + '';
92 }
93 }
94}
95
96module.exports = ClientConfig;