1 | dolce = require "dolce"
|
2 | RequestMiddleware = require "./middleware"
|
3 | crema = require "crema"
|
4 | Messenger = require "./messenger"
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 | module.exports = class
|
15 |
|
16 | |
17 |
|
18 |
|
19 |
|
20 |
|
21 | passive: false
|
22 |
|
23 | |
24 |
|
25 |
|
26 |
|
27 | constructor: (@name, @router) ->
|
28 | @_collection = dolce.collection()
|
29 |
|
30 | |
31 |
|
32 |
|
33 |
|
34 | numListeners: (channel, ops) -> @_collection.get(channel, ops).chains.length
|
35 |
|
36 | |
37 |
|
38 |
|
39 |
|
40 | dispatch: (messageWriter) ->
|
41 |
|
42 |
|
43 | chains = @getListeners messageWriter
|
44 |
|
45 | numChains = chains.length
|
46 | numRunning = numChains
|
47 | oldAck = messageWriter.callback
|
48 |
|
49 | messageWriter.running = !!numChains
|
50 |
|
51 |
|
52 |
|
53 | messageWriter.callback = () ->
|
54 | messageWriter.running = !!(--numRunning)
|
55 | oldAck.apply this, Array.apply(null, arguments).concat([numRunning, numChains]) if oldAck
|
56 |
|
57 |
|
58 | if not !!chains.length and not @passive
|
59 | messageWriter.callback new Error "Route \"#{crema.stringifyPaths(messageWriter.channel.paths)}\" does not exist"
|
60 | return @
|
61 |
|
62 |
|
63 |
|
64 | for chain in chains
|
65 |
|
66 |
|
67 |
|
68 | messageReader = messageWriter.reader()
|
69 |
|
70 |
|
71 | middleware = RequestMiddleware.wrap chain, messageWriter.pre, messageWriter.next, @
|
72 |
|
73 |
|
74 | messanger = @_newMessenger messageReader, middleware
|
75 |
|
76 |
|
77 | messanger.start()
|
78 | @
|
79 |
|
80 | |
81 |
|
82 |
|
83 |
|
84 | addListener: (route, callback) ->
|
85 |
|
86 | disposable
|
87 |
|
88 |
|
89 | if route.tags.one
|
90 | oldCallback = callback
|
91 | callback = () ->
|
92 | oldCallback.apply this, arguments
|
93 | disposable.dispose()
|
94 |
|
95 |
|
96 |
|
97 | @_validateListener route, callback
|
98 |
|
99 |
|
100 | disposable = @_collection.add route, callback
|
101 |
|
102 |
|
103 | |
104 |
|
105 |
|
106 | channels: (ops) ->
|
107 |
|
108 | channels = []
|
109 |
|
110 | for listener in @_collection.find ops
|
111 | (tags: listener.tags,
|
112 | type: @name,
|
113 | path: listener.path)
|
114 |
|
115 |
|
116 | |
117 |
|
118 |
|
119 | listenerQuery: (ops) ->
|
120 |
|
121 | tags = []
|
122 | tag = {}
|
123 |
|
124 | for tagName of ops.tags
|
125 | ops.tags[tagName] = $exists: true if ops.tags[tagName] is true
|
126 | tag = {}
|
127 | tag[tagName] = ops.tags[tagName]
|
128 | tags.push(tag)
|
129 |
|
130 |
|
131 | search = $or: [ { $and: tags }, { unfilterable: $exists: true } ]
|
132 |
|
133 | search
|
134 |
|
135 | |
136 |
|
137 |
|
138 | getListeners: (route, search) ->
|
139 | @_collection.get(route.channel, siftTags: @listenerQuery(search || route) ).chains
|
140 |
|
141 | |
142 |
|
143 |
|
144 | routeExists: (route) -> @_collection.contains(route.channel, siftTags: @listenerQuery(route) )
|
145 |
|
146 |
|
147 | |
148 |
|
149 |
|
150 |
|
151 | _newMessenger: (message, middleware) -> new Messenger message, middleware, @
|
152 |
|
153 |
|
154 | |
155 |
|
156 |
|
157 | _validateListener: (route) ->
|
158 |
|
159 | return if @passive
|
160 |
|
161 | listeners = @_collection.get route.channel, route.tags
|
162 |
|
163 | throw new Error "Route \"#{route.channel.value}\" already exists" if !!listeners.length
|
164 |
|
165 |
|
166 |
|
167 |
|
168 |
|
169 |
|
170 |
|
171 |
|