UNPKG

10.2 kBJavaScriptView Raw
1
2const assert = require('assert')
3const sinon = require('sinon')
4
5const {
6 cancel,
7 merge,
8 sleep,
9 chan,
10 go,
11 put,
12 take,
13 repeat,
14 repeatTake,
15 clone,
16 close,
17 CLOSED,
18 any
19} = require('../lib/index.js')
20
21describe('channels', () => {
22
23 describe('take()', () => {
24
25 it ('should return a promise', () => {
26 assert(put(chan()) instanceof Promise)
27 })
28
29 it ('should by implied by simply awaiting a channel', () => {
30 return go(async () => {
31 let ch = chan()
32 put(ch, 1)
33 let val = await ch
34 assert.equal(val, 1)
35 })
36 })
37
38 it ('should deliver oldest put value', () => {
39 return new Promise(res => {
40
41 var ch = chan()
42 put(ch, 1)
43 put(ch, 2)
44
45 var expected;
46
47 take(ch).then((val) => expected = val)
48
49 setTimeout(() => {
50 assert.equal(expected, 1)
51 res()
52 }, 100)
53 })
54 })
55
56 it ('should work in async function', () => {
57 return new Promise(res => {
58
59 var ch = chan()
60 put(ch, 1)
61 put(ch, 2)
62
63 var test = async function() {
64 var val = await take(ch)
65 assert(val === 1)
66 res()
67 }
68
69 test()
70 })
71 })
72
73 it ('should work in a go-block', (cb) => {
74
75 var ch = chan()
76 put(ch, 1)
77 put(ch, 2)
78
79 go(async function() {
80 var val = await take(ch)
81 assert(val === 1)
82 cb()
83 })
84 })
85
86 it ('should park and wait if no pending put value', (cb) => {
87
88 var ch = chan()
89 var spy = sinon.spy()
90
91 go(async function() {
92 val = await take(ch)
93 spy()
94 })
95
96 process.nextTick(() => {
97 assert(!spy.called)
98 cb()
99 })
100 })
101 })
102
103 describe('put()', () => {
104
105 it ('should return a promise', () => {
106 assert(put(chan()) instanceof Promise)
107 })
108
109 it ('should delegate to buffer', () => {
110 var ch = chan()
111 var spy = sinon.spy(ch.buffer, 'push')
112 put(ch, 1)
113 assert(spy.calledOnce)
114 })
115
116 it ('should resolve put immediately if there is a pending take', () => {
117
118 var ch = chan()
119 var spy = sinon.spy(ch.buffer, 'push')
120
121 // pending take
122 take(ch)
123
124 // put will be executed, not queued
125 put(ch, 1)
126
127 assert(!spy.called)
128 })
129 })
130
131 describe('sleep()', () => {
132 it ('should sleep for given ms', (cb) => {
133
134 var ch = chan()
135 var subject = 1
136
137 go(async function() {
138 await sleep(100)
139 subject = 2
140 })
141
142 setTimeout(() => {
143 assert(subject === 1)
144 setTimeout(() => {
145 assert(subject === 2)
146 cb()
147 }, 60)
148 }, 60)
149
150 })
151 })
152
153 describe('clone()', () => {
154
155 it ('should create a new chan with the same properties', () => {
156 var bufferOrN = 1
157 var opts = { foo: 'bar' }
158 var a = chan(bufferOrN, opts)
159 var b = clone(a, null, { debug: true })
160 assert.equal(JSON.stringify(a), JSON.stringify(b))
161 })
162 })
163
164 describe('close()', () => {
165
166 it ('should set channel closed property to true', () => {
167 var ch = chan()
168 assert(!ch.isClosed)
169 close(ch)
170 assert(ch.isClosed)
171 })
172
173 it ('should cause all puts to resolve to false immediately', (cb) => {
174 var ch = chan()
175 close(ch)
176 put(ch, 2)
177 .then((val) => assert(val === false))
178 .then(cb)
179 })
180
181 it ('should cause all takes to resolve with CLOSED constant value immediately', (cb) => {
182 var ch = chan()
183 close(ch)
184 take(ch, 2)
185 .then((val) => assert(val === CLOSED))
186 .then(cb)
187 })
188
189 it ('should cause all pending takes to resolve with CLOSED constant immediately', (cb) => {
190 var ch = chan()
191 var takenA = take(ch)
192 var takenB = take(ch)
193 close(ch)
194 takenA.then((val) => assert(val === CLOSED))
195 takenB.then((val) => assert(val === CLOSED))
196 Promise.all([ takenA, takenB ]).then(() => cb())
197 })
198
199 it ('should NOT cause all pending puts in buffer to resolve with false immediately', () => {
200 var ch = chan()
201 var putted = put(ch, 2)
202 close(ch)
203 var spy = sinon.spy()
204 putted.then(spy)
205 assert.equal(spy.called, false)
206 })
207 })
208
209 describe('alts()', () => {
210
211 it ('should resolve with first action that resolves', (cb) => {
212 go(async () => {
213 var ch1 = chan()
214 var p = sleep(100)
215 var [ v, ch ] = await any(ch1, p)
216 assert.equal(ch, p)
217 }).then(cb, cb)
218 })
219
220 it ('should resolve immediately if a take has a pending put', (cb) => {
221 go(async () => {
222 var ch1 = chan()
223 var ch2 = chan()
224 var p = sleep(1000)
225 put(ch2, 1)
226 var [ v, ch ] = await any(ch1, ch2, p)
227 assert.equal(ch, ch2)
228 assert.equal(v, 1)
229 }).then(cb)
230 })
231
232 it ('should resolve non-deterministically when more than one has a pending put', (cb) => {
233 go(async () => {
234 var resolved = []
235 for (var i = 0; i < 100; i++) {
236 var chans = [ chan(), chan(), chan() ]
237 put(chans[1], 1)
238 put(chans[2], 2)
239 var [ val, ch ] = await any(...chans)
240 resolved.push(chans.indexOf(ch))
241 }
242 assert.ok(resolved.some((r) => r === 1))
243 assert.ok(!resolved.every((r) => r === 1))
244 }).then(cb)
245 })
246
247 it ('should cancel all other channel actions after one is resolved', (cb) => {
248 go(async () => {
249
250 var ch1 = chan()
251 var ch2 = chan()
252 var p = sleep(1000)
253
254 put(ch1, 1)
255 await any(ch1, ch2, p)
256
257 put(ch2, 2)
258 let v = await take(ch2)
259
260 assert.equal(v, 2)
261 }).then(cb)
262 })
263
264 it ('should work with takes and puts - put winning', (cb) => {
265 go(async () => {
266
267 var ch1 = chan()
268 var ch2 = chan()
269
270 setTimeout(() => take(ch2), 50)
271
272 let [ v, ch ] = await any(ch1, [ ch2, 1 ])
273
274 assert.equal(v, 1)
275 assert.equal(ch, ch2)
276
277 }).then(cb, cb)
278 })
279
280 it ('should work with takes and puts - take winning', (cb) => {
281
282 go(async () => {
283
284 let ch1 = chan()
285 let ch2 = chan()
286 let ch3 = chan()
287 let p = sleep(100)
288
289 setTimeout(() => put(ch2, 2), 50)
290
291 let [ v, ch ] = await any(ch1, ch2, p, [ ch3, 1 ])
292
293 assert.equal(v, 2)
294 assert.equal(ch, ch2)
295
296 }).then(cb, cb)
297 })
298
299 it ('should work with raw promises - promise winning', (cb) => {
300
301 go(async () => {
302
303 let ch1 = chan()
304 let ch2 = chan()
305 let p = Promise.resolve(3)
306
307 let [ v, ch ] = await any(ch1, [ ch2, 1 ], p)
308
309 assert.equal(v, 3)
310 assert.equal(ch, p)
311
312 }).then(cb, cb)
313 })
314
315 it ('should work for typical "timeout" effect', (cb) => {
316
317 go(async () => {
318
319 let input = chan()
320 let timeout = sleep(100)
321
322 let [ v, ch ] = await any(input, timeout)
323
324 assert.equal(ch, timeout)
325
326 }).then(cb, cb)
327 })
328
329 it ('should work for typical "timeout" effect - in time', (cb) => {
330
331 go(async () => {
332
333 let input = chan()
334 let timeout = sleep(100)
335
336 setTimeout(() => put(input, 1), 50)
337
338 let [ v, ch ] = await any(input, timeout)
339
340 assert.equal(ch, input)
341
342 }).then(cb, cb)
343 })
344 })
345
346 describe('cancel()', () => {
347
348 describe('canceling a put', () => {
349
350 it ('should remove it from the buffer\'s unreleased queue', (cb) => {
351 let ch = chan()
352 go(async () => {
353 let put1Promise = put(ch, 1)
354 let put2Promise = put(ch, 2)
355 cancel(ch, put1Promise)
356 let val = await take(ch)
357 assert.equal(val, 2)
358 }).then(cb)
359 })
360 })
361
362 describe('canceling a take', () => {
363
364 it ('should remove it from the channel\'s takes', (cb) => {
365 let ch = chan()
366 go(async () => {
367 let take1Promise = take(ch)
368 let take2Promise = take(ch)
369 cancel(ch, take1Promise)
370 put(ch, 2)
371 let val = await take2Promise
372 assert.equal(val, 2)
373 }).then(cb)
374 })
375 })
376 })
377
378 describe('go()', () => {
379 it ('should immediately invoke given function', () => {
380 var spy = sinon.spy()
381 go(spy)
382 assert(spy.called)
383 })
384 })
385
386 describe('repeat()', () => {
387 it ('should recur until false is returned', async () => {
388 const spy = sinon.spy()
389 let i = 0
390 await repeat(async () => {
391 spy()
392 i++
393 if (i === 3) return false
394 })
395 assert(spy.callCount === 3)
396 })
397 it ('should take a seed value', async () => {
398 const spy = sinon.spy()
399 await repeat(async (i) => {
400 spy()
401 if (i === 3) return false
402 else return i + 1
403 }, 1)
404 assert(spy.callCount === 3)
405 })
406 })
407
408 describe('repeatTake()', () => {
409 it ('should take() until false is returned', async () => {
410 const spy = sinon.spy()
411 const ch = chan()
412 put(ch, 'a')
413 put(ch, 'b')
414 put(ch, 'c')
415 await repeatTake(ch, async (chr) => {
416 spy()
417 if (chr === 'c') return false
418 })
419 assert(spy.callCount === 3)
420 })
421 })
422
423 describe('merge()', () => {
424
425 it ('should take from all inputs and put onto single output channel', (cb) => {
426
427 var a = chan()
428 var b = chan()
429 var c = chan()
430 var merged = merge(a, b, c)
431
432 go(async () => {
433
434 await put(a, 1)
435 await put(b, 2)
436 await put(c, 3)
437
438 let fromA = await take(merged)
439 let fromB = await take(merged)
440 let fromC = await take(merged)
441
442 assert.equal(fromA, 1)
443 assert.equal(fromB, 2)
444 assert.equal(fromC, 3)
445
446 }).then(cb)
447 })
448
449 it ('should close output channel when all inputs are closed', (cb) => {
450
451 var a = chan()
452 var b = chan()
453 var c = chan()
454 var merged = merge(a, b, c)
455
456 go(async () => {
457 assert.equal(merged.isClosed, false)
458 close(a)
459 close(b)
460 close(c)
461 })
462 .then(() => assert.equal(merged.isClosed, true))
463 .then(cb)
464 })
465 })
466})