1 | 'use strict';
|
2 |
|
3 | const is = require('is-type-of');
|
4 | const crypto = require('crypto');
|
5 | const Base = require('tcp-base');
|
6 | const promisify = require('util').promisify;
|
7 | const RemotingCommand = require('./protocol/command/remoting_command');
|
8 |
|
9 | class Channel extends Base {
|
10 | |
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 |
|
19 | constructor(address, options = {}) {
|
20 |
|
21 | const arr = address.split(':');
|
22 |
|
23 | options.accessKey = options.accessKey || options.accessKeyId;
|
24 | options.secretKey = options.secretKey || options.accessKeySecret;
|
25 | super(Object.assign({
|
26 | host: arr[0],
|
27 | port: arr[1],
|
28 | headerLength: 4,
|
29 | needHeartbeat: false,
|
30 | }, options));
|
31 | this.sendPromise = promisify(this.send);
|
32 | }
|
33 |
|
34 | get accessKey() {
|
35 | return this.options.accessKey;
|
36 | }
|
37 |
|
38 | get secretKey() {
|
39 | return this.options.secretKey;
|
40 | }
|
41 |
|
42 | get onsChannel() {
|
43 | return 'ALIYUN';
|
44 | }
|
45 |
|
46 | |
47 |
|
48 |
|
49 |
|
50 |
|
51 | getBodyLength(header) {
|
52 | return header.readInt32BE(0);
|
53 | }
|
54 |
|
55 | decode(body, header) {
|
56 | const command = RemotingCommand.decode(Buffer.concat([ header, body ]));
|
57 | return {
|
58 | id: command.opaque,
|
59 | isResponse: command.isResponseType,
|
60 | data: command,
|
61 | };
|
62 | }
|
63 |
|
64 | beforeRequest(command) {
|
65 | if (!this.accessKey || !this.secretKey) {
|
66 | return;
|
67 | }
|
68 |
|
69 | const header = command.customHeader;
|
70 | const map = new Map();
|
71 | map.set('AccessKey', this.accessKey);
|
72 | map.set('OnsChannel', this.onsChannel);
|
73 | if (header) {
|
74 | for (const field in header) {
|
75 | if (!is.nullOrUndefined(header[field])) {
|
76 | map.set(field, header[field].toString());
|
77 | }
|
78 | }
|
79 | }
|
80 |
|
81 | let val = '';
|
82 | const fields = Array.from(map.keys()).sort();
|
83 | for (const key of fields) {
|
84 | if (key !== 'Signature') {
|
85 | val += map.get(key);
|
86 | }
|
87 | }
|
88 | let total = Buffer.from(val, 'utf8');
|
89 | const bodyLength = command.body ? command.body.length : 0;
|
90 | if (bodyLength) {
|
91 | total = Buffer.concat([ total, command.body ], total.length + bodyLength);
|
92 | }
|
93 | const hmac = crypto.createHmac('sha1', this.secretKey);
|
94 | const signature = hmac.update(total).digest('base64');
|
95 | command.extFields.Signature = signature;
|
96 | command.extFields.AccessKey = this.accessKey;
|
97 | command.extFields.OnsChannel = this.onsChannel;
|
98 | }
|
99 |
|
100 | |
101 |
|
102 |
|
103 |
|
104 |
|
105 |
|
106 | invoke(command, timeout) {
|
107 | this.beforeRequest(command);
|
108 | return this.sendPromise({
|
109 | id: command.opaque,
|
110 | data: command.encode(),
|
111 | timeout,
|
112 | }).catch(err => {
|
113 |
|
114 | if (err.name === 'ResponseTimeoutError') {
|
115 | this.close();
|
116 | }
|
117 | throw err;
|
118 | });
|
119 | }
|
120 |
|
121 | |
122 |
|
123 |
|
124 |
|
125 |
|
126 | invokeOneway(command) {
|
127 | this.beforeRequest(command);
|
128 | return this.sendPromise({
|
129 | id: command.opaque,
|
130 | data: command.encode(),
|
131 | oneway: true,
|
132 | });
|
133 | }
|
134 | }
|
135 |
|
136 | module.exports = Channel;
|