UNPKG

2.75 kBJavaScriptView Raw
1/**
2 * Module dependencies
3 */
4
5var inherits = require('util').inherits;
6var EventEmitter = require('events').EventEmitter;
7var parser = require('hyper-json-immutable-parse');
8
9var hosts = {};
10
11exports = module.exports = new EventEmitter();
12
13exports.open = function(params, WS) {
14 var uri = (params.protocol || 'ws:').replace('http', 'ws') + '//' +
15 params.host +
16 (params.port ? ':' + params.port : '') +
17 (params.wspath || '/?wait1');
18
19 return hosts[uri] || new Connection(uri, WS);
20};
21
22function Connection(uri, WS) {
23 var self = this;
24 var ws = self.ws = new WS(uri/*, 'wait1'*/);
25 self._id = 0;
26 self._pending = [];
27
28 // methods
29 self.close = ws.close.bind(ws);
30
31 // events
32 ws.onopen = self.emit.bind(self, 'open');
33 ws.onclose = self.emit.bind(self, 'close');
34 ws.onmessage = self._onmessage.bind(self);
35 ws.onerror = self.emit.bind(self, 'error');
36
37 self.on('close', function() {
38 delete hosts[uri];
39 });
40 self.on('open', function() {
41 if (self._pending.length) {
42 ws.send(JSON.stringify(self._pending));
43 self._pending = [];
44 }
45 });
46 self.on('error', function(err) {
47 console.log('ERROR', err);
48 });
49 hosts[uri] = self;
50}
51inherits(Connection, EventEmitter);
52
53Connection.prototype._onmessage = function(evt) {
54 var msg = JSON.parse(evt.data, parser);
55 if (!Array.isArray(msg)) msg = [msg];
56 for (var i = 0, res; i < msg.length; i++) {
57 res = msg[i];
58 if (res[0] === -1) exports.emit('push', res[1], res[2], res[3]);
59 else this.emit('RESPONSE_' + res[0], res[1], res[2], res[3]);
60 }
61};
62
63Connection.prototype.send = function(method, path, headers, query, body, timeout) {
64 var self = this;
65 var id = self._id++;
66 path = path.split('/');
67 path.shift();
68 if (!path[0]) path.shift();
69
70 if (body) headers['content-type'] = 'application/json';
71
72 var req = body ? [id, method, path, headers, query, body] : [id, method, path, headers, query];
73
74 var res = this.thunk(id, timeout, method);
75
76 // TODO buffer requests for bulked goodness
77 var ws = self.ws;
78 if (ws.readyState === 1) ws.send(JSON.stringify([req]));
79 else self._pending.push(req);
80
81 return res;
82};
83
84Connection.prototype.thunk = function(id, timeout, method) {
85 var res = new EventEmitter();
86
87 res.id = id;
88
89 var t = setTimeout(function() {
90 if (res._done) return;
91 res.emit('timeout');
92 res._done = true;
93 }, timeout || 10000);
94
95 if (~['POST', 'PUT', 'DELETE'].indexOf(method)) this.once('RESPONSE_' + id, function(status, headers, body) {
96 exports.emit('push', status, headers, body);
97 });
98
99 this.once('RESPONSE_' + id, function(status, headers, body) {
100 if (res._done) return;
101 res.emit('response', status, headers, body);
102 res._done = true;
103 clearTimeout(t);
104 });
105
106 return res;
107};