1 | var EventEmitter = require ('events').EventEmitter,
|
2 | SocketIo = require ('socket.io'),
|
3 | util = require ('util'),
|
4 | flow = require ('../flow'),
|
5 | fs = require ('fs');
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 | var SocketInitiator = module.exports = function (config, initiators) {
|
14 |
|
15 |
|
16 |
|
17 |
|
18 |
|
19 | process.nextTick (this.init.bind (this, config, initiators));
|
20 | }
|
21 |
|
22 | SocketInitiator.connections = {};
|
23 |
|
24 | util.inherits (SocketInitiator, EventEmitter);
|
25 |
|
26 | SocketInitiator.prototype.init = function (config, initiators) {
|
27 |
|
28 | if (config.useHttpServer) {
|
29 | this.httpServer = initiators.http.server;
|
30 | } else if (config.port) {
|
31 | this.port = config.port;
|
32 | } else {
|
33 | throw "you must define 'port' key or use existing http initiator ('useHttpServer' key) for socket.io";
|
34 | }
|
35 |
|
36 | this.opts = config.opts || {};
|
37 |
|
38 | if (config.ssl) {
|
39 | this.opts.key = fs.readFileSync(config.ssl.key).toString();
|
40 | this.opts.cert = fs.readFileSync(config.ssl.cert).toString()
|
41 | }
|
42 |
|
43 | if (config.transports) {
|
44 | this.opts.transports = config.transports;
|
45 | }
|
46 |
|
47 | if (config.verbose) {
|
48 | this.verbose = true;
|
49 | }
|
50 |
|
51 | this.flows = config.workflows || config.dataflows || config.flows;
|
52 | this.timer = config.timer;
|
53 | this.router = config.router;
|
54 |
|
55 |
|
56 |
|
57 | if (config.router === undefined) {
|
58 | this.router = this.defaultRouter;
|
59 | } else if (process.mainModule.exports[config.router]) {
|
60 | this.router = process.mainModule.exports[config.router];
|
61 | } else if (self[config.router]) {
|
62 | this.router = this[config.router];
|
63 | } else {
|
64 | throw "we cannot find " + config.router + " router method within initiator or function in main module";
|
65 | }
|
66 |
|
67 |
|
68 |
|
69 | this.listen();
|
70 |
|
71 | }
|
72 |
|
73 | SocketInitiator.prototype.listen = function () {
|
74 |
|
75 | var self = this;
|
76 |
|
77 | var socketIo = self.socketIo = SocketIo (this.httpServer || this.port, this.opts);
|
78 |
|
79 | if (!this.verbose) {
|
80 |
|
81 | if (socketIo.disable) socketIo.disable('log');
|
82 | }
|
83 |
|
84 | Object.keys (this.flows).forEach (function (flowName) {
|
85 | var flowUrl = flowName[0] === '/' ? flowName : '/' + flowName;
|
86 | var flowConnection = socketIo.of (flowUrl).on ('connection', function (socket) {
|
87 | if (this.verbose) console.log ('new socket.io connection ' + socket.id + ', scope: ' + socket.nsp.name);
|
88 |
|
89 |
|
90 |
|
91 | if (this.flows[flowName].events) {
|
92 | Object.keys (this.flows[flowName].events).forEach (function (eventName) {
|
93 | socket.on (eventName, this.processMessage.bind (this, eventName, this.flows[flowName].events[eventName], socket));
|
94 | }.bind (this));
|
95 | } else {
|
96 | socket.on ('message', this.processMessage.bind (this, 'message', this.flows[flowName], socket));
|
97 | }
|
98 |
|
99 | socket.on ('disconnect', function () {
|
100 | if (this.verbose) console.log ('socket.io client disconnected ' + socket.id);
|
101 |
|
102 | }.bind (this));
|
103 | }.bind (this));
|
104 | SocketInitiator.connections[flowUrl] = flowConnection;
|
105 | }.bind (this));
|
106 |
|
107 |
|
108 | if (this.httpServer) {
|
109 | console.log ('socket.io server is attached to http initiator');
|
110 | } else {
|
111 | console.log ('socket.io server is running on ' + this.port + ' port');
|
112 | }
|
113 |
|
114 | this.emit ('ready', this);
|
115 | }
|
116 |
|
117 | SocketInitiator.prototype.processMessage = function (eventName, flowData, socket, message) {
|
118 |
|
119 | var self = this;
|
120 |
|
121 | if (this.verbose) console.log ('processMessage', eventName, socket.nsp.name, socket.id, message);
|
122 |
|
123 | this.router (eventName, flowData, socket, message);
|
124 | }
|
125 |
|
126 | SocketInitiator.prototype.defaultRouter = function (eventName, flowData, socket, message) {
|
127 |
|
128 | var df = new flow (
|
129 | util.extend (true, {}, flowData),
|
130 | {
|
131 | message: message,
|
132 | socket: socket
|
133 | }
|
134 | );
|
135 |
|
136 | df.on ('completed', this.runPresenter.bind (this, df, 'completed', socket));
|
137 |
|
138 | df.on ('failed', this.runPresenter.bind (this, df, 'failed', socket));
|
139 |
|
140 | this.emit ("detected", message, socket);
|
141 |
|
142 | if (df.ready) df.runDelayed ();
|
143 |
|
144 | return df;
|
145 | }
|
146 |
|
147 | SocketInitiator.prototype.runPresenter = function (df, state, socket) {
|
148 |
|
149 | var self = this;
|
150 |
|
151 | if (!df.presenter) return;
|
152 |
|
153 | var presenter = df.presenter,
|
154 | header,
|
155 | vars,
|
156 | err;
|
157 |
|
158 | try {
|
159 |
|
160 | header = (presenter.header.interpolate(df, false, true).length == 0) ?
|
161 | presenter.header : presenter.header.interpolate(df);
|
162 |
|
163 | vars = presenter.vars.interpolate(df);
|
164 |
|
165 | } catch (e) {
|
166 | err = {error: 'No message'};
|
167 | state = 'failed';
|
168 | }
|
169 |
|
170 | if (state == 'completed') {
|
171 |
|
172 | var msg = header + ':' + JSON.stringify(vars);
|
173 |
|
174 | if (presenter.broadcast) {
|
175 | self.socketIo.sockets.send(msg);
|
176 | } else {
|
177 | socket.send(msg);
|
178 | }
|
179 |
|
180 | } else {
|
181 |
|
182 | socket.send('error:'+ JSON.stringify(err));
|
183 | }
|
184 | }
|
185 |
|