UNPKG

4.19 kBJavaScriptView Raw
1var EventEmitter = require ('events').EventEmitter,
2 SocketIo = require('socket.io'),
3 util = require ('util'),
4 flow = require ('../flow'),
5 fs = require ('fs');
6
7/**
8 * @class initiator.socket
9 * @extends events.EventEmitter
10 *
11 * Initiates WebSocket server-related dataflows.
12 */
13var socket = module.exports = function (config) {
14 // we need to launch socket.io
15
16 var self = this;
17
18 if (!config.port) {
19 throw "you must define 'port' key for http initiator";
20 } else {
21 this.port = config.port;
22 }
23
24 if (config.ssl) {
25 this.opts = {
26 key : fs.readFileSync(config.ssl.key).toString(),
27 cert : fs.readFileSync(config.ssl.cert).toString()
28 };
29 } else {
30 this.opts = {};
31 }
32
33 this.flows = config.workflows || config.dataflows || config.flows;
34 self.timer = config.timer;
35 self.router = config.router;
36
37 // router is function in main module or initiator method
38
39 if (config.router === void 0) {
40 self.router = self.defaultRouter;
41 } else if (process.mainModule.exports[config.router]) {
42 self.router = process.mainModule.exports[config.router];
43 } else if (self[config.router]) {
44 self.router = this[config.router];
45 } else {
46 throw "we cannot find " + config.router + " router method within initiator or function in main module";
47 }
48
49 // - - - start
50
51 self.listen();
52}
53
54util.inherits (socket, EventEmitter);
55
56util.extend (socket.prototype, {
57
58 listen: function () {
59
60 var self = this;
61
62 var socketIo = self.socketIo = SocketIo.listen(self.port, self.opts);
63
64 socketIo.set('transports', ['websocket']);
65 if (!self.log) socketIo.disable('log');
66
67 socketIo.sockets.on('connection', function (socket) {
68
69 if (self.log) console.log('Socket server connection ' + socket.id);
70
71 socket.on('message', function(msg) {
72 self.processMessage(socket, msg);
73 });
74
75 socket.on('disconnect', function () {
76 if (self.log) console.log('Socket server disconnection ' + socket.id);
77 });
78 });
79
80 console.log('Socket server running on ' + self.port + ' port');
81
82 self.emit ('ready', this.server);
83 },
84
85 processMessage: function (socket, message) {
86
87 var self = this;
88
89 if (this.log) console.log('processMessage', socket.id, message);
90
91 var re = /^([A-Z0-9a-z\/]+)(:(.+))?$/;
92 var match = message.match(re);
93
94 if (match && match[1]) {
95
96 var route = match[1];
97 var rawData = match[3],
98 data = {};
99
100 if (rawData) {
101 try {
102 data = JSON.parse(rawData);
103 } catch (e) {
104 data.raw = rawData;
105 }
106 }
107
108 var query = {
109 route: route,
110 data: data
111 };
112
113 this.router(query, socket);
114
115 } else {
116 if (self.log) console.log('Socket initiator: Strange formatted message');
117 }
118 },
119
120 defaultRouter: function (query, socket) {
121
122 var self = this,
123 df,
124 route = query.route;
125
126 if (self.flows.constructor == Array) {
127
128 self.flows.every (function (item) {
129
130 var match = route.match(item.route);
131
132 if (match && match[0] == route) { //exact match
133
134 if (self.log) console.log ('socket match to ' + route);
135
136 df = new flow (
137 util.extend (true, {}, item),
138 {
139 query: query,
140 socket: socket
141 }
142 );
143
144 df.on ('completed', function (df) {
145 self.runPresenter (df, 'completed', socket);
146 });
147
148 df.on ('failed', function (df) {
149 self.runPresenter (df, 'failed', socket);
150 });
151
152 self.emit ("detected", query, socket);
153
154 if (df.ready) df.run();
155
156 return false;
157 }
158
159 return true;
160
161 });
162
163 if (!df) {
164 self.emit ("unknown", query, socket);
165 }
166 }
167
168 return df;
169 },
170
171 runPresenter: function (df, state, socket) {
172
173 var self = this;
174
175 if (!df.presenter) return;
176
177 var presenter = df.presenter,
178 header,
179 vars,
180 err;
181
182 try {
183
184 header = (presenter.header.interpolate(df, false, true).length == 0) ?
185 presenter.header : presenter.header.interpolate(df);
186
187 vars = presenter.vars.interpolate(df);
188
189 } catch (e) {
190 err = {error: 'No message'};
191 state = 'failed';
192 }
193
194 if (state == 'completed') {
195
196 var msg = header + ':' + JSON.stringify(vars);
197
198 if (presenter.broadcast) {
199 self.socketIo.sockets.send(msg);
200 } else {
201 socket.send(msg);
202 }
203
204 } else {
205
206 socket.send('error:'+ JSON.stringify(err));
207 }
208 }
209});