UNPKG

2.89 kBtext/coffeescriptView Raw
1
2debug = require('debug')('msgflo:routing')
3
4# Used to bind one queue/exchange to another when the Broker
5# of the transport cannot provide this functionality, like on MQTT
6#
7# TODO: split into two pieces
8# a) a Router, which implements message routing
9# with a message-queue based protocol for listing and manipulating bindings.
10# b) a Binder mixin for MessageBroker inteface,
11# which sends messsages on this protocol for add/removeBinding() and listBindings()
12#
13# This allows a single Router to exist in the network. It holds the canonical state of which
14# queues/topics are bound to eachother, and multiple processes can query and manipulate these.
15# Typically this would be hosted on the same machine as the Broker itself, and would have same lifetime.
16#
17# Protocol:
18# (in) /msgrouter/$instance/addbinding Add a new binding between a source and target topic/queue.
19# (in) /msgrouter/$instance/removebinding Remove an existing binding between a source and target topic/queue.
20# (out) /msgrouter/$instance/bindings Full list of current bindings. Emitted on changes, or when requested.
21# (in) /msgrouter/$instance/listbindings Explicitly request current bindings.
22#
23# The default $instance is 'default'
24# The Router implementation should persist the bindings whenever they change.
25# Upon restarting it should restore the persisted bindings (and emit a signal).
26#
27bindingId = (f, t) ->
28 return "#{f}-#{t}"
29
30class Binder
31 constructor: (@transport) ->
32 @bindings = {}
33
34 addBinding: (binding, callback) ->
35 from = binding.src
36 to = binding.tgt
37 # TODO: handle non-pubsub types
38 id = bindingId from, to
39 debug 'Binder.addBinding', binding.type, id
40 return callback null if @bindings[id] or from == to
41
42 handler = (msg) =>
43 binding = @bindings[id]
44 return if not binding?.enabled
45 debug 'edge message', from, to, msg
46 @transport.sendTo 'outqueue', to, msg.data, (err) ->
47 throw err if err
48 @transport.subscribeToQueue from, handler, (err) =>
49 return callback err if err
50 @bindings[id] =
51 handler: handler
52 enabled: true
53 return callback null
54
55 removeBinding: (binding, callback) ->
56 from = binding.src
57 to = binding.tgt
58 id = bindingId from, to
59 debug 'Binder.removeBinding', binding, id
60 binding = @bindings[id]
61 return callback new Error "Binding does not exist" if not binding
62 binding.enabled = false
63 #FIXME: add an unsubscribeQueue to Client/transport, and use that
64 return callback null
65
66 listBindings: (callback) -> # FIXME: implement
67 debug 'Binder.listBindings'
68 return callback null, []
69
70
71exports.Binder = Binder
72exports.binderMixin = (transport) ->
73 b = new Binder transport
74 transport._binder = b
75 transport.addBinding = b.addBinding.bind b
76 transport.removeBinding = b.removeBinding.bind b
77 transport.listBindings = b.listBindings.bind b
78