UNPKG

3.45 kBtext/coffeescriptView Raw
1dolce = require "dolce"
2RequestMiddleware = require "./middleware"
3crema = require "crema"
4Messenger = require "./messenger"
5
6
7###
8
9Director process:
10
11###
12
13
14module.exports = class
15
16 ###
17 some directors are passive, meaning errors aren't returned if a route does not exist. This goes for collectors,
18 emitters, etc.
19 ###
20
21 passive: false
22
23 ###
24 constructor
25 ###
26
27 constructor: (@name, @router) ->
28 @_collection = dolce.collection()
29
30 ###
31 returns number of listeners based on channel given
32 ###
33
34 numListeners: (channel, ops) -> @_collection.get(channel, ops).chains.length
35
36 ###
37 dispatches a request
38 ###
39
40 dispatch: (messageWriter) ->
41
42 # find the listeners based on the channel given
43 chains = @getListeners messageWriter
44
45 numChains = chains.length
46 numRunning = numChains
47 oldAck = messageWriter.callback
48
49 messageWriter.running = !!numChains
50
51
52 # replace the with an ack so we can return exactly HOW many listeners there are...
53 messageWriter.callback = () ->
54 messageWriter.running = !!(--numRunning)
55 oldAck.apply this, Array.apply(null, arguments).concat([numRunning, numChains]) if oldAck
56
57
58 if not !!chains.length and not @passive
59 messageWriter.callback new Error "Route \"#{crema.stringifyPaths(messageWriter.channel.paths)}\" does not exist"
60 return @
61
62
63 # in pull bases, there will only be one listener. For push, there maybe multiple
64 for chain in chains
65
66
67 # there needs to be a NEW reader for each listener.
68 messageReader = messageWriter.reader()
69
70 # wrap the middleware
71 middleware = RequestMiddleware.wrap chain, messageWriter.pre, messageWriter.next, @
72
73 # pass through the factory class which creates a new request, OR uses a recycled request
74 messanger = @_newMessenger messageReader, middleware
75
76 # send the request
77 messanger.start()
78 @
79
80 ###
81 adds a route listener to the collection tree
82 ###
83
84 addListener: (route, callback) ->
85
86 disposable
87
88 ## one tag present? remove listener on end
89 if route.tags.one
90 oldCallback = callback
91 callback = () ->
92 oldCallback.apply this, arguments
93 disposable.dispose()
94
95 # validate the route incase we're dealing with a director where only ONE listener can be on each
96 # channel
97 @_validateListener route, callback
98
99 # set the disposable incase we're dealing with a route that listens ONCE for a request
100 disposable = @_collection.add route, callback
101
102
103 ###
104 ###
105
106 channels: (ops) ->
107
108 channels = []
109
110 for listener in @_collection.find ops
111 (tags: listener.tags,
112 type: @name,
113 path: listener.path)
114
115
116 ###
117 ###
118
119 listenerQuery: (ops) ->
120
121 tags = []
122 tag = {}
123
124 for tagName of ops.tags
125 ops.tags[tagName] = $exists: true if ops.tags[tagName] is true
126 tag = {}
127 tag[tagName] = ops.tags[tagName]
128 tags.push(tag)
129
130
131 search = $or: [ { $and: tags }, { unfilterable: $exists: true } ]
132
133 search
134
135 ###
136 ###
137
138 getListeners: (route, search) ->
139 @_collection.get(route.channel, siftTags: @listenerQuery(search || route) ).chains
140
141 ###
142 ###
143
144 routeExists: (route) -> @_collection.contains(route.channel, siftTags: @listenerQuery(route) )
145
146
147 ###
148 returns a new request
149 ###
150
151 _newMessenger: (message, middleware) -> new Messenger message, middleware, @
152
153
154 ###
155 ###
156
157 _validateListener: (route) ->
158
159 return if @passive
160
161 listeners = @_collection.get route.channel, route.tags
162
163 throw new Error "Route \"#{route.channel.value}\" already exists" if !!listeners.length
164
165
166
167
168
169
170
171