UNPKG

96.2 kBJavaScriptView Raw
1var fs = require('fs'),
2 util = require('util')
3var unique_sockjs_string = '_connect_to_statebus_'
4
5function default_options (bus) { return {
6 port: 'auto',
7 backdoor: null,
8 client: (c) => {c.shadows(bus)},
9 file_store: {save_delay: 250, filename: 'db', backup_dir: 'backups', prefix: '*'},
10 sync_files: [{state_path: 'files', fs_path: null}],
11 serve: true,
12 certs: {private_key: 'certs/private-key',
13 certificate: 'certs/certificate',
14 certificate_bundle: 'certs/certificate-bundle'},
15 connections: {include_users: true, edit_others: true},
16 __secure: false
17}}
18
19function set_options (bus, options) {
20 var defaults = bus.clone(bus.options)
21 options = options || {}
22 for (var k in (options || {}))
23 bus.options[k] = options[k]
24
25 // Fill in defaults of nested options too
26 for (var k in {file_store:1, certs:1})
27 if (bus.options[k]) {
28 if (typeof bus.options[k] !== 'object' || bus.options[k] === null)
29 bus.options[k] = {}
30
31 for (var k2 in defaults[k])
32 if (!bus.options[k].hasOwnProperty(k2))
33 bus.options[k][k2] = defaults[k][k2]
34 }
35}
36
37function import_server (bus, options)
38{ var extra_methods = {
39
40 serve: function serve (options) {
41 var master = bus
42 bus.honk = 'statelog'
43 master.label = 'master'
44
45 // Initialize Options
46 set_options(bus, options)
47
48 var use_ssl = bus.options.certs && (
49 require('fs').existsSync(bus.options.certs.private_key)
50 || require('fs').existsSync(bus.options.certs.certificate)
51 || require('fs').existsSync(bus.options.certs.certificate_bundle))
52
53 function c (client, conn) {
54 client.honk = bus.honk
55 client.serves_auth(conn, master)
56 bus.options.client && bus.options.client(client, conn)
57 }
58 if (!bus.options.client) c = undefined // no client bus when programmer explicitly says so
59
60 if (bus.options.file_store)
61 bus.file_store()
62
63 // ******************************************
64 // ***** Create our own http server *********
65 bus.make_http_server({port: bus.options.port, use_ssl})
66 bus.sockjs_server(this.http_server, c) // Serve via sockjs on it
67 var express = require('express')
68 bus.express = express()
69 bus.http = express.Router()
70 bus.install_express(bus.express)
71
72 // use gzip compression if available
73 try { bus.http.use(require('compression')())
74 console.log('Enabled http compression!') } catch (e) {}
75
76
77 // Initialize new clients with an id. We put the client id on
78 // req.client, and also in a cookie for the browser to see.
79 if (bus.options.client)
80 bus.express.use(function (req, res, next) {
81 req.client = require('cookie').parse(req.headers.cookie || '').client
82 if (!req.client) {
83 req.client = (Math.random().toString(36).substring(2)
84 + Math.random().toString(36).substring(2)
85 + Math.random().toString(36).substring(2))
86
87 res.setHeader('Set-Cookie', 'client=' + req.client
88 + '; Expires=21 Oct 2025 00:0:00 GMT;')
89 }
90 next()
91 })
92
93 // Initialize file sync
94 ; (bus.options.sync_files || []).forEach( x => {
95 if (require('fs').existsSync(x.fs_path || x.state_path))
96 bus.sync_files(x.state_path, x.fs_path)
97 })
98
99 // User will put their routes in here
100 bus.express.use('/', bus.http)
101
102 // Add a fallback that goes to state
103 bus.express.get('*', function (req, res) {
104 // Make a temporary client bus
105 var cbus = bus.bus_for_http_client(req, res)
106
107 // Do the fetch
108 cbus.honk = 'statelog'
109 var singleton = req.path.match(/^\/code\//)
110 cbus.fetch_once(req.path.substr(1), (o) => {
111 var unwrap = (Object.keys(o).length === 2
112 && '_' in o
113 && typeof o._ === 'string')
114 // To do: translate pointers as keys
115 res.send(unwrap ? o._ : JSON.stringify(o))
116 cbus.delete_bus()
117 })
118 })
119
120 // Serve Client Coffee
121 bus.serve_client_coffee()
122
123 // Custom router
124 bus.route = function server_route (key, method, arg, t) {
125 var handlers = bus.bindings(key, method)
126 var count = handlers.length
127 if (count)
128 bus.log('route:', bus+'("'+key+'").'+method+'['+handlers.length+'](key:"'+(arg.key||arg)+'")')
129
130 // Call all handlers!
131 for (var i=0; i<handlers.length; i++)
132 bus.run_handler(handlers[i].func, method, arg, {t: t, binding: handlers[i].key})
133
134 // Now handle backup handlers
135 if (count === 0) {
136
137 // A save to master without a to_save defined should be
138 // automatically reflected to all clients.
139 if (method === 'to_save') {
140 bus.save.fire(arg, t)
141 bus.route(arg.key, 'on_set_sync', arg, t)
142 count++
143 }
144
145 // A fetch to master without a to_fetch defined should go to
146 // the database, if we have one.
147 if (method === 'to_fetch') {
148 bus.db_fetch(key)
149 // This basically renders the count useless-- you probably
150 // don't want to wrap this route() with another route().
151 count++
152 }
153 }
154 return handlers.length
155 }
156
157 // Back door to the control room
158 if (bus.options.backdoor) {
159 bus.make_http_server({
160 port: bus.options.backdoor,
161 name: 'backdoor_http_server',
162 use_ssl: use_ssl
163 })
164 bus.sockjs_server(this.backdoor_http_server)
165 }
166 },
167
168 bus_for_http_client: function (req, res) {
169 var bus = this
170 if (!bus.bus_for_http_client.counter)
171 bus.bus_for_http_client.counter = 0
172 var cbus = require('./statebus')()
173 cbus.label = 'client_http' + bus.bus_for_http_client.counter++
174 cbus.master = bus
175
176 // Log in as the client
177 cbus.serves_auth({remoteAddress: req.connection.remoteAddress}, bus)
178 bus.options.client(cbus)
179 cbus.save({key: 'current_user', client: req.client})
180 return cbus
181 },
182
183 make_http_server: function make_http_server (options) {
184 options = options || {}
185 var fs = require('fs')
186
187 if (options.use_ssl) {
188 // Load with TLS/SSL
189 console.log('Encryption ON')
190
191 // use http2 compatible library if available
192 try {
193 var http = require('spdy')
194 console.log('Found spdy library. HTTP/2 enabled!')
195 } catch (e) {
196 var http = require('https')
197 }
198
199 var protocol = 'https'
200 var ssl_options = {
201 ca: (fs.existsSync(this.options.certs.certificate_bundle)
202 && require('split-ca')(this.options.certs.certificate_bundle)),
203 key: fs.readFileSync(this.options.certs.private_key),
204 cert: fs.readFileSync(this.options.certs.certificate),
205 ciphers: "ECDHE-RSA-AES256-SHA384:DHE-RSA-AES256-SHA384"
206 + ":ECDHE-RSA-AES256-SHA256:DHE-RSA-AES256-SHA256"
207 + ":ECDHE-RSA-AES128-SHA256:DHE-RSA-AES128-SHA256"
208 + ":HIGH:!aNULL:!eNULL:!EXPORT:!DES:!RC4:!MD5:!PSK:!SRP:!CAMELLIA",
209 honorCipherOrder: true}
210 }
211 else {
212 // Load unencrypted server
213 console.log('Encryption OFF')
214 var http = require('http')
215 var protocol = 'http'
216 var ssl_options = undefined
217 }
218
219 if (options.port === 'auto') {
220 var bind = require('./extras/tcp-bind')
221 function find_a_port () {
222 var next_port_attempt = 80
223 while (true)
224 try {
225 var result = bind(next_port_attempt)
226 bus.port = next_port_attempt
227 return result
228 } catch (e) {
229 if (next_port_attempt < 3006) next_port_attempt = 3006
230 else next_port_attempt++
231 }
232 }
233
234 var fd
235 if (options.use_ssl)
236 try {
237 fd = bind(443)
238 bus.port = 443
239 bus.redirect_port_80()
240 } catch (e) {fd = find_a_port()}
241 else fd = find_a_port()
242 var http_server = http.createServer(ssl_options)
243 http_server.listen({fd: fd}, () => {
244 console.log('Listening on '+protocol+'://<host>:'+bus.port)
245 })
246 }
247 else {
248 bus.port = bus.options.port
249 var http_server = http.createServer(ssl_options)
250 http_server.listen(bus.options.port, () => {
251 console.log('Listening on '+protocol+'://<host>:'+bus.port)
252 })
253 }
254
255 bus[options.name || 'http_server'] = http_server
256 },
257
258 redirect_port_80: function redirect_port_80 () {
259 var redirector = require('http')
260 redirector.createServer(function (req, res) {
261 res.writeHead(301, {"Location": "https://"+req.headers['host']+req.url})
262 res.end()
263 }).listen(80)
264 },
265
266 install_express: function install_express (express_app) {
267 this.http_server.on('request', // Install express
268 function (request, response) {
269 // But express should ignore all sockjs requests
270 if (!request.url.startsWith('/'+unique_sockjs_string+'/'))
271 express_app(request, response)
272 })
273
274 },
275 sockjs_server: function sockjs_server(httpserver, client_bus_func) {
276 var master = this
277 var client_num = 0
278 // var client_busses = {} // XXX work in progress
279 var log = master.log
280 if (client_bus_func) {
281 master.save({key: 'connections'}) // Clean out old sessions
282 var connections = master.fetch('connections')
283 }
284 var s = require('sockjs').createServer({
285 sockjs_url: 'https://cdn.jsdelivr.net/sockjs/0.3.4/sockjs.min.js',
286 disconnect_delay: 600 * 1000,
287 heartbeat_delay: 6000 * 1000
288 })
289 s.on('connection', function(conn) {
290 if (client_bus_func) {
291 // To do for pooling client busses:
292 // - What do I do with connections? Do they pool at all?
293 // - Before creating a new bus here, check to see if there's
294 // an existing one in the pool, and re-use it if so.
295 // - Count the number of connections using a client.
296 // - When disconnecting, decrement the number, and if it gets
297 // to zero, delete the client bus.
298
299 connections[conn.id] = {client: conn.id, // client is deprecated
300 id: conn.id}
301 master.save(connections)
302
303 var client = require('./statebus')()
304 client.label = 'client' + client_num++
305 master.label = master.label || 'master'
306 client.master = master
307 client_bus_func(client, conn)
308 } else
309 var client = master
310
311 var our_fetches_in = {} // Every key that this socket has fetched
312 log('sockjs_s: New connection from', conn.remoteAddress)
313 function sockjs_pubber (obj, t) {
314 // log('sockjs_pubber:', obj, t)
315 var msg = {save: obj}
316 if (t.version) msg.version = t.version
317 if (t.parents) msg.parents = t.parents
318 if (t.patch) msg.patch = t.patch
319 if (t.patch) msg.save = msg.save.key
320 msg = JSON.stringify(msg)
321
322 if (master.simulate_network_delay) {
323 console.log('>>>> DELAYING!!!', master.simulate_network_delay)
324 obj = bus.clone(obj)
325 setTimeout(() => {conn.write(msg)}, master.simulate_network_delay)
326 } else
327 conn.write(msg)
328
329 log('sockjs_s: SENT a', msg, 'to client')
330 }
331 conn.on('data', function(message) {
332 log('sockjs_s:', message)
333 try {
334 message = JSON.parse(message)
335 var method = bus.message_method(message)
336
337 // Validate the message
338 if (!((method === 'fetch'
339 && master.validate(message, {fetch: 'string',
340 '?parent': 'string', '?version': 'string'}))
341 ||
342 (method === 'save'
343 && master.validate(message, {save: '*',
344 '?parents': 'array', '?version': 'string', '?patch': 'array'})
345 && (typeof(message.save) === 'string'
346 || (typeof(message.save) === 'object'
347 && typeof(message.save.key === 'string'))))
348 ||
349 (method === 'forget'
350 && master.validate(message, {forget: 'string'}))
351 ||
352 (method === 'delete'
353 && master.validate(message, {'delete': 'string'}))))
354 throw 'validation error'
355
356 } catch (e) {
357 for (var i=0; i<4; i++) console.error('#######')
358 console.error('Received bad sockjs message from '
359 + conn.remoteAddress +': ', message, e)
360 return
361 }
362
363 switch (method) {
364 case 'fetch':
365 our_fetches_in[message.fetch] = true
366 client.fetch(message.fetch, sockjs_pubber)
367 break
368 case 'forget':
369 delete our_fetches_in[message.forget]
370 client.forget(message.forget, sockjs_pubber)
371 break
372 case 'delete':
373 client.delete(message['delete'])
374 break
375 case 'save':
376 message.version = message.version || client.new_version()
377 if (message.patch) {
378 var o = bus.cache[message.save] || {key: message.save}
379 try {
380 message.save = bus.apply_patch(o, message.patch[0])
381 } catch (e) {
382 console.error('Received bad sockjs message from '
383 + conn.remoteAddress +': ', message, e)
384 return
385 }
386 }
387 client.save(message.save,
388 {version: message.version,
389 parents: message.parents,
390 patch: message.patch})
391 if (our_fetches_in[message.save.key]) { // Store what we've seen if we
392 // might have to publish it later
393 client.log('Adding', message.save.key+'#'+message.version,
394 'to pubber!')
395 sockjs_pubber.has_seen(client, message.save.key, message.version)
396 }
397 break
398 }
399
400 // validate that our fetches_in are all in the bus
401 for (var key in our_fetches_in)
402 if (!client.fetches_in.has(key, master.funk_key(sockjs_pubber)))
403 console.trace("***\n****\nFound errant key", key,
404 'when receiving a sockjs', method, 'of', message)
405 //log('sockjs_s: done with message')
406 })
407 conn.on('close', function() {
408 log('sockjs_s: disconnected from', conn.remoteAddress, conn.id, client.id)
409 for (var key in our_fetches_in)
410 client.forget(key, sockjs_pubber)
411 if (client_bus_func) {
412 delete connections[conn.id]; master.save(connections)
413 master.delete('connection/' + conn.id)
414 client.delete_bus()
415 }
416 })
417
418 // Define the /connection* state!
419 if (client_bus_func && !master.options.__secure) {
420
421 // A connection
422 client('connection/*').to_fetch = function (key, star) {
423 var result = bus.clone(master.fetch(key))
424 if (master.options.connections.include_users && result.user)
425 result.user = client.fetch(result.user.key)
426 result.id = star
427 result.client = star // Deprecated. Delete this line in v7.
428 return result
429 }
430 client('connection/*').to_save = function (o, star, t) {
431 // Check permissions before editing
432 if (star !== conn.id && !master.options.connections.edit_others) {
433 t.abort()
434 return
435 }
436 o.id = star
437 o.client = star // Deprecated. Delete this line in v7.
438 master.save(client.clone(o))
439 }
440
441 // Your connection
442 client('connection').to_fetch = function () {
443 // subscribe to changes in authentication
444 client.fetch('current_user')
445
446 var result = client.clone(client.fetch('connection/' + conn.id))
447 delete result.key
448 return result
449 }
450 client('connection').to_save = function (o) {
451 o = client.clone(o)
452 o.key = 'connection/' + conn.id
453 client.save(o)
454 }
455
456 // All connections
457 client('connections').to_save = function noop (t) {t.abort()}
458 client('connections').to_fetch = function () {
459 var result = []
460 var conns = master.fetch('connections')
461 for (var connid in conns)
462 if (connid !== 'key')
463 result.push(client.fetch('connection/' + connid))
464
465 return {all: result}
466 }
467 }
468 })
469
470 s.installHandlers(httpserver, {prefix:'/' + unique_sockjs_string})
471 },
472
473 make_websocket: function make_websocket (url) {
474 url = url.replace(/^state:\/\//, 'wss://')
475 url = url.replace(/^istate:\/\//, 'ws://')
476 url = url.replace(/^statei:\/\//, 'ws://')
477 WebSocket = require('websocket').w3cwebsocket
478 return new WebSocket(url+'/'+unique_sockjs_string+'/websocket')
479 },
480 client_creds: function client_creds (server_url) {
481 // Right now the server just creates a different random id each time
482 // it connects.
483 return {clientid: (Math.random().toString(36).substring(2)
484 + Math.random().toString(36).substring(2)
485 + Math.random().toString(36).substring(2))}
486 },
487
488 // Deprecated
489 ws_client: function (prefix, url, account) {
490 console.error('ws_client() is deprecated; use net_mount() instead')
491 bus.net_mount(prefix, url, account) },
492 // Deprecated
493 universal_ws_client: function () {
494 console.error('calling universal_ws_client is deprecated and no longer necessary') },
495
496 file_store: (function () {
497 // Make a database
498 var fs = require('fs')
499 var db = {}
500 var db_is_ok = false
501 var pending_save = null
502 var active
503 function file_store (prefix, delay_activate) {
504 prefix = prefix || bus.options.file_store.prefix
505 var filename = bus.options.file_store.filename,
506 backup_dir = bus.options.file_store.backup_dir
507
508 // Loading db
509 try {
510 if (fs.existsSync && !fs.existsSync(filename))
511 (fs.writeFileSync(filename, '{}'), bus.log('Made a new db file'))
512 db = JSON.parse(fs.readFileSync(filename))
513 db_is_ok = true
514 // If we save before anything else is connected, we'll get this
515 // into the cache but not affect anything else
516 bus.save.fire(global.pointerify ? inline_pointers(db) : db)
517 bus.log('Read db')
518 } catch (e) {
519 console.error(e)
520 console.error('bad db file')
521 }
522
523 // Saving db
524 function save_db() {
525 if (!db_is_ok) return
526
527 console.time('saved db')
528
529 fs.writeFile(filename+'.tmp', JSON.stringify(db, null, 1), function(err) {
530 if (err) {
531 console.error('Crap! DB IS DYING!!!!', err)
532 db_is_ok = false
533 } else
534 fs.rename(filename+'.tmp', filename, function (err) {
535 if (err) {
536 console.error('Crap !! DB IS DYING !!!!', err)
537 db_is_ok = false
538 } else {
539 console.timeEnd('saved db')
540 pending_save = null
541 }
542 })
543 })
544 }
545
546 function save_later() {
547 pending_save = pending_save || setTimeout(save_db, bus.options.file_store.save_delay)
548 }
549 active = !delay_activate
550
551 // Replaces every nested keyed object with {_key: <key>}
552 function abstract_pointers (o) {
553 o = bus.clone(o)
554 var result = {}
555 for (var k in o)
556 result[k] = bus.deep_map(o[k], (o) => {
557 if (o && o.key) return {_key: o.key}
558 else return o
559 })
560 return result
561 }
562 // ...and the inverse
563 function inline_pointers (db) {
564 return bus.deep_map(db, (o) => {
565 if (o && o._key)
566 return db[o._key]
567 else return o
568 })
569 }
570 function on_save (obj) {
571 db[obj.key] = global.pointerify ? abstract_pointers(obj) : obj
572 if (active) save_later()
573 }
574 on_save.priority = true
575 bus(prefix).on_save = on_save
576 bus(prefix).to_delete = function (key) {
577 delete db[key]
578 if (active) save_later()
579 }
580 file_store.activate = function () {
581 active = true
582 save_later()
583 }
584
585 // Handling errors
586 function recover (e) {
587 if (e) {
588 process.stderr.write("Uncaught Exception:\n");
589 process.stderr.write(e.stack + "\n");
590 }
591 if (pending_save) {
592 console.log('Saving db after crash')
593 console.time()
594 fs.writeFileSync(filename, JSON.stringify(db, null, 1))
595 console.log('Saved db after crash')
596 }
597 process.exit(1)
598 }
599 process.on('SIGINT', recover)
600 process.on('SIGTERM', recover)
601 process.on('uncaughtException', recover)
602
603 // Rotating backups
604 setInterval(
605 // This copies the current db over backups/db.<curr_date> every minute
606 function backup_db() {
607 if (!db_is_ok || !backup_dir) return
608 if (fs.existsSync && !fs.existsSync(backup_dir))
609 fs.mkdirSync(backup_dir)
610
611 var d = new Date()
612 var y = d.getYear() + 1900
613 var m = d.getMonth() + 1
614 if (m < 10) m = '0' + m
615 var day = d.getDate()
616 if (day < 10) day = '0' + day
617 var date = y + '-' + m + '-' + day
618
619 //bus.log('Backing up db on', date)
620
621 require('child_process').execFile(
622 '/bin/cp', [filename, backup_dir+'/'+filename+'.'+date])
623 },
624 1000 * 60 // Every minute
625 )
626 }
627
628 return file_store
629 })(),
630
631 firebase_store: function (prefix, firebase_ref) {
632 prefix = prefix || '*'
633
634 function encode_firebase_key(k) {
635 return encodeURIComponent(k).replace(/\./g, '%2E')
636 }
637
638 function decode_firebase_key(k) {
639 return decodeURIComponent(k.replace('%2E', '.'))
640 }
641
642 bus(prefix).to_fetch = function (key, t) {
643 firebase_ref.child(encode_firebase_key(key)).on('value', function (x) {
644 t.done(x.val() || {})
645 }, function (err) { t.abort() })
646 }
647
648 bus(prefix).on_save = function (o) {
649 firebase_ref.child(encode_firebase_key(o.key)).set(o)
650 }
651
652 // bus(prefix).to_save = function (o, t) {
653 // firebase_ref.child(encode_firebase_key(o.key)).set(o, (err) => {
654 // err ? t.abort() : t.done()
655 // })
656 // }
657
658 bus(prefix).to_delete = function (key, t) {
659 firebase_ref.child(encode_firebase_key(key)).set(null, (err) => {
660 err ? t.abort() : t.done()
661 })
662 }
663
664 bus(prefix).to_forget = function (key, t) {
665 firebase_ref.child(encode_firebase_key(key)).off()
666 }
667 },
668
669 db_fetch: function db_fetch (key) {/* does nothing unless overridden */},
670 lazy_sqlite_store: function lazy_sqlite_store (opts) {
671 if (!opts) opts = {}
672 opts.lazy = true
673 bus.sqlite_store(opts)
674 },
675 fast_load_sqlite_store: function sqlite_store (opts) {
676 if (!opts) opts = {}
677 opts.dont_fire = true
678 bus.sqlite_store(opts)
679 },
680 sqlite_store: function sqlite_store (opts) {
681 var prefix = '*'
682 var open_transaction = null
683
684 if (!opts) opts = {}
685 if (!opts.filename) opts.filename = 'db.sqlite'
686 if (!opts.hasOwnProperty('inline_pointers'))
687 opts.inline_pointers = global.pointerify
688
689 // Load the db on startup
690 try {
691 var db = bus.sqlite_store_db || new (require('better-sqlite3'))(opts.filename)
692 bus.sqlite_store_db = db
693 bus.sqlite_store.load_all = load_all
694 bus.sqlite_store.all_keys = all_keys
695
696 db.pragma('journal_mode = WAL')
697 db.prepare('create table if not exists cache (key text primary key, obj text)').run()
698
699 function all_keys () {
700 var result = []
701 for (var row of db.prepare('select key from cache').iterate())
702 result.push(row.key)
703 return result
704 }
705 function load_all (options) {
706 var temp_db = {}
707
708 for (var row of db.prepare('select * from cache').iterate()) {
709 var obj = JSON.parse(row.obj)
710 temp_db[obj.key] = obj
711 }
712
713 if (opts.inline_pointers)
714 temp_db = inline_pointers(temp_db)
715
716 for (var key in temp_db)
717 if (temp_db.hasOwnProperty(key)) {
718 if (options.dont_fire)
719 bus.cache[key] = temp_db[key]
720 else
721 bus.save.fire(temp_db[key])
722 temp_db[key] = undefined
723 }
724 }
725 if (!opts.lazy) load_all(opts)
726
727 bus.log('Read ' + opts.filename)
728 } catch (e) {
729 console.error(e)
730 console.error('Bad sqlite db')
731 }
732
733 function sqlite_get (key) {
734 // console.log('sqlite_get:', key)
735 var x = db.prepare('select * from cache where key = ?').get([key])
736 return x ? JSON.parse(x.obj) : {}
737 }
738 if (opts.lazy) {
739 var db_fetched_keys = {}
740 bus.db_fetch = bus.reactive(function (key) {
741 if (db_fetched_keys[key]) return
742 db_fetched_keys[key] = true
743
744 // Get it from the database
745 var obj = sqlite_get(key)
746
747 // Inline pointers if enabled (does this still work?)
748 if (opts.inline_pointers) obj = inline_pointers_singleobj(obj)
749
750 // Ensure we have the latest version of everything nested
751 obj = bus.deep_map(obj, (o) => (o && o.key
752 ? sqlite_get(o.key)
753 : o))
754
755 // Publish this to the bus
756 bus.save.fire(obj, {to_fetch: true})
757 })
758 }
759
760 // Add save handlers
761 function on_save (obj) {
762 if (opts.inline_pointers)
763 obj = abstract_pointers(obj)
764
765 if (opts.use_transactions && !open_transaction){
766 console.time('save db')
767 db.prepare('BEGIN TRANSACTION').run()
768 }
769
770 db.prepare('replace into cache (key, obj) values (?, ?)').run(
771 [obj.key, JSON.stringify(obj)])
772
773 if (opts.use_transactions && !open_transaction) {
774 open_transaction = setTimeout(function(){
775 console.log('Committing transaction to database')
776 db.prepare('COMMIT').run()
777 open_transaction = false
778 console.timeEnd('save db')
779 })
780 }
781 }
782 if (opts.save_sync) {
783 var old_route = bus.route
784 bus.route = function (key, method, arg, t) {
785 if (method === 'to_save') on_save(arg)
786 return old_route(key, method, arg, t)
787 }
788 } else
789 bus(prefix).on_set_sync = on_save
790
791 bus(prefix).to_delete = function (key) {
792 if (opts.use_transactions && !open_transaction){
793 console.time('save db')
794 db.prepare('BEGIN TRANSACTION').run()
795 }
796 db.prepare('delete from cache where key = ?').run([key])
797 if (opts.use_transactions && !open_transaction)
798 open_transaction = setTimeout(function(){
799 console.log('committing')
800 db.prepare('COMMIT').run()
801 open_transaction = false
802 console.timeEnd('save db')
803 })
804 }
805
806 // Replaces every nested keyed object with {_key: <key>}
807 function abstract_pointers (o) {
808 o = bus.clone(o)
809 var result = {}
810 for (var k in o)
811 result[k] = bus.deep_map(o[k], (o) => {
812 if (o && o.key) return {_key: o.key}
813 else return o
814 })
815 return result
816 }
817 // ...and the inverse
818 function inline_pointers (db) {
819 return bus.deep_map(db, (o) => {
820 if (o && o._key)
821 return db[o._key]
822 else return o
823 })
824 }
825 function inline_pointers_singleobj (obj) {
826 return bus.deep_map(obj, (o) => (o && o._key)
827 ? bus.cache[o._key] : o)
828 }
829
830 // Rotating backups
831 setInterval(
832 // Copy the current db over backups/db.<curr_date> every minute
833 //
834 // Note: in future we might want to use db.backup():
835 // https://github.com/JoshuaWise/better-sqlite3/blob/master/docs/api.md#backupdestination-options---promise
836 function backup_db() {
837 if (opts.backups === false) return
838 var backup_dir = opts.backup_dir || 'backups'
839 if (fs.existsSync && !fs.existsSync(backup_dir))
840 fs.mkdirSync(backup_dir)
841
842 var d = new Date()
843 var y = d.getYear() + 1900
844 var m = d.getMonth() + 1
845 if (m < 10) m = '0' + m
846 var day = d.getDate()
847 if (day < 10) day = '0' + day
848 var date = y + '-' + m + '-' + day
849
850 require('child_process').execFile(
851 'sqlite3',
852 [opts.filename, '.backup '+"'"+backup_dir+'/'+opts.filename+'.'+date+"'"])
853 },
854 1000 * 60 // Every minute
855 )
856 },
857
858 pg_store: function pg_store (opts) {
859 opts = opts || {}
860 opts.prefix = opts.prefix || '*'
861
862 // Load the db on startup
863 try {
864 var db = new require('pg-native')()
865 bus.pg_db = db
866 bus.pg_save = pg_save
867 db.connectSync(opts.url)
868 db.querySync('create table if not exists store (key text primary key, value jsonb)')
869
870 var rows = db.querySync('select * from store')
871 rows.forEach(r => bus.save(inline_pointers(r.value, bus)))
872
873 bus.log('Read ' + opts.url)
874 } catch (e) {
875 console.error(e)
876 console.error('Bad pg db')
877 }
878
879 // Add save handlers
880 function pg_save (obj) {
881 obj = abstract_pointers(obj)
882
883 db.querySync('insert into store (key, value) values ($1, $2) '
884 + 'on conflict (key) do update set value = $2',
885 [obj.key, JSON.stringify(obj)])
886 }
887 pg_save.priority = true
888 bus(opts.prefix).on_save = pg_save
889 bus(opts.prefix).to_delete = function (key) {
890 db.query('delete from store where key = $1', [key])
891 }
892
893 // Replaces every nested keyed object with {_key: <key>}
894 function abstract_pointers (o) {
895 o = bus.clone(o)
896 var result = {}
897 for (var k in o)
898 result[k] = bus.deep_map(o[k], (x) => {
899 if (x && x.key) return {_key: x.key}
900 else return x
901 })
902 return result
903 }
904 // ...and the inverse
905 function inline_pointers (obj, bus) {
906 return bus.deep_map(obj, (o) => {
907 if (o && o._key) {
908 if (!bus.cache[o._key])
909 bus.cache[o._key] = {key: o._key}
910 return bus.cache[o._key]
911 } else return o
912 })
913 }
914 },
915
916 setup_usage_log (opts) {
917 bus.serve_time()
918 opts = opts || {}
919 opts.filename = opts.filename || 'db.sqlite'
920 var db = new (require('better-sqlite3'))(opts.filename)
921 bus.usage_log_db = db
922 //db.pragma('journal_mode = WAL')
923 db.prepare('create table if not exists usage (date integer, event text, details text)').run()
924 db.prepare('create index if not exists date_index on usage (date)').run()
925 var refresh_interval = 1000*60
926
927 var nots = ["details not like '%facebookexternalhit%'",
928 "details not like '%/apple-touch-icon%'",
929 "details not like '%Googlebot%'",
930 "details not like '%AdsBot-Google%'",
931 "details not like '%Google-Adwords-Instant%'",
932 "details not like '%Apache-HttpClient%'",
933 "details not like '%SafeDNSBot%'",
934 "details not like '%RevueBot%'",
935 "details not like '%MetaURI API%'",
936 "details not like '%redback/v%'",
937 "details not like '%Slackbot%'",
938 "details not like '%HTTP_Request2/%'",
939 "details not like '%python-requests/%'",
940 "details not like '%LightspeedSystemsCrawler/%'",
941 "details not like '%CipaCrawler/%'",
942 "details not like '%Twitterbot/%'",
943 "details not like '%Go-http-client/%'",
944 "details not like '%/cheese_service%'"
945 ].join(' and ')
946 bus.usage_log_nots = nots
947
948 // Aggregate all accesses by day, to get daily active users
949 bus('usage').to_fetch = () => {
950 bus.fetch('time/' + refresh_interval)
951 var days = []
952 var last_day
953 for (var row of db.prepare('select * from usage where '
954 + nots + ' order by date').iterate()) {
955 row.details = JSON.parse(row.details)
956 if (row.details.agent && row.details.agent.match(/bot/)) continue
957
958 var d = new Date(row.date * 1000)
959 var day = d.getFullYear() + '-' + (d.getMonth()+1) + '-' + d.getDate()
960 if (last_day !== day)
961 days.push({day: day, // Init
962 clients: new Set(),
963 ips: new Set(),
964 client_socket_opens: new Set(),
965 ip_socket_opens: new Set()
966 })
967 last_day = day
968
969 if (row.event === 'socket open') {
970 days[days.length-1].client_socket_opens.add(row.details.client)
971 days[days.length-1].ip_socket_opens.add(row.details.ip)
972 }
973 days[days.length-1].clients.add(row.details.client)
974 days[days.length-1].ips.add(row.details.ip)
975 }
976
977 for (var i=0; i<days.length; i++)
978 days[i] = {day: days[i].day,
979 ip_hits: days[i].ips.size,
980 client_hits: days[i].clients.size,
981 client_socket_opens: days[i].client_socket_opens.size,
982 ip_socket_opens: days[i].ip_socket_opens.size
983 }
984
985 return {_: days}
986 }
987
988 bus('recent_hits/*').to_fetch = (rest) => {
989 bus.fetch('time/' + refresh_interval)
990 var result = []
991 for (var row of db.prepare('select * from usage where '
992 + nots + ' order by date desc limit ?').iterate(
993 [parseInt(rest)])) {
994
995 row.details = JSON.parse(row.details)
996 if (row.details.agent && row.details.agent.match(/bot/)) continue
997
998 result.push({url: row.details.url, ip: row.details.ip, date: row.date})
999 }
1000
1001 return {_: result}
1002 }
1003
1004 bus('recent_referers/*').to_fetch = (rest) => {
1005 bus.fetch('time/' + refresh_interval)
1006 var result = []
1007 for (var row of db.prepare('select * from usage where '
1008 + nots + ' order by date desc limit ?').iterate(
1009 [parseInt(rest)])) {
1010
1011 row.details = JSON.parse(row.details)
1012 if (row.details.agent && row.details.agent.match(/bot/)) continue
1013
1014 if (row.details.referer && !row.details.referer.match(/^https:\/\/cheeseburgertherapy.com/))
1015 result.push({url: row.details.url, referer: row.details.referer,
1016 date: row.date})
1017 }
1018
1019 return {_: result}
1020 }
1021
1022
1023 function sock_open_time (sock_event) {
1024 var client = JSON.parse(sock_event.details).client
1025 if (!client) return null
1026
1027 var http_req = db.prepare('select * from usage where event = "http request" and date < ? and '
1028 + ' details like ? and '
1029 + nots + ' order by date desc limit 1').get([sock_event.date,
1030 '%'+client+'%'])
1031 if (!http_req) return null
1032
1033 var delay = sock_event.date - http_req.date
1034 var res = !delay || delay > 300 ? 'fail' : delay
1035 if (delay && delay < 300
1036 && JSON.parse(sock_event.details).ip
1037 !== JSON.parse(http_req.details).ip)
1038 console.error('Yuck!', delay, sock_event, http_req)
1039 return [res, JSON.parse(http_req.details).url]
1040 }
1041 function sock_open_times () {
1042 var opens = db.prepare('select * from usage where event = "socket open" and '
1043 + nots + ' order by date desc limit ?').all(500)
1044 var times = []
1045 for (var i=0; i<opens.length; i++) {
1046 times.push(sock_open_time(opens[i]))
1047 // Get the most recent http hit before this open, from the same client id
1048 // subract the times
1049 }
1050 return times
1051 }
1052
1053 bus('socket_load_times').to_fetch = () => {
1054 return {_: sock_open_times()}
1055 }
1056 },
1057 log_usage(event, details) {
1058 bus.usage_log_db.prepare('insert into usage (date, event, details) values (?, ?, ?)')
1059 .run([new Date().getTime()/1000,
1060 event,
1061 JSON.stringify(details)])
1062 },
1063
1064 serve_time () {
1065 if (bus('time*').to_fetch.length > 0)
1066 // Then it's already installed
1067 return
1068
1069 // Time ticker
1070 var timeouts = {}
1071 bus('time*').to_fetch =
1072 function (key, rest, t) {
1073 if (key === 'time') rest = '/1000'
1074 timeout = parseInt(rest.substr(1))
1075 function f () { bus.save.fire({key: key, time: Date.now()}) }
1076 timeouts[key] = setInterval(f, timeout)
1077 f()
1078 }
1079 bus('time*').to_forget =
1080 function (key, rest) {
1081 if (key === 'time') key = 'time/1000'
1082 clearTimeout(timeouts[key])
1083 }
1084 },
1085
1086 smtp (opts) {
1087 var bus = this
1088 // Listen for SMTP messages on port 25
1089 if (opts.domain) {
1090 var mailin = require('mailin')
1091 mailin.start({
1092 port: opts.port || 25,
1093 disableWebhook: true,
1094 host: opts.domain,
1095 smtpOptions: { SMTPBanner: opts.domain }
1096 })
1097 // Event emitted after a message was received and parsed.
1098 mailin.on('message', (connection, msg, raw) => {
1099 if (!msg.messageId) {
1100 console.log('Aborting message without id!', msg.subject, new Date().toLocaleString())
1101 return
1102 }
1103
1104 console.log(msg)
1105 console.log('Refs is', msg.references)
1106 console.log('Raw is', raw)
1107 var parent = msg.references
1108 if (parent) {
1109 if (Array.isArray(parent))
1110 parent = parent[0]
1111 var m = parent.match(/\<(.*)\>/)
1112 if (m && m[1])
1113 parent = m[1]
1114 }
1115 var from = msg.from
1116
1117 console.log('date is', msg.date, typeof(msg.date), msg.date.getTime())
1118 email = {
1119 key: "email/" + msg.messageId,
1120 _: {
1121 title: msg.subject,
1122 parent: parent && ("email/" + parent) || undefined,
1123 from: msg.from.address,
1124 to: msg.to.map(x=>x.address),
1125 cc: msg.cc.map(x=>x.address),
1126 date: msg.date.getTime() / 1000,
1127 text: msg.text,
1128 body: msg.text,
1129 html: msg.html
1130 }
1131 }
1132
1133 bus.save(email)
1134 })
1135 }
1136 },
1137
1138 serve_email (master, opts) {
1139 opts = opts || {}
1140 var client = this
1141 var email_regex = /^(([^<>()\[\]\\.,;:\s@"]+(\.[^<>()\[\]\\.,;:\s@"]+)*)|(".+"))@((\[[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}])|(([a-zA-Z\-0-9]+\.)+[a-zA-Z]{2,}))$/
1142 var peemail_regex = /^public$|^(([^<>()\[\]\\.,;:\s@"]+(\.[^<>()\[\]\\.,;:\s@"]+)*)|(".+"))@((\[[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}])|(([a-zA-Z\-0-9]+\.)+[a-zA-Z]{2,}))$/
1143 // var local_addie_regex = new RegExp('state://' + opts.domain + '/user/([^\/]+)')
1144 // Todo on Server:
1145 // - Connect with mailin, so we can receive SMTP shit
1146 //
1147 // - Is there security hole if users have a ? or a / in their name?
1148 // - Make the master.posts/ state not require /
1149 // - Make standard url tools for optional slashes, and ? query params
1150 // - Should a e.g. client to_save abort if it calls save that aborts?
1151 // - e.g. if master('posts*').to_save aborts
1152
1153 // Helpers
1154 function get_posts (args) {
1155 console.log('getting posts with', args)
1156 var can_see = [{cc: ['public']}]
1157 if (args.for)
1158 can_see.push({to: [args.for]},
1159 {cc: [args.for]},
1160 {from: [args.for]})
1161
1162 terms = '(' + can_see
1163 .map(x => "value @> '"+JSON.stringify({_:x})+"'")
1164 .join(' or ')
1165 + ')'
1166
1167 if (args.about) {
1168 // console.log('getting one about', args.about)
1169 var interested_in = [{to: [args.about]},
1170 {cc: [args.about]},
1171 {from: [args.about]}]
1172 terms += ' and (' + interested_in
1173 .map(x => "value @> '"+JSON.stringify({_:x})+"'")
1174 .join(' or ')
1175 + ')'
1176 }
1177
1178 var q = "select value from store where key like 'post/%' and " + terms
1179 q += " order by value #>'{_,date}' desc"
1180 if (args.to) q += ' limit ' + to
1181 console.log('with query', q)
1182 return master.pg_db.querySync(q).map(x=>x.value)
1183 }
1184 function post_children (post) {
1185 return master.pg_db.querySync(
1186 "select value from store where value #>'{_,parent}' = '"
1187 + JSON.stringify(post.key)
1188 + "' order by value #>'{_,date}' asc").map(x=>x.value)
1189 }
1190
1191 // function canonicalize_address (addie) {
1192 // if (opts.domain) {
1193 // // Try foo@gar.boo
1194 // var m = addie.match(email_regex)
1195 // if (m && m[5].toLowerCase() === opts.domain)
1196 // return 'user/' + m[1]
1197
1198 // // Try state://gar.boo/user/foo
1199 // m = addie.match(local_addie_regex)
1200 // if (m) return 'user/' + m[1]
1201 // }
1202 // return addie
1203 // }
1204
1205 // Define state on master
1206 if (master('posts_for/*').to_fetch.length === 0) {
1207 // Get posts for each user
1208 master('posts_for/*').to_fetch = (json) => {
1209 watch_for_dirt('posts-for/' + json.for)
1210 return {_: get_posts(json)}
1211 }
1212 // Saving any post will dirty the list for all users mentioned in
1213 // the post
1214 master('post/*').to_save = (old, New, t) => {
1215 // To do: diff the cc, to, and from lists, and only dirty
1216 // posts_for people who have been changed
1217
1218 if (!old._) old = {_:{to: [], from: [], cc: []}}
1219
1220 // old._.to = old._.to.map(a=>canonicalize_address(a))
1221 // old._.cc = old._.cc.map(a=>canonicalize_address(a))
1222 // old._.from = old._.from.map(a=>canonicalize_address(a))
1223 // New._.to = New._.to.map(a=>canonicalize_address(a))
1224 // New._.cc = New._.cc.map(a=>canonicalize_address(a))
1225 // New._.from = New._.from.map(a=>canonicalize_address(a))
1226
1227 var dirtied = old._.to.concat(New._.to)
1228 .concat(old._.cc).concat(New._.cc)
1229 .concat(old._.from).concat(New._.from)
1230 dirtied.forEach(u=>dirty('posts-for/' + u))
1231
1232 if (old._.parent) dirty(old._.parent)
1233 if (New._.parent) dirty(New._.parent)
1234
1235 t.done(New)
1236 }
1237 }
1238
1239 function user_addy (u) {
1240 return u.name + '@' + opts.domain
1241 }
1242 function current_addy () {
1243 var c = client.fetch('current_user')
1244 return c.logged_in ? user_addy(c.user) : 'public'
1245 }
1246 function is_author (post) {
1247 var from = post._.from
1248 var c = client.fetch('current_user')
1249 return from.includes(current_addy())
1250 }
1251 function can_see (post) {
1252 var c = client.fetch('current_user')
1253 var allowed = post._.to.concat(post._.cc).concat(post._.from)
1254 return allowed.includes(current_addy()) || allowed.includes('public')
1255 }
1256 var drtbus = master//require('./statebus')()
1257 function dirty (key) { drtbus.save({key: 'dirty-'+key, n: Math.random()}) }
1258 function watch_for_dirt (key) { drtbus.fetch('dirty-'+key) }
1259
1260 client('current_email').to_fetch = () => {
1261 return {_: current_addy()}
1262 }
1263
1264 // Define state on client
1265 client('posts*').to_fetch = (k, rest) => {
1266 var args = bus.parse(rest.substr(1))
1267 var c = client.fetch('current_user')
1268 args.for = current_addy()
1269 var e = master.fetch('posts_for/' + JSON.stringify(args))
1270 return {_: master.clone(e._).map(x=>client.fetch(x.key))}
1271 }
1272
1273 client('post/*').to_fetch = (k) => {
1274 var e = master.clone(master.fetch(k))
1275 if (!e._) return {}
1276 if (!can_see(e))
1277 return {}
1278
1279 e._.children = post_children(e).map(e=>e.key)
1280 watch_for_dirt(k)
1281
1282 return e
1283 }
1284
1285 client('post/*').to_save = (o, t) => {
1286 if (!(client.validate(o, {key: 'string',
1287 _: {to: 'array',
1288 cc: 'array',
1289 from: 'array',
1290 date: 'number',
1291 '?parent': 'string',
1292 body: 'string',
1293 '?title': 'string',
1294 '*': '*'}})
1295 && o._.to.every(a=>a.match(peemail_regex))
1296 && o._.cc.every(a=>a.match(peemail_regex))
1297 && o._.from.every(a=>a.match(peemail_regex)))) {
1298 console.error('post no be valid', o)
1299 t.abort()
1300 return
1301 }
1302
1303 var c = client.fetch('current_user')
1304
1305 // Make sure this user is an author
1306 var from = o._.from
1307 if (!is_author(o)) {
1308 console.error('User', current_addy(),
1309 'is not author', o._.from)
1310 t.abort()
1311 return
1312 }
1313 o = client.clone(o)
1314 delete o.children
1315
1316 master.save(o)
1317 t.done()
1318 }
1319
1320 client('post/*').to_delete = (key, o, t) => {
1321 // Validate
1322 if (!is_author(o)) {
1323 console.error('User', current_addy(),
1324 'is not author', o._.from)
1325 t.abort()
1326 return
1327 }
1328
1329 // master.delete(key)
1330 master.pg_db.query('delete from store where key = $1', [key])
1331
1332 // Dirty everybody
1333 var dirtied = o._.to.concat(o._.cc).concat(o._.from)
1334 dirtied.forEach(u => dirty('posts-for/' + u))
1335 if (o._.parent) dirty(o._.parent)
1336
1337 // To do: handle nested threads. Either detach them, or insert a
1338 // 'deleted' stub, or splice together
1339
1340 // Complete
1341 t.done()
1342 }
1343
1344 client('friends').to_fetch = t => {
1345 return {_: (master.fetch('users').all||[])
1346 .map(u=>client.fetch('email/' + user_addy(u)))
1347 .concat([client.fetch('email/public')])
1348 }
1349 }
1350
1351 client('email/*').to_fetch = (rest) => {
1352 var m = rest.match(email_regex)
1353 var result = {address: rest}
1354 if (rest === 'public') {
1355 result.name = 'public'
1356 }
1357 else if (m && m[5].toLowerCase() === opts.domain) {
1358 result.user = client.fetch('user/' + m[1])
1359 result.name = result.user.name
1360 result.pic = result.user.pic
1361 result.upgraded = true
1362 }
1363 else if (m) {
1364 result.name = m[1]
1365 result.upgraded = false
1366 }
1367
1368 return result
1369 }
1370 },
1371
1372 sqlite_query_server: function sqlite_query_server (db) {
1373 var fetch = bus.fetch
1374 bus('table_columns/*').to_fetch =
1375 function fetch_table_columns (key, rest) {
1376 if (typeof key !== 'string')
1377 console.log(handlers.hash)
1378 var table_name = rest
1379 var columns = fetch('sql/PRAGMA table_info(' + table_name + ')').rows.slice()
1380 var foreign_keys = fetch('table_foreign_keys/' + table_name)
1381 var column_info = {}
1382 for (var i=0;i< columns .length;i++) {
1383 var col = columns[i].name
1384 column_info[col] = columns[i]
1385 // if (col === 'customer' || col === 'customers')
1386 // console.log('FOR CUSTOMER, got', col, foreign_keys[col])
1387 column_info[col].foreign_key = foreign_keys[col] && foreign_keys[col].table
1388 columns[i] = col
1389 }
1390 columns.splice(columns[columns.indexOf('id')], 1)
1391 column_info['key'] = column_info['id']
1392 delete column_info['id']
1393 return {columns:columns, column_info:column_info}
1394 }
1395
1396
1397 bus('table_foreign_keys/*').to_fetch =
1398 function table_foreign_keys (key, rest) {
1399 var table_name = rest
1400 var foreign_keys = fetch('sql/PRAGMA foreign_key_list(' + table_name + ')').rows
1401 var result = {}
1402 for (var i=0;i< foreign_keys .length;i++)
1403 result[foreign_keys[i].from] = foreign_keys[i]
1404 delete result.id
1405 result.key = key
1406 return result
1407 }
1408
1409 bus('sql/*').to_fetch =
1410 function sql (key, rest) {
1411 fetch('time/60000')
1412 var query = rest
1413 try { query = JSON.parse(query) }
1414 catch (e) { query = {stmt: query, args: []} }
1415
1416 db.all(query.stmt, query.args,
1417 function (err, rows) {
1418 if (rows) bus.save.fire({key:key, rows: rows})
1419 else console.error('Bad sqlite query', key, err)
1420 }.bind(this))
1421 }
1422 },
1423
1424 sqlite_table_server: function sqlite_table_server(db, table_name) {
1425 var save = bus.save, fetch = bus.fetch
1426 var table_columns = fetch('table_columns/'+table_name) // This will fail if used too soon
1427 var foreign_keys = fetch('table_foreign_keys/'+table_name)
1428 var remapped_keys = fetch('remapped_keys')
1429 remapped_keys.keys = remapped_keys.keys || {}
1430 remapped_keys.revs = remapped_keys.revs || {}
1431 function row_json (row) {
1432 var result = {key: table_name + '/' + row.id}
1433 for (var k in row)
1434 if (row.hasOwnProperty(k))
1435 result[k] = (foreign_keys[k] && row[k]
1436 ? foreign_keys[k].table + '/' + row[k]
1437 : result[k] = row[k])
1438 if (result.hasOwnProperty('other')) result.other = JSON.parse(result.other || '{}')
1439 delete result.id
1440 return result
1441 }
1442 function json_values (json) {
1443 var columns = table_columns.columns
1444 var result = []
1445 for (var i=0; i<columns.length; i++) {
1446 var col = columns[i]
1447 var val = json[col]
1448
1449 // JSONify the `other' column
1450 if (col === 'other')
1451 val = JSON.stringify(val || {})
1452
1453 // Convert foreign keys from /customer/3 to 3
1454 else if (foreign_keys[col] && typeof val === 'string') {
1455 val = remapped_keys.keys[val] || val
1456 val = json[columns[i]].split('/')[1]
1457 }
1458
1459 result.push(val)
1460 }
1461 return result
1462 }
1463 function render_table () {
1464 var result = []
1465 db.all('select * from ' + table_name, function (err, rows) {
1466 if (err) console.error('Problem with table!', table_name, err)
1467 for (var i=0; i<rows.length; i++)
1468 result[i] = row_json(rows[i])
1469 bus.save.fire({key: table_name, rows: result})
1470 })
1471 }
1472 function render_row(obj) {
1473 bus.save.fire(row_json(obj))
1474 if (remapped_keys.revs[obj.key]) {
1475 var alias = bus.clone(obj)
1476 alias.key = remapped_keys.revs[obj.key]
1477 bus.save.fire(row_json(alias))
1478 }
1479 }
1480
1481 // ************************
1482 // Handlers!
1483 // ************************
1484
1485 // Fetching the whole table, or a single row
1486 bus(table_name + '*').to_fetch = function (key, rest) {
1487 if (rest === '')
1488 // Return the whole table
1489 return render_table()
1490
1491 if (rest[0] !== '/') return {error: 'bad key: ' + key}
1492 key = remapped_keys.keys[key] || key
1493
1494 var id = key.split('/')[1]
1495 db.get('select * from '+table_name+' where rowid = ?',
1496 [id],
1497 function (err, row) {
1498 if (!row)
1499 { console.log('Row', id, "don't exist.", err); return }
1500
1501 render_row(row)
1502 }.bind(this))
1503 }
1504
1505 // Saving a row
1506 bus(table_name + '/*').to_save = function (obj, rest) {
1507 var columns = table_columns.columns
1508 var key = remapped_keys.keys[obj.key] || obj.key
1509
1510 // Compose the query statement
1511 var stmt = 'update ' + table_name + ' set '
1512 var rowid = rest
1513 var vals = json_values(obj)
1514 for (var i=0; i<columns.length; i++) {
1515 stmt += columns[i] + ' = ?'
1516 //vals.push(obj[columns[i]])
1517 if (i < columns.length - 1)
1518 stmt += ', '
1519 }
1520 stmt += ' where rowid = ?'
1521 vals.push(rowid)
1522
1523 // Run the query
1524 db.run(stmt, vals,
1525 function (e,r) {
1526 console.log('updated',e,r,key)
1527 bus.dirty(key)
1528 })
1529 }
1530
1531 // Inserting a new row
1532 bus('new/' + table_name + '/*').to_save = function (obj) {
1533 var columns = table_columns.columns
1534 var stmt = ('insert into ' + table_name + ' (' + columns.join(',')
1535 + ') values (' + new Array(columns.length).join('?,') + '?)')
1536 var values = json_values(obj)
1537
1538 console.log('Sqlite:' + stmt)
1539
1540 db.run(stmt, values, function (error) {
1541 if (error) console.log('INSERT error!', error)
1542 console.log('insert complete, got id', this.lastID)
1543 remapped_keys.keys[obj.key] = table_name + '/' + this.lastID
1544 remapped_keys.revs[remapped_keys.keys[obj.key]] = obj.key
1545 save(remapped_keys)
1546 render_table()
1547 })
1548 }
1549
1550 // Deleting a row
1551 bus(table_name + '/*').to_delete = function (key, rest) {
1552 if (remapped_keys.keys[key]) {
1553 var old_key = key
1554 var new_key = remapped_keys.keys[key]
1555 delete remapped_keys.keys[old_key]
1556 delete remapped_keys.revs[new_key]
1557 key = new_key
1558 }
1559
1560 var stmt = 'delete from ' + table_name + ' where rowid = ?'
1561 var rowid = rest
1562 console.log('DELETE', stmt)
1563 db.run(stmt, [rowid],
1564 function (err) {
1565 if (err) console.log('DELETE error', err)
1566 else console.log('delete complete')
1567 render_table() })
1568 }
1569 },
1570
1571 serves_auth: function serves_auth (conn, master) {
1572 var client = this // to keep me straight while programming
1573
1574 function logout (key) {
1575 var clients = master.fetch('logged_in_clients')
1576 for (var k in clients)
1577 if (k !== 'key')
1578 if (Object.keys(clients[k]).length === 0) {
1579 client.log('Found a deleted user. Removing.', k, clients[k])
1580 delete clients[k]
1581 master.save(clients)
1582 } else if (clients[k].key === key) {
1583 client.log('Logging out!', k, clients[k])
1584 delete clients[k]
1585 master.save(clients)
1586 }
1587 }
1588
1589 // Initialize master
1590 if (!master.auth_initialized) {
1591 master('users/passwords').to_fetch = function (k) {
1592 master.log('users/passwords.to_fetch: Computing!')
1593 var result = {key: 'users/passwords'}
1594 var users = master.fetch('users')
1595 users.all = users.all || []
1596 for (var i=0; i<users.all.length; i++) {
1597 var u = master.fetch(users.all[i])
1598 if (!(u.login || u.name)) {
1599 console.error('upass: this user has bogus name/login', u.key, u.name, u.login)
1600 continue
1601 }
1602 var login = (u.login || u.name).toLowerCase()
1603 console.assert(login, 'Missing login for user', u)
1604 if (result.hasOwnProperty(login)) {
1605 console.error("upass: this user's name is bogus, dude.", u.key)
1606 continue
1607 }
1608 result[login] = {user: u.key, pass: u.pass}
1609 }
1610 return result
1611 }
1612 master.auth_initialized = true
1613 master.fetch('users/passwords')
1614
1615 master('user/*').to_delete = (key, t) => {
1616 master.log('Deleteinggg!!!', key)
1617 // Remove from users.all
1618 var users = master.fetch('users')
1619 users.all = users.all.filter(u => u.key && u.key !== key)
1620 master.save(users)
1621
1622 // Log out
1623 master.log('Logging out', key)
1624 logout(key)
1625
1626 // Remove connection
1627 master.log('Removing connections for', key)
1628 var conns = master.fetch('connections')
1629 for (var k in conns) {
1630 console.log('Trying key', k)
1631 if (k !== 'key')
1632 if (conns[k].user && !conns[k].user.key) {
1633 console.log('Cleaning keyless user', conss[k].user)
1634 delete conns[k].user
1635 master.save(conns)
1636 continue
1637 }
1638 }
1639
1640 master.log('Dirtying users/passwords for', key)
1641 master.dirty('users/passwords')
1642
1643 // Done.
1644 t.done()
1645 }
1646 }
1647
1648 // Authentication functions
1649 function authenticate (login, pass) {
1650 var userpass = master.fetch('users/passwords')[login.toLowerCase()]
1651 master.log('authenticate: we see', {
1652 passwords: master.fetch('users/passwords'),
1653 hash_to_match: userpass && userpass.pass,
1654 password_guess: pass
1655 })
1656
1657 if (!(typeof login === 'string' && typeof pass === 'string')) return false
1658 if (login === 'key') return false
1659 if (!userpass || !userpass.pass) return null
1660
1661 // console.log('comparing passwords', pass, userpass.pass)
1662 if (require('bcrypt-nodejs').compareSync(pass, userpass.pass))
1663 return master.fetch(userpass.user)
1664 }
1665 function create_account (params) {
1666 if (typeof (params.login || params.name) !== 'string')
1667 throw 'no login or name'
1668 var login = (params.login || params.name).toLowerCase()
1669 if (!login ||
1670 !master.validate(params, {'?name': 'string', '?login': 'string',
1671 pass: 'string', '?email': 'string',
1672 '?key': undefined, '*': '*'}))
1673 throw 'invalid name, login, pass, or email'
1674
1675 var passes = master.fetch('users/passwords')
1676 if (passes.hasOwnProperty(login))
1677 throw 'there is already a user with that login or name'
1678
1679 // Hash password
1680 params.pass = require('bcrypt-nodejs').hashSync(params.pass)
1681
1682 // Choose account key
1683 //
1684 // NOTE: Checking if master.cache has the key probably sucks,
1685 // because if something fetches that key (hoping it exists), it
1686 // might create it, even though the account hasn't been created
1687 // yet, and then this will rename it to some random ID.
1688 //
1689 var key = 'user/' + params.name
1690 if (!params.name)
1691 key = 'user/' + Math.random().toString(36).substring(7,13)
1692 while (master.cache.hasOwnProperty(key))
1693 key = 'user/' + Math.random().toString(36).substring(7,13)
1694
1695 // Make account object
1696 var new_account = {key: key,
1697 name: params.name,
1698 login: params.login,
1699 pass: params.pass,
1700 email: params.email }
1701 for (var k in new_account) if (!new_account[k]) delete new_account[k]
1702 master.save(new_account)
1703
1704 var users = master.fetch('users')
1705 users.all = users.all || []
1706 users.all.push(new_account)
1707 passes[login] = {user: new_account.key, pass: new_account.pass}
1708 master.save(users)
1709 master.save(passes)
1710 }
1711
1712 // Current User
1713 client('current_user').to_fetch = function (k) {
1714 client.log('* fetching: current_user')
1715 if (!conn.client) return
1716 var u = master.fetch('logged_in_clients')[conn.client]
1717 u = u && user_obj(u.key, true)
1718 return {user: u || null, logged_in: !!u}
1719 }
1720
1721 client('current_user').to_save = function (o, t) {
1722 function error (msg) {
1723 client.save.abort(o)
1724 var c = client.fetch('current_user')
1725 c.error = msg
1726 client.save.fire(c)
1727 }
1728
1729 client.log('* saving: current_user!')
1730 if (o.client && !conn.client) {
1731 // Set the client
1732 conn.client = o.client
1733 client.client_id = o.client
1734 client.client_ip = conn.remoteAddress
1735
1736 if (conn.id) {
1737 var connections = master.fetch('connections')
1738 connections[conn.id].user = master.fetch('logged_in_clients')[conn.client]
1739 master.save(connections)
1740 }
1741 }
1742 else {
1743 if (o.create_account) {
1744 client.log('current_user: creating account')
1745 try {
1746 create_account(o.create_account)
1747 client.log('Success creating account!')
1748 var cu = client.fetch('current_user')
1749 cu.create_account = null
1750 client.save.fire(cu)
1751 } catch (e) {
1752 error('Cannot create that account because ' + e)
1753 return
1754 }
1755 }
1756
1757 if (o.login_as && conn.id) {
1758 // Then client is trying to log in
1759 client.log('current_user: trying to log in')
1760 var creds = o.login_as
1761 var login = creds.login || creds.name
1762 if (login && creds.pass) {
1763 // With a username and password
1764 var u = authenticate(login, creds.pass)
1765
1766 client.log('auth said:', u)
1767 if (u) {
1768 // Success!
1769 // Associate this user with this session
1770 // user.log('Logging the user in!', u)
1771
1772 var clients = master.fetch('logged_in_clients')
1773 var connections = master.fetch('connections')
1774
1775 clients[conn.client] = u
1776 connections[conn.id].user = u
1777
1778 master.save(clients)
1779 master.save(connections)
1780
1781 client.log('current_user: success logging in!')
1782 }
1783 else {
1784 error('Cannot log in with that information')
1785 return
1786 }
1787 }
1788 else {
1789 error('Cannot log in with that information')
1790 return
1791 }
1792 }
1793
1794 else if (o.logout && conn.id) {
1795 client.log('current_user: logging out')
1796 var clients = master.fetch('logged_in_clients')
1797 var connections = master.fetch('connections')
1798
1799 delete clients[conn.client]
1800 connections[conn.id].user = null
1801
1802 master.save(clients)
1803 master.save(connections)
1804 }
1805 }
1806
1807 t.refetch()
1808 }
1809 client('current_user').to_delete = function () {}
1810
1811 // Users have closet space at /user/<name>/*
1812 var closet_space_key = /^(user\/[^\/]+)\/.*/
1813 var private_closet_space_key = /^user\/[^\/]+\/private.*/
1814
1815 // User
1816 client('user/*').to_save = function (o, t) {
1817 var c = client.fetch('current_user')
1818 var user_key = o.key.match(/^user\/([^\/]+)/)
1819 user_key = user_key && ('user/' + user_key[1])
1820
1821 // Only the current user can touch themself.
1822 if (!c.logged_in || c.user.key !== user_key) {
1823 client.log('Only the current user can touch themself.',
1824 {logged_in: c.logged_in, as: c.user && c.user.key,
1825 touching: user_key})
1826 client.save.abort(o)
1827 return
1828 }
1829
1830 // Users have closet space at /user/<name>/*
1831 if (o.key.match(closet_space_key)) {
1832 client.log('saving closet data')
1833 master.save(o, t)
1834 return
1835 }
1836
1837 // Ok, then it must be a plain user
1838 console.assert(o.key.match(/^user\/[^\/]+$/))
1839
1840 // Validate types
1841 if (!client.validate(o, {key: 'string', '?login': 'string', '?name': 'string',
1842 '?pass': 'string', '?email': 'string', /*'?pic': 'string',*/
1843 '*':'*'})) {
1844 client.log('This user change fails validation.')
1845 client.save.abort(o)
1846 return
1847 }
1848
1849 // Rules for updating "login" and "name" attributes:
1850 // • If "login" isn't specified, then we use "name" as login
1851 // • That resulting login must be unique across all users
1852
1853 // There must be at least a login or a name
1854 var login = o.login || o.name
1855 if (!login) {
1856 client.log('User must have a login or a name')
1857 client.save.abort(o)
1858 return
1859 }
1860
1861 var u = master.fetch(o.key)
1862 var userpass = master.fetch('users/passwords')
1863
1864 // Validate that the login/name is not changed to something clobberish
1865 var old_login = u.login || u.name
1866 if (old_login.toLowerCase() !== login.toLowerCase()
1867 && userpass.hasOwnProperty(login)) {
1868 client.log('The login', login, 'is already taken. Aborting.')
1869 client.save.abort(o) // Abort
1870
1871 o = client.fetch(o.key) // Add error message
1872 o.error = 'The login "' + login + '" is already taken'
1873 client.save.fire(o)
1874
1875 return // And exit
1876 }
1877
1878 // Now we can update login and name
1879 u.login = o.login
1880 u.name = o.name
1881
1882 // Hash password
1883 o.pass = o.pass && require('bcrypt-nodejs').hashSync(o.pass)
1884 u.pass = o.pass || u.pass
1885
1886 // // Users can have pictures (remove this soon)
1887 // // Bug: if user changes name, this picture's url doesn't change.
1888 // if (o.pic && o.pic.indexOf('data:image') > -1) {
1889 // var img_type = o.pic.match(/^data:image\/(\w+);base64,/)[1]
1890 // var b64 = o.pic.replace(/^data:image\/\w+;base64,/, '')
1891 // var upload_dir = global.upload_dir
1892 // // ensure that the uploads directory exists
1893 // if (!fs.existsSync(upload_dir))
1894 // fs.mkdirSync(upload_dir)
1895
1896 // // bug: users with the same name can overwrite each other's files
1897 // u.pic = u.name + '.' + img_type
1898 // fs.writeFile(upload_dir + u.pic, b64, {encoding: 'base64'})
1899 // }
1900
1901 // For anything else, go ahead and add it to the user object
1902 var protected = {key:1, name:1, /*pic:1,*/ pass:1}
1903 for (var k in o)
1904 if (!protected.hasOwnProperty(k))
1905 u[k] = o[k]
1906 for (var k in u)
1907 if (!protected.hasOwnProperty(k) && !o.hasOwnProperty(k))
1908 delete u[k]
1909
1910 master.save(u)
1911 }
1912 client('user/*').to_fetch = function user_fetcher (k) {
1913 var c = client.fetch('current_user')
1914 client.log('* fetching:', k, 'as', c.user)
1915
1916 // Users have closet space at /user/<name>/*
1917 if (k.match(closet_space_key)) {
1918 var obj_user = k.match(closet_space_key)[1]
1919 if (k.match(private_closet_space_key)
1920 && (!c.user || obj_user !== c.user.key)) {
1921 client.log('hiding private closet data')
1922 return {}
1923 }
1924 client.log('fetching closet data')
1925 return client.clone(master.fetch(k))
1926 }
1927
1928 // Otherwise return the actual user
1929 return user_obj(k, c.logged_in && c.user.key === k)
1930 }
1931 client('user/*').to_delete = function () {}
1932 function user_obj (k, logged_in) {
1933 var o = master.clone(master.fetch(k))
1934 if (k.match(/^user\/([^\/]+)\/private\/(.*)$/))
1935 return logged_in ? o : {key: k}
1936
1937 delete o.pass
1938 if (!logged_in) {delete o.email; delete o.login}
1939 return o
1940 }
1941
1942 // Blacklist sensitive stuff on master, in case we have a shadow set up
1943 var blacklist = 'users users/passwords logged_in_clients'.split(' ')
1944 for (var i=0; i<blacklist.length; i++) {
1945 client(blacklist[i]).to_fetch = function () {}
1946 client(blacklist[i]).to_save = function () {}
1947 client(blacklist[i]).to_delete = function () {}
1948 client(blacklist[i]).to_forget = function () {}
1949 }
1950 },
1951
1952 persist: function (prefix_to_sync, validate) {
1953 var client = this
1954 var was_logged_in = undefined
1955
1956 function client_prefix (current_user) {
1957 return 'client/' + (current_user.logged_in
1958 ? current_user.user.key.substr('user/'.length)
1959 : client.client_id) + '/'
1960 }
1961
1962 function copy_client_to_user(client, user) {
1963 var old_prefix = 'client/' + client.client_id
1964 var new_prefix = 'client/' + user.key.substr('user/'.length)
1965
1966 var keys = client.master.fetch('persisted_keys/' + client.client_id)
1967 if (!keys.val) return
1968 for (var old_key in keys.val) {
1969 var new_key = new_prefix + old_key.substr(old_prefix.length)
1970 var o = client.clone(client.master.fetch(old_key))
1971 // Delete the old
1972 client.master.del(old_key)
1973
1974 var new_o = client.master.fetch(new_key)
1975 // If the new key doesn't clobber any existing data on the user...
1976 if (Object.keys(new_o).length === 1) {
1977 // Save the new
1978 o.key = new_key
1979 client.master.save(o)
1980 }
1981 }
1982 keys.val = {}
1983 client.master.save(keys)
1984
1985 // var cache = client.master.cache
1986
1987 // var keys = Object.keys(cache) // Make a copy
1988 // for (var i=0; i<keys.length; i++) { // Because we'll mutate
1989 // var old_key = keys[i] // As we iterate
1990
1991 // if (old_key.startsWith(old_prefix)) {
1992 // var new_key = new_prefix + old_key.substr(old_prefix.length)
1993 // var o = client.clone(cache[old_key])
1994 // // Delete the old
1995 // client.master.del(old_key)
1996
1997 // if (!(cache.hasOwnProperty(new_key))) {
1998 // // Save the new
1999 // o.key = new_key
2000 // client.master.save(o)
2001 // }
2002 // }
2003 // }
2004 }
2005
2006 // Copy client to user if we log in
2007 client(_=>{
2008 var c = client.fetch('current_user')
2009 if (client.loading()) return
2010 if (was_logged_in == false && c.logged_in)
2011 // User just logged in! Let's copy his stuff over
2012 copy_client_to_user(client, c.user)
2013 was_logged_in = c.logged_in
2014 })
2015
2016 client(prefix_to_sync).to_fetch = function (key) {
2017 var c = client.fetch('current_user')
2018 if (client.loading()) return
2019 var prefix = client_prefix(c)
2020
2021 // Get the state from master
2022 var obj = client.clone(client.master.fetch(prefix + key))
2023
2024 // Translate it back to client
2025 obj = client.deep_map(obj, function (o) {
2026 if (typeof o === 'object' && 'key' in o && typeof o.key === 'string')
2027 o.key = o.key.substr(prefix.length)
2028 return o
2029 })
2030 return obj
2031 }
2032
2033 client(prefix_to_sync).to_save = function (obj) {
2034 if (validate && !validate(obj)) {
2035 console.warn('Validation failed on', obj)
2036 client.save.abort(obj)
2037 return
2038 }
2039
2040 var c = client.fetch('current_user')
2041 if (client.loading()) return
2042 var prefix = client_prefix(c)
2043
2044 // Make it safe
2045 var p_keys = client_persisted_keys()
2046 obj = client.clone(obj)
2047 obj = client.deep_map(obj, function (o) {
2048 if (typeof o === 'object' && 'key' in o && typeof o.key === 'string') {
2049 o.key = prefix + o.key
2050 if (p_keys)
2051 p_keys.val[o.key] = true
2052 }
2053 return o
2054 })
2055
2056 // Save to master
2057 client.master.save(obj)
2058 p_keys && client.master.save(p_keys)
2059 }
2060
2061 client(prefix_to_sync).to_delete = function (k) {
2062 k = client_prefix(client.fetch('current_user')) + k
2063 client.master.delete(k)
2064
2065 var p_keys = client_persisted_keys()
2066 delete p_keys.val[k]
2067 client.master.save(p_keys)
2068 }
2069
2070 function client_persisted_keys () {
2071 if (client.fetch('current_user').logged_in) return
2072 var result = client.master.fetch('persisted_keys/' + client.client_id)
2073 if (result && !result.val) result.val = {}
2074 return result
2075 }
2076 },
2077
2078 shadows: function shadows (master_bus) {
2079 // Custom route
2080 var OG_route = bus.route
2081 bus.route = function(key, method, arg, t) {
2082 var count = OG_route(key, method, arg, t)
2083 // This forwards anything we don't have a specific handler for
2084 // to the global cache
2085 if (count === 0) {
2086 count++
2087 if (method === 'to_fetch')
2088 bus.run_handler(function get_from_master (k) {
2089 // console.log('DEFAULT FETCHing', k)
2090 var r = master_bus.fetch(k)
2091 // console.log('DEFAULT FETCHed', r)
2092 bus.save.fire(r, {version: master_bus.versions[r.key]})
2093 }, method, arg)
2094 else if (method === 'to_save')
2095 bus.run_handler(function save_to_master (o, t) {
2096 // console.log('DEFAULT ROUTE', t)
2097 master_bus.save(bus.clone(o), t)
2098 }, method, arg, {t: t})
2099 else if (method == 'to_delete')
2100 bus.run_handler(function delete_from_master (k, t) {
2101 master_bus.delete(k)
2102 return 'done'
2103 }, method, arg, {t: t})
2104 }
2105 return count
2106 }
2107 },
2108
2109 read_file: function init () {
2110 // The first time this is run, we initialize it by loading some
2111 // libraries
2112 var chokidar = require('chokidar')
2113 var watchers = {}
2114 var fs = require('fs')
2115
2116 // Now we redefine the function
2117 bus.read_file = bus.uncallback(
2118 function readFile (filename, encoding, cb) {
2119 fs.readFile(filename, (err, result) => {
2120 if (err) console.error('Error from read_file:', err)
2121 cb(null, ((result || '*error*').toString(encoding || undefined)))
2122 })
2123 },
2124 {
2125 callback_at: 2,
2126 start_watching: (args, dirty, del) => {
2127 var filename = args[0]
2128 console.log('## starting to watch', filename)
2129 watchers[filename] = chokidar.watch(filename, {
2130 atomic: true,
2131 disableGlobbing: true
2132 })
2133 watchers[filename].on('change', () => { dirty() })
2134 watchers[filename].on('add', () => { dirty() })
2135 watchers[filename].on('unlink', () => { del() })
2136 },
2137 stop_watching: (json) => {
2138 var filename = json[0]
2139 console.log('## stopping to watch', filename)
2140 // log('unwatching', filename)
2141 // To do: this should probably use.unwatch() instead.
2142 watchers[filename].close()
2143 delete watchers[filename]
2144 }
2145 })
2146 return bus.read_file.apply(bus, [].slice.call(arguments))
2147 },
2148
2149 // Synchronizes the recursive path starting with <state_path> to the
2150 // file or recursive directory structure at fs_path
2151 sync_files: function sync_files (state_path, file_path) {
2152 // To do:
2153 // - Hook up a to_delete handler
2154 // - recursively remove directories if all files gone
2155
2156 console.assert(state_path.substr(-1) !== '*'
2157 && (!file_path || file_path.substr(-1) !== '*'),
2158 'The sync_files paths should not end with *')
2159
2160 file_path = file_path || state_path
2161 var buffer = {}
2162 var full_file_path = require('path').join(__dirname, file_path)
2163
2164 bus(state_path + '*').to_fetch = (rest) => {
2165 // We DO want to handle:
2166 // - "foo"
2167 // - "foo/*"
2168 // But not:
2169 // - "foobar"
2170 if (rest.length>0 && rest[0] !== '/') return // Bail on e.g. "foobar"
2171
2172 var f = bus.read_file(file_path + rest, 'base64')
2173
2174 // Clear buffer of items after 1 second. If fs results are delayed
2175 // longer, we'll just deal with those flashbacks.
2176 for (k in buffer)
2177 if (new Date().getTime() - buffer[k] > 1 * 1000)
2178 delete buffer[k]
2179
2180 // If we are expecting this, skip the read
2181 // console.log('read file', typeof f == 'string' ? f.substr(0,40) + '..': f)
2182 if (buffer[f]) {
2183 console.log('skipping cause its in buffer')
2184 return
2185 }
2186
2187 return {_:f}
2188 }
2189
2190 bus(state_path + '/*').to_save = (o, rest, t) => {
2191 if (rest.length>0 && rest[0] !== '/') return
2192 var f = Buffer.from(o._, 'base64')
2193 require('fs').writeFile(file_path + rest, f,
2194 (e) => {if (!e) t.done()})
2195 buffer[f] = new Date().getTime()
2196 }
2197
2198 bus.http.use('/'+state_path, require('express').static(full_file_path))
2199 },
2200
2201 // Installs a GET handler at route that gets state from a fetcher function
2202 // Note: Makes too many textbusses. Should re-use one.
2203 http_serve: function http_serve (route, fetcher) {
2204 var textbus = require('./statebus')()
2205 textbus.label = 'textbus'
2206 var watched = new Set()
2207 textbus('*').to_fetch = (filename, old) => {
2208 return {etag: Math.random() + '',
2209 _: fetcher(filename)}
2210 }
2211 bus.http.get(route, (req, res) => {
2212 var path = req.path
2213 var etag = textbus.cache[path] && textbus.cache[path].etag
2214 if (etag && req.get('If-None-Match') === etag) {
2215 res.status(304).end()
2216 return
2217 }
2218
2219 textbus.fetch(req.path) // So that textbus never clears the cache
2220 textbus.fetch(req.path, function cb (o) {
2221 res.setHeader('Cache-Control', 'public')
2222 // res.setHeader('Cache-Control', 'public, max-age='
2223 // + (60 * 60 * 24 * 30)) // 1 month
2224 res.setHeader('ETag', o.etag)
2225 res.setHeader('Access-Control-Allow-Origin', '*')
2226 res.setHeader('Content-Type', 'application/javascript')
2227 res.send(o._)
2228 textbus.forget(o.key, cb) // But we do want to forget the cb
2229 })
2230 })
2231 },
2232
2233 serve_client_coffee: function serve_client_coffee () {
2234 bus.http_serve('/client/:filename', (filename) => {
2235 filename = /\/client\/(.*)/.exec(filename)[0]
2236 var source_filename = filename.substr(1)
2237 var source = bus.read_file(source_filename)
2238 if (bus.loading()) throw 'loading'
2239 if (filename.match(/\.coffee$/))
2240 return bus.compile_coffee(source, source_filename)
2241 else
2242 return source
2243 })
2244 },
2245
2246 compile_coffee: function compile_coffee (source, filename) {
2247 try {
2248 var compiled = require('coffeescript').compile(source, {filename,
2249 bare: true,
2250 sourceMap: true})
2251 } catch (e) {
2252 console.error('Could not compile ' + e.toString())
2253 return 'console.error(' + JSON.stringify(e.toString()) + ')'
2254 }
2255
2256 var source_map = JSON.parse(compiled.v3SourceMap)
2257 source_map.sourcesContent = source
2258 function btoa(s) { return new Buffer(s.toString(),'binary').toString('base64') }
2259
2260 // Base64 encode it
2261 compiled = compiled.js
2262 compiled += '\n'
2263 compiled += '//# sourceMappingURL=data:application/json;base64,'
2264 compiled += btoa(unescape(encodeURIComponent(JSON.stringify(source_map)))) + '\n'
2265 compiled += '//# sourceURL=' + filename
2266 return compiled
2267 },
2268
2269 serve_clientjs: function serve_clientjs (path) {
2270 path = path || 'client.js'
2271 bus.http.get('/' + path, (req, res) => {
2272 res.send(
2273 ['extras/coffee.js', 'extras/sockjs.js', 'extras/react.js',
2274 'statebus.js', 'client.js']
2275 .map((f) => fs.readFileSync('node_modules/statebus/' + f))
2276 .join(';\n'))
2277 })
2278 },
2279
2280 serve_wiki: () => {
2281 bus('edit/*').to_fetch = () => ({_: require('./extras/wiki.coffee').code})
2282 },
2283
2284 unix_socket_repl: function (filename) {
2285 var repl = require('repl')
2286 var net = require('net')
2287 var fs = require('fs')
2288 if (fs.existsSync && fs.existsSync(filename))
2289 fs.unlinkSync(filename)
2290 net.createServer(function (socket) {
2291 var r = repl.start({
2292 //prompt: '> '
2293 input: socket
2294 , output: socket
2295 , terminal: true
2296 //, useGlobal: false
2297 })
2298 r.on('exit', function () {
2299 socket.end()
2300 })
2301 r.context.socket = socket
2302 }).listen(filename)
2303 },
2304
2305 schema: function schema () {
2306 function url_tree (cache) {
2307 // The key tree looks like:
2308 //
2309 // {server: {thing: [obj1, obj2], shing: [obj1, obj2], ...}
2310 // client: {dong: [obj1, ...]}}
2311 //
2312 // And objects without a number, like 'shong' will go on:
2313 // key_tree.server.shong[null]
2314 var tree = {server: {}, client: {}}
2315 for (var key in cache) {
2316 var p = parse_key(key)
2317 if (!p) {
2318 console.log('The state dash can\'t deal with key', key)
2319 return null
2320 }
2321 tree[p.owner][p.name] || (tree[p.owner][p.name] = {})
2322 tree[p.owner][p.name][p.number || null] = cache[key]
2323 }
2324 return tree
2325 }
2326
2327 function parse_key (key) {
2328 var word = "([^/]+)"
2329 // Matching things like: "/new/name/number"
2330 // or: "/name/number"
2331 // or: "/name"
2332 // or: "name/number"
2333 // or: "name"
2334 // ... and you can optionally include a final slash.
2335 var regexp = new RegExp("(/)?(new/)?" +word+ "(/" +word+ ")?(/)?")
2336 var m = key.match(regexp)
2337 if (!m) return null
2338 // indices = [0: has_match, 1: server_owned, 2: is_new, 3: name, 5: number]
2339 var owner = m[1] ? 'server' : 'client'
2340 return m[0] && {owner:owner, 'new': m[2], name: m[3], number: m[5]}
2341 }
2342 schema.parse_key = parse_key
2343 schema.url_tree = url_tree
2344 return url_tree(bus.cache)
2345 }
2346}
2347 // Add methods to bus object
2348 for (var m in extra_methods)
2349 bus[m] = extra_methods[m]
2350
2351 bus.options = default_options(bus)
2352 set_options(bus, options)
2353
2354 // Automatically make state:// fetch over a websocket
2355 bus.net_automount()
2356 return bus
2357}
2358
2359
2360// Handy functions for writing tests on nodejs
2361var tests = []
2362function test (f) {tests.push(f)}
2363function run_tests () {
2364 // Either run the test specified at command line
2365 if (process.argv[2])
2366 tests.find((f) => f.name == process.argv[2])(
2367 ()=>process.exit()
2368 )
2369
2370 // Or run all tests
2371 else {
2372 function run_next () {
2373 if (tests.length > 0) {
2374 var f = tests.shift()
2375 delay_so_far = 0
2376 console.log('\nTesting:', f.name)
2377 f(function () {setTimeout(run_next)})
2378 } else
2379 (console.log('\nDone with all tests.'), process.exit())
2380 }
2381 run_next()
2382 }
2383}
2384function log () {
2385 var pre = ' '
2386 console.log(pre+util.format.apply(null,arguments).replace('\n','\n'+pre))
2387}
2388var assert = require('assert')
2389function delay (time, f) {
2390 delay_so_far = delay_so_far + time
2391 return setTimeout(f, delay_so_far)
2392}
2393delay.init = _=> delay_so_far = 0
2394var delay_so_far = 0
2395
2396
2397// Now export everything
2398module.exports.import_server = import_server
2399module.exports.run_server = function (bus, options) { bus.serve(options) }
2400module.exports.import_module = function (statebus) {
2401 statebus.testing = {test, run_tests, log, assert, delay}
2402
2403 statebus.serve = function serve (options) {
2404 var bus = statebus()
2405 require('./server').run_server(bus, options)
2406 return bus
2407 }
2408
2409 // Handy repl. Invoke with node -e 'require("statebus").repl("/tmp/foo")'
2410 statebus.repl = function (filename) {
2411 var net = require('net')
2412 var sock = net.connect(filename)
2413
2414 process.stdin.pipe(sock)
2415 sock.pipe(process.stdout)
2416
2417 sock.on('connect', function () {
2418 process.stdin.resume();
2419 process.stdin.setRawMode(true)
2420 })
2421
2422 sock.on('close', function done () {
2423 process.stdin.setRawMode(false)
2424 process.stdin.pause()
2425 sock.removeListener('close', done)
2426 })
2427
2428 process.stdin.on('end', function () {
2429 sock.destroy()
2430 console.log()
2431 })
2432
2433 process.stdin.on('data', function (b) {
2434 if (b.length === 1 && b[0] === 4) {
2435 process.stdin.emit('end')
2436 }
2437 })
2438 }
2439}