1 |
|
2 |
|
3 |
|
4 |
|
5 | var inherits = require('util').inherits;
|
6 | var EventEmitter = require('events').EventEmitter;
|
7 | var parser = require('hyper-json-immutable-parse');
|
8 |
|
9 | var hosts = {};
|
10 |
|
11 | exports = module.exports = new EventEmitter();
|
12 |
|
13 | exports.open = function(params, WS) {
|
14 | var uri = (params.protocol || 'ws:').replace('http', 'ws') + '//' +
|
15 | params.host +
|
16 | (params.port ? ':' + params.port : '') +
|
17 | (params.wspath || '/?wait1');
|
18 |
|
19 | return hosts[uri] || new Connection(uri, WS);
|
20 | };
|
21 |
|
22 | function Connection(uri, WS) {
|
23 | var self = this;
|
24 | var ws = self.ws = new WS(uri);
|
25 | self._id = 0;
|
26 | self._pending = [];
|
27 |
|
28 |
|
29 | self.close = ws.close.bind(ws);
|
30 |
|
31 |
|
32 | ws.onopen = self.emit.bind(self, 'open');
|
33 | ws.onclose = self.emit.bind(self, 'close');
|
34 | ws.onmessage = self._onmessage.bind(self);
|
35 | ws.onerror = self.emit.bind(self, 'error');
|
36 |
|
37 | self.on('close', function() {
|
38 | delete hosts[uri];
|
39 | });
|
40 | self.on('open', function() {
|
41 | if (self._pending.length) {
|
42 | ws.send(JSON.stringify(self._pending));
|
43 | self._pending = [];
|
44 | }
|
45 | });
|
46 | self.on('error', function(err) {
|
47 | console.log('ERROR', err);
|
48 | });
|
49 | hosts[uri] = self;
|
50 | }
|
51 | inherits(Connection, EventEmitter);
|
52 |
|
53 | Connection.prototype._onmessage = function(evt) {
|
54 | var msg = JSON.parse(evt.data, parser);
|
55 | if (!Array.isArray(msg)) msg = [msg];
|
56 | for (var i = 0, res; i < msg.length; i++) {
|
57 | res = msg[i];
|
58 | if (res[0] === -1) exports.emit('push', res[1], res[2], res[3]);
|
59 | else this.emit('RESPONSE_' + res[0], res[1], res[2], res[3]);
|
60 | }
|
61 | };
|
62 |
|
63 | Connection.prototype.send = function(method, path, headers, query, body, timeout) {
|
64 | var self = this;
|
65 | var id = self._id++;
|
66 | path = path.split('/');
|
67 | path.shift();
|
68 | if (!path[0]) path.shift();
|
69 |
|
70 | if (body) headers['content-type'] = 'application/json';
|
71 |
|
72 | var req = body ? [id, method, path, headers, query, body] : [id, method, path, headers, query];
|
73 |
|
74 | var res = this.thunk(id, timeout, method);
|
75 |
|
76 |
|
77 | var ws = self.ws;
|
78 | if (ws.readyState === 1) ws.send(JSON.stringify([req]));
|
79 | else self._pending.push(req);
|
80 |
|
81 | return res;
|
82 | };
|
83 |
|
84 | Connection.prototype.thunk = function(id, timeout, method) {
|
85 | var res = new EventEmitter();
|
86 |
|
87 | res.id = id;
|
88 |
|
89 | var t = setTimeout(function() {
|
90 | if (res._done) return;
|
91 | res.emit('timeout');
|
92 | res._done = true;
|
93 | }, timeout || 10000);
|
94 |
|
95 | if (~['POST', 'PUT', 'DELETE'].indexOf(method)) this.once('RESPONSE_' + id, function(status, headers, body) {
|
96 | exports.emit('push', status, headers, body);
|
97 | });
|
98 |
|
99 | this.once('RESPONSE_' + id, function(status, headers, body) {
|
100 | if (res._done) return;
|
101 | res.emit('response', status, headers, body);
|
102 | res._done = true;
|
103 | clearTimeout(t);
|
104 | });
|
105 |
|
106 | return res;
|
107 | };
|