UNPKG

92.6 kBJavaScriptView Raw
1var fs = require('fs'),
2 util = require('util')
3var unique_sockjs_string = '_connect_to_statebus_'
4
5function default_options (bus) { return {
6 port: 'auto',
7 backdoor: null,
8 client: (c) => {c.shadows(bus)},
9 file_store: {save_delay: 250, filename: 'db', backup_dir: 'backups', prefix: '*'},
10 sync_files: [{state_path: 'files', fs_path: null}],
11 serve: true,
12 certs: {private_key: 'certs/private-key',
13 certificate: 'certs/certificate',
14 certificate_bundle: 'certs/certificate-bundle'},
15 connections: {include_users: true, edit_others: true},
16 __secure: false
17}}
18
19function set_options (bus, options) {
20 var defaults = bus.clone(bus.options)
21 options = options || {}
22 for (var k in (options || {}))
23 bus.options[k] = options[k]
24
25 // Fill in defaults of nested options too
26 for (var k in {file_store:1, certs:1})
27 if (bus.options[k]) {
28 if (typeof bus.options[k] !== 'object' || bus.options[k] === null)
29 bus.options[k] = {}
30
31 for (var k2 in defaults[k])
32 if (!bus.options[k].hasOwnProperty(k2))
33 bus.options[k][k2] = defaults[k][k2]
34 }
35}
36
37function import_server (bus, options)
38{ var extra_methods = {
39
40 serve: function serve (options) {
41 var master = bus
42 bus.honk = 'statelog'
43 master.label = 'master'
44
45 // Initialize Options
46 set_options(bus, options)
47
48 var use_ssl = bus.options.certs && (
49 require('fs').existsSync(bus.options.certs.private_key)
50 || require('fs').existsSync(bus.options.certs.certificate)
51 || require('fs').existsSync(bus.options.certs.certificate_bundle))
52
53 function c (client, conn) {
54 client.honk = bus.honk
55 client.serves_auth(conn, master)
56 bus.options.client && bus.options.client(client, conn)
57 }
58 if (!bus.options.client) c = undefined // no client bus when programmer explicitly says so
59
60 if (bus.options.file_store)
61 bus.file_store()
62
63 // ******************************************
64 // ***** Create our own http server *********
65 bus.make_http_server({port: bus.options.port, use_ssl})
66 bus.sockjs_server(this.http_server, c) // Serve via sockjs on it
67 var express = require('express')
68 bus.express = express()
69 bus.http = express.Router()
70 bus.install_express(bus.express)
71
72 // use gzip compression if available
73 try { bus.http.use(require('compression')())
74 console.log('Enabled http compression!') } catch (e) {}
75
76
77 // Initialize new clients with an id. We put the client id on
78 // req.client, and also in a cookie for the browser to see.
79 if (bus.options.client)
80 bus.express.use(function (req, res, next) {
81 req.client = require('cookie').parse(req.headers.cookie || '').client
82 if (!req.client) {
83 req.client = (Math.random().toString(36).substring(2)
84 + Math.random().toString(36).substring(2)
85 + Math.random().toString(36).substring(2))
86
87 res.setHeader('Set-Cookie', 'client=' + req.client
88 + '; Expires=21 Oct 2025 00:0:00 GMT;')
89 }
90 next()
91 })
92
93 // Initialize file sync
94 ; (bus.options.sync_files || []).forEach( x => {
95 if (require('fs').existsSync(x.fs_path || x.state_path))
96 bus.sync_files(x.state_path, x.fs_path)
97 })
98
99 // User will put their routes in here
100 bus.express.use('/', bus.http)
101
102 // Add a fallback that goes to state
103 bus.express.get('*', function (req, res) {
104 // Make a temporary client bus
105 var cbus = bus.bus_for_http_client(req, res)
106
107 // Do the fetch
108 cbus.honk = 'statelog'
109 var singleton = req.path.match(/^\/code\//)
110 cbus.fetch_once(req.path.substr(1), (o) => {
111 var unwrap = (Object.keys(o).length === 2
112 && '_' in o
113 && typeof o._ === 'string')
114 // To do: translate pointers as keys
115 res.send(unwrap ? o._ : JSON.stringify(o))
116 cbus.delete_bus()
117 })
118 })
119
120 // Serve Client Coffee
121 bus.serve_client_coffee()
122
123 // Custom route
124 var OG_route = bus.route
125 bus.route = function(key, method, arg, opts) {
126 var count = OG_route(key, method, arg, opts)
127
128 // This whitelists anything we don't have a specific handler for,
129 // reflecting it to all clients!
130 if (count === 0 && method === 'to_save') {
131 bus.save.fire(arg, opts)
132 count++
133 }
134
135 return count
136 }
137
138 // Back door to the control room
139 if (bus.options.backdoor) {
140 bus.make_http_server({
141 port: bus.options.backdoor,
142 name: 'backdoor_http_server',
143 use_ssl: use_ssl
144 })
145 bus.sockjs_server(this.backdoor_http_server)
146 }
147 },
148
149 bus_for_http_client: function (req, res) {
150 var bus = this
151 if (!bus.bus_for_http_client.counter)
152 bus.bus_for_http_client.counter = 0
153 var cbus = require('./statebus')()
154 cbus.label = 'client_http' + bus.bus_for_http_client.counter++
155 cbus.master = bus
156
157 // Log in as the client
158 cbus.serves_auth({remoteAddress: req.connection.remoteAddress}, bus)
159 bus.options.client(cbus)
160 cbus.save({key: 'current_user', client: req.client})
161 return cbus
162 },
163
164 make_http_server: function make_http_server (options) {
165 options = options || {}
166 var fs = require('fs')
167
168 if (options.use_ssl) {
169 // Load with TLS/SSL
170 console.log('Encryption ON')
171
172 // use http2 compatible library if available
173 try {
174 var http = require('spdy')
175 console.log('Found spdy library. HTTP/2 enabled!')
176 } catch (e) {
177 var http = require('https')
178 }
179
180 var protocol = 'https'
181 var ssl_options = {
182 ca: (fs.existsSync(this.options.certs.certificate_bundle)
183 && require('split-ca')(this.options.certs.certificate_bundle)),
184 key: fs.readFileSync(this.options.certs.private_key),
185 cert: fs.readFileSync(this.options.certs.certificate),
186 ciphers: "ECDHE-RSA-AES256-SHA384:DHE-RSA-AES256-SHA384"
187 + ":ECDHE-RSA-AES256-SHA256:DHE-RSA-AES256-SHA256"
188 + ":ECDHE-RSA-AES128-SHA256:DHE-RSA-AES128-SHA256"
189 + ":HIGH:!aNULL:!eNULL:!EXPORT:!DES:!RC4:!MD5:!PSK:!SRP:!CAMELLIA",
190 honorCipherOrder: true}
191 }
192 else {
193 // Load unencrypted server
194 console.log('Encryption OFF')
195 var http = require('http')
196 var protocol = 'http'
197 var ssl_options = undefined
198 }
199
200 if (options.port === 'auto') {
201 var bind = require('./extras/tcp-bind')
202 function find_a_port () {
203 var next_port_attempt = 80
204 while (true)
205 try {
206 var result = bind(next_port_attempt)
207 bus.port = next_port_attempt
208 return result
209 } catch (e) {
210 if (next_port_attempt < 3006) next_port_attempt = 3006
211 else next_port_attempt++
212 }
213 }
214
215 var fd
216 if (options.use_ssl)
217 try {
218 fd = bind(443)
219 bus.port = 443
220 bus.redirect_port_80()
221 } catch (e) {fd = find_a_port()}
222 else fd = find_a_port()
223 var http_server = http.createServer(ssl_options)
224 http_server.listen({fd: fd}, () => {
225 console.log('Listening on '+protocol+'://<host>:'+bus.port)
226 })
227 }
228 else {
229 bus.port = bus.options.port
230 var http_server = http.createServer(ssl_options)
231 http_server.listen(bus.options.port, () => {
232 console.log('Listening on '+protocol+'://<host>:'+bus.port)
233 })
234 }
235
236 bus[options.name || 'http_server'] = http_server
237 },
238
239 redirect_port_80: function redirect_port_80 () {
240 var redirector = require('http')
241 redirector.createServer(function (req, res) {
242 res.writeHead(301, {"Location": "https://"+req.headers['host']+req.url})
243 res.end()
244 }).listen(80)
245 },
246
247 install_express: function install_express (express_app) {
248 this.http_server.on('request', // Install express
249 function (request, response) {
250 // But express should ignore all sockjs requests
251 if (!request.url.startsWith('/'+unique_sockjs_string+'/'))
252 express_app(request, response)
253 })
254
255 },
256 sockjs_server: function sockjs_server(httpserver, client_bus_func) {
257 var master = this
258 var client_num = 0
259 // var client_busses = {} // XXX work in progress
260 var log = master.log
261 if (client_bus_func) {
262 master.save({key: 'connections'}) // Clean out old sessions
263 var connections = master.fetch('connections')
264 }
265 var s = require('sockjs').createServer({
266 sockjs_url: 'https://cdn.jsdelivr.net/sockjs/0.3.4/sockjs.min.js',
267 disconnect_delay: 600 * 1000,
268 heartbeat_delay: 6000 * 1000
269 })
270 s.on('connection', function(conn) {
271 if (client_bus_func) {
272 // To do for pooling client busses:
273 // - What do I do with connections? Do they pool at all?
274 // - Before creating a new bus here, check to see if there's
275 // an existing one in the pool, and re-use it if so.
276 // - Count the number of connections using a client.
277 // - When disconnecting, decrement the number, and if it gets
278 // to zero, delete the client bus.
279
280 connections[conn.id] = {client: conn.id, // client is deprecated
281 id: conn.id}
282 master.save(connections)
283
284 var client = require('./statebus')()
285 client.label = 'client' + client_num++
286 master.label = master.label || 'master'
287 client.master = master
288 client_bus_func(client, conn)
289 } else
290 var client = master
291
292 var our_fetches_in = {} // Every key that this socket has fetched
293 log('sockjs_s: New connection from', conn.remoteAddress)
294 function sockjs_pubber (obj, t) {
295 // log('sockjs_pubber:', obj, t)
296 var msg = {save: obj}
297 if (t.version) msg.version = t.version
298 if (t.parents) msg.parents = t.parents
299 if (t.patch) msg.patch = t.patch
300 if (t.patch) msg.save = msg.save.key
301 msg = JSON.stringify(msg)
302
303 if (global.network_delay) {
304 console.log('>>>> DELAYING!!!', global.network_delay)
305 obj = bus.clone(obj)
306 setTimeout(() => {conn.write(msg)}, global.network_delay)
307 } else
308 conn.write(msg)
309
310 log('sockjs_s: SENT a', msg, 'to client')
311 }
312 conn.on('data', function(message) {
313 log('sockjs_s:', message)
314 try {
315 message = JSON.parse(message)
316 var method = bus.message_method(message)
317
318 // Validate the message
319 if (!((method === 'fetch'
320 && master.validate(message, {fetch: 'string',
321 '?parent': 'string', '?version': 'string'}))
322 ||
323 (method === 'save'
324 && master.validate(message, {save: '*',
325 '?parents': 'array', '?version': 'string', '?patch': 'array'})
326 && (typeof(message.save) === 'string'
327 || (typeof(message.save) === 'object'
328 && typeof(message.save.key === 'string'))))
329 ||
330 (method === 'forget'
331 && master.validate(message, {forget: 'string'}))
332 ||
333 (method === 'delete'
334 && master.validate(message, {'delete': 'string'}))))
335 throw 'validation error'
336
337 } catch (e) {
338 for (var i=0; i<4; i++) console.error('#######')
339 console.error('Received bad sockjs message from '
340 + conn.remoteAddress +': ', message, e)
341 return
342 }
343
344 switch (method) {
345 case 'fetch':
346 our_fetches_in[message.fetch] = true
347 client.fetch(message.fetch, sockjs_pubber)
348 break
349 case 'forget':
350 delete our_fetches_in[message.forget]
351 client.forget(message.forget, sockjs_pubber)
352 break
353 case 'delete':
354 client.delete(message['delete'])
355 break
356 case 'save':
357 message.version = message.version || client.new_version()
358 if (message.patch) {
359 var o = bus.cache[message.save] || {key: message.save}
360 try {
361 message.save = bus.apply_patch(o, message.patch[0])
362 } catch (e) {
363 console.error('Received bad sockjs message from '
364 + conn.remoteAddress +': ', message, e)
365 return
366 }
367 }
368 client.save(message.save,
369 {version: message.version,
370 parents: message.parents,
371 patch: message.patch})
372 if (our_fetches_in[message.save.key]) { // Store what we've seen if we
373 // might have to publish it later
374 client.log('Adding', message.save.key+'#'+message.version,
375 'to pubber!')
376 sockjs_pubber.has_seen(client, message.save.key, message.version)
377 }
378 break
379 }
380
381 // validate that our fetches_in are all in the bus
382 for (var key in our_fetches_in)
383 if (!client.fetches_in.has(key, master.funk_key(sockjs_pubber)))
384 console.trace("***\n****\nFound errant key", key,
385 'when receiving a sockjs', method, 'of', message)
386 //log('sockjs_s: done with message')
387 })
388 conn.on('close', function() {
389 log('sockjs_s: disconnected from', conn.remoteAddress, conn.id, client.id)
390 for (var key in our_fetches_in)
391 client.forget(key, sockjs_pubber)
392 if (client_bus_func) {
393 delete connections[conn.id]; master.save(connections)
394 client.delete_bus()
395 }
396 })
397
398 // Define the /connection* state!
399 if (client_bus_func && !master.options.__secure) {
400
401 // A connection
402 client('connection/*').to_fetch = function (key, star) {
403 var id = star
404 var conn = master.fetch('connections')[id]
405 if (!conn) return {error: 'connection ' + id + ' does not exist'}
406
407 var result = master.clone(conn)
408 result.key = key
409 result.id = id
410 result.client = id // Deprecated
411
412 if (master.options.connections.include_users && result.user)
413 result.user = client.fetch(result.user.key)
414 return result
415 }
416 client('connection/*').to_save = function (o, star, t) {
417 // Check permissions before editing
418 if (star !== conn.id && !master.options.connections.edit_others) {
419 t.abort()
420 return
421 }
422 var connections = master.fetch('connections')
423 var result = client.clone(o)
424 var old = connections[star]
425 delete result.key
426 result.id = star
427 result.client = star // Deprecated
428 result.user = old.user
429 connections[star] = result
430 master.save(connections)
431 }
432
433 // Your connection
434 client('connection').to_fetch = function () {
435 // subscribe to changes in authentication
436 client.fetch('current_user')
437
438 var result = client.clone(client.fetch('connection/' + conn.id))
439 delete result.key
440 return result
441 }
442 client('connection').to_save = function (o) {
443 o = client.clone(o)
444 o.key = 'connection/' + conn.id
445 client.save(o)
446 }
447
448 // All connections
449 client('connections').to_save = function noop (t) {t.abort()}
450 client('connections').to_fetch = function () {
451 var result = []
452 var conns = master.fetch('connections')
453 for (var connid in conns)
454 if (connid !== 'key')
455 result.push(client.fetch('connection/' + connid))
456
457 return {all: result}
458 }
459 }
460 })
461
462 s.installHandlers(httpserver, {prefix:'/' + unique_sockjs_string})
463 },
464
465 make_websocket: function make_websocket (url) {
466 url = url.replace(/^state:\/\//, 'wss://')
467 url = url.replace(/^istate:\/\//, 'ws://')
468 url = url.replace(/^statei:\/\//, 'ws://')
469 WebSocket = require('websocket').w3cwebsocket
470 return new WebSocket(url+'/'+unique_sockjs_string+'/websocket')
471 },
472 client_creds: function client_creds (server_url) {
473 // Right now the server just creates a different random id each time
474 // it connects.
475 return {clientid: (Math.random().toString(36).substring(2)
476 + Math.random().toString(36).substring(2)
477 + Math.random().toString(36).substring(2))}
478 },
479
480 // Deprecated
481 ws_client: function (prefix, url, account) {
482 console.error('ws_client() is deprecated; use net_mount() instead')
483 bus.net_mount(prefix, url, account) },
484 // Deprecated
485 universal_ws_client: function () {
486 console.error('calling universal_ws_client is deprecated and no longer necessary') },
487
488 file_store: (function () {
489 // Make a database
490 var fs = require('fs')
491 var db = {}
492 var db_is_ok = false
493 var pending_save = null
494 var active
495 function file_store (prefix, delay_activate) {
496 prefix = prefix || bus.options.file_store.prefix
497 var filename = bus.options.file_store.filename,
498 backup_dir = bus.options.file_store.backup_dir
499
500 // Loading db
501 try {
502 if (fs.existsSync && !fs.existsSync(filename))
503 (fs.writeFileSync(filename, '{}'), bus.log('Made a new db file'))
504 db = JSON.parse(fs.readFileSync(filename))
505 db_is_ok = true
506 // If we save before anything else is connected, we'll get this
507 // into the cache but not affect anything else
508 bus.save.fire(global.pointerify ? inline_pointers(db) : db)
509 bus.log('Read db')
510 } catch (e) {
511 console.error(e)
512 console.error('bad db file')
513 }
514
515 // Saving db
516 function save_db() {
517 if (!db_is_ok) return
518
519 console.time('saved db')
520
521 fs.writeFile(filename+'.tmp', JSON.stringify(db, null, 1), function(err) {
522 if (err) {
523 console.error('Crap! DB IS DYING!!!!', err)
524 db_is_ok = false
525 } else
526 fs.rename(filename+'.tmp', filename, function (err) {
527 if (err) {
528 console.error('Crap !! DB IS DYING !!!!', err)
529 db_is_ok = false
530 } else {
531 console.timeEnd('saved db')
532 pending_save = null
533 }
534 })
535 })
536 }
537
538 function save_later() {
539 pending_save = pending_save || setTimeout(save_db, bus.options.file_store.save_delay)
540 }
541 active = !delay_activate
542
543 // Replaces every nested keyed object with {_key: <key>}
544 function abstract_pointers (o) {
545 o = bus.clone(o)
546 var result = {}
547 for (var k in o)
548 result[k] = bus.deep_map(o[k], (o) => {
549 if (o && o.key) return {_key: o.key}
550 else return o
551 })
552 return result
553 }
554 // ...and the inverse
555 function inline_pointers (db) {
556 return bus.deep_map(db, (o) => {
557 if (o && o._key)
558 return db[o._key]
559 else return o
560 })
561 }
562 function on_save (obj) {
563 db[obj.key] = global.pointerify ? abstract_pointers(obj) : obj
564 if (active) save_later()
565 }
566 on_save.priority = true
567 bus(prefix).on_save = on_save
568 bus(prefix).to_delete = function (key) {
569 delete db[key]
570 if (active) save_later()
571 }
572 file_store.activate = function () {
573 active = true
574 save_later()
575 }
576
577 // Handling errors
578 function recover (e) {
579 if (e) {
580 process.stderr.write("Uncaught Exception:\n");
581 process.stderr.write(e.stack + "\n");
582 }
583 if (pending_save) {
584 console.log('Saving db after crash')
585 console.time()
586 fs.writeFileSync(filename, JSON.stringify(db, null, 1))
587 console.log('Saved db after crash')
588 }
589 process.exit(1)
590 }
591 process.on('SIGINT', recover)
592 process.on('SIGTERM', recover)
593 process.on('uncaughtException', recover)
594
595 // Rotating backups
596 setInterval(
597 // This copies the current db over backups/db.<curr_date> every minute
598 function backup_db() {
599 if (!db_is_ok || !backup_dir) return
600 if (fs.existsSync && !fs.existsSync(backup_dir))
601 fs.mkdirSync(backup_dir)
602
603 var d = new Date()
604 var y = d.getYear() + 1900
605 var m = d.getMonth() + 1
606 if (m < 10) m = '0' + m
607 var day = d.getDate()
608 if (day < 10) day = '0' + day
609 var date = y + '-' + m + '-' + day
610
611 //bus.log('Backing up db on', date)
612
613 require('child_process').execFile(
614 '/bin/cp', [filename, backup_dir+'/'+filename+'.'+date])
615 },
616 1000 * 60 // Every minute
617 )
618 }
619
620 return file_store
621 })(),
622
623 firebase_store: function (prefix, firebase_ref) {
624 prefix = prefix || '*'
625
626 function encode_firebase_key(k) {
627 return encodeURIComponent(k).replace(/\./g, '%2E')
628 }
629
630 function decode_firebase_key(k) {
631 return decodeURIComponent(k.replace('%2E', '.'))
632 }
633
634 bus(prefix).to_fetch = function (key, t) {
635 firebase_ref.child(encode_firebase_key(key)).on('value', function (x) {
636 t.done(x.val() || {})
637 }, function (err) { t.abort() })
638 }
639
640 bus(prefix).on_save = function (o) {
641 firebase_ref.child(encode_firebase_key(o.key)).set(o)
642 }
643
644 // bus(prefix).to_save = function (o, t) {
645 // firebase_ref.child(encode_firebase_key(o.key)).set(o, (err) => {
646 // err ? t.abort() : t.done()
647 // })
648 // }
649
650 bus(prefix).to_delete = function (key, t) {
651 firebase_ref.child(encode_firebase_key(key)).set(null, (err) => {
652 err ? t.abort() : t.done()
653 })
654 }
655
656 bus(prefix).to_forget = function (key, t) {
657 firebase_ref.child(encode_firebase_key(key)).off()
658 }
659 },
660
661 lazy_sqlite_store: function lazy_sqlite_store (opts) {
662 if (!opts) opts = {}
663 opts.lazy = true
664 bus.sqlite_store(opts)
665 },
666 fast_load_sqlite_store: function sqlite_store (opts) {
667 if (!opts) opts = {}
668 opts.dont_fire = true
669 bus.sqlite_store(opts)
670 },
671 sqlite_store: function sqlite_store (opts) {
672 var prefix = '*'
673 var open_transaction = null
674
675 if (!opts) opts = {}
676 if (!opts.filename) opts.filename = 'db.sqlite'
677 if (!opts.hasOwnProperty('inline_pointers'))
678 opts.inline_pointers = global.pointerify
679
680 // Load the db on startup
681 try {
682 var db = bus.sqlite_store_db || new (require('better-sqlite3'))(opts.filename)
683 bus.sqlite_store_db = db
684 bus.sqlite_store.load_all = load_all
685 bus.sqlite_store.all_keys = all_keys
686
687 db.pragma('journal_mode = WAL')
688 db.prepare('create table if not exists cache (key text primary key, obj text)').run()
689
690 function all_keys () {
691 var result = []
692 for (var row of db.prepare('select key from cache').iterate())
693 result.push(row.key)
694 return result
695 }
696 function load_all (options) {
697 var temp_db = {}
698
699 for (var row of db.prepare('select * from cache').iterate()) {
700 var obj = JSON.parse(row.obj)
701 temp_db[obj.key] = obj
702 }
703
704 if (opts.inline_pointers)
705 temp_db = inline_pointers(temp_db)
706
707 for (var key in temp_db)
708 if (temp_db.hasOwnProperty(key)) {
709 if (options.dont_fire)
710 bus.cache[key] = temp_db[key]
711 else
712 bus.save.fire(temp_db[key])
713 temp_db[key] = undefined
714 }
715 }
716 if (!opts.lazy) load_all(opts)
717
718 bus.log('Read ' + opts.filename)
719 } catch (e) {
720 console.error(e)
721 console.error('Bad sqlite db')
722 }
723
724 function sqlite_get (key) {
725 var x = db.prepare('select * from cache where key = ?').get([key])
726 return x ? JSON.parse(x.obj) : {}
727 }
728 if (opts.lazy)
729 // Add fetch handler
730 bus(prefix).to_fetch = function (key, t) {
731 var x = (bus.cache[key] && !bus.pending_fetches[key])
732 || sqlite_get(key)
733 if (opts.inline_pointers) x = inline_pointers_singleobj(x)
734 x = bus.deep_map(x, (o) => o && o.key ? sqlite_get(o.key) : o)
735 t.done(x)
736 }
737
738
739 // Add save handlers
740 function on_save (obj) {
741 if (opts.inline_pointers)
742 obj = abstract_pointers(obj)
743
744 if (opts.use_transactions && !open_transaction){
745 console.time('save db')
746 db.prepare('BEGIN TRANSACTION').run()
747 }
748
749 db.prepare('replace into cache (key, obj) values (?, ?)').run(
750 [obj.key, JSON.stringify(obj)])
751
752 if (opts.use_transactions && !open_transaction) {
753 open_transaction = setTimeout(function(){
754 console.log('Committing transaction to database')
755 db.prepare('COMMIT').run()
756 open_transaction = false
757 console.timeEnd('save db')
758 })
759 }
760 }
761 if (opts.save_sync) {
762 var old_route = bus.route
763 bus.route = function (key, method, arg, t) {
764 if (method === 'to_save') on_save(arg)
765 return old_route(key, method, arg, t)
766 }
767 } else {
768 on_save.priority = true
769 bus(prefix).on_save = on_save
770 }
771 bus(prefix).to_delete = function (key) {
772 if (opts.use_transactions && !open_transaction){
773 console.time('save db')
774 db.prepare('BEGIN TRANSACTION').run()
775 }
776 db.prepare('delete from cache where key = ?').run([key])
777 if (opts.use_transactions && !open_transaction)
778 open_transaction = setTimeout(function(){
779 console.log('committing')
780 db.prepare('COMMIT').run()
781 open_transaction = false
782 console.timeEnd('save db')
783 })
784 }
785
786 // Replaces every nested keyed object with {_key: <key>}
787 function abstract_pointers (o) {
788 o = bus.clone(o)
789 var result = {}
790 for (var k in o)
791 result[k] = bus.deep_map(o[k], (o) => {
792 if (o && o.key) return {_key: o.key}
793 else return o
794 })
795 return result
796 }
797 // ...and the inverse
798 function inline_pointers (db) {
799 return bus.deep_map(db, (o) => {
800 if (o && o._key)
801 return db[o._key]
802 else return o
803 })
804 }
805 function inline_pointers_singleobj (obj) {
806 return bus.deep_map(obj, (o) => (o && o._key)
807 ? bus.cache[o._key] : o)
808 }
809
810 // Rotating backups
811 setInterval(
812 // Copy the current db over backups/db.<curr_date> every minute
813 //
814 // Note: in future we might want to use db.backup():
815 // https://github.com/JoshuaWise/better-sqlite3/blob/master/docs/api.md#backupdestination-options---promise
816 function backup_db() {
817 if (opts.backups === false) return
818 var backup_dir = opts.backup_dir || 'backups'
819 if (fs.existsSync && !fs.existsSync(backup_dir))
820 fs.mkdirSync(backup_dir)
821
822 var d = new Date()
823 var y = d.getYear() + 1900
824 var m = d.getMonth() + 1
825 if (m < 10) m = '0' + m
826 var day = d.getDate()
827 if (day < 10) day = '0' + day
828 var date = y + '-' + m + '-' + day
829
830 require('child_process').execFile(
831 'sqlite3',
832 [opts.filename, '.backup '+"'"+backup_dir+'/'+opts.filename+'.'+date+"'"])
833 },
834 1000 * 60 // Every minute
835 )
836 },
837
838 pg_store: function pg_store (opts) {
839 opts = opts || {}
840 opts.prefix = opts.prefix || '*'
841
842 // Load the db on startup
843 try {
844 var db = new require('pg-native')()
845 bus.pg_db = db
846 bus.pg_save = pg_save
847 db.connectSync(opts.url)
848 db.querySync('create table if not exists store (key text primary key, value jsonb)')
849
850 var rows = db.querySync('select * from store')
851 rows.forEach(r => bus.save(inline_pointers(r.value, bus)))
852
853 bus.log('Read ' + opts.url)
854 } catch (e) {
855 console.error(e)
856 console.error('Bad pg db')
857 }
858
859 // Add save handlers
860 function pg_save (obj) {
861 obj = abstract_pointers(obj)
862
863 db.querySync('insert into store (key, value) values ($1, $2) '
864 + 'on conflict (key) do update set value = $2',
865 [obj.key, JSON.stringify(obj)])
866 }
867 pg_save.priority = true
868 bus(opts.prefix).on_save = pg_save
869 bus(opts.prefix).to_delete = function (key) {
870 db.query('delete from store where key = $1', [key])
871 }
872
873 // Replaces every nested keyed object with {_key: <key>}
874 function abstract_pointers (o) {
875 o = bus.clone(o)
876 var result = {}
877 for (var k in o)
878 result[k] = bus.deep_map(o[k], (x) => {
879 if (x && x.key) return {_key: x.key}
880 else return x
881 })
882 return result
883 }
884 // ...and the inverse
885 function inline_pointers (obj, bus) {
886 return bus.deep_map(obj, (o) => {
887 if (o && o._key) {
888 if (!bus.cache[o._key])
889 bus.cache[o._key] = {key: o._key}
890 return bus.cache[o._key]
891 } else return o
892 })
893 }
894 },
895
896 setup_usage_log (opts) {
897 bus.serve_time()
898 opts = opts || {}
899 opts.filename = opts.filename || 'db.sqlite'
900 var db = new (require('better-sqlite3'))(opts.filename)
901 bus.usage_log_db = db
902 //db.pragma('journal_mode = WAL')
903 db.prepare('create table if not exists usage (date integer, event text, details text)').run()
904 db.prepare('create index if not exists date_index on usage (date)').run()
905 var refresh_interval = 1000*60
906
907 var nots = ['details not like "%facebookexternalhit%"',
908 'details not like "%/apple-touch-icon%"',
909 'details not like "%Googlebot%"',
910 'details not like "%AdsBot-Google%"',
911 'details not like "%Google-Adwords-Instant%"',
912 'details not like "%Apache-HttpClient%"',
913 'details not like "%SafeDNSBot%"',
914 'details not like "%RevueBot%"',
915 'details not like "%MetaURI API%"',
916 'details not like "%redback/v%"',
917 'details not like "%Slackbot%"',
918 'details not like "%HTTP_Request2/%"',
919 'details not like "%python-requests/%"',
920 'details not like "%LightspeedSystemsCrawler/%"',
921 'details not like "%CipaCrawler/%"',
922 'details not like "%Twitterbot/%"',
923 'details not like "%Go-http-client/%"',
924 'details not like "%/cheese_service%"'
925 ].join(' and ')
926 bus.usage_log_nots = nots
927
928 // Aggregate all accesses by day, to get daily active users
929 bus('usage').to_fetch = () => {
930 bus.fetch('time/' + refresh_interval)
931 var days = []
932 var last_day
933 for (var row of db.prepare('select * from usage where '
934 + nots + ' order by date').iterate()) {
935 row.details = JSON.parse(row.details)
936 if (row.details.agent && row.details.agent.match(/bot/)) continue
937
938 var d = new Date(row.date * 1000)
939 var day = d.getFullYear() + '-' + (d.getMonth()+1) + '-' + d.getDate()
940 if (last_day !== day)
941 days.push({day: day, // Init
942 clients: new Set(),
943 ips: new Set(),
944 client_socket_opens: new Set(),
945 ip_socket_opens: new Set()
946 })
947 last_day = day
948
949 if (row.event === 'socket open') {
950 days[days.length-1].client_socket_opens.add(row.details.client)
951 days[days.length-1].ip_socket_opens.add(row.details.ip)
952 }
953 days[days.length-1].clients.add(row.details.client)
954 days[days.length-1].ips.add(row.details.ip)
955 }
956
957 for (var i=0; i<days.length; i++)
958 days[i] = {day: days[i].day,
959 ip_hits: days[i].ips.size,
960 client_hits: days[i].clients.size,
961 client_socket_opens: days[i].client_socket_opens.size,
962 ip_socket_opens: days[i].ip_socket_opens.size
963 }
964
965 return {_: days}
966 }
967
968 bus('recent_hits/*').to_fetch = (rest) => {
969 bus.fetch('time/' + refresh_interval)
970 var result = []
971 for (var row of db.prepare('select * from usage where '
972 + nots + ' order by date desc limit ?').iterate(
973 [parseInt(rest)])) {
974
975 row.details = JSON.parse(row.details)
976 if (row.details.agent && row.details.agent.match(/bot/)) continue
977
978 result.push({url: row.details.url, ip: row.details.ip, date: row.date})
979 }
980
981 return {_: result}
982 }
983
984 bus('recent_referers/*').to_fetch = (rest) => {
985 bus.fetch('time/' + refresh_interval)
986 var result = []
987 for (var row of db.prepare('select * from usage where '
988 + nots + ' order by date desc limit ?').iterate(
989 [parseInt(rest)])) {
990
991 row.details = JSON.parse(row.details)
992 if (row.details.agent && row.details.agent.match(/bot/)) continue
993
994 if (row.details.referer && !row.details.referer.match(/^https:\/\/cheeseburgertherapy.com/))
995 result.push({url: row.details.url, referer: row.details.referer,
996 date: row.date})
997 }
998
999 return {_: result}
1000 }
1001
1002
1003 function sock_open_time (sock_event) {
1004 var client = JSON.parse(sock_event.details).client
1005 if (!client) return null
1006
1007 var http_req = db.prepare('select * from usage where event = "http request" and date < ? and '
1008 + ' details like ? and '
1009 + nots + ' order by date desc limit 1').get([sock_event.date,
1010 '%'+client+'%'])
1011 if (!http_req) return null
1012
1013 var delay = sock_event.date - http_req.date
1014 var res = !delay || delay > 300 ? 'fail' : delay
1015 if (delay && delay < 300
1016 && JSON.parse(sock_event.details).ip
1017 !== JSON.parse(http_req.details).ip)
1018 console.error('Yuck!', delay, sock_event, http_req)
1019 return [res, JSON.parse(http_req.details).url]
1020 }
1021 function sock_open_times () {
1022 var opens = db.prepare('select * from usage where event = "socket open" and '
1023 + nots + ' order by date desc limit ?').all(500)
1024 var times = []
1025 for (var i=0; i<opens.length; i++) {
1026 times.push(sock_open_time(opens[i]))
1027 // Get the most recent http hit before this open, from the same client id
1028 // subract the times
1029 }
1030 return times
1031 }
1032
1033 bus('socket_load_times').to_fetch = () => {
1034 return {_: sock_open_times()}
1035 }
1036 },
1037 log_usage(event, details) {
1038 bus.usage_log_db.prepare('insert into usage (date, event, details) values (?, ?, ?)')
1039 .run([new Date().getTime()/1000,
1040 event,
1041 JSON.stringify(details)])
1042 },
1043
1044 serve_time () {
1045 if (bus('time*').to_fetch.length > 0)
1046 // Then it's already installed
1047 return
1048
1049 // Time ticker
1050 var timeouts = {}
1051 bus('time*').to_fetch =
1052 function (key, rest, t) {
1053 if (key === 'time') rest = '/1000'
1054 timeout = parseInt(rest.substr(1))
1055 function f () { bus.save.fire({key: key, time: Date.now()}) }
1056 timeouts[key] = setInterval(f, timeout)
1057 f()
1058 }
1059 bus('time*').to_forget =
1060 function (key, rest) {
1061 if (key === 'time') key = 'time/1000'
1062 clearTimeout(timeouts[key])
1063 }
1064 },
1065
1066 smtp (opts) {
1067 var bus = this
1068 // Listen for SMTP messages on port 25
1069 if (opts.domain) {
1070 var mailin = require('mailin')
1071 mailin.start({
1072 port: opts.port || 25,
1073 disableWebhook: true,
1074 host: opts.domain,
1075 smtpOptions: { SMTPBanner: opts.domain }
1076 })
1077 // Event emitted after a message was received and parsed.
1078 mailin.on('message', (connection, msg, raw) => {
1079 if (!msg.messageId) {
1080 console.log('Aborting message without id!', msg.subject, new Date().toLocaleString())
1081 return
1082 }
1083
1084 console.log(msg)
1085 console.log('Refs is', msg.references)
1086 console.log('Raw is', raw)
1087 var parent = msg.references
1088 if (parent) {
1089 if (Array.isArray(parent))
1090 parent = parent[0]
1091 var m = parent.match(/\<(.*)\>/)
1092 if (m && m[1])
1093 parent = m[1]
1094 }
1095 var from = msg.from
1096
1097 console.log('date is', msg.date, typeof(msg.date), msg.date.getTime())
1098 email = {
1099 key: "email/" + msg.messageId,
1100 _: {
1101 title: msg.subject,
1102 parent: parent && ("email/" + parent) || undefined,
1103 from: msg.from.address,
1104 to: msg.to.map(x=>x.address),
1105 cc: msg.cc.map(x=>x.address),
1106 date: msg.date.getTime() / 1000,
1107 text: msg.text,
1108 body: msg.text,
1109 html: msg.html
1110 }
1111 }
1112
1113 bus.save(email)
1114 })
1115 }
1116 },
1117
1118 serve_email (master, opts) {
1119 opts = opts || {}
1120 var client = this
1121 var email_regex = /^(([^<>()\[\]\\.,;:\s@"]+(\.[^<>()\[\]\\.,;:\s@"]+)*)|(".+"))@((\[[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}])|(([a-zA-Z\-0-9]+\.)+[a-zA-Z]{2,}))$/
1122 var peemail_regex = /^public$|^(([^<>()\[\]\\.,;:\s@"]+(\.[^<>()\[\]\\.,;:\s@"]+)*)|(".+"))@((\[[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}])|(([a-zA-Z\-0-9]+\.)+[a-zA-Z]{2,}))$/
1123 // var local_addie_regex = new RegExp('state://' + opts.domain + '/user/([^\/]+)')
1124 // Todo on Server:
1125 // - Connect with mailin, so we can receive SMTP shit
1126 //
1127 // - Is there security hole if users have a ? or a / in their name?
1128 // - Make the master.posts/ state not require /
1129 // - Make standard url tools for optional slashes, and ? query params
1130 // - Should a e.g. client to_save abort if it calls save that aborts?
1131 // - e.g. if master('posts*').to_save aborts
1132
1133 // Helpers
1134 function get_posts (args) {
1135 console.log('getting posts with', args)
1136 var can_see = [{cc: ['public']}]
1137 if (args.for)
1138 can_see.push({to: [args.for]},
1139 {cc: [args.for]},
1140 {from: [args.for]})
1141
1142 terms = '(' + can_see
1143 .map(x => "value @> '"+JSON.stringify({_:x})+"'")
1144 .join(' or ')
1145 + ')'
1146
1147 if (args.about) {
1148 // console.log('getting one about', args.about)
1149 var interested_in = [{to: [args.about]},
1150 {cc: [args.about]},
1151 {from: [args.about]}]
1152 terms += ' and (' + interested_in
1153 .map(x => "value @> '"+JSON.stringify({_:x})+"'")
1154 .join(' or ')
1155 + ')'
1156 }
1157
1158 var q = "select value from store where key like 'post/%' and " + terms
1159 q += " order by value #>'{_,date}' desc"
1160 if (args.to) q += ' limit ' + to
1161 console.log('with query', q)
1162 return master.pg_db.querySync(q).map(x=>x.value)
1163 }
1164 function post_children (post) {
1165 return master.pg_db.querySync(
1166 "select value from store where value #>'{_,parent}' = '"
1167 + JSON.stringify(post.key)
1168 + "' order by value #>'{_,date}' asc").map(x=>x.value)
1169 }
1170
1171 // function canonicalize_address (addie) {
1172 // if (opts.domain) {
1173 // // Try foo@gar.boo
1174 // var m = addie.match(email_regex)
1175 // if (m && m[5].toLowerCase() === opts.domain)
1176 // return 'user/' + m[1]
1177
1178 // // Try state://gar.boo/user/foo
1179 // m = addie.match(local_addie_regex)
1180 // if (m) return 'user/' + m[1]
1181 // }
1182 // return addie
1183 // }
1184
1185 // Define state on master
1186 if (master('posts_for/*').to_fetch.length === 0) {
1187 // Get posts for each user
1188 master('posts_for/*').to_fetch = (json) => {
1189 watch_for_dirt('posts-for/' + json.for)
1190 return {_: get_posts(json)}
1191 }
1192 // Saving any post will dirty the list for all users mentioned in
1193 // the post
1194 master('post/*').to_save = (old, New, t) => {
1195 // To do: diff the cc, to, and from lists, and only dirty
1196 // posts_for people who have been changed
1197
1198 if (!old._) old = {_:{to: [], from: [], cc: []}}
1199
1200 // old._.to = old._.to.map(a=>canonicalize_address(a))
1201 // old._.cc = old._.cc.map(a=>canonicalize_address(a))
1202 // old._.from = old._.from.map(a=>canonicalize_address(a))
1203 // New._.to = New._.to.map(a=>canonicalize_address(a))
1204 // New._.cc = New._.cc.map(a=>canonicalize_address(a))
1205 // New._.from = New._.from.map(a=>canonicalize_address(a))
1206
1207 var dirtied = old._.to.concat(New._.to)
1208 .concat(old._.cc).concat(New._.cc)
1209 .concat(old._.from).concat(New._.from)
1210 dirtied.forEach(u=>dirty('posts-for/' + u))
1211
1212 if (old._.parent) dirty(old._.parent)
1213 if (New._.parent) dirty(New._.parent)
1214
1215 t.done(New)
1216 }
1217 }
1218
1219 function user_addy (u) {
1220 return u.name + '@' + opts.domain
1221 }
1222 function current_addy () {
1223 var c = client.fetch('current_user')
1224 return c.logged_in ? user_addy(c.user) : 'public'
1225 }
1226 function is_author (post) {
1227 var from = post._.from
1228 var c = client.fetch('current_user')
1229 return from.includes(current_addy())
1230 }
1231 function can_see (post) {
1232 var c = client.fetch('current_user')
1233 var allowed = post._.to.concat(post._.cc).concat(post._.from)
1234 return allowed.includes(current_addy()) || allowed.includes('public')
1235 }
1236 var drtbus = master//require('./statebus')()
1237 function dirty (key) { drtbus.save({key: 'dirty-'+key, n: Math.random()}) }
1238 function watch_for_dirt (key) { drtbus.fetch('dirty-'+key) }
1239
1240 client('current_email').to_fetch = () => {
1241 return {_: current_addy()}
1242 }
1243
1244 // Define state on client
1245 client('posts*').to_fetch = (k, rest) => {
1246 var args = bus.parse(rest.substr(1))
1247 var c = client.fetch('current_user')
1248 args.for = current_addy()
1249 var e = master.fetch('posts_for/' + JSON.stringify(args))
1250 return {_: master.clone(e._).map(x=>client.fetch(x.key))}
1251 }
1252
1253 client('post/*').to_fetch = (k) => {
1254 var e = master.clone(master.fetch(k))
1255 if (!e._) return {}
1256 if (!can_see(e))
1257 return {}
1258
1259 e._.children = post_children(e).map(e=>e.key)
1260 watch_for_dirt(k)
1261
1262 return e
1263 }
1264
1265 client('post/*').to_save = (o, t) => {
1266 if (!(client.validate(o, {key: 'string',
1267 _: {to: 'array',
1268 cc: 'array',
1269 from: 'array',
1270 date: 'number',
1271 '?parent': 'string',
1272 body: 'string',
1273 '?title': 'string',
1274 '*': '*'}})
1275 && o._.to.every(a=>a.match(peemail_regex))
1276 && o._.cc.every(a=>a.match(peemail_regex))
1277 && o._.from.every(a=>a.match(peemail_regex)))) {
1278 console.error('post no be valid', o)
1279 t.abort()
1280 return
1281 }
1282
1283 var c = client.fetch('current_user')
1284
1285 // Make sure this user is an author
1286 var from = o._.from
1287 if (!is_author(o)) {
1288 console.error('User', current_addy(),
1289 'is not author', o._.from)
1290 t.abort()
1291 return
1292 }
1293 o = client.clone(o)
1294 delete o.children
1295
1296 master.save(o)
1297 t.done()
1298 }
1299
1300 client('post/*').to_delete = (key, o, t) => {
1301 // Validate
1302 if (!is_author(o)) {
1303 console.error('User', current_addy(),
1304 'is not author', o._.from)
1305 t.abort()
1306 return
1307 }
1308
1309 // master.delete(key)
1310 master.pg_db.query('delete from store where key = $1', [key])
1311
1312 // Dirty everybody
1313 var dirtied = o._.to.concat(o._.cc).concat(o._.from)
1314 dirtied.forEach(u => dirty('posts-for/' + u))
1315 if (o._.parent) dirty(o._.parent)
1316
1317 // To do: handle nested threads. Either detach them, or insert a
1318 // 'deleted' stub, or splice together
1319
1320 // Complete
1321 t.done()
1322 }
1323
1324 client('friends').to_fetch = t => {
1325 return {_: (master.fetch('users').all||[])
1326 .map(u=>client.fetch('email/' + user_addy(u)))
1327 .concat([client.fetch('email/public')])
1328 }
1329 }
1330
1331 client('email/*').to_fetch = (rest) => {
1332 var m = rest.match(email_regex)
1333 var result = {address: rest}
1334 if (rest === 'public') {
1335 result.name = 'public'
1336 }
1337 else if (m && m[5].toLowerCase() === opts.domain) {
1338 result.user = client.fetch('user/' + m[1])
1339 result.name = result.user.name
1340 result.pic = result.user.pic
1341 result.upgraded = true
1342 }
1343 else if (m) {
1344 result.name = m[1]
1345 result.upgraded = false
1346 }
1347
1348 return result
1349 }
1350 },
1351
1352 sqlite_query_server: function sqlite_query_server (db) {
1353 var fetch = bus.fetch
1354 bus('table_columns/*').to_fetch =
1355 function fetch_table_columns (key, rest) {
1356 if (typeof key !== 'string')
1357 console.log(handlers.hash)
1358 var table_name = rest
1359 var columns = fetch('sql/PRAGMA table_info(' + table_name + ')').rows.slice()
1360 var foreign_keys = fetch('table_foreign_keys/' + table_name)
1361 var column_info = {}
1362 for (var i=0;i< columns .length;i++) {
1363 var col = columns[i].name
1364 column_info[col] = columns[i]
1365 // if (col === 'customer' || col === 'customers')
1366 // console.log('FOR CUSTOMER, got', col, foreign_keys[col])
1367 column_info[col].foreign_key = foreign_keys[col] && foreign_keys[col].table
1368 columns[i] = col
1369 }
1370 columns.splice(columns[columns.indexOf('id')], 1)
1371 column_info['key'] = column_info['id']
1372 delete column_info['id']
1373 return {columns:columns, column_info:column_info}
1374 }
1375
1376
1377 bus('table_foreign_keys/*').to_fetch =
1378 function table_foreign_keys (key, rest) {
1379 var table_name = rest
1380 var foreign_keys = fetch('sql/PRAGMA foreign_key_list(' + table_name + ')').rows
1381 var result = {}
1382 for (var i=0;i< foreign_keys .length;i++)
1383 result[foreign_keys[i].from] = foreign_keys[i]
1384 delete result.id
1385 result.key = key
1386 return result
1387 }
1388
1389 bus('sql/*').to_fetch =
1390 function sql (key, rest) {
1391 fetch('timer/60000')
1392 var query = rest
1393 try { query = JSON.parse(query) }
1394 catch (e) { query = {stmt: query, args: []} }
1395
1396 db.all(query.stmt, query.args,
1397 function (err, rows) {
1398 if (rows) bus.save.fire({key:key, rows: rows})
1399 else console.error('Bad sqlite query', key, err)
1400 }.bind(this))
1401 }
1402 },
1403
1404 sqlite_table_server: function sqlite_table_server(db, table_name) {
1405 var save = bus.save, fetch = bus.fetch
1406 var table_columns = fetch('table_columns/'+table_name) // This will fail if used too soon
1407 var foreign_keys = fetch('table_foreign_keys/'+table_name)
1408 var remapped_keys = fetch('remapped_keys')
1409 remapped_keys.keys = remapped_keys.keys || {}
1410 remapped_keys.revs = remapped_keys.revs || {}
1411 function row_json (row) {
1412 var result = {key: table_name + '/' + row.id}
1413 for (var k in row)
1414 if (row.hasOwnProperty(k))
1415 result[k] = (foreign_keys[k] && row[k]
1416 ? foreign_keys[k].table + '/' + row[k]
1417 : result[k] = row[k])
1418 if (result.hasOwnProperty('other')) result.other = JSON.parse(result.other || '{}')
1419 delete result.id
1420 return result
1421 }
1422 function json_values (json) {
1423 var columns = table_columns.columns
1424 var result = []
1425 for (var i=0; i<columns.length; i++) {
1426 var col = columns[i]
1427 var val = json[col]
1428
1429 // JSONify the `other' column
1430 if (col === 'other')
1431 val = JSON.stringify(val || {})
1432
1433 // Convert foreign keys from /customer/3 to 3
1434 else if (foreign_keys[col] && typeof val === 'string') {
1435 val = remapped_keys.keys[val] || val
1436 val = json[columns[i]].split('/')[1]
1437 }
1438
1439 result.push(val)
1440 }
1441 return result
1442 }
1443 function render_table () {
1444 var result = []
1445 db.all('select * from ' + table_name, function (err, rows) {
1446 if (err) console.error('Problem with table!', table_name, err)
1447 for (var i=0; i<rows.length; i++)
1448 result[i] = row_json(rows[i])
1449 bus.save.fire({key: table_name, rows: result})
1450 })
1451 }
1452 function render_row(obj) {
1453 bus.save.fire(row_json(obj))
1454 if (remapped_keys.revs[obj.key]) {
1455 var alias = bus.clone(obj)
1456 alias.key = remapped_keys.revs[obj.key]
1457 bus.save.fire(row_json(alias))
1458 }
1459 }
1460
1461 // ************************
1462 // Handlers!
1463 // ************************
1464
1465 // Fetching the whole table, or a single row
1466 bus(table_name + '*').to_fetch = function (key, rest) {
1467 if (rest === '')
1468 // Return the whole table
1469 return render_table()
1470
1471 if (rest[0] !== '/') return {error: 'bad key: ' + key}
1472 key = remapped_keys.keys[key] || key
1473
1474 var id = key.split('/')[1]
1475 db.get('select * from '+table_name+' where rowid = ?',
1476 [id],
1477 function (err, row) {
1478 if (!row)
1479 { console.log('Row', id, "don't exist.", err); return }
1480
1481 render_row(row)
1482 }.bind(this))
1483 }
1484
1485 // Saving a row
1486 bus(table_name + '/*').to_save = function (obj, rest) {
1487 var columns = table_columns.columns
1488 var key = remapped_keys.keys[obj.key] || obj.key
1489
1490 // Compose the query statement
1491 var stmt = 'update ' + table_name + ' set '
1492 var rowid = rest
1493 var vals = json_values(obj)
1494 for (var i=0; i<columns.length; i++) {
1495 stmt += columns[i] + ' = ?'
1496 //vals.push(obj[columns[i]])
1497 if (i < columns.length - 1)
1498 stmt += ', '
1499 }
1500 stmt += ' where rowid = ?'
1501 vals.push(rowid)
1502
1503 // Run the query
1504 db.run(stmt, vals,
1505 function (e,r) {
1506 console.log('updated',e,r,key)
1507 bus.dirty(key)
1508 })
1509 }
1510
1511 // Inserting a new row
1512 bus('new/' + table_name + '/*').to_save = function (obj) {
1513 var columns = table_columns.columns
1514 var stmt = ('insert into ' + table_name + ' (' + columns.join(',')
1515 + ') values (' + new Array(columns.length).join('?,') + '?)')
1516 var values = json_values(obj)
1517
1518 console.log('Sqlite:' + stmt)
1519
1520 db.run(stmt, values, function (error) {
1521 if (error) console.log('INSERT error!', error)
1522 console.log('insert complete, got id', this.lastID)
1523 remapped_keys.keys[obj.key] = table_name + '/' + this.lastID
1524 remapped_keys.revs[remapped_keys.keys[obj.key]] = obj.key
1525 save(remapped_keys)
1526 render_table()
1527 })
1528 }
1529
1530 // Deleting a row
1531 bus(table_name + '/*').to_delete = function (key, rest) {
1532 if (remapped_keys.keys[key]) {
1533 var old_key = key
1534 var new_key = remapped_keys.keys[key]
1535 delete remapped_keys.keys[old_key]
1536 delete remapped_keys.revs[new_key]
1537 key = new_key
1538 }
1539
1540 var stmt = 'delete from ' + table_name + ' where rowid = ?'
1541 var rowid = rest
1542 console.log('DELETE', stmt)
1543 db.run(stmt, [rowid],
1544 function (err) {
1545 if (err) console.log('DELETE error', err)
1546 else console.log('delete complete')
1547 render_table() })
1548 }
1549 },
1550
1551 serves_auth: function serves_auth (conn, master) {
1552 var client = this // to keep me straight while programming
1553
1554 // Initialize master
1555 if (!master.auth_initialized) {
1556 master('users/passwords').to_fetch = function (k) {
1557 var result = {key: 'users/passwords'}
1558 var users = master.fetch('users')
1559 users.all = users.all || []
1560 for (var i=0; i<users.all.length; i++) {
1561 var u = master.fetch(users.all[i])
1562 var login = (u.login || u.name).toLowerCase()
1563 console.assert(login, 'Missing login for user', u)
1564 if (result.hasOwnProperty(login)) {
1565 console.error("upass: this user's name is bogus, dude.", u.key)
1566 continue
1567 }
1568 result[login] = {user: u.key, pass: u.pass}
1569 }
1570 return result
1571 }
1572 master.auth_initialized = true
1573 master.fetch('users/passwords')
1574 }
1575
1576 // Authentication functions
1577 function authenticate (login, pass) {
1578 var userpass = master.fetch('users/passwords')[login.toLowerCase()]
1579 master.log('authenticate: we see',
1580 master.fetch('users/passwords'),
1581 userpass && userpass.pass,
1582 pass)
1583
1584 if (!(typeof login === 'string' && typeof pass === 'string')) return false
1585 if (login === 'key') return false
1586 if (!userpass || !userpass.pass) return null
1587
1588 //console.log('comparing passwords', pass, userpass.pass)
1589 if (require('bcrypt-nodejs').compareSync(pass, userpass.pass))
1590 return master.fetch(userpass.user)
1591 }
1592 function create_account (params) {
1593 if (typeof (params.login || params.name) !== 'string')
1594 throw 'no login or name'
1595 var login = (params.login || params.name).toLowerCase()
1596 if (!login ||
1597 !master.validate(params, {'?name': 'string', '?login': 'string',
1598 pass: 'string', '?email': 'string',
1599 '?key': undefined, '*': '*'}))
1600 throw 'invalid name, login, pass, or email'
1601
1602 var passes = master.fetch('users/passwords')
1603 if (passes.hasOwnProperty(login))
1604 throw 'there is already a user with that login or name'
1605
1606 // Hash password
1607 params.pass = require('bcrypt-nodejs').hashSync(params.pass)
1608
1609 // Choose account key
1610 var key = 'user/' + params.name
1611 if (!params.name)
1612 key = 'user/' + Math.random().toString(36).substring(7,13)
1613 while (master.cache.hasOwnProperty(key))
1614 key = 'user/' + Math.random().toString(36).substring(7,13)
1615
1616 // Make account object
1617 var new_account = {key: key,
1618 name: params.name,
1619 login: params.login,
1620 pass: params.pass,
1621 email: params.email }
1622 for (var k in new_account) if (!new_account[k]) delete new_account[k]
1623
1624 var users = master.fetch('users')
1625 users.all = users.all || []
1626 users.all.push(new_account)
1627 passes[login] = {user: new_account.key, pass: new_account.pass}
1628 master.save(users)
1629 master.save(passes)
1630 }
1631
1632 // Current User
1633 client('current_user').to_fetch = function (k) {
1634 client.log('* fetching: current_user')
1635 if (!conn.client) return
1636 var u = master.fetch('logged_in_clients')[conn.client]
1637 u = u && user_obj(u.key, true)
1638 return {user: u || null, logged_in: !!u}
1639 }
1640
1641 client('current_user').to_save = function (o, t) {
1642 function error (msg) {
1643 client.save.abort(o)
1644 var c = client.fetch('current_user')
1645 c.error = msg
1646 client.save(c)
1647 }
1648
1649 client.log('* saving: current_user!')
1650 if (o.client && !conn.client) {
1651 // Set the client
1652 conn.client = o.client
1653 client.client_id = o.client
1654 client.client_ip = conn.remoteAddress
1655
1656 if (conn.id) {
1657 var connections = master.fetch('connections')
1658 connections[conn.id].user = master.fetch('logged_in_clients')[conn.client]
1659 master.save(connections)
1660 }
1661 }
1662 else {
1663 if (o.create_account) {
1664 client.log('current_user: creating account')
1665 try {
1666 create_account(o.create_account)
1667 client.log('Success creating account!')
1668 var cu = client.fetch('current_user')
1669 cu.create_account = null
1670 client.save.fire(cu)
1671 } catch (e) {
1672 error('Cannot create that account because ' + e)
1673 return
1674 }
1675 }
1676
1677 if (o.login_as && conn.id) {
1678 // Then client is trying to log in
1679 client.log('current_user: trying to log in')
1680 var creds = o.login_as
1681 var login = creds.login || creds.name
1682 if (login && creds.pass) {
1683 // With a username and password
1684 var u = authenticate(login, creds.pass)
1685
1686 client.log('auth said:', u)
1687 if (u) {
1688 // Success!
1689 // Associate this user with this session
1690 // user.log('Logging the user in!', u)
1691
1692 var clients = master.fetch('logged_in_clients')
1693 var connections = master.fetch('connections')
1694
1695 clients[conn.client] = u
1696 connections[conn.id].user = u
1697
1698 master.save(clients)
1699 master.save(connections)
1700
1701 client.log('current_user: success logging in!')
1702 }
1703 else {
1704 error('Cannot log in with that information')
1705 return
1706 }
1707 }
1708 else {
1709 error('Cannot log in with that information')
1710 return
1711 }
1712 }
1713
1714 else if (o.logout && conn.id) {
1715 client.log('current_user: logging out')
1716 var clients = master.fetch('logged_in_clients')
1717 var connections = master.fetch('connections')
1718
1719 delete clients[conn.client]
1720 connections[conn.id].user = null
1721
1722 master.save(clients)
1723 master.save(connections)
1724 }
1725 }
1726
1727 t.refetch()
1728 }
1729 client('current_user').to_delete = function () {}
1730
1731 // Users have closet space at /user/<name>/*
1732 var closet_space_key = /^(user\/[^\/]+)\/.*/
1733 var private_closet_space_key = /^user\/[^\/]+\/private.*/
1734
1735 // User
1736 client('user/*').to_save = function (o) {
1737 var c = client.fetch('current_user')
1738 var user_key = o.key.match(/^user\/([^\/]+)/)
1739 user_key = user_key && ('user/' + user_key[1])
1740
1741 // Only the current user can touch themself.
1742 if (!c.logged_in || c.user.key !== user_key) {
1743 client.log('Only the current user can touch themself.',
1744 {logged_in: c.logged_in, as: c.user && c.user.key,
1745 touching: user_key})
1746 client.save.abort(o)
1747 return
1748 }
1749
1750 // Users have closet space at /user/<name>/*
1751 if (o.key.match(closet_space_key)) {
1752 client.log('saving closet data')
1753 master.save(o)
1754 return
1755 }
1756
1757 // Ok, then it must be a plain user
1758 console.assert(o.key.match(/^user\/[^\/]+$/))
1759
1760 // Validate types
1761 if (!client.validate(o, {key: 'string', '?login': 'string', '?name': 'string',
1762 '?pass': 'string', '?email': 'string', /*'?pic': 'string',*/
1763 '*':'*'})) {
1764 client.log('This user change fails validation.')
1765 client.save.abort(o)
1766 return
1767 }
1768
1769 // Rules for updating "login" and "name" attributes:
1770 // • If "login" isn't specified, then we use "name" as login
1771 // • That resulting login must be unique across all users
1772
1773 // There must be at least a login or a name
1774 var login = o.login || o.name
1775 if (!login) {
1776 client.log('User must have a login or a name')
1777 client.save.abort(o)
1778 return
1779 }
1780
1781 var u = master.fetch(o.key)
1782 var userpass = master.fetch('users/passwords')
1783
1784 // Validate that the login/name is not changed to something clobberish
1785 var old_login = u.login || u.name
1786 if (old_login.toLowerCase() !== login.toLowerCase()
1787 && userpass.hasOwnProperty(login)) {
1788 client.log('The login', login, 'is already taken. Aborting.')
1789 client.save.abort(o) // Abort
1790
1791 o = client.fetch(o.key) // Add error message
1792 o.error = 'The login "' + login + '" is already taken'
1793 client.save.fire(o)
1794
1795 return // And exit
1796 }
1797
1798 // Now we can update login and name
1799 u.login = o.login
1800 u.name = o.name
1801
1802 // Hash password
1803 o.pass = o.pass && require('bcrypt-nodejs').hashSync(o.pass)
1804 u.pass = o.pass || u.pass
1805
1806 // // Users can have pictures (remove this soon)
1807 // // Bug: if user changes name, this picture's url doesn't change.
1808 // if (o.pic && o.pic.indexOf('data:image') > -1) {
1809 // var img_type = o.pic.match(/^data:image\/(\w+);base64,/)[1]
1810 // var b64 = o.pic.replace(/^data:image\/\w+;base64,/, '')
1811 // var upload_dir = global.upload_dir
1812 // // ensure that the uploads directory exists
1813 // if (!fs.existsSync(upload_dir))
1814 // fs.mkdirSync(upload_dir)
1815
1816 // // bug: users with the same name can overwrite each other's files
1817 // u.pic = u.name + '.' + img_type
1818 // fs.writeFile(upload_dir + u.pic, b64, {encoding: 'base64'})
1819 // }
1820
1821 // For anything else, go ahead and add it to the user object
1822 var protected = {key:1, name:1, /*pic:1,*/ pass:1}
1823 for (var k in o)
1824 if (!protected.hasOwnProperty(k))
1825 u[k] = o[k]
1826 for (var k in u)
1827 if (!protected.hasOwnProperty(k) && !o.hasOwnProperty(k))
1828 delete u[k]
1829
1830 master.save(u)
1831 }
1832 client('user/*').to_fetch = function user_fetcher (k) {
1833 var c = client.fetch('current_user')
1834 client.log('* fetching:', k, 'as', c.user)
1835
1836 // Users have closet space at /user/<name>/*
1837 if (k.match(closet_space_key)) {
1838 var obj_user = k.match(closet_space_key)[1]
1839 if (k.match(private_closet_space_key)
1840 && (!c.user || obj_user !== c.user.key)) {
1841 client.log('hiding private closet data')
1842 return {}
1843 }
1844 client.log('fetching closet data')
1845 return client.clone(master.fetch(k))
1846 }
1847
1848 // Otherwise return the actual user
1849 return user_obj(k, c.logged_in && c.user.key === k)
1850 }
1851 client('user/*').to_delete = function () {}
1852 function user_obj (k, logged_in) {
1853 var o = master.clone(master.fetch(k))
1854 if (k.match(/^user\/([^\/]+)\/private\/(.*)$/))
1855 return logged_in ? o : {key: k}
1856
1857 delete o.pass
1858 if (!logged_in) {delete o.email; delete o.login}
1859 return o
1860 }
1861
1862 // Blacklist sensitive stuff on master, in case we have a shadow set up
1863 var blacklist = 'users users/passwords logged_in_clients'.split(' ')
1864 for (var i=0; i<blacklist.length; i++) {
1865 client(blacklist[i]).to_fetch = function () {}
1866 client(blacklist[i]).to_save = function () {}
1867 client(blacklist[i]).to_delete = function () {}
1868 client(blacklist[i]).to_forget = function () {}
1869 }
1870 },
1871
1872 persist: function (prefix_to_sync, validate) {
1873 var client = this
1874 var was_logged_in = undefined
1875
1876 function client_prefix (current_user) {
1877 return 'client/' + (current_user.logged_in
1878 ? current_user.user.key.substr('user/'.length)
1879 : client.client_id) + '/'
1880 }
1881
1882 function copy_client_to_user(client, user) {
1883 var old_prefix = 'client/' + client.client_id
1884 var new_prefix = 'client/' + user.key.substr('user/'.length)
1885
1886 var keys = client.master.fetch('persisted_keys/' + client.client_id)
1887 if (!keys.val) return
1888 for (var old_key in keys.val) {
1889 var new_key = new_prefix + old_key.substr(old_prefix.length)
1890 var o = client.clone(client.master.fetch(old_key))
1891 // Delete the old
1892 client.master.del(old_key)
1893
1894 var new_o = client.master.fetch(new_key)
1895 // If the new key doesn't clobber any existing data on the user...
1896 if (Object.keys(new_o).length === 1) {
1897 // Save the new
1898 o.key = new_key
1899 client.master.save(o)
1900 }
1901 }
1902 keys.val = {}
1903 client.master.save(keys)
1904
1905 // var cache = client.master.cache
1906
1907 // var keys = Object.keys(cache) // Make a copy
1908 // for (var i=0; i<keys.length; i++) { // Because we'll mutate
1909 // var old_key = keys[i] // As we iterate
1910
1911 // if (old_key.startsWith(old_prefix)) {
1912 // var new_key = new_prefix + old_key.substr(old_prefix.length)
1913 // var o = client.clone(cache[old_key])
1914 // // Delete the old
1915 // client.master.del(old_key)
1916
1917 // if (!(cache.hasOwnProperty(new_key))) {
1918 // // Save the new
1919 // o.key = new_key
1920 // client.master.save(o)
1921 // }
1922 // }
1923 // }
1924 }
1925
1926 // Copy client to user if we log in
1927 client(_=>{
1928 var c = client.fetch('current_user')
1929 if (client.loading()) return
1930 if (was_logged_in == false && c.logged_in)
1931 // User just logged in! Let's copy his stuff over
1932 copy_client_to_user(client, c.user)
1933 was_logged_in = c.logged_in
1934 })
1935
1936 client(prefix_to_sync).to_fetch = function (key) {
1937 var c = client.fetch('current_user')
1938 if (client.loading()) return
1939 var prefix = client_prefix(c)
1940
1941 // Get the state from master
1942 var obj = client.clone(client.master.fetch(prefix + key))
1943
1944 // Translate it back to client
1945 obj = client.deep_map(obj, function (o) {
1946 if (typeof o === 'object' && 'key' in o && typeof o.key === 'string')
1947 o.key = o.key.substr(prefix.length)
1948 return o
1949 })
1950 return obj
1951 }
1952
1953 client(prefix_to_sync).to_save = function (obj) {
1954 if (validate && !validate(obj)) {
1955 console.warn('Validation failed on', obj)
1956 client.save.abort(obj)
1957 return
1958 }
1959
1960 var c = client.fetch('current_user')
1961 if (client.loading()) return
1962 var prefix = client_prefix(c)
1963
1964 // Make it safe
1965 var p_keys = client_persisted_keys()
1966 obj = client.clone(obj)
1967 obj = client.deep_map(obj, function (o) {
1968 if (typeof o === 'object' && 'key' in o && typeof o.key === 'string') {
1969 o.key = prefix + o.key
1970 if (p_keys)
1971 p_keys.val[o.key] = true
1972 }
1973 return o
1974 })
1975
1976 // Save to master
1977 client.master.save(obj)
1978 p_keys && client.master.save(p_keys)
1979 }
1980
1981 client(prefix_to_sync).to_delete = function (k) {
1982 k = client_prefix(client.fetch('current_user')) + k
1983 client.master.delete(k)
1984
1985 var p_keys = client_persisted_keys()
1986 delete p_keys.val[k]
1987 client.master.save(p_keys)
1988 }
1989
1990 function client_persisted_keys () {
1991 if (client.fetch('current_user').logged_in) return
1992 var result = client.master.fetch('persisted_keys/' + client.client_id)
1993 if (result && !result.val) result.val = {}
1994 return result
1995 }
1996 },
1997
1998 shadows: function shadows (master_bus) {
1999 // Custom route
2000 var OG_route = bus.route
2001 bus.route = function(key, method, arg, t) {
2002 var count = OG_route(key, method, arg, t)
2003 // This forwards anything we don't have a specific handler for
2004 // to the global cache
2005 if (count === 0) {
2006 count++
2007 if (method === 'to_fetch')
2008 bus.run_handler(function get_from_master (k) {
2009 // console.log('DEFAULT FETCHing', k)
2010 var r = master_bus.fetch(k)
2011 // console.log('DEFAULT FETCHed', r)
2012 bus.save.fire(r, {version: master_bus.versions[r.key]})
2013 }, method, arg)
2014 else if (method === 'to_save')
2015 bus.run_handler(function save_to_master (o, t) {
2016 // console.log('DEFAULT ROUTE', t)
2017 master_bus.save(bus.clone(o), t)
2018 }, method, arg, {t: t})
2019 else if (method == 'to_delete')
2020 bus.run_handler(function delete_from_master (k, t) {
2021 master_bus.delete(k)
2022 return 'done'
2023 }, method, arg, {t: t})
2024 }
2025 return count
2026 }
2027 },
2028
2029 read_file: function init () {
2030 // The first time this is run, we initialize it by loading some
2031 // libraries
2032 var chokidar = require('chokidar')
2033 var watchers = {}
2034 var fs = require('fs')
2035
2036 // Now we redefine the function
2037 bus.read_file = bus.uncallback(
2038 function readFile (filename, encoding, cb) {
2039 fs.readFile(filename, (err, result) => {
2040 if (err) console.error('Error from read_file:', err)
2041 cb(null, ((result || '*error*').toString(encoding || undefined)))
2042 })
2043 },
2044 {
2045 callback_at: 2,
2046 start_watching: (args, dirty, del) => {
2047 var filename = args[0]
2048 console.log('## starting to watch', filename)
2049 watchers[filename] = chokidar.watch(filename, {
2050 atomic: true,
2051 disableGlobbing: true
2052 })
2053 watchers[filename].on('change', () => { dirty() })
2054 watchers[filename].on('add', () => { dirty() })
2055 watchers[filename].on('unlink', () => { del() })
2056 },
2057 stop_watching: (json) => {
2058 var filename = json[0]
2059 console.log('## stopping to watch', filename)
2060 // log('unwatching', filename)
2061 // To do: this should probably use.unwatch() instead.
2062 watchers[filename].close()
2063 delete watchers[filename]
2064 }
2065 })
2066 return bus.read_file.apply(bus, [].slice.call(arguments))
2067 },
2068
2069 // Synchronizes the recursive path starting with <state_path> to the
2070 // file or recursive directory structure at fs_path
2071 sync_files: function sync_files (state_path, file_path) {
2072 // To do:
2073 // - Hook up a to_delete handler
2074 // - recursively remove directories if all files gone
2075
2076 console.assert(state_path.substr(-1) !== '*'
2077 && (!file_path || file_path.substr(-1) !== '*'),
2078 'The sync_files paths should not end with *')
2079
2080 file_path = file_path || state_path
2081 var buffer = {}
2082 var full_file_path = require('path').join(__dirname, file_path)
2083
2084 bus(state_path + '*').to_fetch = (rest) => {
2085 // We DO want to handle:
2086 // - "foo"
2087 // - "foo/*"
2088 // But not:
2089 // - "foobar"
2090 if (rest.length>0 && rest[0] !== '/') return // Bail on e.g. "foobar"
2091
2092 var f = bus.read_file(file_path + rest, 'base64')
2093
2094 // Clear buffer of items after 1 second. If fs results are delayed
2095 // longer, we'll just deal with those flashbacks.
2096 for (k in buffer)
2097 if (new Date().getTime() - buffer[k] > 1 * 1000)
2098 delete buffer[k]
2099
2100 // If we are expecting this, skip the read
2101 // console.log('read file', typeof f == 'string' ? f.substr(0,40) + '..': f)
2102 if (buffer[f]) {
2103 console.log('skipping cause its in buffer')
2104 return
2105 }
2106
2107 return {_:f}
2108 }
2109
2110 bus(state_path + '/*').to_save = (o, rest, t) => {
2111 if (rest.length>0 && rest[0] !== '/') return
2112 var f = Buffer.from(o._, 'base64')
2113 require('fs').writeFile(file_path + rest, f)
2114 buffer[f] = new Date().getTime()
2115 t.done()
2116 }
2117
2118 bus.http.use('/'+state_path, require('express').static(full_file_path))
2119 },
2120
2121 // Installs a GET handler at route that gets state from a fetcher function
2122 // Note: Makes too many textbusses. Should re-use one.
2123 http_serve: function http_serve (route, fetcher) {
2124 var textbus = require('./statebus')()
2125 textbus.label = 'textbus'
2126 var watched = new Set()
2127 textbus('*').to_fetch = (filename, old) => {
2128 return {etag: Math.random() + '',
2129 _: fetcher(filename)}
2130 }
2131 bus.http.get(route, (req, res) => {
2132 var path = req.path
2133 var etag = textbus.cache[path] && textbus.cache[path].etag
2134 if (etag && req.get('If-None-Match') === etag) {
2135 res.status(304).end()
2136 return
2137 }
2138
2139 textbus.fetch(req.path) // So that textbus never clears the cache
2140 textbus.fetch(req.path, function cb (o) {
2141 res.setHeader('Cache-Control', 'public')
2142 // res.setHeader('Cache-Control', 'public, max-age='
2143 // + (60 * 60 * 24 * 30)) // 1 month
2144 res.setHeader('ETag', o.etag)
2145 res.setHeader('content-type', 'application/javascript')
2146 res.send(o._)
2147 textbus.forget(o.key, cb) // But we do want to forget the cb
2148 })
2149 })
2150 },
2151
2152 serve_client_coffee: function serve_client_coffee () {
2153 bus.http_serve('/client/:filename', (filename) => {
2154 filename = /\/client\/(.*)/.exec(filename)[0]
2155 var source_filename = filename.substr(1)
2156 var source = bus.read_file(source_filename)
2157 if (filename.match(/\.coffee$/)) {
2158
2159 try {
2160 var compiled = require('coffeescript').compile(source, {filename,
2161 bare: true,
2162 sourceMap: true})
2163 } catch (e) {
2164 if (!bus.loading())
2165 console.error('Could not compile ' + e.toString())
2166 return 'console.error(' + JSON.stringify(e.toString()) + ')'
2167 }
2168
2169 var source_map = JSON.parse(compiled.v3SourceMap)
2170 source_map.sourcesContent = source
2171 compiled = 'window.dom = window.dom || {}\n' + compiled.js
2172 compiled = 'window.ui = window.ui || {}\n' + compiled
2173
2174 function btoa(s) { return new Buffer(s.toString(),'binary').toString('base64') }
2175
2176 // Base64 encode it
2177 compiled += '\n'
2178 compiled += '//# sourceMappingURL=data:application/json;base64,'
2179 compiled += btoa(JSON.stringify(source_map)) + '\n'
2180 compiled += '//# sourceURL=' + source_filename
2181 return compiled
2182 }
2183 else return source
2184 })
2185 },
2186
2187 serve_clientjs: function serve_clientjs (path) {
2188 path = path || 'client.js'
2189 bus(path).to_fetch = () =>
2190 ({_:
2191 ['extras/coffee.js', 'extras/sockjs.js', 'extras/react.js',
2192 'statebus.js', 'client.js']
2193 .map((f) => bus.read_file('node_modules/statebus/' + f))
2194 .join(';\n')})
2195 },
2196
2197 serve_wiki: () => {
2198 bus('edit/*').to_fetch = () => ({_: require('./extras/wiki.coffee').code})
2199 },
2200
2201 unix_socket_repl: function (filename) {
2202 var repl = require('repl')
2203 var net = require('net')
2204 var fs = require('fs')
2205 if (fs.existsSync && fs.existsSync(filename))
2206 fs.unlinkSync(filename)
2207 net.createServer(function (socket) {
2208 var r = repl.start({
2209 //prompt: '> '
2210 input: socket
2211 , output: socket
2212 , terminal: true
2213 //, useGlobal: false
2214 })
2215 r.on('exit', function () {
2216 socket.end()
2217 })
2218 r.context.socket = socket
2219 }).listen(filename)
2220 },
2221
2222 schema: function schema () {
2223 function url_tree (cache) {
2224 // The key tree looks like:
2225 //
2226 // {server: {thing: [obj1, obj2], shing: [obj1, obj2], ...}
2227 // client: {dong: [obj1, ...]}}
2228 //
2229 // And objects without a number, like 'shong' will go on:
2230 // key_tree.server.shong[null]
2231 var tree = {server: {}, client: {}}
2232 for (var key in cache) {
2233 var p = parse_key(key)
2234 if (!p) {
2235 console.log('The state dash can\'t deal with key', key)
2236 return null
2237 }
2238 tree[p.owner][p.name] || (tree[p.owner][p.name] = {})
2239 tree[p.owner][p.name][p.number || null] = cache[key]
2240 }
2241 return tree
2242 }
2243
2244 function parse_key (key) {
2245 var word = "([^/]+)"
2246 // Matching things like: "/new/name/number"
2247 // or: "/name/number"
2248 // or: "/name"
2249 // or: "name/number"
2250 // or: "name"
2251 // ... and you can optionally include a final slash.
2252 var regexp = new RegExp("(/)?(new/)?" +word+ "(/" +word+ ")?(/)?")
2253 var m = key.match(regexp)
2254 if (!m) return null
2255 // indices = [0: has_match, 1: server_owned, 2: is_new, 3: name, 5: number]
2256 var owner = m[1] ? 'server' : 'client'
2257 return m[0] && {owner:owner, 'new': m[2], name: m[3], number: m[5]}
2258 }
2259 schema.parse_key = parse_key
2260 schema.url_tree = url_tree
2261 return url_tree(bus.cache)
2262 }
2263}
2264 // Add methods to bus object
2265 for (var m in extra_methods)
2266 bus[m] = extra_methods[m]
2267
2268 bus.options = default_options(bus)
2269 set_options(bus, options)
2270
2271 // Automatically make state:// fetch over a websocket
2272 bus.net_automount()
2273 return bus
2274}
2275
2276
2277// Handy functions for writing tests on nodejs
2278var tests = []
2279function test (f) {tests.push(f)}
2280function run_tests () {
2281 // Either run the test specified at command line
2282 if (process.argv[2])
2283 tests.find((f) => f.name == process.argv[2])(
2284 ()=>process.exit()
2285 )
2286
2287 // Or run all tests
2288 else {
2289 function run_next () {
2290 if (tests.length > 0) {
2291 var f = tests.shift()
2292 delay_so_far = 0
2293 console.log('\nTesting:', f.name)
2294 f(function () {setTimeout(run_next)})
2295 } else
2296 (console.log('\nDone with all tests.'), process.exit())
2297 }
2298 run_next()
2299 }
2300}
2301function log () {
2302 var pre = ' '
2303 console.log(pre+util.format.apply(null,arguments).replace('\n','\n'+pre))
2304}
2305function assert () { console.assert.apply(console, arguments) }
2306function delay (time, f) {
2307 delay_so_far = delay_so_far + time
2308 return setTimeout(f, delay_so_far)
2309}
2310delay.init = _=> delay_so_far = 0
2311var delay_so_far = 0
2312
2313
2314// Now export everything
2315module.exports.import_server = import_server
2316module.exports.run_server = function (bus, options) { bus.serve(options) }
2317module.exports.import_module = function (statebus) {
2318 statebus.testing = {test, run_tests, log, assert, delay}
2319
2320 statebus.serve = function serve (options) {
2321 var bus = statebus()
2322 require('./server').run_server(bus, options)
2323 return bus
2324 }
2325
2326 // Handy repl. Invoke with node -e 'require("statebus").repl("/tmp/foo")'
2327 statebus.repl = function (filename) {
2328 var net = require('net')
2329 var sock = net.connect(filename)
2330
2331 process.stdin.pipe(sock)
2332 sock.pipe(process.stdout)
2333
2334 sock.on('connect', function () {
2335 process.stdin.resume();
2336 process.stdin.setRawMode(true)
2337 })
2338
2339 sock.on('close', function done () {
2340 process.stdin.setRawMode(false)
2341 process.stdin.pause()
2342 sock.removeListener('close', done)
2343 })
2344
2345 process.stdin.on('end', function () {
2346 sock.destroy()
2347 console.log()
2348 })
2349
2350 process.stdin.on('data', function (b) {
2351 if (b.length === 1 && b[0] === 4) {
2352 process.stdin.emit('end')
2353 }
2354 })
2355 }
2356}