UNPKG

4.77 kBJavaScriptView Raw
1/**
2 * clouds connection
3 *
4 * @author 老雷<leizongmin@gmail.com>
5 */
6
7var define = require('./define');
8var utils = require('./utils');
9
10
11/**
12 * Clouds Connection
13 *
14 * @param {Object} options
15 * - {String} id
16 * - {Object} redis {host, port, db, prefix, password|auth_pass}
17 */
18function Connection (options) {
19 options = options || {};
20
21 var ns = this._ns = utils.createNamespace(options);
22 var id = this.id = options.id;
23 var debug = this._debug = utils.debug('connection:' + id);
24
25 this._prefix = ns('redis.prefix') || define.redisPrefix;
26
27 // create redis connection
28 this._redisConnections = {};
29 this._redisConnections.subscribe = utils.createRedisConnection(ns('redis'));
30 this._redisConnections.publish = utils.createRedisConnection(ns('redis'));
31
32 this._listen();
33}
34
35utils.inheritsEventEmitter(Connection);
36
37// 获得redis key
38Connection.prototype._key = function (key) {
39 var list = Array.prototype.slice.call(arguments);
40 if (this._prefix) list.unshift(this._prefix);
41 return list.join(':');
42};
43
44// 去掉redis key prefix
45Connection.prototype._stripPrefix = function (key) {
46 if (key.slice(0, this._prefix.length + 1) === this._prefix + ':') {
47 return key.slice(this._prefix.length + 1);
48 } else {
49 return key;
50 }
51};
52
53// 处理默认的回调函数
54Connection.prototype._callback = function (fn) {
55 if (typeof fn !== 'function') {
56 var debug = this._debug;
57 fn = function (err) {
58 debug('callback: err=%s, args=%s', err, Array.prototype.slice.call(arguments));
59 };
60 }
61 return fn;
62};
63
64// 取redis客户端
65Connection.prototype._redis = function (name) {
66 return this._redisConnections[name];
67};
68
69// 开始监听消息
70Connection.prototype._listen = function (callback) {
71 var me = this;
72 var key = this._key('L', this.id);
73 this._debug('start listen: key=%s', key);
74
75 this._redis('subscribe').on('message', function (channel, msg) {
76 me._debug('receive message: channel=%s, msg=%s', channel, msg);
77
78 if (channel !== key) {
79 me._debug(' - message from unknown channel: channel=%s', channel);
80 return;
81 }
82
83 me._debug('emit message: %s', msg);
84 me.emit('message', msg);
85 });
86
87 this._redis('subscribe').subscribe(key, function (err, count) {
88 if (!err) {
89 me._debug('subscribe succeed: channel=%s, count=%s', key, count);
90 setTimeout(function () {
91 me._debug('emit event: listen');
92 me.emit('listen');
93 }, define.emitListenDelay);
94 }
95 if (callback) callback(err, count);
96 });
97};
98
99/**
100 * 发送消息
101 *
102 * @param {String} receiver
103 * @param {String} data
104 * @param {Function} callback
105 */
106Connection.prototype.send = function (receiver, data, callback) {
107 var key = this._key('L', receiver);
108 this._debug('send: receiver=%s, key=%s', receiver, key);
109 this._redis('publish').publish(key, data, this._callback(callback));
110};
111
112/**
113 * 注册一个Key
114 *
115 * @param {String} key
116 * @param {Number} ttl
117 * @param {Function} callback
118 */
119Connection.prototype.registerKey = function (key, ttl, callback) {
120 var newKey = this._key(key);
121 this._debug('registerKey: key=%s, newKey=%s, ttl=%s', key, newKey, ttl);
122 this._redis('publish').setex(newKey, ttl, 1, function (err) {
123 callback(err, key);
124 });
125};
126
127/**
128 * 删除一个Key
129 *
130 * @param {String} key
131 * @param {Function} callback
132 */
133Connection.prototype.deleteKey = function (key, callback) {
134 var newKey = this._key(key);
135 this._debug('deleteKey: key=%s, newKey=%s', key, newKey);
136 this._redis('publish').del(newKey, function (err) {
137 callback(err, key);
138 });
139};
140
141/**
142 * 删除一组Key
143 *
144 * @param {Array} keys
145 * @param {Function} callback
146 */
147Connection.prototype.deleteKeys = function (keys, callback) {
148 var me = this;
149 this._debug('deleteKeys: keys=%s', keys);
150 var op = this._redis('publish').multi();
151 keys.forEach(function (key) {
152 op.del(me._key(key));
153 });
154 op.exec(function (err) {
155 callback(err, keys);
156 });
157};
158
159/**
160 * 列出符合要求的key
161 *
162 * @param {String} pattern
163 * @param {Function} callback
164 */
165Connection.prototype.keys = function (pattern, callback) {
166 var me = this;
167 var newKey = this._key(pattern);
168 this._debug('keys: pattern=%s, newKey=%s', pattern, newKey);
169 this._redis('publish').keys(newKey, function (err, keys) {
170 if (keys) {
171 keys = keys.map(function (k) {
172 return me._stripPrefix(k);
173 });
174 }
175 callback(err, keys);
176 });
177};
178
179/**
180 * 退出
181 *
182 * @param {Function} callback
183 */
184Connection.prototype.exit = function (callback) {
185 this._debug('exit');
186
187 // 关闭redis连接
188 this._debug('exit: close redis connection');
189 this._redis('publish').end();
190 this._redis('subscribe').end();
191
192 // 触发exit事件
193 this.emit('exit', this);
194
195 if (callback) callback();
196};
197
198
199module.exports = Connection;