1 |
|
2 | debug = require('debug')('msgflo:routing')
|
3 |
|
4 | # Used to bind one queue/exchange to another when the Broker
|
5 | # of the transport cannot provide this functionality, like on MQTT
|
6 | #
|
7 | # TODO: split into two pieces
|
8 | # a) a Router, which implements message routing
|
9 | # with a message-queue based protocol for listing and manipulating bindings.
|
10 | # b) a Binder mixin for MessageBroker inteface,
|
11 | # which sends messsages on this protocol for add/removeBinding() and listBindings()
|
12 | #
|
13 | # This allows a single Router to exist in the network. It holds the canonical state of which
|
14 | # queues/topics are bound to eachother, and multiple processes can query and manipulate these.
|
15 | # Typically this would be hosted on the same machine as the Broker itself, and would have same lifetime.
|
16 | #
|
17 | # Protocol:
|
18 | # (in) /msgrouter/$instance/addbinding Add a new binding between a source and target topic/queue.
|
19 | # (in) /msgrouter/$instance/removebinding Remove an existing binding between a source and target topic/queue.
|
20 | # (out) /msgrouter/$instance/bindings Full list of current bindings. Emitted on changes, or when requested.
|
21 | # (in) /msgrouter/$instance/listbindings Explicitly request current bindings.
|
22 | #
|
23 | # The default $instance is 'default'
|
24 | # The Router implementation should persist the bindings whenever they change.
|
25 | # Upon restarting it should restore the persisted bindings (and emit a signal).
|
26 | #
|
27 | bindingId = (f, t) ->
|
28 | return "#{f}-#{t}"
|
29 |
|
30 | class Binder
|
31 | constructor: (@transport) ->
|
32 | @bindings = {}
|
33 |
|
34 | addBinding: (binding, callback) ->
|
35 | from = binding.src
|
36 | to = binding.tgt
|
37 | # TODO: handle non-pubsub types
|
38 | id = bindingId from, to
|
39 | debug 'Binder.addBinding', binding.type, id
|
40 | return callback null if @bindings[id] or from == to
|
41 |
|
42 | handler = (msg) =>
|
43 | binding = @bindings[id]
|
44 | return if not binding?.enabled
|
45 | debug 'edge message', from, to, msg
|
46 | @transport.sendTo 'outqueue', to, msg.data, (err) ->
|
47 | throw err if err
|
48 | @transport.subscribeToQueue from, handler, (err) =>
|
49 | return callback err if err
|
50 | @bindings[id] =
|
51 | handler: handler
|
52 | enabled: true
|
53 | return callback null
|
54 |
|
55 | removeBinding: (binding, callback) ->
|
56 | from = binding.src
|
57 | to = binding.tgt
|
58 | id = bindingId from, to
|
59 | debug 'Binder.removeBinding', binding, id
|
60 | binding = @bindings[id]
|
61 | return callback new Error "Binding does not exist" if not binding
|
62 | binding.enabled = false
|
63 | #FIXME: add an unsubscribeQueue to Client/transport, and use that
|
64 | return callback null
|
65 |
|
66 | listBindings: (callback) -> # FIXME: implement
|
67 | debug 'Binder.listBindings'
|
68 | return callback null, []
|
69 |
|
70 |
|
71 | exports.Binder = Binder
|
72 | exports.binderMixin = (transport) ->
|
73 | b = new Binder transport
|
74 | transport._binder = b
|
75 | transport.addBinding = b.addBinding.bind b
|
76 | transport.removeBinding = b.removeBinding.bind b
|
77 | transport.listBindings = b.listBindings.bind b
|
78 |
|