1 | var EventEmitter = require ('events').EventEmitter,
|
2 | SocketIo = require('socket.io'),
|
3 | util = require ('util'),
|
4 | flow = require ('../flow'),
|
5 | fs = require ('fs');
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 | var socket = module.exports = function (config) {
|
14 |
|
15 |
|
16 | var self = this;
|
17 |
|
18 | if (!config.port) {
|
19 | throw "you must define 'port' key for http initiator";
|
20 | } else {
|
21 | this.port = config.port;
|
22 | }
|
23 |
|
24 | if (config.ssl) {
|
25 | this.opts = {
|
26 | key : fs.readFileSync(config.ssl.key).toString(),
|
27 | cert : fs.readFileSync(config.ssl.cert).toString()
|
28 | };
|
29 | } else {
|
30 | this.opts = {};
|
31 | }
|
32 |
|
33 | this.flows = config.workflows || config.dataflows || config.flows;
|
34 | self.timer = config.timer;
|
35 | self.router = config.router;
|
36 |
|
37 |
|
38 |
|
39 | if (config.router === void 0) {
|
40 | self.router = self.defaultRouter;
|
41 | } else if (process.mainModule.exports[config.router]) {
|
42 | self.router = process.mainModule.exports[config.router];
|
43 | } else if (self[config.router]) {
|
44 | self.router = this[config.router];
|
45 | } else {
|
46 | throw "we cannot find " + config.router + " router method within initiator or function in main module";
|
47 | }
|
48 |
|
49 |
|
50 |
|
51 | self.listen();
|
52 | }
|
53 |
|
54 | util.inherits (socket, EventEmitter);
|
55 |
|
56 | util.extend (socket.prototype, {
|
57 |
|
58 | listen: function () {
|
59 |
|
60 | var self = this;
|
61 |
|
62 | var socketIo = self.socketIo = SocketIo.listen(self.port, self.opts);
|
63 |
|
64 | socketIo.set('transports', ['websocket']);
|
65 | if (!self.log) socketIo.disable('log');
|
66 |
|
67 | socketIo.sockets.on('connection', function (socket) {
|
68 |
|
69 | if (self.log) console.log('Socket server connection ' + socket.id);
|
70 |
|
71 | socket.on('message', function(msg) {
|
72 | self.processMessage(socket, msg);
|
73 | });
|
74 |
|
75 | socket.on('disconnect', function () {
|
76 | if (self.log) console.log('Socket server disconnection ' + socket.id);
|
77 | });
|
78 | });
|
79 |
|
80 | console.log('Socket server running on ' + self.port + ' port');
|
81 |
|
82 | self.emit ('ready', this.server);
|
83 | },
|
84 |
|
85 | processMessage: function (socket, message) {
|
86 |
|
87 | var self = this;
|
88 |
|
89 | if (this.log) console.log('processMessage', socket.id, message);
|
90 |
|
91 | var re = /^([A-Z0-9a-z\/]+)(:(.+))?$/;
|
92 | var match = message.match(re);
|
93 |
|
94 | if (match && match[1]) {
|
95 |
|
96 | var route = match[1];
|
97 | var rawData = match[3],
|
98 | data = {};
|
99 |
|
100 | if (rawData) {
|
101 | try {
|
102 | data = JSON.parse(rawData);
|
103 | } catch (e) {
|
104 | data.raw = rawData;
|
105 | }
|
106 | }
|
107 |
|
108 | var query = {
|
109 | route: route,
|
110 | data: data
|
111 | };
|
112 |
|
113 | this.router(query, socket);
|
114 |
|
115 | } else {
|
116 | if (self.log) console.log('Socket initiator: Strange formatted message');
|
117 | }
|
118 | },
|
119 |
|
120 | defaultRouter: function (query, socket) {
|
121 |
|
122 | var self = this,
|
123 | df,
|
124 | route = query.route;
|
125 |
|
126 | if (self.flows.constructor == Array) {
|
127 |
|
128 | self.flows.every (function (item) {
|
129 |
|
130 | var match = route.match(item.route);
|
131 |
|
132 | if (match && match[0] == route) {
|
133 |
|
134 | if (self.log) console.log ('socket match to ' + route);
|
135 |
|
136 | df = new flow (
|
137 | util.extend (true, {}, item),
|
138 | {
|
139 | query: query,
|
140 | socket: socket
|
141 | }
|
142 | );
|
143 |
|
144 | df.on ('completed', function (df) {
|
145 | self.runPresenter (df, 'completed', socket);
|
146 | });
|
147 |
|
148 | df.on ('failed', function (df) {
|
149 | self.runPresenter (df, 'failed', socket);
|
150 | });
|
151 |
|
152 | self.emit ("detected", query, socket);
|
153 |
|
154 | if (df.ready) df.run();
|
155 |
|
156 | return false;
|
157 | }
|
158 |
|
159 | return true;
|
160 |
|
161 | });
|
162 |
|
163 | if (!df) {
|
164 | self.emit ("unknown", query, socket);
|
165 | }
|
166 | }
|
167 |
|
168 | return df;
|
169 | },
|
170 |
|
171 | runPresenter: function (df, state, socket) {
|
172 |
|
173 | var self = this;
|
174 |
|
175 | if (!df.presenter) return;
|
176 |
|
177 | var presenter = df.presenter,
|
178 | header,
|
179 | vars,
|
180 | err;
|
181 |
|
182 | try {
|
183 |
|
184 | header = (presenter.header.interpolate(df, false, true).length == 0) ?
|
185 | presenter.header : presenter.header.interpolate(df);
|
186 |
|
187 | vars = presenter.vars.interpolate(df);
|
188 |
|
189 | } catch (e) {
|
190 | err = {error: 'No message'};
|
191 | state = 'failed';
|
192 | }
|
193 |
|
194 | if (state == 'completed') {
|
195 |
|
196 | var msg = header + ':' + JSON.stringify(vars);
|
197 |
|
198 | if (presenter.broadcast) {
|
199 | self.socketIo.sockets.send(msg);
|
200 | } else {
|
201 | socket.send(msg);
|
202 | }
|
203 |
|
204 | } else {
|
205 |
|
206 | socket.send('error:'+ JSON.stringify(err));
|
207 | }
|
208 | }
|
209 | });
|