UNPKG

1.75 kBJavaScriptView Raw
1var WebSocketServer = require('ws').Server
2var debug = require('debug')('solid:subscription')
3var InMemory = require('./in-memory')
4var parallel = require('run-parallel')
5
6module.exports = WsServer
7
8function WsServer (server, opts) {
9 var self = this
10
11 opts = opts || {}
12 this.suffix = opts.suffix || '.changes'
13 this.store = opts.store || new InMemory(opts)
14
15 // Starting WSS server
16 var wss = new WebSocketServer({
17 server: server,
18 clientTracking: false,
19 path: opts.path
20 })
21
22 // Handling a single connection
23 wss.on('connection', function (client) {
24 debug('New connection')
25 // var location = url.parse(client.upgradeReq.url, true)
26
27 // Handling messages
28 client.on('message', function (message) {
29 debug('New message: ' + message)
30
31 if (!message || typeof message !== 'string') {
32 return
33 }
34
35 var tuple = message.split(' ')
36
37 // Only accept 'sub http://example.tld/hello'
38 if (tuple.length < 2 || tuple[0] !== 'sub') {
39 return
40 }
41
42 self.store.subscribe(tuple[1], client, function (err, uuid) {
43 if (err) {
44 // TODO Should return an error
45 return
46 }
47
48 client.send('ack ' + tuple[1])
49 })
50 })
51
52 // Respond to ping
53 client.on('ping', function () {
54 client.pong()
55 })
56 })
57}
58
59WsServer.prototype.publish = function (uri, callback) {
60 this.store.get(uri, function (err, subscribers) {
61
62 if (err) {
63 if (callback) return callback(err)
64 else return
65 }
66
67 var tasks = Object.keys(subscribers)
68 .map(function (uuid) {
69 return function (cb) {
70 var client = subscribers[uuid]
71 client.send('pub ' + uri)
72 }
73 })
74
75 parallel(tasks, callback)
76 })
77}