UNPKG

6.48 kBJavaScriptView Raw
1
2const Base = require('sdk-base');
3const util = require('util');
4const ready = require('get-ready');
5const copy = require('copy-to');
6const currentIP = require('address').ip();
7
8const RR = 'roundRobin';
9const MS = 'masterSlave';
10
11module.exports = function (OssClient) {
12 function Client(options) {
13 if (!(this instanceof Client)) {
14 return new Client(options);
15 }
16
17 if (!options || !Array.isArray(options.cluster)) {
18 throw new Error('require options.cluster to be an array');
19 }
20
21 Base.call(this);
22
23 this.clients = [];
24 this.availables = {};
25
26 for (let i = 0; i < options.cluster.length; i++) {
27 const opt = options.cluster[i];
28 copy(options).pick('timeout', 'agent', 'urllib').to(opt);
29 this.clients.push(new OssClient(opt));
30 this.availables[i] = true;
31 }
32
33 this.schedule = options.schedule || RR;
34 // only read from master, default is false
35 this.masterOnly = !!options.masterOnly;
36 this.index = 0;
37
38 const heartbeatInterval = options.heartbeatInterval || 10000;
39 this._checkAvailableLock = false;
40 this._timerId = this._deferInterval(this._checkAvailable.bind(this, true), heartbeatInterval);
41 this._ignoreStatusFile = options.ignoreStatusFile || false;
42 this._init();
43 }
44
45 util.inherits(Client, Base);
46 const proto = Client.prototype;
47 ready.mixin(proto);
48
49 const GET_METHODS = [
50 'head',
51 'get',
52 'getStream',
53 'list',
54 'getACL'
55 ];
56
57 const PUT_METHODS = [
58 'put',
59 'putStream',
60 'delete',
61 'deleteMulti',
62 'copy',
63 'putMeta',
64 'putACL'
65 ];
66
67 GET_METHODS.forEach((method) => {
68 proto[method] = async function (...args) {
69 const client = this.chooseAvailable();
70 let lastError;
71 try {
72 return await client[method](...args);
73 } catch (err) {
74 if (err.status && err.status >= 200 && err.status < 500) {
75 // 200 ~ 499 belong to normal response, don't try again
76 throw err;
77 }
78 // < 200 || >= 500 need to retry from other cluser node
79 lastError = err;
80 }
81
82 for (let i = 0; i < this.clients.length; i++) {
83 const c = this.clients[i];
84 if (c !== client) {
85 try {
86 return await c[method].apply(client, args);
87 } catch (err) {
88 if (err.status && err.status >= 200 && err.status < 500) {
89 // 200 ~ 499 belong to normal response, don't try again
90 throw err;
91 }
92 // < 200 || >= 500 need to retry from other cluser node
93 lastError = err;
94 }
95 }
96 }
97
98 lastError.message += ' (all clients are down)';
99 throw lastError;
100 };
101 });
102
103 // must cluster node write success
104 PUT_METHODS.forEach((method) => {
105 proto[method] = async function (...args) {
106 const res = await Promise.all(this.clients.map(client => client[method](...args)));
107 return res[0];
108 };
109 });
110
111 proto.signatureUrl = function signatureUrl(/* name */...args) {
112 const client = this.chooseAvailable();
113 return client.signatureUrl(...args);
114 };
115
116 proto.getObjectUrl = function getObjectUrl(/* name, baseUrl */...args) {
117 const client = this.chooseAvailable();
118 return client.getObjectUrl(...args);
119 };
120
121 proto._init = function _init() {
122 const that = this;
123 (async () => {
124 await that._checkAvailable(that._ignoreStatusFile);
125 that.ready(true);
126 })().catch((err) => {
127 that.emit('error', err);
128 });
129 };
130
131 proto._checkAvailable = async function _checkAvailable(ignoreStatusFile) {
132 const name = `._ali-oss/check.status.${currentIP}.txt`;
133 if (!ignoreStatusFile) {
134 // only start will try to write the file
135 await this.put(name, Buffer.from(`check available started at ${Date()}`));
136 }
137
138 if (this._checkAvailableLock) {
139 return;
140 }
141 this._checkAvailableLock = true;
142 const downStatusFiles = [];
143 for (let i = 0; i < this.clients.length; i++) {
144 const client = this.clients[i];
145 // check 3 times
146 let available = await this._checkStatus(client, name);
147 if (!available) {
148 // check again
149 available = await this._checkStatus(client, name);
150 }
151 if (!available) {
152 // check again
153 /* eslint no-await-in-loop: [0] */
154 available = await this._checkStatus(client, name);
155 if (!available) {
156 downStatusFiles.push(client._objectUrl(name));
157 }
158 }
159 this.availables[i] = available;
160 }
161 this._checkAvailableLock = false;
162
163 if (downStatusFiles.length > 0) {
164 const err = new Error(`${downStatusFiles.length} data node down, please check status file: ${downStatusFiles.join(', ')}`);
165 err.name = 'CheckAvailableError';
166 this.emit('error', err);
167 }
168 };
169
170 proto._checkStatus = async function _checkStatus(client, name) {
171 let available = true;
172 try {
173 await client.head(name);
174 } catch (err) {
175 // 404 will be available too
176 if (!err.status || err.status >= 500 || err.status < 200) {
177 available = false;
178 }
179 }
180 return available;
181 };
182
183 proto.chooseAvailable = function chooseAvailable() {
184 if (this.schedule === MS) {
185 // only read from master
186 if (this.masterOnly) {
187 return this.clients[0];
188 }
189 for (let i = 0; i < this.clients.length; i++) {
190 if (this.availables[i]) {
191 return this.clients[i];
192 }
193 }
194 // all down, try to use this first one
195 return this.clients[0];
196 }
197
198 // RR
199 let n = this.clients.length;
200 while (n > 0) {
201 const i = this._nextRRIndex();
202 if (this.availables[i]) {
203 return this.clients[i];
204 }
205 n--;
206 }
207 // all down, try to use this first one
208 return this.clients[0];
209 };
210
211 proto._nextRRIndex = function _nextRRIndex() {
212 const index = this.index++;
213 if (this.index >= this.clients.length) {
214 this.index = 0;
215 }
216 return index;
217 };
218
219 proto._error = function error(err) {
220 if (err) throw err;
221 };
222
223 proto._createCallback = function _createCallback(ctx, gen, cb) {
224 return () => {
225 cb = cb || this._error;
226 gen.call(ctx).then(() => {
227 cb();
228 }, cb);
229 };
230 };
231 proto._deferInterval = function _deferInterval(gen, timeout, cb) {
232 return setInterval(this._createCallback(this, gen, cb), timeout);
233 };
234
235 proto.close = function close() {
236 clearInterval(this._timerId);
237 this._timerId = null;
238 };
239
240 return Client;
241};