UNPKG

5.17 kBJavaScriptView Raw
1var EventEmitter = require('events').EventEmitter;
2var StreamTopic = require('zetta-events-stream-protocol').StreamTopic;
3var ObjectStream = require('zetta-streams').ObjectStream;
4var deviceFormatter = require('./api_formats/siren/device.siren');
5
6function socketFdFromEnv(env) {
7 if (env.request && env.request.connection && env.request.connection.socket && env.request.connection.socket._handle) {
8 return env.request.connection.socket._handle.fd;
9 } else {
10 return null;
11 }
12}
13
14var PubSub = module.exports = function() {
15 this.emitter = new EventEmitter();
16
17 // Keep from warning poping up
18 this.emitter.setMaxListeners(Infinity);
19
20 this._listeners = {};
21
22 // sendcache ensures only one event is sent to cloud connection for a topic subscription
23 // this can happen now because of wildcards and regexes in topics
24 this._sendCache = {}; // { <socketFd>: [event1, event2, ... ] }
25 this._maxCacheSize = 100; // keep list of last 100 events per peer connection
26};
27
28PubSub.prototype.publish = function(topic, data, fromRemote) {
29 fromRemote = !!(fromRemote);
30 var x = decodeURIComponent(topic);
31 this.emitter.emit(x, data, fromRemote);
32 this.emitter.emit('_data', x, data, fromRemote);
33};
34
35PubSub.prototype.subscribe = function(topic, callback) {
36 var self = this;
37 if (typeof topic === 'string') {
38 topic = StreamTopic.parse(topic);
39 }
40
41 var f = function(t, data, fromRemote) {
42 if (topic.match(t)) {
43 if (typeof callback === 'function') {
44 self._onCallback(topic, t, data, fromRemote, callback);
45 } else if (typeof callback === 'object') {
46 // Only send to peer if event did not come from a downstream peer
47 if (!fromRemote) {
48 self._onResponse(topic, t, data, fromRemote, callback);
49 }
50 }
51 }
52 };
53
54 this.emitter.on('_data', f);
55
56 if (!this._listeners[topic.hash()]) {
57 this._listeners[topic.hash()] = [];
58 }
59
60 this._listeners[topic.hash()].push({ listener: callback, actual: f });
61};
62
63PubSub.prototype.unsubscribe = function(topic, listener) {
64 if (typeof topic === 'string') {
65 topic = StreamTopic.parse(topic);
66 }
67
68 if (!this._listeners[topic.hash()]) {
69 return;
70 }
71
72 var found = -1;
73 this._listeners[topic.hash()].some(function(l, idx) {
74 if (l.listener === listener) {
75 found = idx;
76 return true;
77 }
78 });
79
80 if (found === -1) {
81 return;
82 }
83
84 if (typeof listener === 'object') {
85 var underlyingSocketFd = socketFdFromEnv(listener);
86 if (underlyingSocketFd !== null) {
87 delete this._sendCache[underlyingSocketFd];
88 }
89 listener.response.end(); // end response for push request
90 }
91
92 this.emitter.removeListener('_data', this._listeners[topic.hash()][found].actual);
93 this._listeners[topic.hash()].splice(found, 1);
94
95 if (this._listeners[topic.hash()].length === 0) {
96 delete this._listeners[topic.hash()];
97 }
98};
99
100PubSub.prototype._onCallback = function(topic, sourceTopic, data, fromRemote, cb) {
101 var self = this;
102 cb(topic, data, sourceTopic, fromRemote);
103};
104
105
106// topic: StreamTopic that was used to subscribe
107// sourceTopic: topic string emitted
108// data...
109// env: argo env for the subscription request
110PubSub.prototype._onResponse = function(topic, sourceTopic, data, fromRemote, env) {
111 var underlyingSocketFd = socketFdFromEnv(env);
112 if (this._sendCache[underlyingSocketFd] === undefined) {
113 this._sendCache[underlyingSocketFd] = [];
114 }
115
116 if (this._sendCache[underlyingSocketFd].indexOf(data) >= 0) {
117 return;
118 } else {
119 this._sendCache[underlyingSocketFd].push(data);
120 if (this._sendCache[underlyingSocketFd].length > this._maxCacheSize) {
121 this._sendCache[underlyingSocketFd].shift();
122 }
123 }
124
125 var self = this;
126 var encoding = '';
127 if(Buffer.isBuffer(data)) {
128 encoding = 'application/octet-stream';
129 } else if (data.query && data.device) {
130 var serverId = env.route.params.serverId;
131 var loader = { path: '/servers/' + encodeURIComponent(serverId) };
132 data = deviceFormatter({ loader: loader, env: env, model: data.device });
133 encoding = 'application/json';
134 data = new Buffer(JSON.stringify(data));
135 } else if (typeof data == 'object') {
136 encoding = 'application/json';
137
138 // used for _peer/connect _peer/disconnect
139 if (sourceTopic.indexOf('_peer/') === 0 && typeof data.peer === 'object') {
140 data = ObjectStream.format(sourceTopic, data.peer.properties());
141 }
142
143 try {
144 data = new Buffer(JSON.stringify(data));
145 } catch (err) {
146 console.error(err, err.stack);
147 return;
148 }
149 } else {
150 console.error('PubSub._onResponse encoding not set.');
151 }
152 var stream = env.response.push('/' + sourceTopic, { 'Host': encodeURIComponent(serverId) + '.unreachable.zettajs.io',
153 'Content-Length': data.length,
154 'Content-Type': encoding
155 });
156
157 stream.on('error', function(err) {
158 if (err.code === 'RST_STREAM' && err.status === 3) {
159 stream.end();
160 } else {
161 console.error('PubSub._onCallback', err);
162 }
163 });
164
165 stream.end(data);
166};
167