1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 | var define = require('./define');
|
8 | var utils = require('./utils');
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 | function Connection (options) {
|
19 | options = options || {};
|
20 |
|
21 | var ns = this._ns = utils.createNamespace(options);
|
22 | var id = this.id = options.id;
|
23 | var debug = this._debug = utils.debug('connection:' + id);
|
24 |
|
25 | this._prefix = ns('redis.prefix') || define.redisPrefix;
|
26 |
|
27 |
|
28 | this._redisConnections = {};
|
29 | this._redisConnections.subscribe = utils.createRedisConnection(ns('redis.host'), ns('redis.port'), ns('redis.db'), ns('redis.auth_pass'));
|
30 | this._redisConnections.publish = utils.createRedisConnection(ns('redis.host'), ns('redis.port'), ns('redis.db'), ns('redis.auth_pass'));
|
31 |
|
32 | this._listen();
|
33 | }
|
34 |
|
35 | utils.inheritsEventEmitter(Connection);
|
36 |
|
37 |
|
38 | Connection.prototype._key = function (key) {
|
39 | var list = Array.prototype.slice.call(arguments);
|
40 | if (this._prefix) list.unshift(this._prefix);
|
41 | return list.join(':');
|
42 | };
|
43 |
|
44 |
|
45 | Connection.prototype._stripPrefix = function (key) {
|
46 | if (key.slice(0, this._prefix.length + 1) === this._prefix + ':') {
|
47 | return key.slice(this._prefix.length + 1);
|
48 | } else {
|
49 | return key;
|
50 | }
|
51 | };
|
52 |
|
53 |
|
54 | Connection.prototype._callback = function (fn) {
|
55 | if (typeof fn !== 'function') {
|
56 | var debug = this._debug;
|
57 | fn = function (err) {
|
58 | debug('callback: err=%s, args=%s', err, Array.prototype.slice.call(arguments));
|
59 | };
|
60 | }
|
61 | return fn;
|
62 | };
|
63 |
|
64 |
|
65 | Connection.prototype._redis = function (name) {
|
66 | return this._redisConnections[name];
|
67 | };
|
68 |
|
69 |
|
70 | Connection.prototype._listen = function (callback) {
|
71 | var me = this;
|
72 | var key = this._key('L', this.id);
|
73 | this._debug('start listen: key=%s', key);
|
74 |
|
75 | this._redis('subscribe').on('subscribe', function (channel, count) {
|
76 | me._debug('subscribe succeed: channel=%s, count=%s', channel, count);
|
77 | setTimeout(function () {
|
78 | me._debug('emit event: listen');
|
79 | me.emit('listen');
|
80 | }, define.emitListenDelay);
|
81 | });
|
82 |
|
83 | this._redis('subscribe').on('message', function (channel, msg) {
|
84 | me._debug('receive message: channel=%s, msg=%s', channel, msg);
|
85 |
|
86 | if (channel !== key) {
|
87 | me._debug(' - message from unknown channel: channel=%s', channel);
|
88 | return;
|
89 | }
|
90 |
|
91 | me._debug('emit message: %s', msg);
|
92 | me.emit('message', msg);
|
93 | });
|
94 |
|
95 | this._redis('subscribe').subscribe(key, this._callback(callback));
|
96 | };
|
97 |
|
98 |
|
99 |
|
100 |
|
101 |
|
102 |
|
103 |
|
104 |
|
105 | Connection.prototype.send = function (receiver, data, callback) {
|
106 | var key = this._key('L', receiver);
|
107 | this._debug('send: receiver=%s, key=%s', receiver, key);
|
108 | this._redis('publish').publish(key, data, this._callback(callback));
|
109 | };
|
110 |
|
111 |
|
112 |
|
113 |
|
114 |
|
115 |
|
116 |
|
117 |
|
118 | Connection.prototype.registerKey = function (key, ttl, callback) {
|
119 | var newKey = this._key(key);
|
120 | this._debug('registerKey: key=%s, newKey=%s, ttl=%s', key, newKey, ttl);
|
121 | this._redis('publish').setex(newKey, ttl, 1, function (err) {
|
122 | callback(err, key);
|
123 | });
|
124 | };
|
125 |
|
126 |
|
127 |
|
128 |
|
129 |
|
130 |
|
131 |
|
132 | Connection.prototype.deleteKey = function (key, callback) {
|
133 | var newKey = this._key(key);
|
134 | this._debug('deleteKey: key=%s, newKey=%s', key, newKey);
|
135 | this._redis('publish').del(newKey, function (err) {
|
136 | callback(err, key);
|
137 | });
|
138 | };
|
139 |
|
140 |
|
141 |
|
142 |
|
143 |
|
144 |
|
145 |
|
146 | Connection.prototype.deleteKeys = function (keys, callback) {
|
147 | var me = this;
|
148 | this._debug('deleteKeys: keys=%s', keys);
|
149 | var op = this._redis('publish').multi();
|
150 | keys.forEach(function (key) {
|
151 | op.del(me._key(key));
|
152 | });
|
153 | op.exec(function (err) {
|
154 | callback(err, keys);
|
155 | });
|
156 | };
|
157 |
|
158 |
|
159 |
|
160 |
|
161 |
|
162 |
|
163 |
|
164 | Connection.prototype.keys = function (pattern, callback) {
|
165 | var me = this;
|
166 | var newKey = this._key(pattern);
|
167 | this._debug('keys: pattern=%s, newKey=%s', pattern, newKey);
|
168 | this._redis('publish').keys(newKey, function (err, keys) {
|
169 | if (keys) {
|
170 | keys = keys.map(function (k) {
|
171 | return me._stripPrefix(k);
|
172 | });
|
173 | }
|
174 | callback(err, keys);
|
175 | });
|
176 | };
|
177 |
|
178 |
|
179 |
|
180 |
|
181 |
|
182 |
|
183 | Connection.prototype.exit = function (callback) {
|
184 | this._debug('exit');
|
185 |
|
186 |
|
187 | this._debug('exit: close redis connection');
|
188 | this._redis('publish').end();
|
189 | this._redis('subscribe').end();
|
190 |
|
191 |
|
192 | this.emit('exit', this);
|
193 |
|
194 | if (callback) callback();
|
195 | };
|
196 |
|
197 |
|
198 | module.exports = Connection;
|