1 |
|
2 | import 'babel-polyfill'
|
3 |
|
4 | import assert from 'assert'
|
5 | import sinon from 'sinon'
|
6 | import t from 'transducers-js'
|
7 |
|
8 | import { cancel, merge, sleep, chan, go, put, take, clone, close, CLOSED, any } from '../lib/index'
|
9 |
|
10 | describe('channels', () => {
|
11 |
|
12 | describe('take()', () => {
|
13 |
|
14 | it ('should return a promise', () => {
|
15 | assert(put(chan()) instanceof Promise)
|
16 | })
|
17 |
|
18 | it ('should by implied by simply awaiting a channel', (cb) => {
|
19 | go(async () => {
|
20 | let ch = chan()
|
21 | put(ch, 1)
|
22 | let val = await ch
|
23 | assert.equal(val, 1)
|
24 | }).then(cb)
|
25 | })
|
26 |
|
27 | it ('should deliver oldest put value', (cb) => {
|
28 |
|
29 | var ch = chan()
|
30 | put(ch, 1)
|
31 | put(ch, 2)
|
32 |
|
33 | var expected;
|
34 |
|
35 | take(ch).then((val) => expected = val)
|
36 |
|
37 | process.nextTick(() => {
|
38 | assert(expected === 1)
|
39 | cb()
|
40 | })
|
41 | })
|
42 |
|
43 | it ('should work in async function', (cb) => {
|
44 |
|
45 | var ch = chan()
|
46 | put(ch, 1)
|
47 | put(ch, 2)
|
48 |
|
49 | var test = async function() {
|
50 | var val = await take(ch)
|
51 | assert(val === 1)
|
52 | cb()
|
53 | }
|
54 |
|
55 | test()
|
56 | })
|
57 |
|
58 | it ('should work in a go-block', (cb) => {
|
59 |
|
60 | var ch = chan()
|
61 | put(ch, 1)
|
62 | put(ch, 2)
|
63 |
|
64 | go(async function() {
|
65 | var val = await take(ch)
|
66 | assert(val === 1)
|
67 | cb()
|
68 | })
|
69 | })
|
70 |
|
71 | it ('should park and wait if no pending put value', (cb) => {
|
72 |
|
73 | var ch = chan()
|
74 | var spy = sinon.spy()
|
75 |
|
76 | go(async function() {
|
77 | val = await take(ch)
|
78 | spy()
|
79 | })
|
80 |
|
81 | process.nextTick(() => {
|
82 | assert(!spy.called)
|
83 | cb()
|
84 | })
|
85 | })
|
86 | })
|
87 |
|
88 | describe('put()', () => {
|
89 |
|
90 | it ('should return a promise', () => {
|
91 | assert(put(chan()) instanceof Promise)
|
92 | })
|
93 |
|
94 | it ('should allow transducer to modify content', (cb) => {
|
95 | var ch = chan(null, t.map((n) => n + 1))
|
96 | take(ch)
|
97 | .then((val) => assert(val === 2))
|
98 | .then(cb)
|
99 | put(ch, 1)
|
100 | })
|
101 |
|
102 | it ('should drop the put if transducer filters it out', (cb) => {
|
103 | var ch = chan(null, t.filter((n) => n > 1))
|
104 | take(ch)
|
105 | .then((val) => assert(val === 2))
|
106 | .then(cb)
|
107 | put(ch, 1)
|
108 | put(ch, 2)
|
109 | })
|
110 |
|
111 | it ('should delegate to buffer', () => {
|
112 | var ch = chan()
|
113 | var spy = sinon.spy(ch.buffer, 'push')
|
114 | put(ch, 1)
|
115 | assert(spy.calledOnce)
|
116 | })
|
117 |
|
118 | it ('should resolve put immediately if there is a pending take', () => {
|
119 |
|
120 | var ch = chan()
|
121 | var spy = sinon.spy(ch.buffer, 'push')
|
122 |
|
123 |
|
124 | take(ch)
|
125 |
|
126 |
|
127 | put(ch, 1)
|
128 |
|
129 | assert(!spy.called)
|
130 | })
|
131 | })
|
132 |
|
133 | describe('sleep()', () => {
|
134 | it ('should sleep for given ms', (cb) => {
|
135 |
|
136 | var ch = chan()
|
137 | var subject = 1
|
138 |
|
139 | go(async function() {
|
140 | await sleep(1000)
|
141 | subject = 2
|
142 | })
|
143 |
|
144 | setTimeout(() => {
|
145 | assert(subject === 1)
|
146 | setTimeout(() => {
|
147 | assert(subject === 2)
|
148 | cb()
|
149 | }, 600)
|
150 | }, 600)
|
151 |
|
152 | })
|
153 | })
|
154 |
|
155 | describe('clone()', () => {
|
156 |
|
157 | it ('should create a new chan with the same properties', () => {
|
158 | var bufferOrN = 1
|
159 | var xduce = t.map((n) => n + 1)
|
160 | var opts = { foo: 'bar' }
|
161 | var a = chan(bufferOrN, xduce, opts)
|
162 | var b = clone(a)
|
163 | assert.equal(JSON.stringify(a), JSON.stringify(b))
|
164 | })
|
165 | })
|
166 |
|
167 | describe('close()', () => {
|
168 |
|
169 | it ('should set channel closed property to true', () => {
|
170 | var ch = chan()
|
171 | assert(!ch.closed)
|
172 | close(ch)
|
173 | assert(ch.closed)
|
174 | })
|
175 |
|
176 | it ('should cause all puts to resolve to false immediately', (cb) => {
|
177 | var ch = chan()
|
178 | close(ch)
|
179 | put(ch, 2)
|
180 | .then((val) => assert(val === false))
|
181 | .then(cb)
|
182 | })
|
183 |
|
184 | it ('should cause all takes to resolve with CLOSED constant value immediately', (cb) => {
|
185 | var ch = chan()
|
186 | close(ch)
|
187 | take(ch, 2)
|
188 | .then((val) => assert(val === CLOSED))
|
189 | .then(cb)
|
190 | })
|
191 |
|
192 | it ('should cause all pending takes to resolve with CLOSED constant immediately', (cb) => {
|
193 | var ch = chan()
|
194 | var takenA = take(ch)
|
195 | var takenB = take(ch)
|
196 | close(ch)
|
197 | takenA.then((val) => assert(val === CLOSED))
|
198 | takenB.then((val) => assert(val === CLOSED))
|
199 | Promise.all([ takenA, takenB ]).then(() => cb())
|
200 | })
|
201 |
|
202 | it ('should cause all pending puts in buffer to resolve with false immediately', (cb) => {
|
203 | var ch = chan()
|
204 | var putted = put(ch, 2)
|
205 | close(ch)
|
206 | putted
|
207 | .then((val) => assert(val === false))
|
208 | .then(cb)
|
209 | })
|
210 | })
|
211 |
|
212 | describe('alts()', () => {
|
213 |
|
214 | let sleepyChan = (ms, val) => {
|
215 | let ch = chan()
|
216 | setTimeout(() => put(ch, val), ms)
|
217 | return ch
|
218 | }
|
219 |
|
220 | it ('should resolve with first channel that receives a value', (cb) => {
|
221 | go(async () => {
|
222 | var foo = chan()
|
223 | var t = sleepyChan(1000, 1)
|
224 | var [ val, ch ] = await any(foo, t)
|
225 | assert(ch === t)
|
226 | }).then(cb).catch(console.log.bind(console))
|
227 | })
|
228 |
|
229 | it ('should resolve immediately if one channel has a pending put', (cb) => {
|
230 | go(async () => {
|
231 | var foo = chan()
|
232 | var bar = chan()
|
233 | var t = sleepyChan(1000, 1)
|
234 | put(bar, 1)
|
235 | var [ val, ch ] = await any(foo, bar, t)
|
236 | assert(ch === bar)
|
237 | }).then(cb)
|
238 | })
|
239 |
|
240 | it ('should resolve non-deterministically when more than one has a pending put', (cb) => {
|
241 | go(async () => {
|
242 | var resolved = []
|
243 | for (var i = 0; i < 100; i++) {
|
244 | var chans = [ chan(), chan(), chan() ]
|
245 | put(chans[1], 1)
|
246 | put(chans[2], 2)
|
247 | var [ val, ch ] = await any(...chans)
|
248 | resolved.push(chans.indexOf(ch))
|
249 | }
|
250 | assert.ok(resolved.some((r) => r === 1))
|
251 | assert.ok(!resolved.every((r) => r === 1))
|
252 | }).then(cb)
|
253 | })
|
254 |
|
255 | it ('should cancel all other channel actions after a resolved', (cb) => {
|
256 | go(async () => {
|
257 | var foo = chan()
|
258 | var bar = chan()
|
259 | var t = sleepyChan(1000, 1)
|
260 | put(bar, 1)
|
261 | await any(foo, bar, t)
|
262 | put(foo, 2)
|
263 | let val = await take(foo)
|
264 | assert.equal(val, 2)
|
265 | }).then(cb)
|
266 | })
|
267 |
|
268 | it ('should work with takes and puts - put winning', (cb) => {
|
269 | go(async () => {
|
270 |
|
271 | var forTakes = chan()
|
272 | var forPuts = chan()
|
273 |
|
274 | let gettingFirst = any(forTakes, [ forPuts, 1 ])
|
275 |
|
276 | await take(forPuts)
|
277 |
|
278 | let [ v, c ] = await gettingFirst
|
279 |
|
280 | assert.equal(v, 1)
|
281 | assert.equal(c, forPuts)
|
282 |
|
283 | }).then(cb)
|
284 | })
|
285 |
|
286 | it ('should work with takes and puts - take winning', (cb) => {
|
287 |
|
288 | go(async () => {
|
289 |
|
290 | let forTakes = chan()
|
291 | let forPuts = chan()
|
292 |
|
293 | let gettingFirst = any(forTakes, [ forPuts, 1 ])
|
294 |
|
295 | await put(forTakes, 2)
|
296 |
|
297 | let [ v, c ] = await gettingFirst
|
298 |
|
299 | assert.equal(v, 2)
|
300 | assert.equal(c, forTakes)
|
301 |
|
302 | }).then(cb)
|
303 | })
|
304 |
|
305 | it ('should work with raw promises - promise winning', (cb) => {
|
306 |
|
307 | go(async () => {
|
308 |
|
309 | let forTakes = chan()
|
310 | let forPuts = chan()
|
311 | let forPromise = Promise.resolve(3)
|
312 |
|
313 | let gettingFirst = any(forTakes, [ forPuts, 1 ], forPromise)
|
314 |
|
315 | let [ v, c ] = await gettingFirst
|
316 |
|
317 | assert.equal(v, 3)
|
318 | assert.equal(c, forPromise)
|
319 |
|
320 | }).then(cb)
|
321 | })
|
322 | })
|
323 |
|
324 | describe('cancel()', () => {
|
325 |
|
326 | describe('canceling a put', () => {
|
327 |
|
328 | it ('should remove it from the buffer\'s unreleased queue', (cb) => {
|
329 | let ch = chan()
|
330 | go(async () => {
|
331 | let put1Promise = put(ch, 1)
|
332 | let put2Promise = put(ch, 2)
|
333 | cancel(ch, put1Promise)
|
334 | let val = await take(ch)
|
335 | assert.equal(val, 2)
|
336 | }).then(cb)
|
337 | })
|
338 | })
|
339 |
|
340 | describe('canceling a take', () => {
|
341 |
|
342 | it ('should remove it from the channel\'s takes', (cb) => {
|
343 | let ch = chan()
|
344 | go(async () => {
|
345 | let take1Promise = take(ch)
|
346 | let take2Promise = take(ch)
|
347 | cancel(ch, take1Promise)
|
348 | put(ch, 2)
|
349 | let val = await take2Promise
|
350 | assert.equal(val, 2)
|
351 | }).then(cb)
|
352 | })
|
353 | })
|
354 | })
|
355 |
|
356 | describe('go()', () => {
|
357 | it ('should immediately invoke given function', () => {
|
358 | var spy = sinon.spy()
|
359 | go(spy)
|
360 | assert(spy.called)
|
361 | })
|
362 | })
|
363 |
|
364 | describe('merge()', () => {
|
365 |
|
366 | it ('should take from all inputs and put onto single output channel', (cb) => {
|
367 |
|
368 | var a = chan()
|
369 | var b = chan()
|
370 | var c = chan()
|
371 | var merged = merge(a, b, c)
|
372 |
|
373 | go(async () => {
|
374 |
|
375 | await put(a, 1)
|
376 | await put(b, 2)
|
377 | await put(c, 3)
|
378 |
|
379 | let fromA = await take(merged)
|
380 | let fromB = await take(merged)
|
381 | let fromC = await take(merged)
|
382 |
|
383 | assert.equal(fromA, 1)
|
384 | assert.equal(fromB, 2)
|
385 | assert.equal(fromC, 3)
|
386 |
|
387 | }).then(cb)
|
388 | })
|
389 |
|
390 | it ('should close output channel when all inputs are closed', (cb) => {
|
391 |
|
392 | var a = chan()
|
393 | var b = chan()
|
394 | var c = chan()
|
395 | var merged = merge(a, b, c)
|
396 |
|
397 | go(async () => {
|
398 | assert.equal(merged.closed, false)
|
399 | close(a)
|
400 | close(b)
|
401 | close(c)
|
402 | })
|
403 | .then(() => assert.equal(merged.closed, true))
|
404 | .then(cb)
|
405 | })
|
406 | })
|
407 | })
|
408 |
|