UNPKG

4.89 kBJavaScriptView Raw
1/**
2 * clouds connection
3 *
4 * @author 老雷<leizongmin@gmail.com>
5 */
6
7var define = require('./define');
8var utils = require('./utils');
9
10
11/**
12 * Clouds Connection
13 *
14 * @param {Object} options
15 * - {String} id
16 * - {Object} redis {host, port, db, prefix, auth_pass}
17 */
18function Connection (options) {
19 options = options || {};
20
21 var ns = this._ns = utils.createNamespace(options);
22 var id = this.id = options.id;
23 var debug = this._debug = utils.debug('connection:' + id);
24
25 this._prefix = ns('redis.prefix') || define.redisPrefix;
26
27 // create redis connection
28 this._redisConnections = {};
29 this._redisConnections.subscribe = utils.createRedisConnection(ns('redis.host'), ns('redis.port'), ns('redis.db'), ns('redis.auth_pass'));
30 this._redisConnections.publish = utils.createRedisConnection(ns('redis.host'), ns('redis.port'), ns('redis.db'), ns('redis.auth_pass'));
31
32 this._listen();
33}
34
35utils.inheritsEventEmitter(Connection);
36
37// 获得redis key
38Connection.prototype._key = function (key) {
39 var list = Array.prototype.slice.call(arguments);
40 if (this._prefix) list.unshift(this._prefix);
41 return list.join(':');
42};
43
44// 去掉redis key prefix
45Connection.prototype._stripPrefix = function (key) {
46 if (key.slice(0, this._prefix.length + 1) === this._prefix + ':') {
47 return key.slice(this._prefix.length + 1);
48 } else {
49 return key;
50 }
51};
52
53// 处理默认的回调函数
54Connection.prototype._callback = function (fn) {
55 if (typeof fn !== 'function') {
56 var debug = this._debug;
57 fn = function (err) {
58 debug('callback: err=%s, args=%s', err, Array.prototype.slice.call(arguments));
59 };
60 }
61 return fn;
62};
63
64// 取redis客户端
65Connection.prototype._redis = function (name) {
66 return this._redisConnections[name];
67};
68
69// 开始监听消息
70Connection.prototype._listen = function (callback) {
71 var me = this;
72 var key = this._key('L', this.id);
73 this._debug('start listen: key=%s', key);
74
75 this._redis('subscribe').on('subscribe', function (channel, count) {
76 me._debug('subscribe succeed: channel=%s, count=%s', channel, count);
77 setTimeout(function () {
78 me._debug('emit event: listen');
79 me.emit('listen');
80 }, define.emitListenDelay);
81 });
82
83 this._redis('subscribe').on('message', function (channel, msg) {
84 me._debug('receive message: channel=%s, msg=%s', channel, msg);
85
86 if (channel !== key) {
87 me._debug(' - message from unknown channel: channel=%s', channel);
88 return;
89 }
90
91 me._debug('emit message: %s', msg);
92 me.emit('message', msg);
93 });
94
95 this._redis('subscribe').subscribe(key, this._callback(callback));
96};
97
98/**
99 * 发送消息
100 *
101 * @param {String} receiver
102 * @param {String} data
103 * @param {Function} callback
104 */
105Connection.prototype.send = function (receiver, data, callback) {
106 var key = this._key('L', receiver);
107 this._debug('send: receiver=%s, key=%s', receiver, key);
108 this._redis('publish').publish(key, data, this._callback(callback));
109};
110
111/**
112 * 注册一个Key
113 *
114 * @param {String} key
115 * @param {Number} ttl
116 * @param {Function} callback
117 */
118Connection.prototype.registerKey = function (key, ttl, callback) {
119 var newKey = this._key(key);
120 this._debug('registerKey: key=%s, newKey=%s, ttl=%s', key, newKey, ttl);
121 this._redis('publish').setex(newKey, ttl, 1, function (err) {
122 callback(err, key);
123 });
124};
125
126/**
127 * 删除一个Key
128 *
129 * @param {String} key
130 * @param {Function} callback
131 */
132Connection.prototype.deleteKey = function (key, callback) {
133 var newKey = this._key(key);
134 this._debug('deleteKey: key=%s, newKey=%s', key, newKey);
135 this._redis('publish').del(newKey, function (err) {
136 callback(err, key);
137 });
138};
139
140/**
141 * 删除一组Key
142 *
143 * @param {Array} keys
144 * @param {Function} callback
145 */
146Connection.prototype.deleteKeys = function (keys, callback) {
147 var me = this;
148 this._debug('deleteKeys: keys=%s', keys);
149 var op = this._redis('publish').multi();
150 keys.forEach(function (key) {
151 op.del(me._key(key));
152 });
153 op.exec(function (err) {
154 callback(err, keys);
155 });
156};
157
158/**
159 * 列出符合要求的key
160 *
161 * @param {String} pattern
162 * @param {Function} callback
163 */
164Connection.prototype.keys = function (pattern, callback) {
165 var me = this;
166 var newKey = this._key(pattern);
167 this._debug('keys: pattern=%s, newKey=%s', pattern, newKey);
168 this._redis('publish').keys(newKey, function (err, keys) {
169 if (keys) {
170 keys = keys.map(function (k) {
171 return me._stripPrefix(k);
172 });
173 }
174 callback(err, keys);
175 });
176};
177
178/**
179 * 退出
180 *
181 * @param {Function} callback
182 */
183Connection.prototype.exit = function (callback) {
184 this._debug('exit');
185
186 // 关闭redis连接
187 this._debug('exit: close redis connection');
188 this._redis('publish').end();
189 this._redis('subscribe').end();
190
191 // 触发exit事件
192 this.emit('exit', this);
193
194 if (callback) callback();
195};
196
197
198module.exports = Connection;