UNPKG

3.7 kBJavaScriptView Raw
1/**
2 * clouds server
3 *
4 * @author 老雷<leizongmin@gmail.com>
5 */
6
7var define = require('./define');
8var utils = require('./utils');
9
10
11/**
12 * Clouds Server
13 *
14 * @param {Object} options
15 * - {Object} redis {host, port, db, prefix}
16 * - {Number} heartbeat (s)
17 */
18function CloudsServer (options) {
19 options = options || {};
20 if (!(options.heartbeat > 0)) options.heartbeat = define.heartbeat;
21
22 // 初始化CloudsBase
23 options.type = 'server';
24 CloudsServer.super_.call(this, options);
25
26 var me = this;
27
28 this._heartbeat = options.heartbeat;
29
30 this._services = {};
31 this._messages = {};
32
33 this._heartbeatTid = setInterval(function () {
34 me._keepHeartbeat();
35 }, this._ns('heartbeat') * 1000);
36
37 // 退出时的清理工作
38 this.once('exit', function (me) {
39 // 删除所有相关key
40 var key = me._key('*' + me.id + '*');
41 me._debug('exit: query all related redis keys=%s', key);
42 me._connection.keys(key, function (err, list) {
43 if (err) return callback(err);
44 if (Array.isArray(list) && list.length > 0) {
45
46 me._debug('exit: delete all related redis keys=%s', list);
47 me._connection.deleteKeys(list, function (err) {
48 if (err) return callback(err);
49
50 delKeysSuccess();
51 });
52
53 } else {
54 delKeysSuccess();
55 }
56
57 function delKeysSuccess () {
58 // 停止定时器
59 me._debug('exit: clear timer');
60 clearInterval(me._heartbeatTid);
61 }
62 });
63 });
64
65 // 处理调用请求
66 this._setHandler('message.c', this._handleCallService);
67}
68
69utils.inheritsBase(CloudsServer);
70
71/**
72 * 注册服务
73 *
74 * @param {String} method
75 * @param {Function} handle
76 * @param {Function} callback
77 */
78CloudsServer.prototype.register = function (method, handle, callback) {
79 this._debug('register: %s => %s', method, handle);
80
81 this._services[method] = handle;
82 this._resetServiceScore(method, callback);
83
84 return this;
85};
86
87// 重新注册注册服务到Redis的可用服务器列表
88CloudsServer.prototype._resetServiceScore = function (method, callback) {
89 var key = this._key('S', method, this.id);
90 this._debug('reset service score: %s, key=%s', method, key);
91
92 this._connection.registerKey(key, this._heartbeat * 2, this._callback(callback));
93};
94
95// 保持服务在Redis的可用服务器列表
96CloudsServer.prototype._keepServiceScore = function (method, callback) {
97 var me = this;
98 var key = this._key('S', method, this.id);
99 me._debug('keep service score: %s, key=%s', method, key);
100
101 me._connection.registerKey(key, this._heartbeat * 2, this._callback(callback));
102};
103
104// 心跳
105CloudsServer.prototype._keepHeartbeat = function () {
106 var me = this;
107 me._debug('heartbeat');
108 Object.keys(me._services).forEach(function (n) {
109 me._keepServiceScore(n);
110 });
111};
112
113// 处理服务调用请求
114CloudsServer.prototype._handleCallService = function (msg) {
115 var me = this;
116 this._debug('handle call service: %s %s', msg.data.m, msg.data.a);
117
118 var fn = me._services[msg.data.m];
119 if (typeof fn !== 'function') {
120 return me._responseResult(msg, new Error('service handler not found'));
121 }
122
123 fn.apply(null, msg.data.a.concat(function (err) {
124 var args = Array.prototype.slice.call(arguments, 1);
125 me._responseResult(msg, err, args);
126 }));
127};
128
129// 返回结果
130CloudsServer.prototype._responseResult = function (sourceMsg, err, args, callback) {
131 var key = this._key(sourceMsg.sender);
132 this._debug('response result: client=%s, key=%s, err=%s, args=%s', sourceMsg.sender, key, err, args);
133
134 var msg = this._protocol.packResult(sourceMsg.data.i, err, args);
135
136 this._connection.send(key, msg.raw, this._callback(callback));
137};
138
139
140
141module.exports = CloudsServer;