UNPKG

94.8 kBJavaScriptView Raw
1var fs = require('fs'),
2 util = require('util')
3var unique_sockjs_string = '_connect_to_statebus_'
4
5function default_options (bus) { return {
6 port: 'auto',
7 backdoor: null,
8 client: (c) => {c.shadows(bus)},
9 file_store: {save_delay: 250, filename: 'db', backup_dir: 'backups', prefix: '*'},
10 sync_files: [{state_path: 'files', fs_path: null}],
11 serve: true,
12 certs: {private_key: 'certs/private-key',
13 certificate: 'certs/certificate',
14 certificate_bundle: 'certs/certificate-bundle'},
15 connections: {include_users: true, edit_others: true},
16 __secure: false
17}}
18
19function set_options (bus, options) {
20 var defaults = bus.clone(bus.options)
21 options = options || {}
22 for (var k in (options || {}))
23 bus.options[k] = options[k]
24
25 // Fill in defaults of nested options too
26 for (var k in {file_store:1, certs:1})
27 if (bus.options[k]) {
28 if (typeof bus.options[k] !== 'object' || bus.options[k] === null)
29 bus.options[k] = {}
30
31 for (var k2 in defaults[k])
32 if (!bus.options[k].hasOwnProperty(k2))
33 bus.options[k][k2] = defaults[k][k2]
34 }
35}
36
37function import_server (bus, options)
38{ var extra_methods = {
39
40 serve: function serve (options) {
41 var master = bus
42 bus.honk = 'statelog'
43 master.label = 'master'
44
45 // Initialize Options
46 set_options(bus, options)
47
48 var use_ssl = bus.options.certs && (
49 require('fs').existsSync(bus.options.certs.private_key)
50 || require('fs').existsSync(bus.options.certs.certificate)
51 || require('fs').existsSync(bus.options.certs.certificate_bundle))
52
53 function c (client, conn) {
54 client.honk = bus.honk
55 client.serves_auth(conn, master)
56 bus.options.client && bus.options.client(client, conn)
57 }
58 if (!bus.options.client) c = undefined // no client bus when programmer explicitly says so
59
60 if (bus.options.file_store)
61 bus.file_store()
62
63 // ******************************************
64 // ***** Create our own http server *********
65 bus.make_http_server({port: bus.options.port, use_ssl})
66 bus.sockjs_server(this.http_server, c) // Serve via sockjs on it
67 var express = require('express')
68 bus.express = express()
69 bus.http = express.Router()
70 bus.install_express(bus.express)
71
72 // use gzip compression if available
73 try { bus.http.use(require('compression')())
74 console.log('Enabled http compression!') } catch (e) {}
75
76
77 // Initialize new clients with an id. We put the client id on
78 // req.client, and also in a cookie for the browser to see.
79 if (bus.options.client)
80 bus.express.use(function (req, res, next) {
81 req.client = require('cookie').parse(req.headers.cookie || '').client
82 if (!req.client) {
83 req.client = (Math.random().toString(36).substring(2)
84 + Math.random().toString(36).substring(2)
85 + Math.random().toString(36).substring(2))
86
87 res.setHeader('Set-Cookie', 'client=' + req.client
88 + '; Expires=21 Oct 2025 00:0:00 GMT;')
89 }
90 next()
91 })
92
93 // Initialize file sync
94 ; (bus.options.sync_files || []).forEach( x => {
95 if (require('fs').existsSync(x.fs_path || x.state_path))
96 bus.sync_files(x.state_path, x.fs_path)
97 })
98
99 // User will put their routes in here
100 bus.express.use('/', bus.http)
101
102 // Add a fallback that goes to state
103 bus.express.get('*', function (req, res) {
104 // Make a temporary client bus
105 var cbus = bus.bus_for_http_client(req, res)
106
107 // Do the fetch
108 cbus.honk = 'statelog'
109 var singleton = req.path.match(/^\/code\//)
110 cbus.fetch_once(req.path.substr(1), (o) => {
111 var unwrap = (Object.keys(o).length === 2
112 && '_' in o
113 && typeof o._ === 'string')
114 // To do: translate pointers as keys
115 res.send(unwrap ? o._ : JSON.stringify(o))
116 cbus.delete_bus()
117 })
118 })
119
120 // Serve Client Coffee
121 bus.serve_client_coffee()
122
123 // Custom route
124 var OG_route = bus.route
125 bus.route = function(key, method, arg, opts) {
126 var count = OG_route(key, method, arg, opts)
127
128 // This whitelists anything we don't have a specific handler for,
129 // reflecting it to all clients!
130 if (count === 0 && method === 'to_save') {
131 bus.save.fire(arg, opts)
132 bus.route(arg.key, 'on_set_sync', arg, opts)
133 count++
134 }
135
136 return count
137 }
138
139 // Back door to the control room
140 if (bus.options.backdoor) {
141 bus.make_http_server({
142 port: bus.options.backdoor,
143 name: 'backdoor_http_server',
144 use_ssl: use_ssl
145 })
146 bus.sockjs_server(this.backdoor_http_server)
147 }
148 },
149
150 bus_for_http_client: function (req, res) {
151 var bus = this
152 if (!bus.bus_for_http_client.counter)
153 bus.bus_for_http_client.counter = 0
154 var cbus = require('./statebus')()
155 cbus.label = 'client_http' + bus.bus_for_http_client.counter++
156 cbus.master = bus
157
158 // Log in as the client
159 cbus.serves_auth({remoteAddress: req.connection.remoteAddress}, bus)
160 bus.options.client(cbus)
161 cbus.save({key: 'current_user', client: req.client})
162 return cbus
163 },
164
165 make_http_server: function make_http_server (options) {
166 options = options || {}
167 var fs = require('fs')
168
169 if (options.use_ssl) {
170 // Load with TLS/SSL
171 console.log('Encryption ON')
172
173 // use http2 compatible library if available
174 try {
175 var http = require('spdy')
176 console.log('Found spdy library. HTTP/2 enabled!')
177 } catch (e) {
178 var http = require('https')
179 }
180
181 var protocol = 'https'
182 var ssl_options = {
183 ca: (fs.existsSync(this.options.certs.certificate_bundle)
184 && require('split-ca')(this.options.certs.certificate_bundle)),
185 key: fs.readFileSync(this.options.certs.private_key),
186 cert: fs.readFileSync(this.options.certs.certificate),
187 ciphers: "ECDHE-RSA-AES256-SHA384:DHE-RSA-AES256-SHA384"
188 + ":ECDHE-RSA-AES256-SHA256:DHE-RSA-AES256-SHA256"
189 + ":ECDHE-RSA-AES128-SHA256:DHE-RSA-AES128-SHA256"
190 + ":HIGH:!aNULL:!eNULL:!EXPORT:!DES:!RC4:!MD5:!PSK:!SRP:!CAMELLIA",
191 honorCipherOrder: true}
192 }
193 else {
194 // Load unencrypted server
195 console.log('Encryption OFF')
196 var http = require('http')
197 var protocol = 'http'
198 var ssl_options = undefined
199 }
200
201 if (options.port === 'auto') {
202 var bind = require('./extras/tcp-bind')
203 function find_a_port () {
204 var next_port_attempt = 80
205 while (true)
206 try {
207 var result = bind(next_port_attempt)
208 bus.port = next_port_attempt
209 return result
210 } catch (e) {
211 if (next_port_attempt < 3006) next_port_attempt = 3006
212 else next_port_attempt++
213 }
214 }
215
216 var fd
217 if (options.use_ssl)
218 try {
219 fd = bind(443)
220 bus.port = 443
221 bus.redirect_port_80()
222 } catch (e) {fd = find_a_port()}
223 else fd = find_a_port()
224 var http_server = http.createServer(ssl_options)
225 http_server.listen({fd: fd}, () => {
226 console.log('Listening on '+protocol+'://<host>:'+bus.port)
227 })
228 }
229 else {
230 bus.port = bus.options.port
231 var http_server = http.createServer(ssl_options)
232 http_server.listen(bus.options.port, () => {
233 console.log('Listening on '+protocol+'://<host>:'+bus.port)
234 })
235 }
236
237 bus[options.name || 'http_server'] = http_server
238 },
239
240 redirect_port_80: function redirect_port_80 () {
241 var redirector = require('http')
242 redirector.createServer(function (req, res) {
243 res.writeHead(301, {"Location": "https://"+req.headers['host']+req.url})
244 res.end()
245 }).listen(80)
246 },
247
248 install_express: function install_express (express_app) {
249 this.http_server.on('request', // Install express
250 function (request, response) {
251 // But express should ignore all sockjs requests
252 if (!request.url.startsWith('/'+unique_sockjs_string+'/'))
253 express_app(request, response)
254 })
255
256 },
257 sockjs_server: function sockjs_server(httpserver, client_bus_func) {
258 var master = this
259 var client_num = 0
260 // var client_busses = {} // XXX work in progress
261 var log = master.log
262 if (client_bus_func) {
263 master.save({key: 'connections'}) // Clean out old sessions
264 var connections = master.fetch('connections')
265 }
266 var s = require('sockjs').createServer({
267 sockjs_url: 'https://cdn.jsdelivr.net/sockjs/0.3.4/sockjs.min.js',
268 disconnect_delay: 600 * 1000,
269 heartbeat_delay: 6000 * 1000
270 })
271 s.on('connection', function(conn) {
272 if (client_bus_func) {
273 // To do for pooling client busses:
274 // - What do I do with connections? Do they pool at all?
275 // - Before creating a new bus here, check to see if there's
276 // an existing one in the pool, and re-use it if so.
277 // - Count the number of connections using a client.
278 // - When disconnecting, decrement the number, and if it gets
279 // to zero, delete the client bus.
280
281 connections[conn.id] = {client: conn.id, // client is deprecated
282 id: conn.id}
283 master.save(connections)
284
285 var client = require('./statebus')()
286 client.label = 'client' + client_num++
287 master.label = master.label || 'master'
288 client.master = master
289 client_bus_func(client, conn)
290 } else
291 var client = master
292
293 var our_fetches_in = {} // Every key that this socket has fetched
294 log('sockjs_s: New connection from', conn.remoteAddress)
295 function sockjs_pubber (obj, t) {
296 // log('sockjs_pubber:', obj, t)
297 var msg = {save: obj}
298 if (t.version) msg.version = t.version
299 if (t.parents) msg.parents = t.parents
300 if (t.patch) msg.patch = t.patch
301 if (t.patch) msg.save = msg.save.key
302 msg = JSON.stringify(msg)
303
304 if (global.network_delay) {
305 console.log('>>>> DELAYING!!!', global.network_delay)
306 obj = bus.clone(obj)
307 setTimeout(() => {conn.write(msg)}, global.network_delay)
308 } else
309 conn.write(msg)
310
311 log('sockjs_s: SENT a', msg, 'to client')
312 }
313 conn.on('data', function(message) {
314 log('sockjs_s:', message)
315 try {
316 message = JSON.parse(message)
317 var method = bus.message_method(message)
318
319 // Validate the message
320 if (!((method === 'fetch'
321 && master.validate(message, {fetch: 'string',
322 '?parent': 'string', '?version': 'string'}))
323 ||
324 (method === 'save'
325 && master.validate(message, {save: '*',
326 '?parents': 'array', '?version': 'string', '?patch': 'array'})
327 && (typeof(message.save) === 'string'
328 || (typeof(message.save) === 'object'
329 && typeof(message.save.key === 'string'))))
330 ||
331 (method === 'forget'
332 && master.validate(message, {forget: 'string'}))
333 ||
334 (method === 'delete'
335 && master.validate(message, {'delete': 'string'}))))
336 throw 'validation error'
337
338 } catch (e) {
339 for (var i=0; i<4; i++) console.error('#######')
340 console.error('Received bad sockjs message from '
341 + conn.remoteAddress +': ', message, e)
342 return
343 }
344
345 switch (method) {
346 case 'fetch':
347 our_fetches_in[message.fetch] = true
348 client.fetch(message.fetch, sockjs_pubber)
349 break
350 case 'forget':
351 delete our_fetches_in[message.forget]
352 client.forget(message.forget, sockjs_pubber)
353 break
354 case 'delete':
355 client.delete(message['delete'])
356 break
357 case 'save':
358 message.version = message.version || client.new_version()
359 if (message.patch) {
360 var o = bus.cache[message.save] || {key: message.save}
361 try {
362 message.save = bus.apply_patch(o, message.patch[0])
363 } catch (e) {
364 console.error('Received bad sockjs message from '
365 + conn.remoteAddress +': ', message, e)
366 return
367 }
368 }
369 client.save(message.save,
370 {version: message.version,
371 parents: message.parents,
372 patch: message.patch})
373 if (our_fetches_in[message.save.key]) { // Store what we've seen if we
374 // might have to publish it later
375 client.log('Adding', message.save.key+'#'+message.version,
376 'to pubber!')
377 sockjs_pubber.has_seen(client, message.save.key, message.version)
378 }
379 break
380 }
381
382 // validate that our fetches_in are all in the bus
383 for (var key in our_fetches_in)
384 if (!client.fetches_in.has(key, master.funk_key(sockjs_pubber)))
385 console.trace("***\n****\nFound errant key", key,
386 'when receiving a sockjs', method, 'of', message)
387 //log('sockjs_s: done with message')
388 })
389 conn.on('close', function() {
390 log('sockjs_s: disconnected from', conn.remoteAddress, conn.id, client.id)
391 for (var key in our_fetches_in)
392 client.forget(key, sockjs_pubber)
393 if (client_bus_func) {
394 delete connections[conn.id]; master.save(connections)
395 client.delete_bus()
396 }
397 })
398
399 // Define the /connection* state!
400 if (client_bus_func && !master.options.__secure) {
401
402 // A connection
403 client('connection/*').to_fetch = function (key, star) {
404 var id = star
405 var conn = master.fetch('connections')[id]
406 if (!conn) return {error: 'connection ' + id + ' does not exist'}
407
408 var result = master.clone(conn)
409 result.key = key
410 result.id = id
411 result.client = id // Deprecated
412
413 if (master.options.connections.include_users && result.user)
414 result.user = client.fetch(result.user.key)
415 return result
416 }
417 client('connection/*').to_save = function (o, star, t) {
418 // Check permissions before editing
419 if (star !== conn.id && !master.options.connections.edit_others) {
420 t.abort()
421 return
422 }
423 var connections = master.fetch('connections')
424 var result = client.clone(o)
425 var old = connections[star]
426 delete result.key
427 result.id = star
428 result.client = star // Deprecated
429 result.user = old.user
430 connections[star] = result
431 master.save(connections)
432 }
433
434 // Your connection
435 client('connection').to_fetch = function () {
436 // subscribe to changes in authentication
437 client.fetch('current_user')
438
439 var result = client.clone(client.fetch('connection/' + conn.id))
440 delete result.key
441 return result
442 }
443 client('connection').to_save = function (o) {
444 o = client.clone(o)
445 o.key = 'connection/' + conn.id
446 client.save(o)
447 }
448
449 // All connections
450 client('connections').to_save = function noop (t) {t.abort()}
451 client('connections').to_fetch = function () {
452 var result = []
453 var conns = master.fetch('connections')
454 for (var connid in conns)
455 if (connid !== 'key')
456 result.push(client.fetch('connection/' + connid))
457
458 return {all: result}
459 }
460 }
461 })
462
463 s.installHandlers(httpserver, {prefix:'/' + unique_sockjs_string})
464 },
465
466 make_websocket: function make_websocket (url) {
467 url = url.replace(/^state:\/\//, 'wss://')
468 url = url.replace(/^istate:\/\//, 'ws://')
469 url = url.replace(/^statei:\/\//, 'ws://')
470 WebSocket = require('websocket').w3cwebsocket
471 return new WebSocket(url+'/'+unique_sockjs_string+'/websocket')
472 },
473 client_creds: function client_creds (server_url) {
474 // Right now the server just creates a different random id each time
475 // it connects.
476 return {clientid: (Math.random().toString(36).substring(2)
477 + Math.random().toString(36).substring(2)
478 + Math.random().toString(36).substring(2))}
479 },
480
481 // Deprecated
482 ws_client: function (prefix, url, account) {
483 console.error('ws_client() is deprecated; use net_mount() instead')
484 bus.net_mount(prefix, url, account) },
485 // Deprecated
486 universal_ws_client: function () {
487 console.error('calling universal_ws_client is deprecated and no longer necessary') },
488
489 file_store: (function () {
490 // Make a database
491 var fs = require('fs')
492 var db = {}
493 var db_is_ok = false
494 var pending_save = null
495 var active
496 function file_store (prefix, delay_activate) {
497 prefix = prefix || bus.options.file_store.prefix
498 var filename = bus.options.file_store.filename,
499 backup_dir = bus.options.file_store.backup_dir
500
501 // Loading db
502 try {
503 if (fs.existsSync && !fs.existsSync(filename))
504 (fs.writeFileSync(filename, '{}'), bus.log('Made a new db file'))
505 db = JSON.parse(fs.readFileSync(filename))
506 db_is_ok = true
507 // If we save before anything else is connected, we'll get this
508 // into the cache but not affect anything else
509 bus.save.fire(global.pointerify ? inline_pointers(db) : db)
510 bus.log('Read db')
511 } catch (e) {
512 console.error(e)
513 console.error('bad db file')
514 }
515
516 // Saving db
517 function save_db() {
518 if (!db_is_ok) return
519
520 console.time('saved db')
521
522 fs.writeFile(filename+'.tmp', JSON.stringify(db, null, 1), function(err) {
523 if (err) {
524 console.error('Crap! DB IS DYING!!!!', err)
525 db_is_ok = false
526 } else
527 fs.rename(filename+'.tmp', filename, function (err) {
528 if (err) {
529 console.error('Crap !! DB IS DYING !!!!', err)
530 db_is_ok = false
531 } else {
532 console.timeEnd('saved db')
533 pending_save = null
534 }
535 })
536 })
537 }
538
539 function save_later() {
540 pending_save = pending_save || setTimeout(save_db, bus.options.file_store.save_delay)
541 }
542 active = !delay_activate
543
544 // Replaces every nested keyed object with {_key: <key>}
545 function abstract_pointers (o) {
546 o = bus.clone(o)
547 var result = {}
548 for (var k in o)
549 result[k] = bus.deep_map(o[k], (o) => {
550 if (o && o.key) return {_key: o.key}
551 else return o
552 })
553 return result
554 }
555 // ...and the inverse
556 function inline_pointers (db) {
557 return bus.deep_map(db, (o) => {
558 if (o && o._key)
559 return db[o._key]
560 else return o
561 })
562 }
563 function on_save (obj) {
564 db[obj.key] = global.pointerify ? abstract_pointers(obj) : obj
565 if (active) save_later()
566 }
567 on_save.priority = true
568 bus(prefix).on_save = on_save
569 bus(prefix).to_delete = function (key) {
570 delete db[key]
571 if (active) save_later()
572 }
573 file_store.activate = function () {
574 active = true
575 save_later()
576 }
577
578 // Handling errors
579 function recover (e) {
580 if (e) {
581 process.stderr.write("Uncaught Exception:\n");
582 process.stderr.write(e.stack + "\n");
583 }
584 if (pending_save) {
585 console.log('Saving db after crash')
586 console.time()
587 fs.writeFileSync(filename, JSON.stringify(db, null, 1))
588 console.log('Saved db after crash')
589 }
590 process.exit(1)
591 }
592 process.on('SIGINT', recover)
593 process.on('SIGTERM', recover)
594 process.on('uncaughtException', recover)
595
596 // Rotating backups
597 setInterval(
598 // This copies the current db over backups/db.<curr_date> every minute
599 function backup_db() {
600 if (!db_is_ok || !backup_dir) return
601 if (fs.existsSync && !fs.existsSync(backup_dir))
602 fs.mkdirSync(backup_dir)
603
604 var d = new Date()
605 var y = d.getYear() + 1900
606 var m = d.getMonth() + 1
607 if (m < 10) m = '0' + m
608 var day = d.getDate()
609 if (day < 10) day = '0' + day
610 var date = y + '-' + m + '-' + day
611
612 //bus.log('Backing up db on', date)
613
614 require('child_process').execFile(
615 '/bin/cp', [filename, backup_dir+'/'+filename+'.'+date])
616 },
617 1000 * 60 // Every minute
618 )
619 }
620
621 return file_store
622 })(),
623
624 firebase_store: function (prefix, firebase_ref) {
625 prefix = prefix || '*'
626
627 function encode_firebase_key(k) {
628 return encodeURIComponent(k).replace(/\./g, '%2E')
629 }
630
631 function decode_firebase_key(k) {
632 return decodeURIComponent(k.replace('%2E', '.'))
633 }
634
635 bus(prefix).to_fetch = function (key, t) {
636 firebase_ref.child(encode_firebase_key(key)).on('value', function (x) {
637 t.done(x.val() || {})
638 }, function (err) { t.abort() })
639 }
640
641 bus(prefix).on_save = function (o) {
642 firebase_ref.child(encode_firebase_key(o.key)).set(o)
643 }
644
645 // bus(prefix).to_save = function (o, t) {
646 // firebase_ref.child(encode_firebase_key(o.key)).set(o, (err) => {
647 // err ? t.abort() : t.done()
648 // })
649 // }
650
651 bus(prefix).to_delete = function (key, t) {
652 firebase_ref.child(encode_firebase_key(key)).set(null, (err) => {
653 err ? t.abort() : t.done()
654 })
655 }
656
657 bus(prefix).to_forget = function (key, t) {
658 firebase_ref.child(encode_firebase_key(key)).off()
659 }
660 },
661
662 lazy_sqlite_store: function lazy_sqlite_store (opts) {
663 if (!opts) opts = {}
664 opts.lazy = true
665 bus.sqlite_store(opts)
666 },
667 fast_load_sqlite_store: function sqlite_store (opts) {
668 if (!opts) opts = {}
669 opts.dont_fire = true
670 bus.sqlite_store(opts)
671 },
672 sqlite_store: function sqlite_store (opts) {
673 var prefix = '*'
674 var open_transaction = null
675
676 if (!opts) opts = {}
677 if (!opts.filename) opts.filename = 'db.sqlite'
678 if (!opts.hasOwnProperty('inline_pointers'))
679 opts.inline_pointers = global.pointerify
680
681 // Load the db on startup
682 try {
683 var db = bus.sqlite_store_db || new (require('better-sqlite3'))(opts.filename)
684 bus.sqlite_store_db = db
685 bus.sqlite_store.load_all = load_all
686 bus.sqlite_store.all_keys = all_keys
687
688 db.pragma('journal_mode = WAL')
689 db.prepare('create table if not exists cache (key text primary key, obj text)').run()
690
691 function all_keys () {
692 var result = []
693 for (var row of db.prepare('select key from cache').iterate())
694 result.push(row.key)
695 return result
696 }
697 function load_all (options) {
698 var temp_db = {}
699
700 for (var row of db.prepare('select * from cache').iterate()) {
701 var obj = JSON.parse(row.obj)
702 temp_db[obj.key] = obj
703 }
704
705 if (opts.inline_pointers)
706 temp_db = inline_pointers(temp_db)
707
708 for (var key in temp_db)
709 if (temp_db.hasOwnProperty(key)) {
710 if (options.dont_fire)
711 bus.cache[key] = temp_db[key]
712 else
713 bus.save.fire(temp_db[key])
714 temp_db[key] = undefined
715 }
716 }
717 if (!opts.lazy) load_all(opts)
718
719 bus.log('Read ' + opts.filename)
720 } catch (e) {
721 console.error(e)
722 console.error('Bad sqlite db')
723 }
724
725 function sqlite_get (key) {
726 var x = db.prepare('select * from cache where key = ?').get([key])
727 return x ? JSON.parse(x.obj) : {}
728 }
729 if (opts.lazy)
730 // Add fetch handler
731 bus(prefix).to_fetch = function (key, t) {
732 var x = (bus.cache[key] && !bus.pending_fetches[key])
733 || sqlite_get(key)
734 if (opts.inline_pointers) x = inline_pointers_singleobj(x)
735 x = bus.deep_map(x, (o) => o && o.key ? sqlite_get(o.key) : o)
736 t.done(x)
737 }
738
739
740 // Add save handlers
741 function on_save (obj) {
742 if (opts.inline_pointers)
743 obj = abstract_pointers(obj)
744
745 if (opts.use_transactions && !open_transaction){
746 console.time('save db')
747 db.prepare('BEGIN TRANSACTION').run()
748 }
749
750 db.prepare('replace into cache (key, obj) values (?, ?)').run(
751 [obj.key, JSON.stringify(obj)])
752
753 if (opts.use_transactions && !open_transaction) {
754 open_transaction = setTimeout(function(){
755 console.log('Committing transaction to database')
756 db.prepare('COMMIT').run()
757 open_transaction = false
758 console.timeEnd('save db')
759 })
760 }
761 }
762 if (opts.save_sync) {
763 var old_route = bus.route
764 bus.route = function (key, method, arg, t) {
765 if (method === 'to_save') on_save(arg)
766 return old_route(key, method, arg, t)
767 }
768 } else
769 bus(prefix).on_set_sync = on_save
770
771 bus(prefix).to_delete = function (key) {
772 if (opts.use_transactions && !open_transaction){
773 console.time('save db')
774 db.prepare('BEGIN TRANSACTION').run()
775 }
776 db.prepare('delete from cache where key = ?').run([key])
777 if (opts.use_transactions && !open_transaction)
778 open_transaction = setTimeout(function(){
779 console.log('committing')
780 db.prepare('COMMIT').run()
781 open_transaction = false
782 console.timeEnd('save db')
783 })
784 }
785
786 // Replaces every nested keyed object with {_key: <key>}
787 function abstract_pointers (o) {
788 o = bus.clone(o)
789 var result = {}
790 for (var k in o)
791 result[k] = bus.deep_map(o[k], (o) => {
792 if (o && o.key) return {_key: o.key}
793 else return o
794 })
795 return result
796 }
797 // ...and the inverse
798 function inline_pointers (db) {
799 return bus.deep_map(db, (o) => {
800 if (o && o._key)
801 return db[o._key]
802 else return o
803 })
804 }
805 function inline_pointers_singleobj (obj) {
806 return bus.deep_map(obj, (o) => (o && o._key)
807 ? bus.cache[o._key] : o)
808 }
809
810 // Rotating backups
811 setInterval(
812 // Copy the current db over backups/db.<curr_date> every minute
813 //
814 // Note: in future we might want to use db.backup():
815 // https://github.com/JoshuaWise/better-sqlite3/blob/master/docs/api.md#backupdestination-options---promise
816 function backup_db() {
817 if (opts.backups === false) return
818 var backup_dir = opts.backup_dir || 'backups'
819 if (fs.existsSync && !fs.existsSync(backup_dir))
820 fs.mkdirSync(backup_dir)
821
822 var d = new Date()
823 var y = d.getYear() + 1900
824 var m = d.getMonth() + 1
825 if (m < 10) m = '0' + m
826 var day = d.getDate()
827 if (day < 10) day = '0' + day
828 var date = y + '-' + m + '-' + day
829
830 require('child_process').execFile(
831 'sqlite3',
832 [opts.filename, '.backup '+"'"+backup_dir+'/'+opts.filename+'.'+date+"'"])
833 },
834 1000 * 60 // Every minute
835 )
836 },
837
838 pg_store: function pg_store (opts) {
839 opts = opts || {}
840 opts.prefix = opts.prefix || '*'
841
842 // Load the db on startup
843 try {
844 var db = new require('pg-native')()
845 bus.pg_db = db
846 bus.pg_save = pg_save
847 db.connectSync(opts.url)
848 db.querySync('create table if not exists store (key text primary key, value jsonb)')
849
850 var rows = db.querySync('select * from store')
851 rows.forEach(r => bus.save(inline_pointers(r.value, bus)))
852
853 bus.log('Read ' + opts.url)
854 } catch (e) {
855 console.error(e)
856 console.error('Bad pg db')
857 }
858
859 // Add save handlers
860 function pg_save (obj) {
861 obj = abstract_pointers(obj)
862
863 db.querySync('insert into store (key, value) values ($1, $2) '
864 + 'on conflict (key) do update set value = $2',
865 [obj.key, JSON.stringify(obj)])
866 }
867 pg_save.priority = true
868 bus(opts.prefix).on_save = pg_save
869 bus(opts.prefix).to_delete = function (key) {
870 db.query('delete from store where key = $1', [key])
871 }
872
873 // Replaces every nested keyed object with {_key: <key>}
874 function abstract_pointers (o) {
875 o = bus.clone(o)
876 var result = {}
877 for (var k in o)
878 result[k] = bus.deep_map(o[k], (x) => {
879 if (x && x.key) return {_key: x.key}
880 else return x
881 })
882 return result
883 }
884 // ...and the inverse
885 function inline_pointers (obj, bus) {
886 return bus.deep_map(obj, (o) => {
887 if (o && o._key) {
888 if (!bus.cache[o._key])
889 bus.cache[o._key] = {key: o._key}
890 return bus.cache[o._key]
891 } else return o
892 })
893 }
894 },
895
896 setup_usage_log (opts) {
897 bus.serve_time()
898 opts = opts || {}
899 opts.filename = opts.filename || 'db.sqlite'
900 var db = new (require('better-sqlite3'))(opts.filename)
901 bus.usage_log_db = db
902 //db.pragma('journal_mode = WAL')
903 db.prepare('create table if not exists usage (date integer, event text, details text)').run()
904 db.prepare('create index if not exists date_index on usage (date)').run()
905 var refresh_interval = 1000*60
906
907 var nots = ['details not like "%facebookexternalhit%"',
908 'details not like "%/apple-touch-icon%"',
909 'details not like "%Googlebot%"',
910 'details not like "%AdsBot-Google%"',
911 'details not like "%Google-Adwords-Instant%"',
912 'details not like "%Apache-HttpClient%"',
913 'details not like "%SafeDNSBot%"',
914 'details not like "%RevueBot%"',
915 'details not like "%MetaURI API%"',
916 'details not like "%redback/v%"',
917 'details not like "%Slackbot%"',
918 'details not like "%HTTP_Request2/%"',
919 'details not like "%python-requests/%"',
920 'details not like "%LightspeedSystemsCrawler/%"',
921 'details not like "%CipaCrawler/%"',
922 'details not like "%Twitterbot/%"',
923 'details not like "%Go-http-client/%"',
924 'details not like "%/cheese_service%"'
925 ].join(' and ')
926 bus.usage_log_nots = nots
927
928 // Aggregate all accesses by day, to get daily active users
929 bus('usage').to_fetch = () => {
930 bus.fetch('time/' + refresh_interval)
931 var days = []
932 var last_day
933 for (var row of db.prepare('select * from usage where '
934 + nots + ' order by date').iterate()) {
935 row.details = JSON.parse(row.details)
936 if (row.details.agent && row.details.agent.match(/bot/)) continue
937
938 var d = new Date(row.date * 1000)
939 var day = d.getFullYear() + '-' + (d.getMonth()+1) + '-' + d.getDate()
940 if (last_day !== day)
941 days.push({day: day, // Init
942 clients: new Set(),
943 ips: new Set(),
944 client_socket_opens: new Set(),
945 ip_socket_opens: new Set()
946 })
947 last_day = day
948
949 if (row.event === 'socket open') {
950 days[days.length-1].client_socket_opens.add(row.details.client)
951 days[days.length-1].ip_socket_opens.add(row.details.ip)
952 }
953 days[days.length-1].clients.add(row.details.client)
954 days[days.length-1].ips.add(row.details.ip)
955 }
956
957 for (var i=0; i<days.length; i++)
958 days[i] = {day: days[i].day,
959 ip_hits: days[i].ips.size,
960 client_hits: days[i].clients.size,
961 client_socket_opens: days[i].client_socket_opens.size,
962 ip_socket_opens: days[i].ip_socket_opens.size
963 }
964
965 return {_: days}
966 }
967
968 bus('recent_hits/*').to_fetch = (rest) => {
969 bus.fetch('time/' + refresh_interval)
970 var result = []
971 for (var row of db.prepare('select * from usage where '
972 + nots + ' order by date desc limit ?').iterate(
973 [parseInt(rest)])) {
974
975 row.details = JSON.parse(row.details)
976 if (row.details.agent && row.details.agent.match(/bot/)) continue
977
978 result.push({url: row.details.url, ip: row.details.ip, date: row.date})
979 }
980
981 return {_: result}
982 }
983
984 bus('recent_referers/*').to_fetch = (rest) => {
985 bus.fetch('time/' + refresh_interval)
986 var result = []
987 for (var row of db.prepare('select * from usage where '
988 + nots + ' order by date desc limit ?').iterate(
989 [parseInt(rest)])) {
990
991 row.details = JSON.parse(row.details)
992 if (row.details.agent && row.details.agent.match(/bot/)) continue
993
994 if (row.details.referer && !row.details.referer.match(/^https:\/\/cheeseburgertherapy.com/))
995 result.push({url: row.details.url, referer: row.details.referer,
996 date: row.date})
997 }
998
999 return {_: result}
1000 }
1001
1002
1003 function sock_open_time (sock_event) {
1004 var client = JSON.parse(sock_event.details).client
1005 if (!client) return null
1006
1007 var http_req = db.prepare('select * from usage where event = "http request" and date < ? and '
1008 + ' details like ? and '
1009 + nots + ' order by date desc limit 1').get([sock_event.date,
1010 '%'+client+'%'])
1011 if (!http_req) return null
1012
1013 var delay = sock_event.date - http_req.date
1014 var res = !delay || delay > 300 ? 'fail' : delay
1015 if (delay && delay < 300
1016 && JSON.parse(sock_event.details).ip
1017 !== JSON.parse(http_req.details).ip)
1018 console.error('Yuck!', delay, sock_event, http_req)
1019 return [res, JSON.parse(http_req.details).url]
1020 }
1021 function sock_open_times () {
1022 var opens = db.prepare('select * from usage where event = "socket open" and '
1023 + nots + ' order by date desc limit ?').all(500)
1024 var times = []
1025 for (var i=0; i<opens.length; i++) {
1026 times.push(sock_open_time(opens[i]))
1027 // Get the most recent http hit before this open, from the same client id
1028 // subract the times
1029 }
1030 return times
1031 }
1032
1033 bus('socket_load_times').to_fetch = () => {
1034 return {_: sock_open_times()}
1035 }
1036 },
1037 log_usage(event, details) {
1038 bus.usage_log_db.prepare('insert into usage (date, event, details) values (?, ?, ?)')
1039 .run([new Date().getTime()/1000,
1040 event,
1041 JSON.stringify(details)])
1042 },
1043
1044 serve_time () {
1045 if (bus('time*').to_fetch.length > 0)
1046 // Then it's already installed
1047 return
1048
1049 // Time ticker
1050 var timeouts = {}
1051 bus('time*').to_fetch =
1052 function (key, rest, t) {
1053 if (key === 'time') rest = '/1000'
1054 timeout = parseInt(rest.substr(1))
1055 function f () { bus.save.fire({key: key, time: Date.now()}) }
1056 timeouts[key] = setInterval(f, timeout)
1057 f()
1058 }
1059 bus('time*').to_forget =
1060 function (key, rest) {
1061 if (key === 'time') key = 'time/1000'
1062 clearTimeout(timeouts[key])
1063 }
1064 },
1065
1066 smtp (opts) {
1067 var bus = this
1068 // Listen for SMTP messages on port 25
1069 if (opts.domain) {
1070 var mailin = require('mailin')
1071 mailin.start({
1072 port: opts.port || 25,
1073 disableWebhook: true,
1074 host: opts.domain,
1075 smtpOptions: { SMTPBanner: opts.domain }
1076 })
1077 // Event emitted after a message was received and parsed.
1078 mailin.on('message', (connection, msg, raw) => {
1079 if (!msg.messageId) {
1080 console.log('Aborting message without id!', msg.subject, new Date().toLocaleString())
1081 return
1082 }
1083
1084 console.log(msg)
1085 console.log('Refs is', msg.references)
1086 console.log('Raw is', raw)
1087 var parent = msg.references
1088 if (parent) {
1089 if (Array.isArray(parent))
1090 parent = parent[0]
1091 var m = parent.match(/\<(.*)\>/)
1092 if (m && m[1])
1093 parent = m[1]
1094 }
1095 var from = msg.from
1096
1097 console.log('date is', msg.date, typeof(msg.date), msg.date.getTime())
1098 email = {
1099 key: "email/" + msg.messageId,
1100 _: {
1101 title: msg.subject,
1102 parent: parent && ("email/" + parent) || undefined,
1103 from: msg.from.address,
1104 to: msg.to.map(x=>x.address),
1105 cc: msg.cc.map(x=>x.address),
1106 date: msg.date.getTime() / 1000,
1107 text: msg.text,
1108 body: msg.text,
1109 html: msg.html
1110 }
1111 }
1112
1113 bus.save(email)
1114 })
1115 }
1116 },
1117
1118 serve_email (master, opts) {
1119 opts = opts || {}
1120 var client = this
1121 var email_regex = /^(([^<>()\[\]\\.,;:\s@"]+(\.[^<>()\[\]\\.,;:\s@"]+)*)|(".+"))@((\[[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}])|(([a-zA-Z\-0-9]+\.)+[a-zA-Z]{2,}))$/
1122 var peemail_regex = /^public$|^(([^<>()\[\]\\.,;:\s@"]+(\.[^<>()\[\]\\.,;:\s@"]+)*)|(".+"))@((\[[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}])|(([a-zA-Z\-0-9]+\.)+[a-zA-Z]{2,}))$/
1123 // var local_addie_regex = new RegExp('state://' + opts.domain + '/user/([^\/]+)')
1124 // Todo on Server:
1125 // - Connect with mailin, so we can receive SMTP shit
1126 //
1127 // - Is there security hole if users have a ? or a / in their name?
1128 // - Make the master.posts/ state not require /
1129 // - Make standard url tools for optional slashes, and ? query params
1130 // - Should a e.g. client to_save abort if it calls save that aborts?
1131 // - e.g. if master('posts*').to_save aborts
1132
1133 // Helpers
1134 function get_posts (args) {
1135 console.log('getting posts with', args)
1136 var can_see = [{cc: ['public']}]
1137 if (args.for)
1138 can_see.push({to: [args.for]},
1139 {cc: [args.for]},
1140 {from: [args.for]})
1141
1142 terms = '(' + can_see
1143 .map(x => "value @> '"+JSON.stringify({_:x})+"'")
1144 .join(' or ')
1145 + ')'
1146
1147 if (args.about) {
1148 // console.log('getting one about', args.about)
1149 var interested_in = [{to: [args.about]},
1150 {cc: [args.about]},
1151 {from: [args.about]}]
1152 terms += ' and (' + interested_in
1153 .map(x => "value @> '"+JSON.stringify({_:x})+"'")
1154 .join(' or ')
1155 + ')'
1156 }
1157
1158 var q = "select value from store where key like 'post/%' and " + terms
1159 q += " order by value #>'{_,date}' desc"
1160 if (args.to) q += ' limit ' + to
1161 console.log('with query', q)
1162 return master.pg_db.querySync(q).map(x=>x.value)
1163 }
1164 function post_children (post) {
1165 return master.pg_db.querySync(
1166 "select value from store where value #>'{_,parent}' = '"
1167 + JSON.stringify(post.key)
1168 + "' order by value #>'{_,date}' asc").map(x=>x.value)
1169 }
1170
1171 // function canonicalize_address (addie) {
1172 // if (opts.domain) {
1173 // // Try foo@gar.boo
1174 // var m = addie.match(email_regex)
1175 // if (m && m[5].toLowerCase() === opts.domain)
1176 // return 'user/' + m[1]
1177
1178 // // Try state://gar.boo/user/foo
1179 // m = addie.match(local_addie_regex)
1180 // if (m) return 'user/' + m[1]
1181 // }
1182 // return addie
1183 // }
1184
1185 // Define state on master
1186 if (master('posts_for/*').to_fetch.length === 0) {
1187 // Get posts for each user
1188 master('posts_for/*').to_fetch = (json) => {
1189 watch_for_dirt('posts-for/' + json.for)
1190 return {_: get_posts(json)}
1191 }
1192 // Saving any post will dirty the list for all users mentioned in
1193 // the post
1194 master('post/*').to_save = (old, New, t) => {
1195 // To do: diff the cc, to, and from lists, and only dirty
1196 // posts_for people who have been changed
1197
1198 if (!old._) old = {_:{to: [], from: [], cc: []}}
1199
1200 // old._.to = old._.to.map(a=>canonicalize_address(a))
1201 // old._.cc = old._.cc.map(a=>canonicalize_address(a))
1202 // old._.from = old._.from.map(a=>canonicalize_address(a))
1203 // New._.to = New._.to.map(a=>canonicalize_address(a))
1204 // New._.cc = New._.cc.map(a=>canonicalize_address(a))
1205 // New._.from = New._.from.map(a=>canonicalize_address(a))
1206
1207 var dirtied = old._.to.concat(New._.to)
1208 .concat(old._.cc).concat(New._.cc)
1209 .concat(old._.from).concat(New._.from)
1210 dirtied.forEach(u=>dirty('posts-for/' + u))
1211
1212 if (old._.parent) dirty(old._.parent)
1213 if (New._.parent) dirty(New._.parent)
1214
1215 t.done(New)
1216 }
1217 }
1218
1219 function user_addy (u) {
1220 return u.name + '@' + opts.domain
1221 }
1222 function current_addy () {
1223 var c = client.fetch('current_user')
1224 return c.logged_in ? user_addy(c.user) : 'public'
1225 }
1226 function is_author (post) {
1227 var from = post._.from
1228 var c = client.fetch('current_user')
1229 return from.includes(current_addy())
1230 }
1231 function can_see (post) {
1232 var c = client.fetch('current_user')
1233 var allowed = post._.to.concat(post._.cc).concat(post._.from)
1234 return allowed.includes(current_addy()) || allowed.includes('public')
1235 }
1236 var drtbus = master//require('./statebus')()
1237 function dirty (key) { drtbus.save({key: 'dirty-'+key, n: Math.random()}) }
1238 function watch_for_dirt (key) { drtbus.fetch('dirty-'+key) }
1239
1240 client('current_email').to_fetch = () => {
1241 return {_: current_addy()}
1242 }
1243
1244 // Define state on client
1245 client('posts*').to_fetch = (k, rest) => {
1246 var args = bus.parse(rest.substr(1))
1247 var c = client.fetch('current_user')
1248 args.for = current_addy()
1249 var e = master.fetch('posts_for/' + JSON.stringify(args))
1250 return {_: master.clone(e._).map(x=>client.fetch(x.key))}
1251 }
1252
1253 client('post/*').to_fetch = (k) => {
1254 var e = master.clone(master.fetch(k))
1255 if (!e._) return {}
1256 if (!can_see(e))
1257 return {}
1258
1259 e._.children = post_children(e).map(e=>e.key)
1260 watch_for_dirt(k)
1261
1262 return e
1263 }
1264
1265 client('post/*').to_save = (o, t) => {
1266 if (!(client.validate(o, {key: 'string',
1267 _: {to: 'array',
1268 cc: 'array',
1269 from: 'array',
1270 date: 'number',
1271 '?parent': 'string',
1272 body: 'string',
1273 '?title': 'string',
1274 '*': '*'}})
1275 && o._.to.every(a=>a.match(peemail_regex))
1276 && o._.cc.every(a=>a.match(peemail_regex))
1277 && o._.from.every(a=>a.match(peemail_regex)))) {
1278 console.error('post no be valid', o)
1279 t.abort()
1280 return
1281 }
1282
1283 var c = client.fetch('current_user')
1284
1285 // Make sure this user is an author
1286 var from = o._.from
1287 if (!is_author(o)) {
1288 console.error('User', current_addy(),
1289 'is not author', o._.from)
1290 t.abort()
1291 return
1292 }
1293 o = client.clone(o)
1294 delete o.children
1295
1296 master.save(o)
1297 t.done()
1298 }
1299
1300 client('post/*').to_delete = (key, o, t) => {
1301 // Validate
1302 if (!is_author(o)) {
1303 console.error('User', current_addy(),
1304 'is not author', o._.from)
1305 t.abort()
1306 return
1307 }
1308
1309 // master.delete(key)
1310 master.pg_db.query('delete from store where key = $1', [key])
1311
1312 // Dirty everybody
1313 var dirtied = o._.to.concat(o._.cc).concat(o._.from)
1314 dirtied.forEach(u => dirty('posts-for/' + u))
1315 if (o._.parent) dirty(o._.parent)
1316
1317 // To do: handle nested threads. Either detach them, or insert a
1318 // 'deleted' stub, or splice together
1319
1320 // Complete
1321 t.done()
1322 }
1323
1324 client('friends').to_fetch = t => {
1325 return {_: (master.fetch('users').all||[])
1326 .map(u=>client.fetch('email/' + user_addy(u)))
1327 .concat([client.fetch('email/public')])
1328 }
1329 }
1330
1331 client('email/*').to_fetch = (rest) => {
1332 var m = rest.match(email_regex)
1333 var result = {address: rest}
1334 if (rest === 'public') {
1335 result.name = 'public'
1336 }
1337 else if (m && m[5].toLowerCase() === opts.domain) {
1338 result.user = client.fetch('user/' + m[1])
1339 result.name = result.user.name
1340 result.pic = result.user.pic
1341 result.upgraded = true
1342 }
1343 else if (m) {
1344 result.name = m[1]
1345 result.upgraded = false
1346 }
1347
1348 return result
1349 }
1350 },
1351
1352 sqlite_query_server: function sqlite_query_server (db) {
1353 var fetch = bus.fetch
1354 bus('table_columns/*').to_fetch =
1355 function fetch_table_columns (key, rest) {
1356 if (typeof key !== 'string')
1357 console.log(handlers.hash)
1358 var table_name = rest
1359 var columns = fetch('sql/PRAGMA table_info(' + table_name + ')').rows.slice()
1360 var foreign_keys = fetch('table_foreign_keys/' + table_name)
1361 var column_info = {}
1362 for (var i=0;i< columns .length;i++) {
1363 var col = columns[i].name
1364 column_info[col] = columns[i]
1365 // if (col === 'customer' || col === 'customers')
1366 // console.log('FOR CUSTOMER, got', col, foreign_keys[col])
1367 column_info[col].foreign_key = foreign_keys[col] && foreign_keys[col].table
1368 columns[i] = col
1369 }
1370 columns.splice(columns[columns.indexOf('id')], 1)
1371 column_info['key'] = column_info['id']
1372 delete column_info['id']
1373 return {columns:columns, column_info:column_info}
1374 }
1375
1376
1377 bus('table_foreign_keys/*').to_fetch =
1378 function table_foreign_keys (key, rest) {
1379 var table_name = rest
1380 var foreign_keys = fetch('sql/PRAGMA foreign_key_list(' + table_name + ')').rows
1381 var result = {}
1382 for (var i=0;i< foreign_keys .length;i++)
1383 result[foreign_keys[i].from] = foreign_keys[i]
1384 delete result.id
1385 result.key = key
1386 return result
1387 }
1388
1389 bus('sql/*').to_fetch =
1390 function sql (key, rest) {
1391 fetch('timer/60000')
1392 var query = rest
1393 try { query = JSON.parse(query) }
1394 catch (e) { query = {stmt: query, args: []} }
1395
1396 db.all(query.stmt, query.args,
1397 function (err, rows) {
1398 if (rows) bus.save.fire({key:key, rows: rows})
1399 else console.error('Bad sqlite query', key, err)
1400 }.bind(this))
1401 }
1402 },
1403
1404 sqlite_table_server: function sqlite_table_server(db, table_name) {
1405 var save = bus.save, fetch = bus.fetch
1406 var table_columns = fetch('table_columns/'+table_name) // This will fail if used too soon
1407 var foreign_keys = fetch('table_foreign_keys/'+table_name)
1408 var remapped_keys = fetch('remapped_keys')
1409 remapped_keys.keys = remapped_keys.keys || {}
1410 remapped_keys.revs = remapped_keys.revs || {}
1411 function row_json (row) {
1412 var result = {key: table_name + '/' + row.id}
1413 for (var k in row)
1414 if (row.hasOwnProperty(k))
1415 result[k] = (foreign_keys[k] && row[k]
1416 ? foreign_keys[k].table + '/' + row[k]
1417 : result[k] = row[k])
1418 if (result.hasOwnProperty('other')) result.other = JSON.parse(result.other || '{}')
1419 delete result.id
1420 return result
1421 }
1422 function json_values (json) {
1423 var columns = table_columns.columns
1424 var result = []
1425 for (var i=0; i<columns.length; i++) {
1426 var col = columns[i]
1427 var val = json[col]
1428
1429 // JSONify the `other' column
1430 if (col === 'other')
1431 val = JSON.stringify(val || {})
1432
1433 // Convert foreign keys from /customer/3 to 3
1434 else if (foreign_keys[col] && typeof val === 'string') {
1435 val = remapped_keys.keys[val] || val
1436 val = json[columns[i]].split('/')[1]
1437 }
1438
1439 result.push(val)
1440 }
1441 return result
1442 }
1443 function render_table () {
1444 var result = []
1445 db.all('select * from ' + table_name, function (err, rows) {
1446 if (err) console.error('Problem with table!', table_name, err)
1447 for (var i=0; i<rows.length; i++)
1448 result[i] = row_json(rows[i])
1449 bus.save.fire({key: table_name, rows: result})
1450 })
1451 }
1452 function render_row(obj) {
1453 bus.save.fire(row_json(obj))
1454 if (remapped_keys.revs[obj.key]) {
1455 var alias = bus.clone(obj)
1456 alias.key = remapped_keys.revs[obj.key]
1457 bus.save.fire(row_json(alias))
1458 }
1459 }
1460
1461 // ************************
1462 // Handlers!
1463 // ************************
1464
1465 // Fetching the whole table, or a single row
1466 bus(table_name + '*').to_fetch = function (key, rest) {
1467 if (rest === '')
1468 // Return the whole table
1469 return render_table()
1470
1471 if (rest[0] !== '/') return {error: 'bad key: ' + key}
1472 key = remapped_keys.keys[key] || key
1473
1474 var id = key.split('/')[1]
1475 db.get('select * from '+table_name+' where rowid = ?',
1476 [id],
1477 function (err, row) {
1478 if (!row)
1479 { console.log('Row', id, "don't exist.", err); return }
1480
1481 render_row(row)
1482 }.bind(this))
1483 }
1484
1485 // Saving a row
1486 bus(table_name + '/*').to_save = function (obj, rest) {
1487 var columns = table_columns.columns
1488 var key = remapped_keys.keys[obj.key] || obj.key
1489
1490 // Compose the query statement
1491 var stmt = 'update ' + table_name + ' set '
1492 var rowid = rest
1493 var vals = json_values(obj)
1494 for (var i=0; i<columns.length; i++) {
1495 stmt += columns[i] + ' = ?'
1496 //vals.push(obj[columns[i]])
1497 if (i < columns.length - 1)
1498 stmt += ', '
1499 }
1500 stmt += ' where rowid = ?'
1501 vals.push(rowid)
1502
1503 // Run the query
1504 db.run(stmt, vals,
1505 function (e,r) {
1506 console.log('updated',e,r,key)
1507 bus.dirty(key)
1508 })
1509 }
1510
1511 // Inserting a new row
1512 bus('new/' + table_name + '/*').to_save = function (obj) {
1513 var columns = table_columns.columns
1514 var stmt = ('insert into ' + table_name + ' (' + columns.join(',')
1515 + ') values (' + new Array(columns.length).join('?,') + '?)')
1516 var values = json_values(obj)
1517
1518 console.log('Sqlite:' + stmt)
1519
1520 db.run(stmt, values, function (error) {
1521 if (error) console.log('INSERT error!', error)
1522 console.log('insert complete, got id', this.lastID)
1523 remapped_keys.keys[obj.key] = table_name + '/' + this.lastID
1524 remapped_keys.revs[remapped_keys.keys[obj.key]] = obj.key
1525 save(remapped_keys)
1526 render_table()
1527 })
1528 }
1529
1530 // Deleting a row
1531 bus(table_name + '/*').to_delete = function (key, rest) {
1532 if (remapped_keys.keys[key]) {
1533 var old_key = key
1534 var new_key = remapped_keys.keys[key]
1535 delete remapped_keys.keys[old_key]
1536 delete remapped_keys.revs[new_key]
1537 key = new_key
1538 }
1539
1540 var stmt = 'delete from ' + table_name + ' where rowid = ?'
1541 var rowid = rest
1542 console.log('DELETE', stmt)
1543 db.run(stmt, [rowid],
1544 function (err) {
1545 if (err) console.log('DELETE error', err)
1546 else console.log('delete complete')
1547 render_table() })
1548 }
1549 },
1550
1551 serves_auth: function serves_auth (conn, master) {
1552 var client = this // to keep me straight while programming
1553
1554 function logout (key) {
1555 var clients = master.fetch('logged_in_clients')
1556 for (var k in clients)
1557 if (k !== 'key')
1558 if (Object.keys(clients[k]).length === 0) {
1559 client.log('Found a deleted user. Removing.', k, clients[k])
1560 delete clients[k]
1561 master.save(clients)
1562 } else if (clients[k].key === key) {
1563 client.log('Logging out!', k, clients[k])
1564 delete clients[k]
1565 master.save(clients)
1566 }
1567 }
1568
1569 // Initialize master
1570 if (!master.auth_initialized) {
1571 master('users/passwords').to_fetch = function (k) {
1572 var result = {key: 'users/passwords'}
1573 var users = master.fetch('users')
1574 users.all = users.all || []
1575 for (var i=0; i<users.all.length; i++) {
1576 var u = master.fetch(users.all[i])
1577 if (!(u.login || u.name)) {
1578 console.error('upass: this user has bogus name/login', u.key, u.name, u.login)
1579 continue
1580 }
1581 var login = (u.login || u.name).toLowerCase()
1582 console.assert(login, 'Missing login for user', u)
1583 if (result.hasOwnProperty(login)) {
1584 console.error("upass: this user's name is bogus, dude.", u.key)
1585 continue
1586 }
1587 result[login] = {user: u.key, pass: u.pass}
1588 }
1589 return result
1590 }
1591 master.auth_initialized = true
1592 master.fetch('users/passwords')
1593
1594 master('user/*').to_delete = (key, t) => {
1595 master.log('Deleteinggg!!!', key)
1596 // Remove from users.all
1597 var users = master.fetch('users')
1598 users.all = users.all.filter(u => u.key && u.key !== key)
1599 master.save(users)
1600
1601 // Log out
1602 master.log('Logging out', key)
1603 logout(key)
1604
1605 // Remove connection
1606 master.log('Removing connections for', key)
1607 var conns = master.fetch('connections')
1608 for (var k in conns) {
1609 console.log('Trying key', k)
1610 if (k !== 'key')
1611 if (conns[k].user && !conns[k].user.key) {
1612 console.log('Cleaning keyless user', conss[k].user)
1613 delete conns[k].user
1614 master.save(conns)
1615 continue
1616 }
1617 }
1618
1619 master.log('Dirtying users/passwords for', key)
1620 master.dirty('users/passwords')
1621
1622 // Done.
1623 t.done()
1624 }
1625 }
1626
1627 // Authentication functions
1628 function authenticate (login, pass) {
1629 var userpass = master.fetch('users/passwords')[login.toLowerCase()]
1630 master.log('authenticate: we see',
1631 master.fetch('users/passwords'),
1632 userpass && userpass.pass,
1633 pass)
1634
1635 if (!(typeof login === 'string' && typeof pass === 'string')) return false
1636 if (login === 'key') return false
1637 if (!userpass || !userpass.pass) return null
1638
1639 //console.log('comparing passwords', pass, userpass.pass)
1640 if (require('bcrypt-nodejs').compareSync(pass, userpass.pass))
1641 return master.fetch(userpass.user)
1642 }
1643 function create_account (params) {
1644 if (typeof (params.login || params.name) !== 'string')
1645 throw 'no login or name'
1646 var login = (params.login || params.name).toLowerCase()
1647 if (!login ||
1648 !master.validate(params, {'?name': 'string', '?login': 'string',
1649 pass: 'string', '?email': 'string',
1650 '?key': undefined, '*': '*'}))
1651 throw 'invalid name, login, pass, or email'
1652
1653 var passes = master.fetch('users/passwords')
1654 if (passes.hasOwnProperty(login))
1655 throw 'there is already a user with that login or name'
1656
1657 // Hash password
1658 params.pass = require('bcrypt-nodejs').hashSync(params.pass)
1659
1660 // Choose account key
1661 var key = 'user/' + params.name
1662 if (!params.name)
1663 key = 'user/' + Math.random().toString(36).substring(7,13)
1664 while (master.cache.hasOwnProperty(key))
1665 key = 'user/' + Math.random().toString(36).substring(7,13)
1666
1667 // Make account object
1668 var new_account = {key: key,
1669 name: params.name,
1670 login: params.login,
1671 pass: params.pass,
1672 email: params.email }
1673 for (var k in new_account) if (!new_account[k]) delete new_account[k]
1674 master.save(new_account)
1675
1676 var users = master.fetch('users')
1677 users.all = users.all || []
1678 users.all.push(new_account)
1679 passes[login] = {user: new_account.key, pass: new_account.pass}
1680 master.save(users)
1681 master.save(passes)
1682 }
1683
1684 // Current User
1685 client('current_user').to_fetch = function (k) {
1686 client.log('* fetching: current_user')
1687 if (!conn.client) return
1688 var u = master.fetch('logged_in_clients')[conn.client]
1689 u = u && user_obj(u.key, true)
1690 return {user: u || null, logged_in: !!u}
1691 }
1692
1693 client('current_user').to_save = function (o, t) {
1694 function error (msg) {
1695 client.save.abort(o)
1696 var c = client.fetch('current_user')
1697 c.error = msg
1698 client.save(c)
1699 }
1700
1701 client.log('* saving: current_user!')
1702 if (o.client && !conn.client) {
1703 // Set the client
1704 conn.client = o.client
1705 client.client_id = o.client
1706 client.client_ip = conn.remoteAddress
1707
1708 if (conn.id) {
1709 var connections = master.fetch('connections')
1710 connections[conn.id].user = master.fetch('logged_in_clients')[conn.client]
1711 master.save(connections)
1712 }
1713 }
1714 else {
1715 if (o.create_account) {
1716 client.log('current_user: creating account')
1717 try {
1718 create_account(o.create_account)
1719 client.log('Success creating account!')
1720 var cu = client.fetch('current_user')
1721 cu.create_account = null
1722 client.save.fire(cu)
1723 } catch (e) {
1724 error('Cannot create that account because ' + e)
1725 return
1726 }
1727 }
1728
1729 if (o.login_as && conn.id) {
1730 // Then client is trying to log in
1731 client.log('current_user: trying to log in')
1732 var creds = o.login_as
1733 var login = creds.login || creds.name
1734 if (login && creds.pass) {
1735 // With a username and password
1736 var u = authenticate(login, creds.pass)
1737
1738 client.log('auth said:', u)
1739 if (u) {
1740 // Success!
1741 // Associate this user with this session
1742 // user.log('Logging the user in!', u)
1743
1744 var clients = master.fetch('logged_in_clients')
1745 var connections = master.fetch('connections')
1746
1747 clients[conn.client] = u
1748 connections[conn.id].user = u
1749
1750 master.save(clients)
1751 master.save(connections)
1752
1753 client.log('current_user: success logging in!')
1754 }
1755 else {
1756 error('Cannot log in with that information')
1757 return
1758 }
1759 }
1760 else {
1761 error('Cannot log in with that information')
1762 return
1763 }
1764 }
1765
1766 else if (o.logout && conn.id) {
1767 client.log('current_user: logging out')
1768 var clients = master.fetch('logged_in_clients')
1769 var connections = master.fetch('connections')
1770
1771 delete clients[conn.client]
1772 connections[conn.id].user = null
1773
1774 master.save(clients)
1775 master.save(connections)
1776 }
1777 }
1778
1779 t.refetch()
1780 }
1781 client('current_user').to_delete = function () {}
1782
1783 // Users have closet space at /user/<name>/*
1784 var closet_space_key = /^(user\/[^\/]+)\/.*/
1785 var private_closet_space_key = /^user\/[^\/]+\/private.*/
1786
1787 // User
1788 client('user/*').to_save = function (o) {
1789 var c = client.fetch('current_user')
1790 var user_key = o.key.match(/^user\/([^\/]+)/)
1791 user_key = user_key && ('user/' + user_key[1])
1792
1793 // Only the current user can touch themself.
1794 if (!c.logged_in || c.user.key !== user_key) {
1795 client.log('Only the current user can touch themself.',
1796 {logged_in: c.logged_in, as: c.user && c.user.key,
1797 touching: user_key})
1798 client.save.abort(o)
1799 return
1800 }
1801
1802 // Users have closet space at /user/<name>/*
1803 if (o.key.match(closet_space_key)) {
1804 client.log('saving closet data')
1805 master.save(o)
1806 return
1807 }
1808
1809 // Ok, then it must be a plain user
1810 console.assert(o.key.match(/^user\/[^\/]+$/))
1811
1812 // Validate types
1813 if (!client.validate(o, {key: 'string', '?login': 'string', '?name': 'string',
1814 '?pass': 'string', '?email': 'string', /*'?pic': 'string',*/
1815 '*':'*'})) {
1816 client.log('This user change fails validation.')
1817 client.save.abort(o)
1818 return
1819 }
1820
1821 // Rules for updating "login" and "name" attributes:
1822 // • If "login" isn't specified, then we use "name" as login
1823 // • That resulting login must be unique across all users
1824
1825 // There must be at least a login or a name
1826 var login = o.login || o.name
1827 if (!login) {
1828 client.log('User must have a login or a name')
1829 client.save.abort(o)
1830 return
1831 }
1832
1833 var u = master.fetch(o.key)
1834 var userpass = master.fetch('users/passwords')
1835
1836 // Validate that the login/name is not changed to something clobberish
1837 var old_login = u.login || u.name
1838 if (old_login.toLowerCase() !== login.toLowerCase()
1839 && userpass.hasOwnProperty(login)) {
1840 client.log('The login', login, 'is already taken. Aborting.')
1841 client.save.abort(o) // Abort
1842
1843 o = client.fetch(o.key) // Add error message
1844 o.error = 'The login "' + login + '" is already taken'
1845 client.save.fire(o)
1846
1847 return // And exit
1848 }
1849
1850 // Now we can update login and name
1851 u.login = o.login
1852 u.name = o.name
1853
1854 // Hash password
1855 o.pass = o.pass && require('bcrypt-nodejs').hashSync(o.pass)
1856 u.pass = o.pass || u.pass
1857
1858 // // Users can have pictures (remove this soon)
1859 // // Bug: if user changes name, this picture's url doesn't change.
1860 // if (o.pic && o.pic.indexOf('data:image') > -1) {
1861 // var img_type = o.pic.match(/^data:image\/(\w+);base64,/)[1]
1862 // var b64 = o.pic.replace(/^data:image\/\w+;base64,/, '')
1863 // var upload_dir = global.upload_dir
1864 // // ensure that the uploads directory exists
1865 // if (!fs.existsSync(upload_dir))
1866 // fs.mkdirSync(upload_dir)
1867
1868 // // bug: users with the same name can overwrite each other's files
1869 // u.pic = u.name + '.' + img_type
1870 // fs.writeFile(upload_dir + u.pic, b64, {encoding: 'base64'})
1871 // }
1872
1873 // For anything else, go ahead and add it to the user object
1874 var protected = {key:1, name:1, /*pic:1,*/ pass:1}
1875 for (var k in o)
1876 if (!protected.hasOwnProperty(k))
1877 u[k] = o[k]
1878 for (var k in u)
1879 if (!protected.hasOwnProperty(k) && !o.hasOwnProperty(k))
1880 delete u[k]
1881
1882 master.save(u)
1883 }
1884 client('user/*').to_fetch = function user_fetcher (k) {
1885 var c = client.fetch('current_user')
1886 client.log('* fetching:', k, 'as', c.user)
1887
1888 // Users have closet space at /user/<name>/*
1889 if (k.match(closet_space_key)) {
1890 var obj_user = k.match(closet_space_key)[1]
1891 if (k.match(private_closet_space_key)
1892 && (!c.user || obj_user !== c.user.key)) {
1893 client.log('hiding private closet data')
1894 return {}
1895 }
1896 client.log('fetching closet data')
1897 return client.clone(master.fetch(k))
1898 }
1899
1900 // Otherwise return the actual user
1901 return user_obj(k, c.logged_in && c.user.key === k)
1902 }
1903 client('user/*').to_delete = function () {}
1904 function user_obj (k, logged_in) {
1905 var o = master.clone(master.fetch(k))
1906 if (k.match(/^user\/([^\/]+)\/private\/(.*)$/))
1907 return logged_in ? o : {key: k}
1908
1909 delete o.pass
1910 if (!logged_in) {delete o.email; delete o.login}
1911 return o
1912 }
1913
1914 // Blacklist sensitive stuff on master, in case we have a shadow set up
1915 var blacklist = 'users users/passwords logged_in_clients'.split(' ')
1916 for (var i=0; i<blacklist.length; i++) {
1917 client(blacklist[i]).to_fetch = function () {}
1918 client(blacklist[i]).to_save = function () {}
1919 client(blacklist[i]).to_delete = function () {}
1920 client(blacklist[i]).to_forget = function () {}
1921 }
1922 },
1923
1924 persist: function (prefix_to_sync, validate) {
1925 var client = this
1926 var was_logged_in = undefined
1927
1928 function client_prefix (current_user) {
1929 return 'client/' + (current_user.logged_in
1930 ? current_user.user.key.substr('user/'.length)
1931 : client.client_id) + '/'
1932 }
1933
1934 function copy_client_to_user(client, user) {
1935 var old_prefix = 'client/' + client.client_id
1936 var new_prefix = 'client/' + user.key.substr('user/'.length)
1937
1938 var keys = client.master.fetch('persisted_keys/' + client.client_id)
1939 if (!keys.val) return
1940 for (var old_key in keys.val) {
1941 var new_key = new_prefix + old_key.substr(old_prefix.length)
1942 var o = client.clone(client.master.fetch(old_key))
1943 // Delete the old
1944 client.master.del(old_key)
1945
1946 var new_o = client.master.fetch(new_key)
1947 // If the new key doesn't clobber any existing data on the user...
1948 if (Object.keys(new_o).length === 1) {
1949 // Save the new
1950 o.key = new_key
1951 client.master.save(o)
1952 }
1953 }
1954 keys.val = {}
1955 client.master.save(keys)
1956
1957 // var cache = client.master.cache
1958
1959 // var keys = Object.keys(cache) // Make a copy
1960 // for (var i=0; i<keys.length; i++) { // Because we'll mutate
1961 // var old_key = keys[i] // As we iterate
1962
1963 // if (old_key.startsWith(old_prefix)) {
1964 // var new_key = new_prefix + old_key.substr(old_prefix.length)
1965 // var o = client.clone(cache[old_key])
1966 // // Delete the old
1967 // client.master.del(old_key)
1968
1969 // if (!(cache.hasOwnProperty(new_key))) {
1970 // // Save the new
1971 // o.key = new_key
1972 // client.master.save(o)
1973 // }
1974 // }
1975 // }
1976 }
1977
1978 // Copy client to user if we log in
1979 client(_=>{
1980 var c = client.fetch('current_user')
1981 if (client.loading()) return
1982 if (was_logged_in == false && c.logged_in)
1983 // User just logged in! Let's copy his stuff over
1984 copy_client_to_user(client, c.user)
1985 was_logged_in = c.logged_in
1986 })
1987
1988 client(prefix_to_sync).to_fetch = function (key) {
1989 var c = client.fetch('current_user')
1990 if (client.loading()) return
1991 var prefix = client_prefix(c)
1992
1993 // Get the state from master
1994 var obj = client.clone(client.master.fetch(prefix + key))
1995
1996 // Translate it back to client
1997 obj = client.deep_map(obj, function (o) {
1998 if (typeof o === 'object' && 'key' in o && typeof o.key === 'string')
1999 o.key = o.key.substr(prefix.length)
2000 return o
2001 })
2002 return obj
2003 }
2004
2005 client(prefix_to_sync).to_save = function (obj) {
2006 if (validate && !validate(obj)) {
2007 console.warn('Validation failed on', obj)
2008 client.save.abort(obj)
2009 return
2010 }
2011
2012 var c = client.fetch('current_user')
2013 if (client.loading()) return
2014 var prefix = client_prefix(c)
2015
2016 // Make it safe
2017 var p_keys = client_persisted_keys()
2018 obj = client.clone(obj)
2019 obj = client.deep_map(obj, function (o) {
2020 if (typeof o === 'object' && 'key' in o && typeof o.key === 'string') {
2021 o.key = prefix + o.key
2022 if (p_keys)
2023 p_keys.val[o.key] = true
2024 }
2025 return o
2026 })
2027
2028 // Save to master
2029 client.master.save(obj)
2030 p_keys && client.master.save(p_keys)
2031 }
2032
2033 client(prefix_to_sync).to_delete = function (k) {
2034 k = client_prefix(client.fetch('current_user')) + k
2035 client.master.delete(k)
2036
2037 var p_keys = client_persisted_keys()
2038 delete p_keys.val[k]
2039 client.master.save(p_keys)
2040 }
2041
2042 function client_persisted_keys () {
2043 if (client.fetch('current_user').logged_in) return
2044 var result = client.master.fetch('persisted_keys/' + client.client_id)
2045 if (result && !result.val) result.val = {}
2046 return result
2047 }
2048 },
2049
2050 shadows: function shadows (master_bus) {
2051 // Custom route
2052 var OG_route = bus.route
2053 bus.route = function(key, method, arg, t) {
2054 var count = OG_route(key, method, arg, t)
2055 // This forwards anything we don't have a specific handler for
2056 // to the global cache
2057 if (count === 0) {
2058 count++
2059 if (method === 'to_fetch')
2060 bus.run_handler(function get_from_master (k) {
2061 // console.log('DEFAULT FETCHing', k)
2062 var r = master_bus.fetch(k)
2063 // console.log('DEFAULT FETCHed', r)
2064 bus.save.fire(r, {version: master_bus.versions[r.key]})
2065 }, method, arg)
2066 else if (method === 'to_save')
2067 bus.run_handler(function save_to_master (o, t) {
2068 // console.log('DEFAULT ROUTE', t)
2069 master_bus.save(bus.clone(o), t)
2070 }, method, arg, {t: t})
2071 else if (method == 'to_delete')
2072 bus.run_handler(function delete_from_master (k, t) {
2073 master_bus.delete(k)
2074 return 'done'
2075 }, method, arg, {t: t})
2076 }
2077 return count
2078 }
2079 },
2080
2081 read_file: function init () {
2082 // The first time this is run, we initialize it by loading some
2083 // libraries
2084 var chokidar = require('chokidar')
2085 var watchers = {}
2086 var fs = require('fs')
2087
2088 // Now we redefine the function
2089 bus.read_file = bus.uncallback(
2090 function readFile (filename, encoding, cb) {
2091 fs.readFile(filename, (err, result) => {
2092 if (err) console.error('Error from read_file:', err)
2093 cb(null, ((result || '*error*').toString(encoding || undefined)))
2094 })
2095 },
2096 {
2097 callback_at: 2,
2098 start_watching: (args, dirty, del) => {
2099 var filename = args[0]
2100 console.log('## starting to watch', filename)
2101 watchers[filename] = chokidar.watch(filename, {
2102 atomic: true,
2103 disableGlobbing: true
2104 })
2105 watchers[filename].on('change', () => { dirty() })
2106 watchers[filename].on('add', () => { dirty() })
2107 watchers[filename].on('unlink', () => { del() })
2108 },
2109 stop_watching: (json) => {
2110 var filename = json[0]
2111 console.log('## stopping to watch', filename)
2112 // log('unwatching', filename)
2113 // To do: this should probably use.unwatch() instead.
2114 watchers[filename].close()
2115 delete watchers[filename]
2116 }
2117 })
2118 return bus.read_file.apply(bus, [].slice.call(arguments))
2119 },
2120
2121 // Synchronizes the recursive path starting with <state_path> to the
2122 // file or recursive directory structure at fs_path
2123 sync_files: function sync_files (state_path, file_path) {
2124 // To do:
2125 // - Hook up a to_delete handler
2126 // - recursively remove directories if all files gone
2127
2128 console.assert(state_path.substr(-1) !== '*'
2129 && (!file_path || file_path.substr(-1) !== '*'),
2130 'The sync_files paths should not end with *')
2131
2132 file_path = file_path || state_path
2133 var buffer = {}
2134 var full_file_path = require('path').join(__dirname, file_path)
2135
2136 bus(state_path + '*').to_fetch = (rest) => {
2137 // We DO want to handle:
2138 // - "foo"
2139 // - "foo/*"
2140 // But not:
2141 // - "foobar"
2142 if (rest.length>0 && rest[0] !== '/') return // Bail on e.g. "foobar"
2143
2144 var f = bus.read_file(file_path + rest, 'base64')
2145
2146 // Clear buffer of items after 1 second. If fs results are delayed
2147 // longer, we'll just deal with those flashbacks.
2148 for (k in buffer)
2149 if (new Date().getTime() - buffer[k] > 1 * 1000)
2150 delete buffer[k]
2151
2152 // If we are expecting this, skip the read
2153 // console.log('read file', typeof f == 'string' ? f.substr(0,40) + '..': f)
2154 if (buffer[f]) {
2155 console.log('skipping cause its in buffer')
2156 return
2157 }
2158
2159 return {_:f}
2160 }
2161
2162 bus(state_path + '/*').to_save = (o, rest, t) => {
2163 if (rest.length>0 && rest[0] !== '/') return
2164 var f = Buffer.from(o._, 'base64')
2165 require('fs').writeFile(file_path + rest, f)
2166 buffer[f] = new Date().getTime()
2167 t.done()
2168 }
2169
2170 bus.http.use('/'+state_path, require('express').static(full_file_path))
2171 },
2172
2173 // Installs a GET handler at route that gets state from a fetcher function
2174 // Note: Makes too many textbusses. Should re-use one.
2175 http_serve: function http_serve (route, fetcher) {
2176 var textbus = require('./statebus')()
2177 textbus.label = 'textbus'
2178 var watched = new Set()
2179 textbus('*').to_fetch = (filename, old) => {
2180 return {etag: Math.random() + '',
2181 _: fetcher(filename)}
2182 }
2183 bus.http.get(route, (req, res) => {
2184 var path = req.path
2185 var etag = textbus.cache[path] && textbus.cache[path].etag
2186 if (etag && req.get('If-None-Match') === etag) {
2187 res.status(304).end()
2188 return
2189 }
2190
2191 textbus.fetch(req.path) // So that textbus never clears the cache
2192 textbus.fetch(req.path, function cb (o) {
2193 res.setHeader('Cache-Control', 'public')
2194 // res.setHeader('Cache-Control', 'public, max-age='
2195 // + (60 * 60 * 24 * 30)) // 1 month
2196 res.setHeader('ETag', o.etag)
2197 res.setHeader('Access-Control-Allow-Origin', '*')
2198 res.setHeader('content-type', 'application/javascript')
2199 res.send(o._)
2200 textbus.forget(o.key, cb) // But we do want to forget the cb
2201 })
2202 })
2203 },
2204
2205 serve_client_coffee: function serve_client_coffee () {
2206 bus.http_serve('/client/:filename', (filename) => {
2207 filename = /\/client\/(.*)/.exec(filename)[0]
2208 var source_filename = filename.substr(1)
2209 var source = bus.read_file(source_filename)
2210 if (bus.loading()) throw 'loading'
2211 if (filename.match(/\.coffee$/)) {
2212
2213 try {
2214 var compiled = require('coffeescript').compile(source, {filename,
2215 bare: true,
2216 sourceMap: true})
2217 } catch (e) {
2218 console.error('Could not compile ' + e.toString())
2219 return 'console.error(' + JSON.stringify(e.toString()) + ')'
2220 }
2221
2222 var source_map = JSON.parse(compiled.v3SourceMap)
2223 source_map.sourcesContent = source
2224 compiled = 'window.dom = window.dom || {}\n' + compiled.js
2225 compiled = 'window.ui = window.ui || {}\n' + compiled
2226
2227 function btoa(s) { return new Buffer(s.toString(),'binary').toString('base64') }
2228
2229 // Base64 encode it
2230 compiled += '\n'
2231 compiled += '//# sourceMappingURL=data:application/json;base64,'
2232 compiled += btoa(JSON.stringify(source_map)) + '\n'
2233 compiled += '//# sourceURL=' + source_filename
2234 return compiled
2235 }
2236 else return source
2237 })
2238 },
2239
2240 serve_clientjs: function serve_clientjs (path) {
2241 path = path || 'client.js'
2242 bus.http.get('/' + path, (req, res) => {
2243 res.send(
2244 ['extras/coffee.js', 'extras/sockjs.js', 'extras/react.js',
2245 'statebus.js', 'client.js']
2246 .map((f) => fs.readFileSync('node_modules/statebus/' + f))
2247 .join(';\n'))
2248 })
2249 },
2250
2251 serve_wiki: () => {
2252 bus('edit/*').to_fetch = () => ({_: require('./extras/wiki.coffee').code})
2253 },
2254
2255 unix_socket_repl: function (filename) {
2256 var repl = require('repl')
2257 var net = require('net')
2258 var fs = require('fs')
2259 if (fs.existsSync && fs.existsSync(filename))
2260 fs.unlinkSync(filename)
2261 net.createServer(function (socket) {
2262 var r = repl.start({
2263 //prompt: '> '
2264 input: socket
2265 , output: socket
2266 , terminal: true
2267 //, useGlobal: false
2268 })
2269 r.on('exit', function () {
2270 socket.end()
2271 })
2272 r.context.socket = socket
2273 }).listen(filename)
2274 },
2275
2276 schema: function schema () {
2277 function url_tree (cache) {
2278 // The key tree looks like:
2279 //
2280 // {server: {thing: [obj1, obj2], shing: [obj1, obj2], ...}
2281 // client: {dong: [obj1, ...]}}
2282 //
2283 // And objects without a number, like 'shong' will go on:
2284 // key_tree.server.shong[null]
2285 var tree = {server: {}, client: {}}
2286 for (var key in cache) {
2287 var p = parse_key(key)
2288 if (!p) {
2289 console.log('The state dash can\'t deal with key', key)
2290 return null
2291 }
2292 tree[p.owner][p.name] || (tree[p.owner][p.name] = {})
2293 tree[p.owner][p.name][p.number || null] = cache[key]
2294 }
2295 return tree
2296 }
2297
2298 function parse_key (key) {
2299 var word = "([^/]+)"
2300 // Matching things like: "/new/name/number"
2301 // or: "/name/number"
2302 // or: "/name"
2303 // or: "name/number"
2304 // or: "name"
2305 // ... and you can optionally include a final slash.
2306 var regexp = new RegExp("(/)?(new/)?" +word+ "(/" +word+ ")?(/)?")
2307 var m = key.match(regexp)
2308 if (!m) return null
2309 // indices = [0: has_match, 1: server_owned, 2: is_new, 3: name, 5: number]
2310 var owner = m[1] ? 'server' : 'client'
2311 return m[0] && {owner:owner, 'new': m[2], name: m[3], number: m[5]}
2312 }
2313 schema.parse_key = parse_key
2314 schema.url_tree = url_tree
2315 return url_tree(bus.cache)
2316 }
2317}
2318 // Add methods to bus object
2319 for (var m in extra_methods)
2320 bus[m] = extra_methods[m]
2321
2322 bus.options = default_options(bus)
2323 set_options(bus, options)
2324
2325 // Automatically make state:// fetch over a websocket
2326 bus.net_automount()
2327 return bus
2328}
2329
2330
2331// Handy functions for writing tests on nodejs
2332var tests = []
2333function test (f) {tests.push(f)}
2334function run_tests () {
2335 // Either run the test specified at command line
2336 if (process.argv[2])
2337 tests.find((f) => f.name == process.argv[2])(
2338 ()=>process.exit()
2339 )
2340
2341 // Or run all tests
2342 else {
2343 function run_next () {
2344 if (tests.length > 0) {
2345 var f = tests.shift()
2346 delay_so_far = 0
2347 console.log('\nTesting:', f.name)
2348 f(function () {setTimeout(run_next)})
2349 } else
2350 (console.log('\nDone with all tests.'), process.exit())
2351 }
2352 run_next()
2353 }
2354}
2355function log () {
2356 var pre = ' '
2357 console.log(pre+util.format.apply(null,arguments).replace('\n','\n'+pre))
2358}
2359function assert () { console.assert.apply(console, arguments) }
2360function delay (time, f) {
2361 delay_so_far = delay_so_far + time
2362 return setTimeout(f, delay_so_far)
2363}
2364delay.init = _=> delay_so_far = 0
2365var delay_so_far = 0
2366
2367
2368// Now export everything
2369module.exports.import_server = import_server
2370module.exports.run_server = function (bus, options) { bus.serve(options) }
2371module.exports.import_module = function (statebus) {
2372 statebus.testing = {test, run_tests, log, assert, delay}
2373
2374 statebus.serve = function serve (options) {
2375 var bus = statebus()
2376 require('./server').run_server(bus, options)
2377 return bus
2378 }
2379
2380 // Handy repl. Invoke with node -e 'require("statebus").repl("/tmp/foo")'
2381 statebus.repl = function (filename) {
2382 var net = require('net')
2383 var sock = net.connect(filename)
2384
2385 process.stdin.pipe(sock)
2386 sock.pipe(process.stdout)
2387
2388 sock.on('connect', function () {
2389 process.stdin.resume();
2390 process.stdin.setRawMode(true)
2391 })
2392
2393 sock.on('close', function done () {
2394 process.stdin.setRawMode(false)
2395 process.stdin.pause()
2396 sock.removeListener('close', done)
2397 })
2398
2399 process.stdin.on('end', function () {
2400 sock.destroy()
2401 console.log()
2402 })
2403
2404 process.stdin.on('data', function (b) {
2405 if (b.length === 1 && b[0] === 4) {
2406 process.stdin.emit('end')
2407 }
2408 })
2409 }
2410}