UNPKG

3.48 kBJavaScriptView Raw
1'use strict';
2
3const is = require('is-type-of');
4const crypto = require('crypto');
5const Base = require('tcp-base');
6const promisify = require('util').promisify;
7const RemotingCommand = require('./protocol/command/remoting_command');
8
9class Channel extends Base {
10 /**
11 * rocketmq tcp channel object
12 * @param {String} address - server address
13 * @param {Object} options
14 * - {String} accessKey
15 * - {String} secretKey
16 * - {String} onsChannel
17 * @class
18 */
19 constructor(address, options = {}) {
20 // 10.18.214.201:8080
21 const arr = address.split(':');
22 // support alias: accessKeyId and accessKeySecret
23 options.accessKey = options.accessKey || options.accessKeyId;
24 options.secretKey = options.secretKey || options.accessKeySecret;
25 super(Object.assign({
26 host: arr[0],
27 port: arr[1],
28 headerLength: 4,
29 needHeartbeat: false,
30 }, options));
31 this.sendPromise = promisify(this.send);
32 }
33
34 get accessKey() {
35 return this.options.accessKey;
36 }
37
38 get secretKey() {
39 return this.options.secretKey;
40 }
41
42 get onsChannel() {
43 return 'ALIYUN';
44 }
45
46 /**
47 * Get packet length from header
48 * @param {Buffer} header - packet header
49 * @return {Number} bodyLength
50 */
51 getBodyLength(header) {
52 return header.readInt32BE(0);
53 }
54
55 decode(body, header) {
56 const command = RemotingCommand.decode(Buffer.concat([ header, body ]));
57 return {
58 id: command.opaque,
59 isResponse: command.isResponseType,
60 data: command,
61 };
62 }
63
64 beforeRequest(command) {
65 if (!this.accessKey || !this.secretKey) {
66 return;
67 }
68
69 const header = command.customHeader;
70 const map = new Map();
71 map.set('AccessKey', this.accessKey);
72 map.set('OnsChannel', this.onsChannel);
73 if (header) {
74 for (const field in header) {
75 if (!is.nullOrUndefined(header[field])) {
76 map.set(field, header[field].toString());
77 }
78 }
79 }
80
81 let val = '';
82 const fields = Array.from(map.keys()).sort();
83 for (const key of fields) {
84 if (key !== 'Signature') {
85 val += map.get(key);
86 }
87 }
88 let total = Buffer.from(val, 'utf8');
89 const bodyLength = command.body ? command.body.length : 0;
90 if (bodyLength) {
91 total = Buffer.concat([ total, command.body ], total.length + bodyLength);
92 }
93 const hmac = crypto.createHmac('sha1', this.secretKey);
94 const signature = hmac.update(total).digest('base64');
95 command.extFields.Signature = signature;
96 command.extFields.AccessKey = this.accessKey;
97 command.extFields.OnsChannel = this.onsChannel;
98 }
99
100 /**
101 * invoke rocketmq api
102 * @param {RemotingCommand} command - remoting command
103 * @param {Number} timeout - response timeout
104 * @return {Object} response
105 */
106 invoke(command, timeout) {
107 this.beforeRequest(command);
108 return this.sendPromise({
109 id: command.opaque,
110 data: command.encode(),
111 timeout,
112 }).catch(err => {
113 // TODO: not sure whether is work ?
114 if (err.name === 'ResponseTimeoutError') {
115 this.close();
116 }
117 throw err;
118 });
119 }
120
121 /**
122 * invoke rocketmq api without need response
123 * @param {RemotingCommand} command - remoting command
124 * @return {Promise} Promise
125 */
126 invokeOneway(command) {
127 this.beforeRequest(command);
128 return this.sendPromise({
129 id: command.opaque,
130 data: command.encode(),
131 oneway: true,
132 });
133 }
134}
135
136module.exports = Channel;