UNPKG

7.71 kBJavaScriptView Raw
1/* eslint-disable require-jsdoc, no-underscore-dangle */
2'use strict';
3var Gun = require('../../gun');
4var http = require('../http');
5var url = require('url');
6var WS = require('ws');
7var WSS = WS.Server;
8var attach = require('./server-push');
9
10// Handles server to server sync.
11require('./client.js');
12
13Gun.on('opt', function (at) {
14 this.to.next(at);
15 var gun = at.gun, opt = at.opt;
16 gun.__ = at.root._;
17 gun.__.opt.ws = opt.ws = gun.__.opt.ws || opt.ws || {};
18
19 function start (server, port, app) {
20 if (app && app.use) {
21 app.use(gun.wsp.server);
22 }
23 server = gun.__.opt.ws.server = gun.__.opt.ws.server || opt.ws.server || server;
24
25 if (!gun.wsp.ws) {
26 gun.wsp.ws = new WSS(gun.__.opt.ws);
27 attach(gun, gun.wsp.ws);
28 }
29
30 gun.wsp.ws = gun.wsp.ws || new WSS(gun.__.opt.ws);
31 require('./ws')(gun.wsp.ws, function (req, res) {
32 var ws = this;
33 req.headers['gun-sid'] = ws.sid = ws.sid ? ws.sid : req.headers['gun-sid'];
34 ws.sub = ws.sub || gun.wsp.on('network', function (msg) {
35 var ev = this; ev.to.next(msg);
36 if (!ws || !ws.send || !ws._socket || !ws._socket.writable) { return ev.off(); }
37 if (!msg || (msg.headers && msg.headers['gun-sid'] === ws.sid)) { return; }
38 if (msg && msg.headers) { delete msg.headers['ws-rid']; }
39 // TODO: BUG? ^ What if other peers want to ack? Do they use the ws-rid or a gun declared id?
40 try { ws.send(Gun.text.ify(msg));
41 } catch (e) {} // juuuust in case.
42 });
43 gun.wsp.wire(req, res);
44 }, {headers: {'ws-rid': 1, 'gun-sid': 1}});
45 gun.__.opt.ws.port = gun.__.opt.ws.port || opt.ws.port || port || 80;
46 }
47 var wsp = gun.wsp = gun.wsp || function (server) {
48 if (!server) { return gun; }
49 if (Gun.fns.is(server.address)) {
50 if (server.address()) {
51 start(server, server.address().port);
52 return gun;
53 }
54 }
55 if (Gun.fns.is(server.get) && server.get('port')) {
56 start(server, server.get('port'));
57 return gun;
58 }
59 var listen = server.listen;
60 server.listen = function (port) {
61 var serve = listen.apply(server, arguments);
62 start(serve, port, server);
63 return serve;
64 };
65 return gun;
66 };
67 gun.wsp.on = gun.wsp.on || Gun.on;
68 gun.wsp.regex = gun.wsp.regex || opt.route || opt.path || /^\/gun/i;
69 gun.wsp.poll = gun.wsp.poll || opt.poll || 1;
70 gun.wsp.pull = gun.wsp.pull || opt.pull || gun.wsp.poll * 1000;
71 gun.wsp.server = gun.wsp.server || function (req, res, next) { // http
72 next = next || function () {};
73 if (!req || !res) { return next(), false; }
74 if (!req.url) { return next(), false; }
75 if (!req.method) { return next(), false; }
76 var msg = {};
77 msg.url = url.parse(req.url, true);
78 if (!gun.wsp.regex.test(msg.url.pathname)) { return next(), false; } // TODO: BUG! If the option isn't a regex then this will fail!
79 if (msg.url.pathname.replace(gun.wsp.regex, '').slice(0, 3).toLowerCase() === '.js') {
80 res.writeHead(200, {'Content-Type': 'text/javascript'});
81 res.end(gun.wsp.js = gun.wsp.js || require('fs').readFileSync(__dirname + '/../../gun.js')); // gun server is caching the gun library for the client
82 return true;
83 }
84
85 if (!req.upgrade) {
86 next();
87 return false;
88 }
89
90 return http(req, res, function (req, res) {
91 if (!req) { return next(); }
92 var stream, cb = res = require('../jsonp')(req, res);
93 if (req.headers && (stream = req.headers['gun-sid'])) {
94 stream = (gun.wsp.peers = gun.wsp.peers || {})[stream] = gun.wsp.peers[stream] || {sid: stream};
95 stream.drain = stream.drain || function (res) {
96 if (!res || !stream || !stream.queue || !stream.queue.length) { return; }
97 res({headers: {'gun-sid': stream.sid}, body: stream.queue });
98 stream.off = setTimeout(function () { stream = null; }, gun.wsp.pull);
99 stream.reply = stream.queue = null;
100 return true;
101 };
102 stream.sub = stream.sub || gun.wsp.on('network', function (req) {
103 var ev = this; ev.to.next(req);
104 if (!stream) { return ev.off(); } // self cleans up after itself!
105 if (!req || (req.headers && req.headers['gun-sid'] === stream.sid)) { return; }
106 (stream.queue = stream.queue || []).push(req);
107 stream.drain(stream.reply);
108 });
109 cb = function (r) { (r.headers || {}).poll = gun.wsp.poll; res(r); };
110 clearTimeout(stream.off);
111 if (req.headers.pull) {
112 if (stream.drain(cb)) { return; }
113 return stream.reply = cb;
114 }
115 }
116 gun.wsp.wire(req, cb);
117 }), true;
118 };
119 if ((gun.__.opt.maxSockets = opt.maxSockets || gun.__.opt.maxSockets) !== false) {
120 require('https').globalAgent.maxSockets = require('http').globalAgent.maxSockets = gun.__.opt.maxSockets || Infinity;
121 }
122 gun.wsp.msg = gun.wsp.msg || function (id) {
123 if (!id) {
124 return gun.wsp.msg.debounce[id = Gun.text.random(9)] = Gun.time.is(), id;
125 }
126 clearTimeout(gun.wsp.msg.clear);
127 gun.wsp.msg.clear = setTimeout(function () {
128 var now = Gun.time.is();
129 Gun.obj.map(gun.wsp.msg.debounce, function (t, id) {
130 if ((now - t) < (1000 * 60 * 5)) { return; }
131 Gun.obj.del(gun.wsp.msg.debounce, id);
132 });
133 }, 500);
134 if (id = gun.wsp.msg.debounce[id]) {
135 return gun.wsp.msg.debounce[id] = Gun.time.is(), id;
136 }
137 gun.wsp.msg.debounce[id] = Gun.time.is();
138 return;
139 };
140 gun.wsp.msg.debounce = gun.wsp.msg.debounce || {};
141 gun.wsp.wire = gun.wsp.wire || (function () {
142 // all streams, technically PATCH but implemented as
143 // PUT or POST, are forwarded to other trusted peers
144 // except for the ones that are listed in the message
145 // as having already been sent to.
146 // all states, implemented with GET, are replied to the
147 // source that asked for it.
148 function tran (req, res) {
149 if (!req || !res || !req.body || !req.headers) {
150 return;
151 }
152 if (req.url) {
153 req.url = url.format(req.url);
154 }
155 // var msg = req.body;
156 gun.on('in', req.body);
157 // // AUTH for non-replies.
158 // if(gun.wsp.msg(msg['#'])){ return }
159 // gun.wsp.on('network', Gun.obj.copy(req));
160 // if(msg['@']){ return } // no need to process.
161 // if(msg['$'] && msg['$']['#']){ return tran.get(req, res) }
162 // //if(Gun.is.lex(msg['$'])){ return tran.get(req, res) }
163 // else { return tran.put(req, res) }
164 // cb({body: {hello: 'world'}});
165 // // TODO: BUG! server put should push.
166 }
167 tran.get = function (req, cb) {
168 var body = req.body;
169 var lex = body.$;
170 var reply = {
171 headers: { 'Content-Type': tran.json },
172 };
173
174 var graph = gun.Back(Infinity)._.graph;
175 var node = graph[lex['#']];
176 var result = Gun.graph.ify(node);
177
178 if (node) {
179 cb({
180 headers: reply.headers,
181 body: {
182 '#': gun.wsp.msg(),
183 '@': body['#'],
184 '$': result,
185 },
186 });
187
188 return;
189 }
190
191 gun.on('out', {
192 gun: gun,
193 get: lex,
194 req: 1,
195 '#': body['#'] || Gun.on.ask(function (at, ev) {
196 ev.off();
197 var graph = at.put;
198 return cb({
199 headers: reply.headers,
200 body: {
201 '#': gun.wsp.msg(),
202 '@': body['#'],
203 '$': graph,
204 '!': at.err,
205 },
206 });
207 }),
208 });
209 };
210
211 tran.put = function (req, cb) {
212 // NOTE: It is highly recommended you do your own PUT/POSTs
213 // through your own API that then saves to gun manually.
214 // This will give you much more fine-grain control over
215 // security, transactions, and what not.
216 var body = req.body;
217 var graph = body.$;
218 var reply = {
219 headers: { 'Content-Type': tran.json },
220 };
221
222 gun.on('out', {
223 gun: gun,
224 put: graph,
225 '#': Gun.on.ask(function (ack, ev) {
226 ev.off();
227 return cb({
228 headers: reply.headers,
229 body: {
230 '#': gun.wsp.msg(),
231 '@': body['#'],
232 '$': ack,
233 '!': ack.err,
234 },
235 });
236 }),
237 });
238 };
239
240 tran.json = 'application/json';
241 return tran;
242 }());
243
244 if (opt.server) {
245 wsp(opt.server);
246 }
247});