UNPKG

9.34 kBJavaScriptView Raw
1
2import 'babel-polyfill'
3
4import assert from 'assert'
5import sinon from 'sinon'
6import t from 'transducers-js'
7
8import { cancel, merge, sleep, chan, go, put, take, clone, close, CLOSED, any } from '../lib/index'
9
10describe('channels', () => {
11
12 describe('take()', () => {
13
14 it ('should return a promise', () => {
15 assert(put(chan()) instanceof Promise)
16 })
17
18 it ('should by implied by simply awaiting a channel', (cb) => {
19 go(async () => {
20 let ch = chan()
21 put(ch, 1)
22 let val = await ch
23 assert.equal(val, 1)
24 }).then(cb)
25 })
26
27 it ('should deliver oldest put value', (cb) => {
28
29 var ch = chan()
30 put(ch, 1)
31 put(ch, 2)
32
33 var expected;
34
35 take(ch).then((val) => expected = val)
36
37 process.nextTick(() => {
38 assert(expected === 1)
39 cb()
40 })
41 })
42
43 it ('should work in async function', (cb) => {
44
45 var ch = chan()
46 put(ch, 1)
47 put(ch, 2)
48
49 var test = async function() {
50 var val = await take(ch)
51 assert(val === 1)
52 cb()
53 }
54
55 test()
56 })
57
58 it ('should work in a go-block', (cb) => {
59
60 var ch = chan()
61 put(ch, 1)
62 put(ch, 2)
63
64 go(async function() {
65 var val = await take(ch)
66 assert(val === 1)
67 cb()
68 })
69 })
70
71 it ('should park and wait if no pending put value', (cb) => {
72
73 var ch = chan()
74 var spy = sinon.spy()
75
76 go(async function() {
77 val = await take(ch)
78 spy()
79 })
80
81 process.nextTick(() => {
82 assert(!spy.called)
83 cb()
84 })
85 })
86 })
87
88 describe('put()', () => {
89
90 it ('should return a promise', () => {
91 assert(put(chan()) instanceof Promise)
92 })
93
94 it ('should allow transducer to modify content', (cb) => {
95 var ch = chan(null, t.map((n) => n + 1))
96 take(ch)
97 .then((val) => assert(val === 2))
98 .then(cb)
99 put(ch, 1)
100 })
101
102 it ('should drop the put if transducer filters it out', (cb) => {
103 var ch = chan(null, t.filter((n) => n > 1))
104 take(ch)
105 .then((val) => assert(val === 2))
106 .then(cb)
107 put(ch, 1) // dropped
108 put(ch, 2)
109 })
110
111 it ('should delegate to buffer', () => {
112 var ch = chan()
113 var spy = sinon.spy(ch.buffer, 'push')
114 put(ch, 1)
115 assert(spy.calledOnce)
116 })
117
118 it ('should resolve put immediately if there is a pending take', () => {
119
120 var ch = chan()
121 var spy = sinon.spy(ch.buffer, 'push')
122
123 // pending take
124 take(ch)
125
126 // put will be executed, not queued
127 put(ch, 1)
128
129 assert(!spy.called)
130 })
131 })
132
133 describe('sleep()', () => {
134 it ('should sleep for given ms', (cb) => {
135
136 var ch = chan()
137 var subject = 1
138
139 go(async function() {
140 await sleep(1000)
141 subject = 2
142 })
143
144 setTimeout(() => {
145 assert(subject === 1)
146 setTimeout(() => {
147 assert(subject === 2)
148 cb()
149 }, 600)
150 }, 600)
151
152 })
153 })
154
155 describe('clone()', () => {
156
157 it ('should create a new chan with the same properties', () => {
158 var bufferOrN = 1
159 var xduce = t.map((n) => n + 1)
160 var opts = { foo: 'bar' }
161 var a = chan(bufferOrN, xduce, opts)
162 var b = clone(a)
163 assert.equal(JSON.stringify(a), JSON.stringify(b))
164 })
165 })
166
167 describe('close()', () => {
168
169 it ('should set channel closed property to true', () => {
170 var ch = chan()
171 assert(!ch.closed)
172 close(ch)
173 assert(ch.closed)
174 })
175
176 it ('should cause all puts to resolve to false immediately', (cb) => {
177 var ch = chan()
178 close(ch)
179 put(ch, 2)
180 .then((val) => assert(val === false))
181 .then(cb)
182 })
183
184 it ('should cause all takes to resolve with CLOSED constant value immediately', (cb) => {
185 var ch = chan()
186 close(ch)
187 take(ch, 2)
188 .then((val) => assert(val === CLOSED))
189 .then(cb)
190 })
191
192 it ('should cause all pending takes to resolve with CLOSED constant immediately', (cb) => {
193 var ch = chan()
194 var takenA = take(ch)
195 var takenB = take(ch)
196 close(ch)
197 takenA.then((val) => assert(val === CLOSED))
198 takenB.then((val) => assert(val === CLOSED))
199 Promise.all([ takenA, takenB ]).then(() => cb())
200 })
201
202 it ('should cause all pending puts in buffer to resolve with false immediately', (cb) => {
203 var ch = chan()
204 var putted = put(ch, 2)
205 close(ch)
206 putted
207 .then((val) => assert(val === false))
208 .then(cb)
209 })
210 })
211
212 describe('alts()', () => {
213
214 let sleepyChan = (ms, val) => {
215 let ch = chan()
216 setTimeout(() => put(ch, val), ms)
217 return ch
218 }
219
220 it ('should resolve with first channel that receives a value', (cb) => {
221 go(async () => {
222 var foo = chan()
223 var t = sleepyChan(1000, 1)
224 var [ val, ch ] = await any(foo, t)
225 assert(ch === t)
226 }).then(cb).catch(console.log.bind(console))
227 })
228
229 it ('should resolve immediately if one channel has a pending put', (cb) => {
230 go(async () => {
231 var foo = chan()
232 var bar = chan()
233 var t = sleepyChan(1000, 1)
234 put(bar, 1)
235 var [ val, ch ] = await any(foo, bar, t)
236 assert(ch === bar)
237 }).then(cb)
238 })
239
240 it ('should resolve non-deterministically when more than one has a pending put', (cb) => {
241 go(async () => {
242 var resolved = []
243 for (var i = 0; i < 100; i++) {
244 var chans = [ chan(), chan(), chan() ]
245 put(chans[1], 1)
246 put(chans[2], 2)
247 var [ val, ch ] = await any(...chans)
248 resolved.push(chans.indexOf(ch))
249 }
250 assert.ok(resolved.some((r) => r === 1))
251 assert.ok(!resolved.every((r) => r === 1))
252 }).then(cb)
253 })
254
255 it ('should cancel all other channel actions after a resolved', (cb) => {
256 go(async () => {
257 var foo = chan()
258 var bar = chan()
259 var t = sleepyChan(1000, 1)
260 put(bar, 1)
261 await any(foo, bar, t)
262 put(foo, 2)
263 let val = await take(foo)
264 assert.equal(val, 2)
265 }).then(cb)
266 })
267
268 it ('should work with takes and puts - put winning', (cb) => {
269 go(async () => {
270
271 var forTakes = chan()
272 var forPuts = chan()
273
274 let gettingFirst = any(forTakes, [ forPuts, 1 ])
275
276 await take(forPuts)
277
278 let [ v, c ] = await gettingFirst
279
280 assert.equal(v, 1)
281 assert.equal(c, forPuts)
282
283 }).then(cb)
284 })
285
286 it ('should work with takes and puts - take winning', (cb) => {
287
288 go(async () => {
289
290 let forTakes = chan()
291 let forPuts = chan()
292
293 let gettingFirst = any(forTakes, [ forPuts, 1 ])
294
295 await put(forTakes, 2)
296
297 let [ v, c ] = await gettingFirst
298
299 assert.equal(v, 2)
300 assert.equal(c, forTakes)
301
302 }).then(cb)
303 })
304
305 it ('should work with raw promises - promise winning', (cb) => {
306
307 go(async () => {
308
309 let forTakes = chan()
310 let forPuts = chan()
311 let forPromise = Promise.resolve(3)
312
313 let gettingFirst = any(forTakes, [ forPuts, 1 ], forPromise)
314
315 let [ v, c ] = await gettingFirst
316
317 assert.equal(v, 3)
318 assert.equal(c, forPromise)
319
320 }).then(cb)
321 })
322 })
323
324 describe('cancel()', () => {
325
326 describe('canceling a put', () => {
327
328 it ('should remove it from the buffer\'s unreleased queue', (cb) => {
329 let ch = chan()
330 go(async () => {
331 let put1Promise = put(ch, 1)
332 let put2Promise = put(ch, 2)
333 cancel(ch, put1Promise)
334 let val = await take(ch)
335 assert.equal(val, 2)
336 }).then(cb)
337 })
338 })
339
340 describe('canceling a take', () => {
341
342 it ('should remove it from the channel\'s takes', (cb) => {
343 let ch = chan()
344 go(async () => {
345 let take1Promise = take(ch)
346 let take2Promise = take(ch)
347 cancel(ch, take1Promise)
348 put(ch, 2)
349 let val = await take2Promise
350 assert.equal(val, 2)
351 }).then(cb)
352 })
353 })
354 })
355
356 describe('go()', () => {
357 it ('should immediately invoke given function', () => {
358 var spy = sinon.spy()
359 go(spy)
360 assert(spy.called)
361 })
362 })
363
364 describe('merge()', () => {
365
366 it ('should take from all inputs and put onto single output channel', (cb) => {
367
368 var a = chan()
369 var b = chan()
370 var c = chan()
371 var merged = merge(a, b, c)
372
373 go(async () => {
374
375 await put(a, 1)
376 await put(b, 2)
377 await put(c, 3)
378
379 let fromA = await take(merged)
380 let fromB = await take(merged)
381 let fromC = await take(merged)
382
383 assert.equal(fromA, 1)
384 assert.equal(fromB, 2)
385 assert.equal(fromC, 3)
386
387 }).then(cb)
388 })
389
390 it ('should close output channel when all inputs are closed', (cb) => {
391
392 var a = chan()
393 var b = chan()
394 var c = chan()
395 var merged = merge(a, b, c)
396
397 go(async () => {
398 assert.equal(merged.closed, false)
399 close(a)
400 close(b)
401 close(c)
402 })
403 .then(() => assert.equal(merged.closed, true))
404 .then(cb)
405 })
406 })
407})
408