1 |
|
2 | const Base = require('sdk-base');
|
3 | const util = require('util');
|
4 | const ready = require('get-ready');
|
5 | const copy = require('copy-to');
|
6 | const currentIP = require('address').ip();
|
7 |
|
8 | const RR = 'roundRobin';
|
9 | const MS = 'masterSlave';
|
10 |
|
11 | module.exports = function (OssClient) {
|
12 | function Client(options) {
|
13 | if (!(this instanceof Client)) {
|
14 | return new Client(options);
|
15 | }
|
16 |
|
17 | if (!options || !Array.isArray(options.cluster)) {
|
18 | throw new Error('require options.cluster to be an array');
|
19 | }
|
20 |
|
21 | Base.call(this);
|
22 |
|
23 | this.clients = [];
|
24 | this.availables = {};
|
25 |
|
26 | for (let i = 0; i < options.cluster.length; i++) {
|
27 | const opt = options.cluster[i];
|
28 | copy(options).pick('timeout', 'agent', 'urllib').to(opt);
|
29 | this.clients.push(new OssClient(opt));
|
30 | this.availables[i] = true;
|
31 | }
|
32 |
|
33 | this.schedule = options.schedule || RR;
|
34 |
|
35 | this.masterOnly = !!options.masterOnly;
|
36 | this.index = 0;
|
37 |
|
38 | const heartbeatInterval = options.heartbeatInterval || 10000;
|
39 | this._checkAvailableLock = false;
|
40 | this._timerId = this._deferInterval(this._checkAvailable.bind(this, true), heartbeatInterval);
|
41 | this._ignoreStatusFile = options.ignoreStatusFile || false;
|
42 | this._init();
|
43 | }
|
44 |
|
45 | util.inherits(Client, Base);
|
46 | const proto = Client.prototype;
|
47 | ready.mixin(proto);
|
48 |
|
49 | const GET_METHODS = [
|
50 | 'head',
|
51 | 'get',
|
52 | 'getStream',
|
53 | 'list',
|
54 | 'getACL'
|
55 | ];
|
56 |
|
57 | const PUT_METHODS = [
|
58 | 'put',
|
59 | 'putStream',
|
60 | 'delete',
|
61 | 'deleteMulti',
|
62 | 'copy',
|
63 | 'putMeta',
|
64 | 'putACL'
|
65 | ];
|
66 |
|
67 | GET_METHODS.forEach((method) => {
|
68 | proto[method] = async function (...args) {
|
69 | const client = this.chooseAvailable();
|
70 | let lastError;
|
71 | try {
|
72 | return await client[method](...args);
|
73 | } catch (err) {
|
74 | if (err.status && err.status >= 200 && err.status < 500) {
|
75 |
|
76 | throw err;
|
77 | }
|
78 |
|
79 | lastError = err;
|
80 | }
|
81 |
|
82 | for (let i = 0; i < this.clients.length; i++) {
|
83 | const c = this.clients[i];
|
84 | if (c !== client) {
|
85 | try {
|
86 | return await c[method].apply(client, args);
|
87 | } catch (err) {
|
88 | if (err.status && err.status >= 200 && err.status < 500) {
|
89 |
|
90 | throw err;
|
91 | }
|
92 |
|
93 | lastError = err;
|
94 | }
|
95 | }
|
96 | }
|
97 |
|
98 | lastError.message += ' (all clients are down)';
|
99 | throw lastError;
|
100 | };
|
101 | });
|
102 |
|
103 |
|
104 | PUT_METHODS.forEach((method) => {
|
105 | proto[method] = async function (...args) {
|
106 | const res = await Promise.all(this.clients.map(client => client[method](...args)));
|
107 | return res[0];
|
108 | };
|
109 | });
|
110 |
|
111 | proto.signatureUrl = function signatureUrl(/* name */...args) {
|
112 | const client = this.chooseAvailable();
|
113 | return client.signatureUrl(...args);
|
114 | };
|
115 |
|
116 | proto.getObjectUrl = function getObjectUrl(/* name, baseUrl */...args) {
|
117 | const client = this.chooseAvailable();
|
118 | return client.getObjectUrl(...args);
|
119 | };
|
120 |
|
121 | proto._init = function _init() {
|
122 | const that = this;
|
123 | (async () => {
|
124 | await that._checkAvailable(that._ignoreStatusFile);
|
125 | that.ready(true);
|
126 | })().catch((err) => {
|
127 | that.emit('error', err);
|
128 | });
|
129 | };
|
130 |
|
131 | proto._checkAvailable = async function _checkAvailable(ignoreStatusFile) {
|
132 | const name = `._ali-oss/check.status.${currentIP}.txt`;
|
133 | if (!ignoreStatusFile) {
|
134 |
|
135 | await this.put(name, Buffer.from(`check available started at ${Date()}`));
|
136 | }
|
137 |
|
138 | if (this._checkAvailableLock) {
|
139 | return;
|
140 | }
|
141 | this._checkAvailableLock = true;
|
142 | const downStatusFiles = [];
|
143 | for (let i = 0; i < this.clients.length; i++) {
|
144 | const client = this.clients[i];
|
145 |
|
146 | let available = await this._checkStatus(client, name);
|
147 | if (!available) {
|
148 |
|
149 | available = await this._checkStatus(client, name);
|
150 | }
|
151 | if (!available) {
|
152 |
|
153 |
|
154 | available = await this._checkStatus(client, name);
|
155 | if (!available) {
|
156 | downStatusFiles.push(client._objectUrl(name));
|
157 | }
|
158 | }
|
159 | this.availables[i] = available;
|
160 | }
|
161 | this._checkAvailableLock = false;
|
162 |
|
163 | if (downStatusFiles.length > 0) {
|
164 | const err = new Error(`${downStatusFiles.length} data node down, please check status file: ${downStatusFiles.join(', ')}`);
|
165 | err.name = 'CheckAvailableError';
|
166 | this.emit('error', err);
|
167 | }
|
168 | };
|
169 |
|
170 | proto._checkStatus = async function _checkStatus(client, name) {
|
171 | let available = true;
|
172 | try {
|
173 | await client.head(name);
|
174 | } catch (err) {
|
175 |
|
176 | if (!err.status || err.status >= 500 || err.status < 200) {
|
177 | available = false;
|
178 | }
|
179 | }
|
180 | return available;
|
181 | };
|
182 |
|
183 | proto.chooseAvailable = function chooseAvailable() {
|
184 | if (this.schedule === MS) {
|
185 |
|
186 | if (this.masterOnly) {
|
187 | return this.clients[0];
|
188 | }
|
189 | for (let i = 0; i < this.clients.length; i++) {
|
190 | if (this.availables[i]) {
|
191 | return this.clients[i];
|
192 | }
|
193 | }
|
194 |
|
195 | return this.clients[0];
|
196 | }
|
197 |
|
198 |
|
199 | let n = this.clients.length;
|
200 | while (n > 0) {
|
201 | const i = this._nextRRIndex();
|
202 | if (this.availables[i]) {
|
203 | return this.clients[i];
|
204 | }
|
205 | n--;
|
206 | }
|
207 |
|
208 | return this.clients[0];
|
209 | };
|
210 |
|
211 | proto._nextRRIndex = function _nextRRIndex() {
|
212 | const index = this.index++;
|
213 | if (this.index >= this.clients.length) {
|
214 | this.index = 0;
|
215 | }
|
216 | return index;
|
217 | };
|
218 |
|
219 | proto._error = function error(err) {
|
220 | if (err) throw err;
|
221 | };
|
222 |
|
223 | proto._createCallback = function _createCallback(ctx, gen, cb) {
|
224 | return () => {
|
225 | cb = cb || this._error;
|
226 | gen.call(ctx).then(() => {
|
227 | cb();
|
228 | }, cb);
|
229 | };
|
230 | };
|
231 | proto._deferInterval = function _deferInterval(gen, timeout, cb) {
|
232 | return setInterval(this._createCallback(this, gen, cb), timeout);
|
233 | };
|
234 |
|
235 | proto.close = function close() {
|
236 | clearInterval(this._timerId);
|
237 | this._timerId = null;
|
238 | };
|
239 |
|
240 | return Client;
|
241 | };
|