UNPKG

4.92 kBJavaScriptView Raw
1var EventEmitter = require ('events').EventEmitter,
2 SocketIo = require ('socket.io'),
3 util = require ('util'),
4 flow = require ('../flow'),
5 fs = require ('fs');
6
7/**
8 * @class initiator.socket
9 * @extends events.EventEmitter
10 *
11 * Initiates WebSocket server-related dataflows.
12 */
13var SocketInitiator = module.exports = function (config, initiators) {
14 // we need to launch socket.io
15
16 // initiators start in random order. socket.io
17 // can launch in dependent mode from httpdi,
18 // so we need to start socket.io after httpdi gets initialized
19 process.nextTick (this.init.bind (this, config, initiators));
20}
21
22SocketInitiator.connections = {};
23
24util.inherits (SocketInitiator, EventEmitter);
25
26SocketInitiator.prototype.init = function (config, initiators) {
27
28 if (config.useHttpServer) {
29 this.httpServer = initiators.http.server;
30 } else if (config.port) {
31 this.port = config.port;
32 } else {
33 throw "you must define 'port' key or use existing http initiator ('useHttpServer' key) for socket.io";
34 }
35
36 this.opts = config.opts || {};
37
38 if (config.ssl) {
39 this.opts.key = fs.readFileSync(config.ssl.key).toString();
40 this.opts.cert = fs.readFileSync(config.ssl.cert).toString()
41 }
42
43 if (config.transports) {
44 this.opts.transports = config.transports;
45 }
46
47 if (config.verbose) {
48 this.verbose = true;
49 }
50
51 this.flows = config.workflows || config.dataflows || config.flows;
52 this.timer = config.timer;
53 this.router = config.router;
54
55 // router is function in main module or initiator method
56
57 if (config.router === undefined) {
58 this.router = this.defaultRouter;
59 } else if (process.mainModule.exports[config.router]) {
60 this.router = process.mainModule.exports[config.router];
61 } else if (self[config.router]) {
62 this.router = this[config.router];
63 } else {
64 throw "we cannot find " + config.router + " router method within initiator or function in main module";
65 }
66
67 // - - - start
68
69 this.listen();
70
71}
72
73SocketInitiator.prototype.listen = function () {
74
75 var self = this;
76
77 var socketIo = self.socketIo = SocketIo (this.httpServer || this.port, this.opts);
78
79 if (!this.verbose) {
80 // have no effect on new socket.io
81 if (socketIo.disable) socketIo.disable('log');
82 }
83
84 Object.keys (this.flows).forEach (function (flowName) {
85 var flowUrl = flowName[0] === '/' ? flowName : '/' + flowName;
86 var flowConnection = socketIo.of (flowUrl).on ('connection', function (socket) {
87 if (this.verbose) console.log ('new socket.io connection ' + socket.id + ', scope: ' + socket.nsp.name);
88
89 //SocketInitiator.connections[socket.nsp.name] = socket;
90
91 if (this.flows[flowName].events) {
92 Object.keys (this.flows[flowName].events).forEach (function (eventName) {
93 socket.on (eventName, this.processMessage.bind (this, eventName, this.flows[flowName].events[eventName], socket));
94 }.bind (this));
95 } else {
96 socket.on ('message', this.processMessage.bind (this, 'message', this.flows[flowName], socket));
97 }
98
99 socket.on ('disconnect', function () {
100 if (this.verbose) console.log ('socket.io client disconnected ' + socket.id);
101 //delete SocketInitiator.connections[socket.nsp.name];
102 }.bind (this));
103 }.bind (this));
104 SocketInitiator.connections[flowUrl] = flowConnection;
105 }.bind (this));
106
107
108 if (this.httpServer) {
109 console.log ('socket.io server is attached to http initiator');
110 } else {
111 console.log ('socket.io server is running on ' + this.port + ' port');
112 }
113
114 this.emit ('ready', this);
115}
116
117SocketInitiator.prototype.processMessage = function (eventName, flowData, socket, message) {
118
119 var self = this;
120
121 if (this.verbose) console.log ('processMessage', eventName, socket.nsp.name, socket.id, message);
122
123 this.router (eventName, flowData, socket, message);
124 }
125
126SocketInitiator.prototype.defaultRouter = function (eventName, flowData, socket, message) {
127
128 var df = new flow (
129 util.extend (true, {}, flowData),
130 {
131 message: message,
132 socket: socket
133 }
134 );
135
136 df.on ('completed', this.runPresenter.bind (this, df, 'completed', socket));
137
138 df.on ('failed', this.runPresenter.bind (this, df, 'failed', socket));
139
140 this.emit ("detected", message, socket);
141
142 if (df.ready) df.runDelayed ();
143
144 return df;
145 }
146
147SocketInitiator.prototype.runPresenter = function (df, state, socket) {
148
149 var self = this;
150
151 if (!df.presenter) return;
152
153 var presenter = df.presenter,
154 header,
155 vars,
156 err;
157
158 try {
159
160 header = (presenter.header.interpolate(df, false, true).length == 0) ?
161 presenter.header : presenter.header.interpolate(df);
162
163 vars = presenter.vars.interpolate(df);
164
165 } catch (e) {
166 err = {error: 'No message'};
167 state = 'failed';
168 }
169
170 if (state == 'completed') {
171
172 var msg = header + ':' + JSON.stringify(vars);
173
174 if (presenter.broadcast) {
175 self.socketIo.sockets.send(msg);
176 } else {
177 socket.send(msg);
178 }
179
180 } else {
181
182 socket.send('error:'+ JSON.stringify(err));
183 }
184 }
185