UNPKG

9.01 kBJavaScriptView Raw
1/**
2 * clouds client
3 *
4 * @author 老雷<leizongmin@gmail.com>
5 */
6
7var define = require('./define');
8var utils = require('./utils');
9
10
11/**
12 * Clouds Client
13 *
14 * @param {Object} options
15 * - {Object} redis {host, port, db, prefix}
16 * - {Number} timeout (s)
17 * - {Boolean} notAutoCleanRemoteServer
18 * - {Number} serverMaxAge
19 */
20function CloudsClient (options) {
21 options = options || {};
22 if (!(options.timeout > 0)) options.timeout = define.timeout;
23 if (!(options.serverMaxAge > 0)) options.serverMaxAge = define.serverMaxAge;
24
25 // 初始化CloudsBase
26 options.type = 'client';
27 CloudsClient.super_.call(this, options);
28
29 this._timeout = options.timeout;
30 this._serverMaxAge = this._ns('serverMaxAge') || define.serverMaxAge;
31
32 this._messages = {};
33 this._servers = {};
34 this._serversTimestamp = {};
35 this._findOneServerLastIndex = 0;
36
37 this._autoClearServerList();
38
39 // 处理调用结果
40 this._setHandler('message.r', this._handleResult);
41 // 清理本地服务器列表
42 this._setHandler('clear_server_list', this._handleClearServerList);
43 // 删除本地服务器缓存
44 this._setHandler('remove_server', this._handleRemoveServer);
45 // 保存服务器列表到缓存
46 this._setHandler('save_server_list', this._handleSaveServerList);
47 // 查找一个可用的服务器
48 this._setHandler('find_one_server', this._handleFindOneServer);
49}
50
51utils.inheritsBase(CloudsClient);
52
53// 自动清理本地的服务器列表
54CloudsClient.prototype._autoClearServerList = function () {
55 var me = this;
56 me._clearServerListTid = setInterval(function () {
57 me._debug('start clear server list');
58 me._getHandler('clear_server_list').call(me);
59 }, me._serverMaxAge * 500);
60};
61
62// 清理本地的服务器列表
63CloudsClient.prototype._handleClearServerList = function () {
64 var me = this;
65 var t = Date.now() - me._serverMaxAge * 1000;
66 Object.keys(me._servers).forEach(function (method) {
67 if (me._serversTimestamp[method] <= t) {
68 me._debug('clear server list: %s [%s]', method, me._servers[method].length);
69 delete me._servers[method];
70 delete me._serversTimestamp[method];
71 }
72 });
73};
74
75/**
76 * 调用服务
77 *
78 * @param {String} method
79 * @param {Array} args
80 * @param {Function} callback
81 */
82CloudsClient.prototype.call = function (method, args, callback) {
83 var me = this;
84 var msg = me._protocol.packCall(method, args);
85 var messageId = msg.params.d.i;
86 this._debug('call: %s => %s, args=%s', method, messageId, args);
87
88 // 寻找一个可用的服务器
89 var serverId;
90 me._findOneServer(method, function (err, ret) {
91 if (err) return cb(err);
92
93 serverId = ret;
94 me._setMessageHandler(messageId, cb);
95 me._sendCallRequest(serverId, msg.raw);
96 });
97
98 var hasCallback = false;
99
100 // 保证只回调一次
101 function cb () {
102 me._debug('call [%s]: on callback => %s', method, callback);
103
104 if (hasCallback) {
105 me._debug('call [%s]: has callback');
106 return;
107 }
108
109 hasCallback = true;
110 clearTimeout(tid);
111 callback.apply(null, arguments);
112 }
113
114 // 检查是否调用超时
115 function checkTimeout () {
116 me._debug('call: callback on timeout, method=%s, args=%s', method, args);
117 var err = new Error('timeout');
118 err.code = 'CLOUDS_CALL_SERVICE_TIMEOUT';
119
120 // 清除消息处理程序
121 me._removeMessageHandler(messageId);
122
123 // 清除当前服务器
124 me._removeServer(serverId, method);
125
126 return cb(err);
127 }
128
129 var tid = setTimeout(checkTimeout, me._timeout * 1000);
130
131 return this;
132};
133
134/**
135 * 返回一个调用指定服务的函数
136 *
137 * @param {String} method
138 * @param {Number} retry
139 * @param {Number} onRetry 格式:function (arg1, arg2, ... callback) { callback(arg1, arg2, ...); }
140 * @return {Function}
141 */
142CloudsClient.prototype.bind = function (method, retry, onRetry) {
143 var me = this;
144 retry = Number(retry);
145 if (!(retry > 0)) retry = 0;
146 me._debug('bind: method=%s, retry=%s, onRetry=%s', method, retry, onRetry);
147
148 var timeout = me._timeout * 1000;
149 if (!(timeout > 0)) timeout = 0;
150
151 if (typeof onRetry !== 'function') {
152 onRetry = define.onRetry;
153 }
154
155 return function () {
156
157 var args = Array.prototype.slice.call(arguments);
158 var callback = args.pop();
159 if (typeof callback !== 'function') {
160 throw new Error('must provide a callback function');
161 }
162
163 var retryCounter = 0;
164
165 function onCallback (err) {
166 if (err && retryCounter < retry) {
167 if (err.code === 'CLOUDS_CALL_SERVICE_TIMEOUT') {
168 return tryAgain();
169 }
170 if (err.code === 'CLOUDS_NO_AVAILABLE_SERVER') {
171 return setTimeout(tryAgain, timeout);
172 }
173 }
174 callback.apply(null, arguments);
175
176 function tryAgain () {
177 retryCounter++;
178 me._debug('[bind]call [%s]: retry, counter=%s', method, retryCounter);
179 start();
180 }
181 }
182
183 function start () {
184 me._debug('[bind]call [%s]: start, timeout=%s', timeout);
185 onRetry.apply(null, args.concat(call));
186 }
187
188 function call () {
189 var args = Array.prototype.slice.call(arguments);
190 me.call(method, args, onCallback);
191 }
192
193 start();
194 };
195};
196
197// 寻找一个指定服务可用的服务器ID
198CloudsClient.prototype._findOneServer = function (method, callback) {
199 this._debug('find one server: %s', method);
200 this._getHandler('find_one_server').call(this, method, callback);
201};
202
203CloudsClient.prototype._handleFindOneServer = function (method, callback) {
204 var me = this;
205
206 if (!Array.isArray(me._servers[method])) me._servers[method] = [];
207 if (me._servers[method].length < 1) {
208
209 var key = me._key('S', method, '*');
210 me._connection.keys(key, function (err, list) {
211 if (err) return callback(err);
212 if (!Array.isArray(list)) list = [];
213
214 list = list.map(function (item) {
215 // 取得key中的id
216 return item.split(':').pop();
217 });
218
219 me._saveServerList(method, list);
220 returnOneServer();
221 });
222
223 } else {
224 returnOneServer();
225 }
226
227 function returnOneServer () {
228
229 var len = me._servers[method].length;
230 if (len < 1) {
231 var err = new Error('no available server');
232 err.code = 'CLOUDS_NO_AVAILABLE_SERVER';
233 return callback(err);
234 }
235
236 // 依次取其中一个服务器的ID
237 if (me._findOneServerLastIndex >= me._servers[method].length) {
238 me._findOneServerLastIndex = 0;
239 }
240 var i = me._findOneServerLastIndex;
241 var id= me._servers[method][i];
242 me._findOneServerLastIndex++;
243
244 me._debug('find one server: serverId=%s, method=%s', id, method);
245 return callback(null, id);
246 }
247};
248
249// 发送服务调用请求
250CloudsClient.prototype._sendCallRequest = function (serverId, msg, callback) {
251 var key = this._key(serverId);
252 this._debug('send call request: server=%s, key=%s', serverId, key);
253
254 this._connection.send(key, msg, this._callback(callback));
255};
256
257// 保存可用服务器列表
258CloudsClient.prototype._saveServerList = function (method, list) {
259 this._debug('save server list: [%s] <= %s', method, list);
260 this._getHandler('save_server_list').call(this, method, list);
261};
262
263CloudsClient.prototype._handleSaveServerList = function (method, list) {
264 this._servers[method] = list || [];
265 this._serversTimestamp[method] = Date.now();
266};
267
268// 将指定服务的服务器从可用列表中删除
269CloudsClient.prototype._removeServer = function (serverId, method) {
270 this._debug('remove server from local list: [%s] %s', serverId, method);
271 this._getHandler('remove_server').call(this, serverId, method);
272};
273
274CloudsClient.prototype._handleRemoveServer = function (serverId, method) {
275 if (this._servers[method] && this._servers[method].length > 0) {
276 this._servers[method] = this._servers[method].filter(function (item) {
277 return (item !== serverId);
278 });
279 }
280
281 if (!this._ns('notAutoCleanRemoteServer')) {
282 this._debug('remove server from remote list');
283 this._connection.deleteKey(this._key('S', method, serverId), this._callback());
284 }
285};
286
287// 设置指定消息ID的处理程序
288CloudsClient.prototype._setMessageHandler = function (id, handler) {
289 this._debug('set message handler: %s => %s', id, handler);
290 this._messages[id] = handler;
291};
292
293// 取指定消息ID的处理程序
294CloudsClient.prototype._getMessageHandler = function (id) {
295 var fn = this._messages[id];
296 this._debug('get message handler: %s <= %s', id, fn);
297 return fn;
298};
299
300// 删除指定消息ID的处理程序
301CloudsClient.prototype._removeMessageHandler = function (id) {
302 this._debug('remove message handler: id=%s', id);
303 delete this._messages[id];
304};
305
306// 处理调用结果
307CloudsClient.prototype._handleResult = function (msg) {
308 var fn = this._getMessageHandler(msg.data.i);
309 this._debug('handle call result: #%s %s => %s', msg.data.i, msg.data.r, fn);
310 if (typeof fn !== 'function') {
311 return this._debug('unknown message id: %s', msg.data.i);
312 }
313
314 this._removeMessageHandler(msg.data.i);
315
316 fn.apply(null, [msg.data.e || null].concat(msg.data.r));
317};
318
319
320module.exports = CloudsClient;