UNPKG

90.7 kBJavaScriptView Raw
1var fs = require('fs'),
2 util = require('util')
3var unique_sockjs_string = '_connect_to_statebus_'
4
5function default_options (bus) { return {
6 port: 'auto',
7 backdoor: null,
8 client: (c) => {c.shadows(bus)},
9 file_store: {save_delay: 250, filename: 'db', backup_dir: 'backups', prefix: '*'},
10 sync_files: [{state_path: 'files', fs_path: null}],
11 serve: true,
12 certs: {private_key: 'certs/private-key',
13 certificate: 'certs/certificate',
14 certificate_bundle: 'certs/certificate-bundle'},
15 connections: {include_users: true, edit_others: true},
16 __secure: false
17}}
18
19function set_options (bus, options) {
20 var defaults = bus.clone(bus.options)
21 options = options || {}
22 for (var k in (options || {}))
23 bus.options[k] = options[k]
24
25 // Fill in defaults of nested options too
26 for (var k in {file_store:1, certs:1})
27 if (bus.options[k]) {
28 if (typeof bus.options[k] !== 'object' || bus.options[k] === null)
29 bus.options[k] = {}
30
31 for (var k2 in defaults[k])
32 if (!bus.options[k].hasOwnProperty(k2))
33 bus.options[k][k2] = defaults[k][k2]
34 }
35}
36
37function import_server (bus, options)
38{ var extra_methods = {
39
40 serve: function serve (options) {
41 var master = bus
42 bus.honk = 'statelog'
43 master.label = 'master'
44
45 // Initialize Options
46 set_options(bus, options)
47
48 var use_ssl = bus.options.certs && (
49 require('fs').existsSync(bus.options.certs.private_key)
50 || require('fs').existsSync(bus.options.certs.certificate)
51 || require('fs').existsSync(bus.options.certs.certificate_bundle))
52
53 function c (client, conn) {
54 client.honk = bus.honk
55 client.serves_auth(conn, master)
56 bus.options.client && bus.options.client(client, conn)
57 }
58 if (!bus.options.client) c = undefined // no client bus when programmer explicitly says so
59
60 if (bus.options.file_store)
61 bus.file_store()
62
63 // ******************************************
64 // ***** Create our own http server *********
65 bus.make_http_server({port: bus.options.port, use_ssl})
66 bus.sockjs_server(this.http_server, c) // Serve via sockjs on it
67 var express = require('express')
68 bus.express = express()
69 bus.http = express.Router()
70 bus.install_express(bus.express)
71
72 // use gzip compression if available
73 try { bus.http.use(require('compression')())
74 console.log('Enabled http compression!') } catch (e) {}
75
76
77 // Initialize new clients with an id. We put the client id on
78 // req.client, and also in a cookie for the browser to see.
79 if (bus.options.client)
80 bus.express.use(function (req, res, next) {
81 req.client = require('cookie').parse(req.headers.cookie || '').client
82 if (!req.client) {
83 req.client = (Math.random().toString(36).substring(2)
84 + Math.random().toString(36).substring(2)
85 + Math.random().toString(36).substring(2))
86
87 res.setHeader('Set-Cookie', 'client=' + req.client
88 + '; Expires=21 Oct 2025 00:0:00 GMT;')
89 }
90 next()
91 })
92
93 // Initialize file sync
94 ; (bus.options.sync_files || []).forEach( x => {
95 if (require('fs').existsSync(x.fs_path || x.state_path))
96 bus.sync_files(x.state_path, x.fs_path)
97 })
98
99 // User will put their routes in here
100 bus.express.use('/', bus.http)
101
102 // Add a fallback that goes to state
103 bus.express.get('*', function (req, res) {
104 // Make a temporary client bus
105 var cbus = bus.bus_for_http_client(req, res)
106
107 // Do the fetch
108 cbus.honk = 'statelog'
109 var singleton = req.path.match(/^\/code\//)
110 cbus.fetch_once(req.path.substr(1), (o) => {
111 var unwrap = (Object.keys(o).length === 2
112 && '_' in o
113 && typeof o._ === 'string')
114 // To do: translate pointers as keys
115 res.send(unwrap ? o._ : JSON.stringify(o))
116 cbus.delete_bus()
117 })
118 })
119
120 // Serve Client Coffee
121 bus.serve_client_coffee()
122
123 // Custom route
124 var OG_route = bus.route
125 bus.route = function(key, method, arg, opts) {
126 var count = OG_route(key, method, arg, opts)
127
128 // This whitelists anything we don't have a specific handler for,
129 // reflecting it to all clients!
130 if (count === 0 && method === 'to_save') {
131 bus.save.fire(arg, opts)
132 count++
133 }
134
135 return count
136 }
137
138 // Back door to the control room
139 if (bus.options.backdoor) {
140 bus.make_http_server({
141 port: bus.options.backdoor,
142 name: 'backdoor_http_server',
143 use_ssl: use_ssl
144 })
145 bus.sockjs_server(this.backdoor_http_server)
146 }
147 },
148
149 bus_for_http_client: function (req, res) {
150 var bus = this
151 if (!bus.bus_for_http_client.counter)
152 bus.bus_for_http_client.counter = 0
153 var cbus = require('./statebus')()
154 cbus.label = 'client_http' + bus.bus_for_http_client.counter++
155 cbus.master = bus
156
157 // Log in as the client
158 cbus.serves_auth({client: req.client,
159 remoteAddress: req.connection.remoteAddress},
160 bus)
161 bus.options.client(cbus)
162 cbus.save({key: 'current_user', client: req.client})
163 return cbus
164 },
165
166 make_http_server: function make_http_server (options) {
167 options = options || {}
168 var fs = require('fs')
169
170 if (options.use_ssl) {
171 // Load with TLS/SSL
172 console.log('Encryption ON')
173
174 // use http2 compatible library if available
175 try {
176 var http = require('spdy')
177 console.log('Found spdy library. HTTP/2 enabled!')
178 } catch (e) {
179 var http = require('https')
180 }
181
182 var protocol = 'https'
183 var ssl_options = {
184 ca: (fs.existsSync(this.options.certs.certificate_bundle)
185 && require('split-ca')(this.options.certs.certificate_bundle)),
186 key: fs.readFileSync(this.options.certs.private_key),
187 cert: fs.readFileSync(this.options.certs.certificate),
188 ciphers: "ECDHE-RSA-AES256-SHA384:DHE-RSA-AES256-SHA384"
189 + ":ECDHE-RSA-AES256-SHA256:DHE-RSA-AES256-SHA256"
190 + ":ECDHE-RSA-AES128-SHA256:DHE-RSA-AES128-SHA256"
191 + ":HIGH:!aNULL:!eNULL:!EXPORT:!DES:!RC4:!MD5:!PSK:!SRP:!CAMELLIA",
192 honorCipherOrder: true}
193 }
194 else {
195 // Load unencrypted server
196 console.log('Encryption OFF')
197 var http = require('http')
198 var protocol = 'http'
199 var ssl_options = undefined
200 }
201
202 if (options.port === 'auto') {
203 var bind = require('./extras/tcp-bind')
204 function find_a_port () {
205 var next_port_attempt = 80
206 while (true)
207 try {
208 var result = bind(next_port_attempt)
209 bus.port = next_port_attempt
210 return result
211 } catch (e) {
212 if (next_port_attempt < 3006) next_port_attempt = 3006
213 else next_port_attempt++
214 }
215 }
216
217 var fd
218 if (options.use_ssl)
219 try {
220 fd = bind(443)
221 bus.port = 443
222 bus.redirect_port_80()
223 } catch (e) {fd = find_a_port()}
224 else fd = find_a_port()
225 var http_server = http.createServer(ssl_options)
226 http_server.listen({fd: fd}, () => {
227 console.log('Listening on '+protocol+'://<host>:'+bus.port)
228 })
229 }
230 else {
231 bus.port = bus.options.port
232 var http_server = http.createServer(ssl_options)
233 http_server.listen(bus.options.port, () => {
234 console.log('Listening on '+protocol+'://<host>:'+bus.port)
235 })
236 }
237
238 bus[options.name || 'http_server'] = http_server
239 },
240
241 redirect_port_80: function redirect_port_80 () {
242 var redirector = require('http')
243 redirector.createServer(function (req, res) {
244 res.writeHead(301, {"Location": "https://"+req.headers['host']+req.url})
245 res.end()
246 }).listen(80)
247 },
248
249 install_express: function install_express (express_app) {
250 this.http_server.on('request', // Install express
251 function (request, response) {
252 // But express should ignore all sockjs requests
253 if (!request.url.startsWith('/'+unique_sockjs_string+'/'))
254 express_app(request, response)
255 })
256
257 },
258 sockjs_server: function sockjs_server(httpserver, client_bus_func) {
259 var master = this
260 var client_num = 0
261 // var client_busses = {} // XXX work in progress
262 var log = master.log
263 if (client_bus_func) {
264 master.save({key: 'connections'}) // Clean out old sessions
265 var connections = master.fetch('connections')
266 }
267 var s = require('sockjs').createServer({
268 sockjs_url: 'https://cdn.jsdelivr.net/sockjs/0.3.4/sockjs.min.js',
269 disconnect_delay: 600 * 1000,
270 heartbeat_delay: 6000 * 1000
271 })
272 s.on('connection', function(conn) {
273 if (client_bus_func) {
274 // To do for pooling client busses:
275 // - What do I do with connections? Do they pool at all?
276 // - Before creating a new bus here, check to see if there's
277 // an existing one in the pool, and re-use it if so.
278 // - Count the number of connections using a client.
279 // - When disconnecting, decrement the number, and if it gets
280 // to zero, delete the client bus.
281
282 connections[conn.id] = {client: conn.id, // client is deprecated
283 id: conn.id}
284 master.save(connections)
285
286 var client = require('./statebus')()
287 client.label = 'client' + client_num++
288 master.label = master.label || 'master'
289 client.master = master
290 client_bus_func(client, conn)
291 } else
292 var client = master
293
294 var our_fetches_in = {} // Every key that this socket has fetched
295 log('sockjs_s: New connection from', conn.remoteAddress)
296 function sockjs_pubber (obj, t) {
297 // log('sockjs_pubber:', obj, t)
298 var msg = {save: obj}
299 if (t.version) msg.version = t.version
300 if (t.parents) msg.parents = t.parents
301 if (t.patch) msg.patch = t.patch
302 if (t.patch) msg.save = msg.save.key
303 msg = JSON.stringify(msg)
304
305 if (global.network_delay) {
306 console.log('>>>> DELAYING!!!', global.network_delay)
307 obj = bus.clone(obj)
308 setTimeout(() => {conn.write(msg)}, global.network_delay)
309 } else
310 conn.write(msg)
311
312 log('sockjs_s: SENT a', msg, 'to client')
313 }
314 conn.on('data', function(message) {
315 log('sockjs_s:', message)
316 try {
317 message = JSON.parse(message)
318 var method = bus.message_method(message)
319
320 // Validate the message
321 if (!((method === 'fetch'
322 && master.validate(message, {fetch: 'string',
323 '?parent': 'string', '?version': 'string'}))
324 ||
325 (method === 'save'
326 && master.validate(message, {save: '*',
327 '?parents': 'array', '?version': 'string', '?patch': 'array'})
328 && (typeof(message.save) === 'string'
329 || (typeof(message.save) === 'object'
330 && typeof(message.save.key === 'string'))))
331 ||
332 (method === 'forget'
333 && master.validate(message, {forget: 'string'}))
334 ||
335 (method === 'delete'
336 && master.validate(message, {'delete': 'string'}))))
337 throw 'validation error'
338
339 } catch (e) {
340 for (var i=0; i<4; i++) console.error('#######')
341 console.error('Received bad sockjs message from '
342 + conn.remoteAddress +': ', message, e)
343 return
344 }
345
346 switch (method) {
347 case 'fetch':
348 our_fetches_in[message.fetch] = true
349 client.fetch(message.fetch, sockjs_pubber)
350 break
351 case 'forget':
352 delete our_fetches_in[message.forget]
353 client.forget(message.forget, sockjs_pubber)
354 break
355 case 'delete':
356 client.delete(message['delete'])
357 break
358 case 'save':
359 message.version = message.version || client.new_version()
360 if (message.patch) {
361 var o = bus.cache[message.save] || {key: message.save}
362 try {
363 message.save = bus.apply_patch(o, message.patch[0])
364 } catch (e) {
365 console.error('Received bad sockjs message from '
366 + conn.remoteAddress +': ', message, e)
367 return
368 }
369 }
370 client.save(message.save,
371 {version: message.version,
372 parents: message.parents,
373 patch: message.patch})
374 if (our_fetches_in[message.save.key]) { // Store what we've seen if we
375 // might have to publish it later
376 client.log('Adding', message.save.key+'#'+message.version,
377 'to pubber!')
378 sockjs_pubber.has_seen(client, message.save.key, message.version)
379 }
380 break
381 }
382
383 // validate that our fetches_in are all in the bus
384 for (var key in our_fetches_in)
385 if (!client.fetches_in.has(key, master.funk_key(sockjs_pubber)))
386 console.trace("***\n****\nFound errant key", key,
387 'when receiving a sockjs', method, 'of', message)
388 //log('sockjs_s: done with message')
389 })
390 conn.on('close', function() {
391 log('sockjs_s: disconnected from', conn.remoteAddress, conn.id, client.id)
392 for (var key in our_fetches_in)
393 client.forget(key, sockjs_pubber)
394 if (client_bus_func) {
395 delete connections[conn.id]; master.save(connections)
396 client.delete_bus()
397 }
398 })
399
400 // Define the /connection* state!
401 if (client_bus_func && !master.options.__secure) {
402
403 // A connection
404 client('connection/*').to_fetch = function (key, star) {
405 var id = star
406 var conn = master.fetch('connections')[id]
407 if (!conn) return {error: 'connection ' + id + ' does not exist'}
408
409 var result = master.clone(conn)
410 result.key = key
411 result.id = id
412 result.client = id // Deprecated
413
414 if (master.options.connections.include_users && result.user)
415 result.user = client.fetch(result.user.key)
416 return result
417 }
418 client('connection/*').to_save = function (o, star, t) {
419 // Check permissions before editing
420 if (star !== conn.id && !master.options.connections.edit_others) {
421 t.abort()
422 return
423 }
424 var connections = master.fetch('connections')
425 var result = client.clone(o)
426 var old = connections[star]
427 delete result.key
428 result.id = star
429 result.client = star // Deprecated
430 result.user = old.user
431 connections[star] = result
432 master.save(connections)
433 }
434
435 // Your connection
436 client('connection').to_fetch = function () {
437 // subscribe to changes in authentication
438 client.fetch('current_user')
439
440 var result = client.clone(client.fetch('connection/' + conn.id))
441 delete result.key
442 return result
443 }
444 client('connection').to_save = function (o) {
445 o = client.clone(o)
446 o.key = 'connection/' + conn.id
447 client.save(o)
448 }
449
450 // All connections
451 client('connections').to_save = function noop (t) {t.abort()}
452 client('connections').to_fetch = function () {
453 var result = []
454 var conns = master.fetch('connections')
455 for (var connid in conns)
456 if (connid !== 'key')
457 result.push(client.fetch('connection/' + connid))
458
459 return {all: result}
460 }
461 }
462 })
463
464 s.installHandlers(httpserver, {prefix:'/' + unique_sockjs_string})
465 },
466
467 make_websocket: function make_websocket (url) {
468 url = url.replace(/^state:\/\//, 'wss://')
469 url = url.replace(/^istate:\/\//, 'ws://')
470 url = url.replace(/^statei:\/\//, 'ws://')
471 WebSocket = require('websocket').w3cwebsocket
472 return new WebSocket(url+'/'+unique_sockjs_string+'/websocket')
473 },
474 client_creds: function client_creds (server_url) {
475 // Right now the server just creates a different random id each time
476 // it connects.
477 return {clientid: (Math.random().toString(36).substring(2)
478 + Math.random().toString(36).substring(2)
479 + Math.random().toString(36).substring(2))}
480 },
481
482 // Deprecated
483 ws_client: function (prefix, url, account) {
484 console.error('ws_client() is deprecated; use net_mount() instead')
485 bus.net_mount(prefix, url, account) },
486 // Deprecated
487 universal_ws_client: function () {
488 console.error('calling universal_ws_client is deprecated and no longer necessary') },
489
490 file_store: (function () {
491 // Make a database
492 var fs = require('fs')
493 var db = {}
494 var db_is_ok = false
495 var pending_save = null
496 var active
497 function file_store (prefix, delay_activate) {
498 prefix = prefix || bus.options.file_store.prefix
499 var filename = bus.options.file_store.filename,
500 backup_dir = bus.options.file_store.backup_dir
501
502 // Loading db
503 try {
504 if (fs.existsSync && !fs.existsSync(filename))
505 (fs.writeFileSync(filename, '{}'), bus.log('Made a new db file'))
506 db = JSON.parse(fs.readFileSync(filename))
507 db_is_ok = true
508 // If we save before anything else is connected, we'll get this
509 // into the cache but not affect anything else
510 bus.save.fire(global.pointerify ? inline_pointers(db) : db)
511 bus.log('Read db')
512 } catch (e) {
513 console.error(e)
514 console.error('bad db file')
515 }
516
517 // Saving db
518 function save_db() {
519 if (!db_is_ok) return
520
521 console.time('saved db')
522
523 fs.writeFile(filename+'.tmp', JSON.stringify(db, null, 1), function(err) {
524 if (err) {
525 console.error('Crap! DB IS DYING!!!!', err)
526 db_is_ok = false
527 } else
528 fs.rename(filename+'.tmp', filename, function (err) {
529 if (err) {
530 console.error('Crap !! DB IS DYING !!!!', err)
531 db_is_ok = false
532 } else {
533 console.timeEnd('saved db')
534 pending_save = null
535 }
536 })
537 })
538 }
539
540 function save_later() {
541 pending_save = pending_save || setTimeout(save_db, bus.options.file_store.save_delay)
542 }
543 active = !delay_activate
544
545 // Replaces every nested keyed object with {_key: <key>}
546 function abstract_pointers (o) {
547 o = bus.clone(o)
548 var result = {}
549 for (var k in o)
550 result[k] = bus.deep_map(o[k], (o) => {
551 if (o && o.key) return {_key: o.key}
552 else return o
553 })
554 return result
555 }
556 // ...and the inverse
557 function inline_pointers (db) {
558 return bus.deep_map(db, (o) => {
559 if (o && o._key)
560 return db[o._key]
561 else return o
562 })
563 }
564 function on_save (obj) {
565 db[obj.key] = global.pointerify ? abstract_pointers(obj) : obj
566 if (active) save_later()
567 }
568 on_save.priority = true
569 bus(prefix).on_save = on_save
570 bus(prefix).to_delete = function (key) {
571 delete db[key]
572 if (active) save_later()
573 }
574 file_store.activate = function () {
575 active = true
576 save_later()
577 }
578
579 // Handling errors
580 function recover (e) {
581 if (e) {
582 process.stderr.write("Uncaught Exception:\n");
583 process.stderr.write(e.stack + "\n");
584 }
585 if (pending_save) {
586 console.log('Saving db after crash')
587 console.time()
588 fs.writeFileSync(filename, JSON.stringify(db, null, 1))
589 console.log('Saved db after crash')
590 }
591 process.exit(1)
592 }
593 process.on('SIGINT', recover)
594 process.on('SIGTERM', recover)
595 process.on('uncaughtException', recover)
596
597 // Rotating backups
598 setInterval(
599 // This copies the current db over backups/db.<curr_date> every minute
600 function backup_db() {
601 if (!db_is_ok || !backup_dir) return
602 if (fs.existsSync && !fs.existsSync(backup_dir))
603 fs.mkdirSync(backup_dir)
604
605 var d = new Date()
606 var y = d.getYear() + 1900
607 var m = d.getMonth() + 1
608 if (m < 10) m = '0' + m
609 var day = d.getDate()
610 if (day < 10) day = '0' + day
611 var date = y + '-' + m + '-' + day
612
613 //bus.log('Backing up db on', date)
614
615 require('child_process').execFile(
616 '/bin/cp', [filename, backup_dir+'/'+filename+'.'+date])
617 },
618 1000 * 60 // Every minute
619 )
620 }
621
622 return file_store
623 })(),
624
625 firebase_store: function (prefix, firebase_ref) {
626 prefix = prefix || '*'
627
628 function encode_firebase_key(k) {
629 return encodeURIComponent(k).replace(/\./g, '%2E')
630 }
631
632 function decode_firebase_key(k) {
633 return decodeURIComponent(k.replace('%2E', '.'))
634 }
635
636 bus(prefix).to_fetch = function (key, t) {
637 firebase_ref.child(encode_firebase_key(key)).on('value', function (x) {
638 t.done(x.val() || {})
639 }, function (err) { t.abort() })
640 }
641
642 bus(prefix).on_save = function (o) {
643 firebase_ref.child(encode_firebase_key(o.key)).set(o)
644 }
645
646 // bus(prefix).to_save = function (o, t) {
647 // firebase_ref.child(encode_firebase_key(o.key)).set(o, (err) => {
648 // err ? t.abort() : t.done()
649 // })
650 // }
651
652 bus(prefix).to_delete = function (key, t) {
653 firebase_ref.child(encode_firebase_key(key)).set(null, (err) => {
654 err ? t.abort() : t.done()
655 })
656 }
657
658 bus(prefix).to_forget = function (key, t) {
659 firebase_ref.child(encode_firebase_key(key)).off()
660 }
661 },
662
663 lazy_sqlite_store: function lazy_sqlite_store (opts) {
664 if (!opts) opts = {}
665 opts.lazy = true
666 bus.sqlite_store(opts)
667 },
668 fast_load_sqlite_store: function sqlite_store (opts) {
669 if (!opts) opts = {}
670 opts.dont_fire = true
671 bus.sqlite_store(opts)
672 },
673 sqlite_store: function sqlite_store (opts) {
674 var prefix = '*'
675 var open_transaction = null
676
677 if (!opts) opts = {}
678 if (!opts.filename) opts.filename = 'db.sqlite'
679 if (!opts.hasOwnProperty('inline_pointers'))
680 opts.inline_pointers = global.pointerify
681
682 // Load the db on startup
683 try {
684 var db = bus.sqlite_store_db || new (require('better-sqlite3'))(opts.filename)
685 bus.sqlite_store_db = db
686 bus.sqlite_store.load_all = load_all
687 bus.sqlite_store.all_keys = all_keys
688
689 db.pragma('journal_mode = WAL')
690 db.prepare('create table if not exists cache (key text primary key, obj text)').run()
691
692 function all_keys () {
693 var result = []
694 for (var row of db.prepare('select key from cache').iterate())
695 result.push(row.key)
696 return result
697 }
698 function load_all (options) {
699 var temp_db = {}
700
701 for (var row of db.prepare('select * from cache').iterate()) {
702 var obj = JSON.parse(row.obj)
703 temp_db[obj.key] = obj
704 }
705
706 if (opts.inline_pointers)
707 temp_db = inline_pointers(temp_db)
708
709 for (var key in temp_db)
710 if (temp_db.hasOwnProperty(key)) {
711 if (options.dont_fire)
712 bus.cache[key] = temp_db[key]
713 else
714 bus.save.fire(temp_db[key])
715 temp_db[key] = undefined
716 }
717 }
718 if (!opts.lazy) load_all(opts)
719
720 bus.log('Read ' + opts.filename)
721 } catch (e) {
722 console.error(e)
723 console.error('Bad sqlite db')
724 }
725
726 function sqlite_get (key) {
727 var x = db.prepare('select * from cache where key = ?').get([key])
728 return x ? JSON.parse(x.obj) : {}
729 }
730 if (opts.lazy)
731 // Add fetch handler
732 bus(prefix).to_fetch = function (key, t) {
733 var x = (bus.cache[key] && !bus.pending_fetches[key])
734 || sqlite_get(key)
735 if (opts.inline_pointers) x = inline_pointers_singleobj(x)
736 x = bus.deep_map(x, (o) => o && o.key ? sqlite_get(o.key) : o)
737 t.done(x)
738 }
739
740
741 // Add save handlers
742 function on_save (obj) {
743 if (opts.inline_pointers)
744 obj = abstract_pointers(obj)
745
746 if (opts.use_transactions && !open_transaction){
747 console.time('save db')
748 db.prepare('BEGIN TRANSACTION').run()
749 }
750
751 db.prepare('replace into cache (key, obj) values (?, ?)').run(
752 [obj.key, JSON.stringify(obj)])
753
754 if (opts.use_transactions && !open_transaction) {
755 open_transaction = setTimeout(function(){
756 console.log('Committing transaction to database')
757 db.prepare('COMMIT').run()
758 open_transaction = false
759 console.timeEnd('save db')
760 })
761 }
762 }
763 if (opts.save_sync) {
764 var old_route = bus.route
765 bus.route = function (key, method, arg, t) {
766 if (method === 'to_save') on_save(arg)
767 return old_route(key, method, arg, t)
768 }
769 } else {
770 on_save.priority = true
771 bus(prefix).on_save = on_save
772 }
773 bus(prefix).to_delete = function (key) {
774 if (opts.use_transactions && !open_transaction){
775 console.time('save db')
776 db.prepare('BEGIN TRANSACTION').run()
777 }
778 db.prepare('delete from cache where key = ?').run([key])
779 if (opts.use_transactions && !open_transaction)
780 open_transaction = setTimeout(function(){
781 console.log('committing')
782 db.prepare('COMMIT').run()
783 open_transaction = false
784 console.timeEnd('save db')
785 })
786 }
787
788 // Replaces every nested keyed object with {_key: <key>}
789 function abstract_pointers (o) {
790 o = bus.clone(o)
791 var result = {}
792 for (var k in o)
793 result[k] = bus.deep_map(o[k], (o) => {
794 if (o && o.key) return {_key: o.key}
795 else return o
796 })
797 return result
798 }
799 // ...and the inverse
800 function inline_pointers (db) {
801 return bus.deep_map(db, (o) => {
802 if (o && o._key)
803 return db[o._key]
804 else return o
805 })
806 }
807 function inline_pointers_singleobj (obj) {
808 return bus.deep_map(obj, (o) => (o && o._key)
809 ? bus.cache[o._key] : o)
810 }
811
812 // Rotating backups
813 setInterval(
814 // Copy the current db over backups/db.<curr_date> every minute
815 function backup_db() {
816 if (opts.backups === false) return
817 var backup_dir = opts.backup_dir || 'backups'
818 if (fs.existsSync && !fs.existsSync(backup_dir))
819 fs.mkdirSync(backup_dir)
820
821 var d = new Date()
822 var y = d.getYear() + 1900
823 var m = d.getMonth() + 1
824 if (m < 10) m = '0' + m
825 var day = d.getDate()
826 if (day < 10) day = '0' + day
827 var date = y + '-' + m + '-' + day
828
829 require('child_process').execFile(
830 'sqlite3',
831 [opts.filename, '.backup '+"'"+backup_dir+'/'+opts.filename+'.'+date+"'"])
832 },
833 1000 * 60 // Every minute
834 )
835 },
836
837 pg_store: function pg_store (opts) {
838 opts = opts || {}
839 opts.prefix = opts.prefix || '*'
840
841 // Load the db on startup
842 try {
843 var db = new require('pg-native')()
844 bus.pg_db = db
845 bus.pg_save = pg_save
846 db.connectSync(opts.url)
847 db.querySync('create table if not exists store (key text primary key, value jsonb)')
848
849 var rows = db.querySync('select * from store')
850 rows.forEach(r => bus.save(inline_pointers(r.value, bus)))
851
852 bus.log('Read ' + opts.url)
853 } catch (e) {
854 console.error(e)
855 console.error('Bad pg db')
856 }
857
858 // Add save handlers
859 function pg_save (obj) {
860 obj = abstract_pointers(obj)
861
862 db.querySync('insert into store (key, value) values ($1, $2) '
863 + 'on conflict (key) do update set value = $2',
864 [obj.key, JSON.stringify(obj)])
865 }
866 pg_save.priority = true
867 bus(opts.prefix).on_save = pg_save
868 bus(opts.prefix).to_delete = function (key) {
869 db.query('delete from store where key = $1', [key])
870 }
871
872 // Replaces every nested keyed object with {_key: <key>}
873 function abstract_pointers (o) {
874 o = bus.clone(o)
875 var result = {}
876 for (var k in o)
877 result[k] = bus.deep_map(o[k], (x) => {
878 if (x && x.key) return {_key: x.key}
879 else return x
880 })
881 return result
882 }
883 // ...and the inverse
884 function inline_pointers (obj, bus) {
885 return bus.deep_map(obj, (o) => {
886 if (o && o._key) {
887 if (!bus.cache[o._key])
888 bus.cache[o._key] = {key: o._key}
889 return bus.cache[o._key]
890 } else return o
891 })
892 }
893 },
894
895 setup_usage_log (opts) {
896 bus.serve_time()
897 opts = opts || {}
898 opts.filename = opts.filename || 'db.sqlite'
899 var db = new (require('better-sqlite3'))(opts.filename)
900 bus.usage_log_db = db
901 //db.pragma('journal_mode = WAL')
902 db.prepare('create table if not exists usage (date integer, event text, details text)').run()
903 db.prepare('create index if not exists date_index on usage (date)').run()
904 var refresh_interval = 1000*60
905
906 var nots = ['details not like "%facebookexternalhit%"',
907 'details not like "%/apple-touch-icon%"',
908 'details not like "%Googlebot%"',
909 'details not like "%AdsBot-Google%"',
910 'details not like "%Google-Adwords-Instant%"',
911 'details not like "%Apache-HttpClient%"',
912 'details not like "%SafeDNSBot%"',
913 'details not like "%RevueBot%"',
914 'details not like "%MetaURI API%"',
915 'details not like "%redback/v%"',
916 'details not like "%Slackbot%"',
917 'details not like "%HTTP_Request2/%"',
918 'details not like "%python-requests/%"',
919 'details not like "%LightspeedSystemsCrawler/%"',
920 'details not like "%CipaCrawler/%"',
921 'details not like "%Twitterbot/%"',
922 'details not like "%Go-http-client/%"',
923 'details not like "%/cheese_service%"'
924 ].join(' and ')
925 bus.usage_log_nots = nots
926
927 // Aggregate all accesses by day, to get daily active users
928 bus('usage').to_fetch = () => {
929 bus.fetch('time/' + refresh_interval)
930 var days = []
931 var last_day
932 for (var row of db.prepare('select * from usage where '
933 + nots + ' order by date').iterate()) {
934 row.details = JSON.parse(row.details)
935 if (row.details.agent && row.details.agent.match(/bot/)) continue
936
937 var d = new Date(row.date * 1000)
938 var day = d.getFullYear() + '-' + (d.getMonth()+1) + '-' + d.getDate()
939 if (last_day !== day)
940 days.push({day: day, // Init
941 clients: new Set(),
942 ips: new Set(),
943 client_socket_opens: new Set(),
944 ip_socket_opens: new Set()
945 })
946 last_day = day
947
948 if (row.event === 'socket open') {
949 days[days.length-1].client_socket_opens.add(row.details.client)
950 days[days.length-1].ip_socket_opens.add(row.details.ip)
951 }
952 days[days.length-1].clients.add(row.details.client)
953 days[days.length-1].ips.add(row.details.ip)
954 }
955
956 for (var i=0; i<days.length; i++)
957 days[i] = {day: days[i].day,
958 ip_hits: days[i].ips.size,
959 client_hits: days[i].clients.size,
960 client_socket_opens: days[i].client_socket_opens.size,
961 ip_socket_opens: days[i].ip_socket_opens.size
962 }
963
964 return {_: days}
965 }
966
967 bus('recent_hits/*').to_fetch = (rest) => {
968 bus.fetch('time/' + refresh_interval)
969 var result = []
970 for (var row of db.prepare('select * from usage where '
971 + nots + ' order by date desc limit ?').iterate(
972 [parseInt(rest)])) {
973
974 row.details = JSON.parse(row.details)
975 if (row.details.agent && row.details.agent.match(/bot/)) continue
976
977 result.push({url: row.details.url, ip: row.details.ip, date: row.date})
978 }
979
980 return {_: result}
981 }
982
983 bus('recent_referers/*').to_fetch = (rest) => {
984 bus.fetch('time/' + refresh_interval)
985 var result = []
986 for (var row of db.prepare('select * from usage where '
987 + nots + ' order by date desc limit ?').iterate(
988 [parseInt(rest)])) {
989
990 row.details = JSON.parse(row.details)
991 if (row.details.agent && row.details.agent.match(/bot/)) continue
992
993 if (row.details.referer && !row.details.referer.match(/^https:\/\/cheeseburgertherapy.com/))
994 result.push({url: row.details.url, referer: row.details.referer,
995 date: row.date})
996 }
997
998 return {_: result}
999 }
1000
1001
1002 function sock_open_time (sock_event) {
1003 var client = JSON.parse(sock_event.details).client
1004 if (!client) return null
1005
1006 var http_req = db.prepare('select * from usage where event = "http request" and date < ? and '
1007 + ' details like ? and '
1008 + nots + ' order by date desc limit 1').get([sock_event.date,
1009 '%'+client+'%'])
1010 if (!http_req) return null
1011
1012 var delay = sock_event.date - http_req.date
1013 var res = !delay || delay > 300 ? 'fail' : delay
1014 if (delay && delay < 300
1015 && JSON.parse(sock_event.details).ip
1016 !== JSON.parse(http_req.details).ip)
1017 console.error('Yuck!', delay, sock_event, http_req)
1018 return [res, JSON.parse(http_req.details).url]
1019 }
1020 function sock_open_times () {
1021 var opens = db.prepare('select * from usage where event = "socket open" and '
1022 + nots + ' order by date desc limit ?').all(500)
1023 var times = []
1024 for (var i=0; i<opens.length; i++) {
1025 times.push(sock_open_time(opens[i]))
1026 // Get the most recent http hit before this open, from the same client id
1027 // subract the times
1028 }
1029 return times
1030 }
1031
1032 bus('socket_load_times').to_fetch = () => {
1033 return {_: sock_open_times()}
1034 }
1035 },
1036 log_usage(event, details) {
1037 bus.usage_log_db.prepare('insert into usage (date, event, details) values (?, ?, ?)')
1038 .run([new Date().getTime()/1000,
1039 event,
1040 JSON.stringify(details)])
1041 },
1042
1043 serve_time () {
1044 if (bus('time*').to_fetch.length > 0)
1045 // Then it's already installed
1046 return
1047
1048 // Time ticker
1049 var timeouts = {}
1050 bus('time*').to_fetch =
1051 function (key, rest, t) {
1052 if (key === 'time') rest = '/1000'
1053 timeout = parseInt(rest.substr(1))
1054 function f () { bus.save.fire({key: key, time: Date.now()}) }
1055 timeouts[key] = setInterval(f, timeout)
1056 f()
1057 }
1058 bus('time*').to_forget =
1059 function (key, rest) {
1060 if (key === 'time') key = 'time/1000'
1061 clearTimeout(timeouts[key])
1062 }
1063 },
1064
1065 smtp (opts) {
1066 var bus = this
1067 // Listen for SMTP messages on port 25
1068 if (opts.domain) {
1069 var mailin = require('mailin')
1070 mailin.start({
1071 port: opts.port || 25,
1072 disableWebhook: true,
1073 host: opts.domain,
1074 smtpOptions: { SMTPBanner: opts.domain }
1075 })
1076 // Event emitted after a message was received and parsed.
1077 mailin.on('message', (connection, msg, raw) => {
1078 if (!msg.messageId) {
1079 console.log('Aborting message without id!', msg.subject, new Date().toLocaleString())
1080 return
1081 }
1082
1083 console.log(msg)
1084 console.log('Refs is', msg.references)
1085 console.log('Raw is', raw)
1086 var parent = msg.references
1087 if (parent) {
1088 if (Array.isArray(parent))
1089 parent = parent[0]
1090 var m = parent.match(/\<(.*)\>/)
1091 if (m && m[1])
1092 parent = m[1]
1093 }
1094 var from = msg.from
1095
1096 console.log('date is', msg.date, typeof(msg.date), msg.date.getTime())
1097 email = {
1098 key: "email/" + msg.messageId,
1099 _: {
1100 title: msg.subject,
1101 parent: parent && ("email/" + parent) || undefined,
1102 from: msg.from.address,
1103 to: msg.to.map(x=>x.address),
1104 cc: msg.cc.map(x=>x.address),
1105 date: msg.date.getTime() / 1000,
1106 text: msg.text,
1107 body: msg.text,
1108 html: msg.html
1109 }
1110 }
1111
1112 bus.save(email)
1113 })
1114 }
1115 },
1116
1117 serve_email (master, opts) {
1118 opts = opts || {}
1119 var client = this
1120 var email_regex = /^(([^<>()\[\]\\.,;:\s@"]+(\.[^<>()\[\]\\.,;:\s@"]+)*)|(".+"))@((\[[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}])|(([a-zA-Z\-0-9]+\.)+[a-zA-Z]{2,}))$/
1121 var peemail_regex = /^public$|^(([^<>()\[\]\\.,;:\s@"]+(\.[^<>()\[\]\\.,;:\s@"]+)*)|(".+"))@((\[[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}])|(([a-zA-Z\-0-9]+\.)+[a-zA-Z]{2,}))$/
1122 // var local_addie_regex = new RegExp('state://' + opts.domain + '/user/([^\/]+)')
1123 // Todo on Server:
1124 // - Connect with mailin, so we can receive SMTP shit
1125 //
1126 // - Is there security hole if users have a ? or a / in their name?
1127 // - Make the master.posts/ state not require /
1128 // - Make standard url tools for optional slashes, and ? query params
1129 // - Should a e.g. client to_save abort if it calls save that aborts?
1130 // - e.g. if master('posts*').to_save aborts
1131
1132 // Helpers
1133 function get_posts (args) {
1134 console.log('getting posts with', args)
1135 var can_see = [{cc: ['public']}]
1136 if (args.for)
1137 can_see.push({to: [args.for]},
1138 {cc: [args.for]},
1139 {from: [args.for]})
1140
1141 terms = '(' + can_see
1142 .map(x => "value @> '"+JSON.stringify({_:x})+"'")
1143 .join(' or ')
1144 + ')'
1145
1146 if (args.about) {
1147 // console.log('getting one about', args.about)
1148 var interested_in = [{to: [args.about]},
1149 {cc: [args.about]},
1150 {from: [args.about]}]
1151 terms += ' and (' + interested_in
1152 .map(x => "value @> '"+JSON.stringify({_:x})+"'")
1153 .join(' or ')
1154 + ')'
1155 }
1156
1157 var q = "select value from store where key like 'post/%' and " + terms
1158 q += " order by value #>'{_,date}' desc"
1159 if (args.to) q += ' limit ' + to
1160 console.log('with query', q)
1161 return master.pg_db.querySync(q).map(x=>x.value)
1162 }
1163 function post_children (post) {
1164 return master.pg_db.querySync(
1165 "select value from store where value #>'{_,parent}' = '"
1166 + JSON.stringify(post.key)
1167 + "' order by value #>'{_,date}' asc").map(x=>x.value)
1168 }
1169
1170 // function canonicalize_address (addie) {
1171 // if (opts.domain) {
1172 // // Try foo@gar.boo
1173 // var m = addie.match(email_regex)
1174 // if (m && m[5].toLowerCase() === opts.domain)
1175 // return 'user/' + m[1]
1176
1177 // // Try state://gar.boo/user/foo
1178 // m = addie.match(local_addie_regex)
1179 // if (m) return 'user/' + m[1]
1180 // }
1181 // return addie
1182 // }
1183
1184 // Define state on master
1185 if (master('posts_for/*').to_fetch.length === 0) {
1186 // Get posts for each user
1187 master('posts_for/*').to_fetch = (json) => {
1188 watch_for_dirt('posts-for/' + json.for)
1189 return {_: get_posts(json)}
1190 }
1191 // Saving any post will dirty the list for all users mentioned in
1192 // the post
1193 master('post/*').to_save = (old, New, t) => {
1194 // To do: diff the cc, to, and from lists, and only dirty
1195 // posts_for people who have been changed
1196
1197 if (!old._) old = {_:{to: [], from: [], cc: []}}
1198
1199 // old._.to = old._.to.map(a=>canonicalize_address(a))
1200 // old._.cc = old._.cc.map(a=>canonicalize_address(a))
1201 // old._.from = old._.from.map(a=>canonicalize_address(a))
1202 // New._.to = New._.to.map(a=>canonicalize_address(a))
1203 // New._.cc = New._.cc.map(a=>canonicalize_address(a))
1204 // New._.from = New._.from.map(a=>canonicalize_address(a))
1205
1206 var dirtied = old._.to.concat(New._.to)
1207 .concat(old._.cc).concat(New._.cc)
1208 .concat(old._.from).concat(New._.from)
1209 dirtied.forEach(u=>dirty('posts-for/' + u))
1210
1211 if (old._.parent) dirty(old._.parent)
1212 if (New._.parent) dirty(New._.parent)
1213
1214 t.done(New)
1215 }
1216 }
1217
1218 function user_addy (u) {
1219 return u.name + '@' + opts.domain
1220 }
1221 function current_addy () {
1222 var c = client.fetch('current_user')
1223 return c.logged_in ? user_addy(c.user) : 'public'
1224 }
1225 function is_author (post) {
1226 var from = post._.from
1227 var c = client.fetch('current_user')
1228 return from.includes(current_addy())
1229 }
1230 function can_see (post) {
1231 var c = client.fetch('current_user')
1232 var allowed = post._.to.concat(post._.cc).concat(post._.from)
1233 return allowed.includes(current_addy()) || allowed.includes('public')
1234 }
1235 var drtbus = master//require('./statebus')()
1236 function dirty (key) { drtbus.save({key: 'dirty-'+key, n: Math.random()}) }
1237 function watch_for_dirt (key) { drtbus.fetch('dirty-'+key) }
1238
1239 client('current_email').to_fetch = () => {
1240 return {_: current_addy()}
1241 }
1242
1243 // Define state on client
1244 client('posts*').to_fetch = (k, rest) => {
1245 var args = bus.parse(rest.substr(1))
1246 var c = client.fetch('current_user')
1247 args.for = current_addy()
1248 var e = master.fetch('posts_for/' + JSON.stringify(args))
1249 return {_: master.clone(e._).map(x=>client.fetch(x.key))}
1250 }
1251
1252 client('post/*').to_fetch = (k) => {
1253 var e = master.clone(master.fetch(k))
1254 if (!e._) return {}
1255 if (!can_see(e))
1256 return {}
1257
1258 e._.children = post_children(e).map(e=>e.key)
1259 watch_for_dirt(k)
1260
1261 return e
1262 }
1263
1264 client('post/*').to_save = (o, t) => {
1265 if (!(client.validate(o, {key: 'string',
1266 _: {to: 'array',
1267 cc: 'array',
1268 from: 'array',
1269 date: 'number',
1270 '?parent': 'string',
1271 body: 'string',
1272 '?title': 'string',
1273 '*': '*'}})
1274 && o._.to.every(a=>a.match(peemail_regex))
1275 && o._.cc.every(a=>a.match(peemail_regex))
1276 && o._.from.every(a=>a.match(peemail_regex)))) {
1277 console.error('post no be valid', o)
1278 t.abort()
1279 return
1280 }
1281
1282 var c = client.fetch('current_user')
1283
1284 // Make sure this user is an author
1285 var from = o._.from
1286 if (!is_author(o)) {
1287 console.error('User', current_addy(),
1288 'is not author', o._.from)
1289 t.abort()
1290 return
1291 }
1292 o = client.clone(o)
1293 delete o.children
1294
1295 master.save(o)
1296 t.done()
1297 }
1298
1299 client('post/*').to_delete = (key, o, t) => {
1300 // Validate
1301 if (!is_author(o)) {
1302 console.error('User', current_addy(),
1303 'is not author', o._.from)
1304 t.abort()
1305 return
1306 }
1307
1308 // master.delete(key)
1309 master.pg_db.query('delete from store where key = $1', [key])
1310
1311 // Dirty everybody
1312 var dirtied = o._.to.concat(o._.cc).concat(o._.from)
1313 dirtied.forEach(u => dirty('posts-for/' + u))
1314 if (o._.parent) dirty(o._.parent)
1315
1316 // To do: handle nested threads. Either detach them, or insert a
1317 // 'deleted' stub, or splice together
1318
1319 // Complete
1320 t.done()
1321 }
1322
1323 client('friends').to_fetch = t => {
1324 return {_: (master.fetch('users').all||[])
1325 .map(u=>client.fetch('email/' + user_addy(u)))
1326 .concat([client.fetch('email/public')])
1327 }
1328 }
1329
1330 client('email/*').to_fetch = (rest) => {
1331 var m = rest.match(email_regex)
1332 var result = {address: rest}
1333 if (rest === 'public') {
1334 result.name = 'public'
1335 }
1336 else if (m && m[5].toLowerCase() === opts.domain) {
1337 result.user = client.fetch('user/' + m[1])
1338 result.name = result.user.name
1339 result.pic = result.user.pic
1340 result.upgraded = true
1341 }
1342 else if (m) {
1343 result.name = m[1]
1344 result.upgraded = false
1345 }
1346
1347 return result
1348 }
1349 },
1350
1351 sqlite_query_server: function sqlite_query_server (db) {
1352 var fetch = bus.fetch
1353 bus('table_columns/*').to_fetch =
1354 function fetch_table_columns (key, rest) {
1355 if (typeof key !== 'string')
1356 console.log(handlers.hash)
1357 var table_name = rest
1358 var columns = fetch('sql/PRAGMA table_info(' + table_name + ')').rows.slice()
1359 var foreign_keys = fetch('table_foreign_keys/' + table_name)
1360 var column_info = {}
1361 for (var i=0;i< columns .length;i++) {
1362 var col = columns[i].name
1363 column_info[col] = columns[i]
1364 // if (col === 'customer' || col === 'customers')
1365 // console.log('FOR CUSTOMER, got', col, foreign_keys[col])
1366 column_info[col].foreign_key = foreign_keys[col] && foreign_keys[col].table
1367 columns[i] = col
1368 }
1369 columns.splice(columns[columns.indexOf('id')], 1)
1370 column_info['key'] = column_info['id']
1371 delete column_info['id']
1372 return {columns:columns, column_info:column_info}
1373 }
1374
1375
1376 bus('table_foreign_keys/*').to_fetch =
1377 function table_foreign_keys (key, rest) {
1378 var table_name = rest
1379 var foreign_keys = fetch('sql/PRAGMA foreign_key_list(' + table_name + ')').rows
1380 var result = {}
1381 for (var i=0;i< foreign_keys .length;i++)
1382 result[foreign_keys[i].from] = foreign_keys[i]
1383 delete result.id
1384 result.key = key
1385 return result
1386 }
1387
1388 bus('sql/*').to_fetch =
1389 function sql (key, rest) {
1390 fetch('timer/60000')
1391 var query = rest
1392 try { query = JSON.parse(query) }
1393 catch (e) { query = {stmt: query, args: []} }
1394
1395 db.all(query.stmt, query.args,
1396 function (err, rows) {
1397 if (rows) bus.save.fire({key:key, rows: rows})
1398 else console.error('Bad sqlite query', key, err)
1399 }.bind(this))
1400 }
1401 },
1402
1403 sqlite_table_server: function sqlite_table_server(db, table_name) {
1404 var save = bus.save, fetch = bus.fetch
1405 var table_columns = fetch('table_columns/'+table_name) // This will fail if used too soon
1406 var foreign_keys = fetch('table_foreign_keys/'+table_name)
1407 var remapped_keys = fetch('remapped_keys')
1408 remapped_keys.keys = remapped_keys.keys || {}
1409 remapped_keys.revs = remapped_keys.revs || {}
1410 function row_json (row) {
1411 var result = {key: table_name + '/' + row.id}
1412 for (var k in row)
1413 if (row.hasOwnProperty(k))
1414 result[k] = (foreign_keys[k] && row[k]
1415 ? foreign_keys[k].table + '/' + row[k]
1416 : result[k] = row[k])
1417 if (result.hasOwnProperty('other')) result.other = JSON.parse(result.other || '{}')
1418 delete result.id
1419 return result
1420 }
1421 function json_values (json) {
1422 var columns = table_columns.columns
1423 var result = []
1424 for (var i=0; i<columns.length; i++) {
1425 var col = columns[i]
1426 var val = json[col]
1427
1428 // JSONify the `other' column
1429 if (col === 'other')
1430 val = JSON.stringify(val || {})
1431
1432 // Convert foreign keys from /customer/3 to 3
1433 else if (foreign_keys[col] && typeof val === 'string') {
1434 val = remapped_keys.keys[val] || val
1435 val = json[columns[i]].split('/')[1]
1436 }
1437
1438 result.push(val)
1439 }
1440 return result
1441 }
1442 function render_table () {
1443 var result = []
1444 db.all('select * from ' + table_name, function (err, rows) {
1445 if (err) console.error('Problem with table!', table_name, err)
1446 for (var i=0; i<rows.length; i++)
1447 result[i] = row_json(rows[i])
1448 bus.save.fire({key: table_name, rows: result})
1449 })
1450 }
1451 function render_row(obj) {
1452 bus.save.fire(row_json(obj))
1453 if (remapped_keys.revs[obj.key]) {
1454 var alias = bus.clone(obj)
1455 alias.key = remapped_keys.revs[obj.key]
1456 bus.save.fire(row_json(alias))
1457 }
1458 }
1459
1460 // ************************
1461 // Handlers!
1462 // ************************
1463
1464 // Fetching the whole table, or a single row
1465 bus(table_name + '*').to_fetch = function (key, rest) {
1466 if (rest === '')
1467 // Return the whole table
1468 return render_table()
1469
1470 if (rest[0] !== '/') return {error: 'bad key: ' + key}
1471 key = remapped_keys.keys[key] || key
1472
1473 var id = key.split('/')[1]
1474 db.get('select * from '+table_name+' where rowid = ?',
1475 [id],
1476 function (err, row) {
1477 if (!row)
1478 { console.log('Row', id, "don't exist.", err); return }
1479
1480 render_row(row)
1481 }.bind(this))
1482 }
1483
1484 // Saving a row
1485 bus(table_name + '/*').to_save = function (obj, rest) {
1486 var columns = table_columns.columns
1487 var key = remapped_keys.keys[obj.key] || obj.key
1488
1489 // Compose the query statement
1490 var stmt = 'update ' + table_name + ' set '
1491 var rowid = rest
1492 var vals = json_values(obj)
1493 for (var i=0; i<columns.length; i++) {
1494 stmt += columns[i] + ' = ?'
1495 //vals.push(obj[columns[i]])
1496 if (i < columns.length - 1)
1497 stmt += ', '
1498 }
1499 stmt += ' where rowid = ?'
1500 vals.push(rowid)
1501
1502 // Run the query
1503 db.run(stmt, vals,
1504 function (e,r) {
1505 console.log('updated',e,r,key)
1506 bus.dirty(key)
1507 })
1508 }
1509
1510 // Inserting a new row
1511 bus('new/' + table_name + '/*').to_save = function (obj) {
1512 var columns = table_columns.columns
1513 var stmt = ('insert into ' + table_name + ' (' + columns.join(',')
1514 + ') values (' + new Array(columns.length).join('?,') + '?)')
1515 var values = json_values(obj)
1516
1517 console.log('Sqlite:' + stmt)
1518
1519 db.run(stmt, values, function (error) {
1520 if (error) console.log('INSERT error!', error)
1521 console.log('insert complete, got id', this.lastID)
1522 remapped_keys.keys[obj.key] = table_name + '/' + this.lastID
1523 remapped_keys.revs[remapped_keys.keys[obj.key]] = obj.key
1524 save(remapped_keys)
1525 render_table()
1526 })
1527 }
1528
1529 // Deleting a row
1530 bus(table_name + '/*').to_delete = function (key, rest) {
1531 if (remapped_keys.keys[key]) {
1532 var old_key = key
1533 var new_key = remapped_keys.keys[key]
1534 delete remapped_keys.keys[old_key]
1535 delete remapped_keys.revs[new_key]
1536 key = new_key
1537 }
1538
1539 var stmt = 'delete from ' + table_name + ' where rowid = ?'
1540 var rowid = rest
1541 console.log('DELETE', stmt)
1542 db.run(stmt, [rowid],
1543 function (err) {
1544 if (err) console.log('DELETE error', err)
1545 else console.log('delete complete')
1546 render_table() })
1547 }
1548 },
1549
1550 serves_auth: function serves_auth (conn, master) {
1551 var client = this // to keep me straight while programming
1552
1553 // Initialize master
1554 if (master('users/passwords').to_fetch.length === 0) {
1555 master('users/passwords').to_fetch = function (k) {
1556 var result = {key: 'users/passwords'}
1557 var users = master.fetch('users')
1558 users.all = users.all || []
1559 for (var i=0; i<users.all.length; i++) {
1560 var u = master.fetch(users.all[i])
1561 var login = (u.login || u.name).toLowerCase()
1562 console.assert(login, 'Missing login for user', u)
1563 if (result.hasOwnProperty(login)) {
1564 console.error("upass: this user's name is bogus, dude.", u.key)
1565 continue
1566 }
1567 result[login] = {user: u.key, pass: u.pass}
1568 }
1569 return result
1570 }
1571
1572 }
1573
1574 // Authentication functions
1575 function authenticate (login, pass) {
1576 var userpass = master.fetch('users/passwords')[login.toLowerCase()]
1577 master.log('authenticate: we see',
1578 master.fetch('users/passwords'),
1579 userpass && userpass.pass,
1580 pass)
1581
1582 if (!(typeof login === 'string' && typeof pass === 'string')) return false
1583 if (login === 'key') return false
1584 if (!userpass || !userpass.pass) return null
1585
1586 //console.log('comparing passwords', pass, userpass.pass)
1587 if (require('bcrypt-nodejs').compareSync(pass, userpass.pass))
1588 return master.fetch(userpass.user)
1589 }
1590 function create_account (params) {
1591 if (typeof (params.login || params.name) !== 'string')
1592 throw 'no login or name'
1593 var login = (params.login || params.name).toLowerCase()
1594 if (!login ||
1595 !master.validate(params, {'?name': 'string', '?login': 'string',
1596 pass: 'string', '?email': 'string',
1597 '?key': undefined, '*': '*'}))
1598 throw 'invalid name, login, pass, or email'
1599
1600 var passes = master.fetch('users/passwords')
1601 if (passes.hasOwnProperty(login))
1602 throw 'there is already a user with that login or name'
1603
1604 // Hash password
1605 params.pass = require('bcrypt-nodejs').hashSync(params.pass)
1606
1607 // Choose account key
1608 var key = 'user/' + params.name
1609 if (!params.name)
1610 key = 'user/' + Math.random().toString(36).substring(7,13)
1611 while (master.cache.hasOwnProperty(key))
1612 key = 'user/' + Math.random().toString(36).substring(7,13)
1613
1614 // Make account object
1615 var new_account = {key: key,
1616 name: params.name,
1617 login: params.login,
1618 pass: params.pass,
1619 email: params.email }
1620 for (var k in new_account) if (!new_account[k]) delete new_account[k]
1621
1622 var users = master.fetch('users')
1623 users.all = users.all || []
1624 users.all.push(new_account)
1625 passes[login] = {user: new_account.key, pass: new_account.pass}
1626 master.save(users)
1627 master.save(passes)
1628 }
1629
1630 // Current User
1631 client('current_user').to_fetch = function (k) {
1632 client.log('* fetching: current_user')
1633 if (!conn.client) return
1634 var u = master.fetch('logged_in_clients')[conn.client]
1635 u = u && user_obj(u.key, true)
1636 return {user: u || null, logged_in: !!u}
1637 }
1638
1639 client('current_user').to_save = function (o, t) {
1640 function error (msg) {
1641 client.save.abort(o)
1642 var c = client.fetch('current_user')
1643 c.error = msg
1644 client.save(c)
1645 }
1646
1647 client.log('* saving: current_user!')
1648 if (o.client && !conn.client) {
1649 // Set the client
1650 conn.client = o.client
1651 client.client_id = o.client
1652 client.client_ip = conn.remoteAddress
1653
1654 var connections = master.fetch('connections')
1655 connections[conn.id].user = master.fetch('logged_in_clients')[conn.client]
1656 master.save(connections)
1657 }
1658 else {
1659 if (o.create_account) {
1660 client.log('current_user: creating account')
1661 try {
1662 create_account(o.create_account)
1663 client.log('Success creating account!')
1664 var cu = client.fetch('current_user')
1665 cu.create_account = null
1666 client.save.fire(cu)
1667 } catch (e) {
1668 error('Cannot create that account because ' + e)
1669 return
1670 }
1671 }
1672
1673 if (o.login_as) {
1674 // Then client is trying to log in
1675 client.log('current_user: trying to log in')
1676 var creds = o.login_as
1677 var login = creds.login || creds.name
1678 if (login && creds.pass) {
1679 // With a username and password
1680 var u = authenticate(login, creds.pass)
1681
1682 client.log('auth said:', u)
1683 if (u) {
1684 // Success!
1685 // Associate this user with this session
1686 // user.log('Logging the user in!', u)
1687
1688 var clients = master.fetch('logged_in_clients')
1689 var connections = master.fetch('connections')
1690
1691 clients[conn.client] = u
1692 connections[conn.id].user = u
1693
1694 master.save(clients)
1695 master.save(connections)
1696
1697 client.log('current_user: success logging in!')
1698 }
1699 else {
1700 error('Cannot log in with that information')
1701 return
1702 }
1703 }
1704 else {
1705 error('Cannot log in with that information')
1706 return
1707 }
1708 }
1709
1710 else if (o.logout) {
1711 client.log('current_user: logging out')
1712 var clients = master.fetch('logged_in_clients')
1713 var connections = master.fetch('connections')
1714
1715 delete clients[conn.client]
1716 connections[conn.id].user = null
1717
1718 master.save(clients)
1719 master.save(connections)
1720 }
1721 }
1722
1723 t.refetch()
1724 }
1725 client('current_user').to_delete = function () {}
1726
1727 // Users have closet space at /user/<name>/*
1728 var closet_space_key = /^(user\/[^\/]+)\/.*/
1729 var private_closet_space_key = /^user\/[^\/]+\/private.*/
1730
1731 // User
1732 client('user/*').to_save = function (o) {
1733 var c = client.fetch('current_user')
1734 var user_key = o.key.match(/^user\/([^\/]+)/)
1735 user_key = user_key && ('user/' + user_key[1])
1736
1737 // Only the current user can touch themself.
1738 if (!c.logged_in || c.user.key !== user_key) {
1739 client.log('Only the current user can touch themself.',
1740 {logged_in: c.logged_in, as: c.user && c.user.key,
1741 touching: user_key})
1742 client.save.abort(o)
1743 return
1744 }
1745
1746 // Users have closet space at /user/<name>/*
1747 if (o.key.match(closet_space_key)) {
1748 client.log('saving closet data')
1749 master.save(o)
1750 return
1751 }
1752
1753 // Ok, then it must be a plain user
1754 console.assert(o.key.match(/^user\/[^\/]+$/))
1755
1756 // Validate types
1757 if (!client.validate(o, {key: 'string', '?login': 'string', '?name': 'string',
1758 '?pass': 'string', '?email': 'string', /*'?pic': 'string',*/
1759 '*':'*'})) {
1760 client.log('This user change fails validation.')
1761 client.save.abort(o)
1762 return
1763 }
1764
1765 // Rules for updating "login" and "name" attributes:
1766 // • If "login" isn't specified, then we use "name" as login
1767 // • That resulting login must be unique across all users
1768
1769 // There must be at least a login or a name
1770 var login = o.login || o.name
1771 if (!login) {
1772 client.log('User must have a login or a name')
1773 client.save.abort(o)
1774 return
1775 }
1776
1777 var u = master.fetch(o.key)
1778 var userpass = master.fetch('users/passwords')
1779
1780 // Validate that the login/name is not changed to something clobberish
1781 var old_login = u.login || u.name
1782 if (old_login.toLowerCase() !== login.toLowerCase()
1783 && userpass.hasOwnProperty(login)) {
1784 client.log('The login', login, 'is already taken. Aborting.')
1785 client.save.abort(o) // Abort
1786
1787 o = client.fetch(o.key) // Add error message
1788 o.error = 'The login "' + login + '" is already taken'
1789 client.save.fire(o)
1790
1791 return // And exit
1792 }
1793
1794 // Now we can update login and name
1795 u.login = o.login
1796 u.name = o.name
1797
1798 // Hash password
1799 o.pass = o.pass && require('bcrypt-nodejs').hashSync(o.pass)
1800 u.pass = o.pass || u.pass
1801
1802 // // Users can have pictures (remove this soon)
1803 // // Bug: if user changes name, this picture's url doesn't change.
1804 // if (o.pic && o.pic.indexOf('data:image') > -1) {
1805 // var img_type = o.pic.match(/^data:image\/(\w+);base64,/)[1]
1806 // var b64 = o.pic.replace(/^data:image\/\w+;base64,/, '')
1807 // var upload_dir = global.upload_dir
1808 // // ensure that the uploads directory exists
1809 // if (!fs.existsSync(upload_dir))
1810 // fs.mkdirSync(upload_dir)
1811
1812 // // bug: users with the same name can overwrite each other's files
1813 // u.pic = u.name + '.' + img_type
1814 // fs.writeFile(upload_dir + u.pic, b64, {encoding: 'base64'})
1815 // }
1816
1817 // For anything else, go ahead and add it to the user object
1818 var protected = {key:1, name:1, /*pic:1,*/ pass:1}
1819 for (var k in o)
1820 if (!protected.hasOwnProperty(k))
1821 u[k] = o[k]
1822 for (var k in u)
1823 if (!protected.hasOwnProperty(k) && !o.hasOwnProperty(k))
1824 delete u[k]
1825
1826 master.save(u)
1827 }
1828 client('user/*').to_fetch = function user_fetcher (k) {
1829 var c = client.fetch('current_user')
1830 client.log('* fetching:', k, 'as', c.user)
1831
1832 // Users have closet space at /user/<name>/*
1833 if (k.match(closet_space_key)) {
1834 var obj_user = k.match(closet_space_key)[1]
1835 if (k.match(private_closet_space_key)
1836 && (!c.user || obj_user !== c.user.key)) {
1837 client.log('hiding private closet data')
1838 return {}
1839 }
1840 client.log('fetching closet data')
1841 return client.clone(master.fetch(k))
1842 }
1843
1844 // Otherwise return the actual user
1845 return user_obj(k, c.logged_in && c.user.key === k)
1846 }
1847 client('user/*').to_delete = function () {}
1848 function user_obj (k, logged_in) {
1849 var o = master.clone(master.fetch(k))
1850 if (k.match(/^user\/([^\/]+)\/private\/(.*)$/))
1851 return logged_in ? o : {key: k}
1852
1853 delete o.pass
1854 if (!logged_in) {delete o.email; delete o.login}
1855 return o
1856 }
1857
1858 // Blacklist sensitive stuff on master, in case we have a shadow set up
1859 var blacklist = 'users users/passwords logged_in_clients'.split(' ')
1860 for (var i=0; i<blacklist.length; i++) {
1861 client(blacklist[i]).to_fetch = function () {}
1862 client(blacklist[i]).to_save = function () {}
1863 client(blacklist[i]).to_delete = function () {}
1864 client(blacklist[i]).to_forget = function () {}
1865 }
1866 },
1867
1868 persist: function (prefix_to_sync, validate) {
1869 var client = this
1870 var was_logged_in = false
1871
1872 function client_prefix (current_user) {
1873 return 'client/' + (current_user.logged_in
1874 ? current_user.user.key.substr('user/'.length)
1875 : client.client_id)
1876 }
1877
1878 function copy_client_to_user(client, user) {
1879 var old_prefix = 'client/' + client.client_id
1880 var new_prefix = 'client/' + user.key.substr('user/'.length)
1881 var cache = client.master.cache
1882
1883 var keys = Object.keys(cache) // Make a copy
1884 for (var i=0; i<keys.length; i++) { // Because we'll mutate
1885 var old_key = keys[i] // As we iterate
1886
1887 if (old_key.startsWith(old_prefix)) {
1888 var new_key = new_prefix + old_key.substr(old_prefix.length)
1889 var o = client.clone(cache[old_key])
1890 // Delete the old
1891 client.master.del(old_key)
1892
1893 if (!(cache.hasOwnProperty(new_key))) {
1894 // Save the new
1895 o.key = new_key
1896 client.master.save(o)
1897 }
1898 }
1899 }
1900 }
1901
1902 client(prefix_to_sync).to_fetch = function (key) {
1903 var c = client.fetch('current_user')
1904 if (client.loading()) return
1905 var prefix = client_prefix(c)
1906
1907 if (!was_logged_in && c.logged_in)
1908 // User just logged in! Let's copy his stuff over
1909 copy_client_to_user(client, c.user)
1910 was_logged_in = c.logged_in
1911
1912 // Get the state from master
1913 var obj = client.clone(client.master.fetch(prefix + key))
1914
1915 // Translate it back to client
1916 obj = client.deep_map(obj, function (o) {
1917 if (typeof o === 'object' && 'key' in o && typeof o.key === 'string')
1918 o.key = o.key.substr(prefix.length)
1919 return o
1920 })
1921 return obj
1922 }
1923
1924 client(prefix_to_sync).to_save = function (obj) {
1925 if (validate && !validate(obj)) {
1926 console.warn('Validation failed on', obj)
1927 client.save.abort(obj)
1928 return
1929 }
1930
1931 var c = client.fetch('current_user')
1932 if (client.loading()) return
1933 var prefix = client_prefix(c)
1934
1935 // Make it safe
1936 obj = client.clone(obj)
1937 obj = client.deep_map(obj, function (o) {
1938 if (typeof o === 'object' && 'key' in o && typeof o.key === 'string')
1939 o.key = prefix + o.key
1940 return o
1941 })
1942
1943 // Save to master
1944 client.master.save(obj)
1945 }
1946
1947 client(prefix_to_sync).to_delete = function (k) {
1948 client.master.delete(client_prefix(client.fetch('current_user')) + k)
1949 }
1950 },
1951
1952 shadows: function shadows (master_bus) {
1953 // Custom route
1954 var OG_route = bus.route
1955 bus.route = function(key, method, arg, t) {
1956 var count = OG_route(key, method, arg, t)
1957 // This forwards anything we don't have a specific handler for
1958 // to the global cache
1959 if (count === 0) {
1960 count++
1961 if (method === 'to_fetch')
1962 bus.run_handler(function get_from_master (k) {
1963 // console.log('DEFAULT FETCHing', k)
1964 var r = master_bus.fetch(k)
1965 // console.log('DEFAULT FETCHed', r)
1966 bus.save.fire(r, {version: master_bus.versions[r.key]})
1967 }, method, arg)
1968 else if (method === 'to_save')
1969 bus.run_handler(function save_to_master (o, t) {
1970 // console.log('DEFAULT ROUTE', t)
1971 master_bus.save(bus.clone(o), t)
1972 }, method, arg, {t: t})
1973 else if (method == 'to_delete')
1974 bus.run_handler(function delete_from_master (k, t) {
1975 master_bus.delete(k)
1976 return 'done'
1977 }, method, arg, {t: t})
1978 }
1979 return count
1980 }
1981 },
1982
1983 read_file: function init () {
1984 // The first time this is run, we initialize it by loading some
1985 // libraries
1986 var chokidar = require('chokidar')
1987 var watchers = {}
1988 var fs = require('fs')
1989
1990 // Now we redefine the function
1991 bus.read_file = bus.uncallback(
1992 function readFile (filename, encoding, cb) {
1993 fs.readFile(filename, (err, result) => {
1994 if (err) console.error('Error from read_file:', err)
1995 cb(null, ((result || '*error*').toString(encoding || undefined)))
1996 })
1997 },
1998 {
1999 callback_at: 2,
2000 start_watching: (args, dirty, del) => {
2001 var filename = args[0]
2002 console.log('## starting to watch', filename)
2003 watchers[filename] = chokidar.watch(filename, {
2004 atomic: true,
2005 disableGlobbing: true
2006 })
2007 watchers[filename].on('change', () => { dirty() })
2008 watchers[filename].on('add', () => { dirty() })
2009 watchers[filename].on('unlink', () => { del() })
2010 },
2011 stop_watching: (json) => {
2012 var filename = json[0]
2013 console.log('## stopping to watch', filename)
2014 // log('unwatching', filename)
2015 // To do: this should probably use.unwatch() instead.
2016 watchers[filename].close()
2017 delete watchers[filename]
2018 }
2019 })
2020 return bus.read_file.apply(bus, [].slice.call(arguments))
2021 },
2022
2023 // Synchronizes the recursive path starting with <state_path> to the
2024 // file or recursive directory structure at fs_path
2025 sync_files: function sync_files (state_path, file_path) {
2026 // To do:
2027 // - Hook up a to_delete handler
2028 // - recursively remove directories if all files gone
2029
2030 console.assert(state_path.substr(-1) !== '*'
2031 && (!file_path || file_path.substr(-1) !== '*'),
2032 'The sync_files paths should not end with *')
2033
2034 file_path = file_path || state_path
2035 var buffer = {}
2036 var full_file_path = require('path').join(__dirname, file_path)
2037
2038 bus(state_path + '*').to_fetch = (rest) => {
2039 // We DO want to handle:
2040 // - "foo"
2041 // - "foo/*"
2042 // But not:
2043 // - "foobar"
2044 if (rest.length>0 && rest[0] !== '/') return // Bail on e.g. "foobar"
2045
2046 var f = bus.read_file(file_path + rest, 'base64')
2047
2048 // Clear buffer of items after 1 second. If fs results are delayed
2049 // longer, we'll just deal with those flashbacks.
2050 for (k in buffer)
2051 if (new Date().getTime() - buffer[k] > 1 * 1000)
2052 delete buffer[k]
2053
2054 // If we are expecting this, skip the read
2055 // console.log('read file', typeof f == 'string' ? f.substr(0,40) + '..': f)
2056 if (buffer[f]) {
2057 console.log('skipping cause its in buffer')
2058 return
2059 }
2060
2061 return {_:f}
2062 }
2063
2064 bus(state_path + '/*').to_save = (o, rest, t) => {
2065 if (rest.length>0 && rest[0] !== '/') return
2066 var f = Buffer.from(o._, 'base64')
2067 require('fs').writeFile(file_path + rest, f)
2068 buffer[f] = new Date().getTime()
2069 t.done()
2070 }
2071
2072 bus.http.use('/'+state_path, require('express').static(full_file_path))
2073 },
2074
2075 // Installs a GET handler at route that gets state from a fetcher function
2076 // Note: Makes too many textbusses. Should re-use one.
2077 http_serve: function http_serve (route, fetcher) {
2078 var textbus = require('./statebus')()
2079 textbus.label = 'textbus'
2080 var watched = new Set()
2081 textbus('*').to_fetch = (filename, old) => {
2082 return {etag: Math.random() + '',
2083 _: fetcher(filename)}
2084 }
2085 bus.http.get(route, (req, res) => {
2086 var path = req.path
2087 var etag = textbus.cache[path] && textbus.cache[path].etag
2088 if (etag && req.get('If-None-Match') === etag) {
2089 res.status(304).end()
2090 return
2091 }
2092
2093 textbus.fetch(req.path) // So that textbus never clears the cache
2094 textbus.fetch(req.path, function cb (o) {
2095 res.setHeader('Cache-Control', 'public')
2096 // res.setHeader('Cache-Control', 'public, max-age='
2097 // + (60 * 60 * 24 * 30)) // 1 month
2098 res.setHeader('ETag', o.etag)
2099 res.setHeader('content-type', 'application/javascript')
2100 res.send(o._)
2101 textbus.forget(o.key, cb) // But we do want to forget the cb
2102 })
2103 })
2104 },
2105
2106 serve_client_coffee: function serve_client_coffee () {
2107 bus.http_serve('/client/:filename', (filename) => {
2108 filename = /\/client\/(.*)/.exec(filename)[0]
2109 var source_filename = filename.substr(1)
2110 var source = bus.read_file(source_filename)
2111 if (filename.match(/\.coffee$/)) {
2112
2113 try {
2114 var compiled = require('coffeescript').compile(source, {filename,
2115 bare: true,
2116 sourceMap: true})
2117 } catch (e) {
2118 if (!bus.loading())
2119 console.error('Could not compile ' + e.toString())
2120 return 'console.error(' + JSON.stringify(e.toString()) + ')'
2121 }
2122
2123 var source_map = JSON.parse(compiled.v3SourceMap)
2124 source_map.sourcesContent = source
2125 compiled = 'window.dom = window.dom || {}\n' + compiled.js
2126 compiled = 'window.ui = window.ui || {}\n' + compiled
2127
2128 function btoa(s) { return new Buffer(s.toString(),'binary').toString('base64') }
2129
2130 // Base64 encode it
2131 compiled += '\n'
2132 compiled += '//# sourceMappingURL=data:application/json;base64,'
2133 compiled += btoa(JSON.stringify(source_map)) + '\n'
2134 compiled += '//# sourceURL=' + source_filename
2135 return compiled
2136 }
2137 else return source
2138 })
2139 },
2140
2141 serve_clientjs: function serve_clientjs (path) {
2142 path = path || 'client.js'
2143 bus(path).to_fetch = () =>
2144 ({_:
2145 ['extras/coffee.js', 'extras/sockjs.js', 'extras/react.js',
2146 'statebus.js', 'client.js']
2147 .map((f) => bus.read_file('node_modules/statebus/' + f))
2148 .join(';\n')})
2149 },
2150
2151 serve_wiki: () => {
2152 bus('edit/*').to_fetch = () => ({_: require('./extras/wiki.coffee').code})
2153 },
2154
2155 unix_socket_repl: function (filename) {
2156 var repl = require('repl')
2157 var net = require('net')
2158 var fs = require('fs')
2159 if (fs.existsSync && fs.existsSync(filename))
2160 fs.unlinkSync(filename)
2161 net.createServer(function (socket) {
2162 var r = repl.start({
2163 //prompt: '> '
2164 input: socket
2165 , output: socket
2166 , terminal: true
2167 //, useGlobal: false
2168 })
2169 r.on('exit', function () {
2170 socket.end()
2171 })
2172 r.context.socket = socket
2173 }).listen(filename)
2174 },
2175
2176 schema: function schema () {
2177 function url_tree (cache) {
2178 // The key tree looks like:
2179 //
2180 // {server: {thing: [obj1, obj2], shing: [obj1, obj2], ...}
2181 // client: {dong: [obj1, ...]}}
2182 //
2183 // And objects without a number, like 'shong' will go on:
2184 // key_tree.server.shong[null]
2185 var tree = {server: {}, client: {}}
2186 for (var key in cache) {
2187 var p = parse_key(key)
2188 if (!p) {
2189 console.log('The state dash can\'t deal with key', key)
2190 return null
2191 }
2192 tree[p.owner][p.name] || (tree[p.owner][p.name] = {})
2193 tree[p.owner][p.name][p.number || null] = cache[key]
2194 }
2195 return tree
2196 }
2197
2198 function parse_key (key) {
2199 var word = "([^/]+)"
2200 // Matching things like: "/new/name/number"
2201 // or: "/name/number"
2202 // or: "/name"
2203 // or: "name/number"
2204 // or: "name"
2205 // ... and you can optionally include a final slash.
2206 var regexp = new RegExp("(/)?(new/)?" +word+ "(/" +word+ ")?(/)?")
2207 var m = key.match(regexp)
2208 if (!m) return null
2209 // indices = [0: has_match, 1: server_owned, 2: is_new, 3: name, 5: number]
2210 var owner = m[1] ? 'server' : 'client'
2211 return m[0] && {owner:owner, 'new': m[2], name: m[3], number: m[5]}
2212 }
2213 schema.parse_key = parse_key
2214 schema.url_tree = url_tree
2215 return url_tree(bus.cache)
2216 }
2217}
2218 // Add methods to bus object
2219 for (var m in extra_methods)
2220 bus[m] = extra_methods[m]
2221
2222 bus.options = default_options(bus)
2223 set_options(bus, options)
2224
2225 // Automatically make state:// fetch over a websocket
2226 bus.net_automount()
2227 return bus
2228}
2229
2230
2231// Handy functions for writing tests on nodejs
2232var tests = []
2233function test (f) {tests.push(f)}
2234function run_tests () {
2235 // Either run the test specified at command line
2236 if (process.argv[2])
2237 tests.find((f) => f.name == process.argv[2])(
2238 ()=>process.exit()
2239 )
2240
2241 // Or run all tests
2242 else {
2243 function run_next () {
2244 if (tests.length > 0) {
2245 var f = tests.shift()
2246 delay_so_far = 0
2247 console.log('\nTesting:', f.name)
2248 f(function () {setTimeout(run_next)})
2249 } else
2250 (console.log('\nDone with all tests.'), process.exit())
2251 }
2252 run_next()
2253 }
2254}
2255function log () {
2256 var pre = ' '
2257 console.log(pre+util.format.apply(null,arguments).replace('\n','\n'+pre))
2258}
2259function assert () { console.assert.apply(console, arguments) }
2260function delay (time, f) {
2261 delay_so_far = delay_so_far + time
2262 return setTimeout(f, delay_so_far)
2263}
2264delay.init = _=> delay_so_far = 0
2265var delay_so_far = 0
2266
2267
2268// Now export everything
2269module.exports.import_server = import_server
2270module.exports.run_server = function (bus, options) { bus.serve(options) }
2271module.exports.import_module = function (statebus) {
2272 statebus.testing = {test, run_tests, log, assert, delay}
2273
2274 statebus.serve = function serve (options) {
2275 var bus = statebus()
2276 require('./server').run_server(bus, options)
2277 return bus
2278 }
2279
2280 // Handy repl. Invoke with node -e 'require("statebus").repl("/tmp/foo")'
2281 statebus.repl = function (filename) {
2282 var net = require('net')
2283 var sock = net.connect(filename)
2284
2285 process.stdin.pipe(sock)
2286 sock.pipe(process.stdout)
2287
2288 sock.on('connect', function () {
2289 process.stdin.resume();
2290 process.stdin.setRawMode(true)
2291 })
2292
2293 sock.on('close', function done () {
2294 process.stdin.setRawMode(false)
2295 process.stdin.pause()
2296 sock.removeListener('close', done)
2297 })
2298
2299 process.stdin.on('end', function () {
2300 sock.destroy()
2301 console.log()
2302 })
2303
2304 process.stdin.on('data', function (b) {
2305 if (b.length === 1 && b[0] === 4) {
2306 process.stdin.emit('end')
2307 }
2308 })
2309 }
2310}