1 |
|
2 | const assert = require('assert')
|
3 | const sinon = require('sinon')
|
4 |
|
5 | const {
|
6 | cancel,
|
7 | merge,
|
8 | sleep,
|
9 | chan,
|
10 | go,
|
11 | put,
|
12 | take,
|
13 | repeat,
|
14 | repeatTake,
|
15 | clone,
|
16 | close,
|
17 | CLOSED,
|
18 | any
|
19 | } = require('../lib/index.js')
|
20 |
|
21 | describe('channels', () => {
|
22 |
|
23 | describe('take()', () => {
|
24 |
|
25 | it ('should return a promise', () => {
|
26 | assert(put(chan()) instanceof Promise)
|
27 | })
|
28 |
|
29 | it ('should by implied by simply awaiting a channel', () => {
|
30 | return go(async () => {
|
31 | let ch = chan()
|
32 | put(ch, 1)
|
33 | let val = await ch
|
34 | assert.equal(val, 1)
|
35 | })
|
36 | })
|
37 |
|
38 | it ('should deliver oldest put value', () => {
|
39 | return new Promise(res => {
|
40 |
|
41 | var ch = chan()
|
42 | put(ch, 1)
|
43 | put(ch, 2)
|
44 |
|
45 | var expected;
|
46 |
|
47 | take(ch).then((val) => expected = val)
|
48 |
|
49 | setTimeout(() => {
|
50 | assert.equal(expected, 1)
|
51 | res()
|
52 | }, 100)
|
53 | })
|
54 | })
|
55 |
|
56 | it ('should work in async function', () => {
|
57 | return new Promise(res => {
|
58 |
|
59 | var ch = chan()
|
60 | put(ch, 1)
|
61 | put(ch, 2)
|
62 |
|
63 | var test = async function() {
|
64 | var val = await take(ch)
|
65 | assert(val === 1)
|
66 | res()
|
67 | }
|
68 |
|
69 | test()
|
70 | })
|
71 | })
|
72 |
|
73 | it ('should work in a go-block', (cb) => {
|
74 |
|
75 | var ch = chan()
|
76 | put(ch, 1)
|
77 | put(ch, 2)
|
78 |
|
79 | go(async function() {
|
80 | var val = await take(ch)
|
81 | assert(val === 1)
|
82 | cb()
|
83 | })
|
84 | })
|
85 |
|
86 | it ('should park and wait if no pending put value', (cb) => {
|
87 |
|
88 | var ch = chan()
|
89 | var spy = sinon.spy()
|
90 |
|
91 | go(async function() {
|
92 | val = await take(ch)
|
93 | spy()
|
94 | })
|
95 |
|
96 | process.nextTick(() => {
|
97 | assert(!spy.called)
|
98 | cb()
|
99 | })
|
100 | })
|
101 | })
|
102 |
|
103 | describe('put()', () => {
|
104 |
|
105 | it ('should return a promise', () => {
|
106 | assert(put(chan()) instanceof Promise)
|
107 | })
|
108 |
|
109 | it ('should delegate to buffer', () => {
|
110 | var ch = chan()
|
111 | var spy = sinon.spy(ch.buffer, 'push')
|
112 | put(ch, 1)
|
113 | assert(spy.calledOnce)
|
114 | })
|
115 |
|
116 | it ('should resolve put immediately if there is a pending take', () => {
|
117 |
|
118 | var ch = chan()
|
119 | var spy = sinon.spy(ch.buffer, 'push')
|
120 |
|
121 |
|
122 | take(ch)
|
123 |
|
124 |
|
125 | put(ch, 1)
|
126 |
|
127 | assert(!spy.called)
|
128 | })
|
129 | })
|
130 |
|
131 | describe('sleep()', () => {
|
132 | it ('should sleep for given ms', (cb) => {
|
133 |
|
134 | var ch = chan()
|
135 | var subject = 1
|
136 |
|
137 | go(async function() {
|
138 | await sleep(100)
|
139 | subject = 2
|
140 | })
|
141 |
|
142 | setTimeout(() => {
|
143 | assert(subject === 1)
|
144 | setTimeout(() => {
|
145 | assert(subject === 2)
|
146 | cb()
|
147 | }, 60)
|
148 | }, 60)
|
149 |
|
150 | })
|
151 | })
|
152 |
|
153 | describe('clone()', () => {
|
154 |
|
155 | it ('should create a new chan with the same properties', () => {
|
156 | var bufferOrN = 1
|
157 | var opts = { foo: 'bar' }
|
158 | var a = chan(bufferOrN, opts)
|
159 | var b = clone(a, null, { debug: true })
|
160 | assert.equal(JSON.stringify(a), JSON.stringify(b))
|
161 | })
|
162 | })
|
163 |
|
164 | describe('close()', () => {
|
165 |
|
166 | it ('should set channel closed property to true', () => {
|
167 | var ch = chan()
|
168 | assert(!ch.isClosed)
|
169 | close(ch)
|
170 | assert(ch.isClosed)
|
171 | })
|
172 |
|
173 | it ('should cause all puts to resolve to false immediately', (cb) => {
|
174 | var ch = chan()
|
175 | close(ch)
|
176 | put(ch, 2)
|
177 | .then((val) => assert(val === false))
|
178 | .then(cb)
|
179 | })
|
180 |
|
181 | it ('should cause all takes to resolve with CLOSED constant value immediately', (cb) => {
|
182 | var ch = chan()
|
183 | close(ch)
|
184 | take(ch, 2)
|
185 | .then((val) => assert(val === CLOSED))
|
186 | .then(cb)
|
187 | })
|
188 |
|
189 | it ('should cause all pending takes to resolve with CLOSED constant immediately', (cb) => {
|
190 | var ch = chan()
|
191 | var takenA = take(ch)
|
192 | var takenB = take(ch)
|
193 | close(ch)
|
194 | takenA.then((val) => assert(val === CLOSED))
|
195 | takenB.then((val) => assert(val === CLOSED))
|
196 | Promise.all([ takenA, takenB ]).then(() => cb())
|
197 | })
|
198 |
|
199 | it ('should NOT cause all pending puts in buffer to resolve with false immediately', () => {
|
200 | var ch = chan()
|
201 | var putted = put(ch, 2)
|
202 | close(ch)
|
203 | var spy = sinon.spy()
|
204 | putted.then(spy)
|
205 | assert.equal(spy.called, false)
|
206 | })
|
207 | })
|
208 |
|
209 | describe('alts()', () => {
|
210 |
|
211 | it ('should resolve with first action that resolves', (cb) => {
|
212 | go(async () => {
|
213 | var ch1 = chan()
|
214 | var p = sleep(100)
|
215 | var [ v, ch ] = await any(ch1, p)
|
216 | assert.equal(ch, p)
|
217 | }).then(cb, cb)
|
218 | })
|
219 |
|
220 | it ('should resolve immediately if a take has a pending put', (cb) => {
|
221 | go(async () => {
|
222 | var ch1 = chan()
|
223 | var ch2 = chan()
|
224 | var p = sleep(1000)
|
225 | put(ch2, 1)
|
226 | var [ v, ch ] = await any(ch1, ch2, p)
|
227 | assert.equal(ch, ch2)
|
228 | assert.equal(v, 1)
|
229 | }).then(cb)
|
230 | })
|
231 |
|
232 | it ('should resolve non-deterministically when more than one has a pending put', (cb) => {
|
233 | go(async () => {
|
234 | var resolved = []
|
235 | for (var i = 0; i < 100; i++) {
|
236 | var chans = [ chan(), chan(), chan() ]
|
237 | put(chans[1], 1)
|
238 | put(chans[2], 2)
|
239 | var [ val, ch ] = await any(...chans)
|
240 | resolved.push(chans.indexOf(ch))
|
241 | }
|
242 | assert.ok(resolved.some((r) => r === 1))
|
243 | assert.ok(!resolved.every((r) => r === 1))
|
244 | }).then(cb)
|
245 | })
|
246 |
|
247 | it ('should cancel all other channel actions after one is resolved', (cb) => {
|
248 | go(async () => {
|
249 |
|
250 | var ch1 = chan()
|
251 | var ch2 = chan()
|
252 | var p = sleep(1000)
|
253 |
|
254 | put(ch1, 1)
|
255 | await any(ch1, ch2, p)
|
256 |
|
257 | put(ch2, 2)
|
258 | let v = await take(ch2)
|
259 |
|
260 | assert.equal(v, 2)
|
261 | }).then(cb)
|
262 | })
|
263 |
|
264 | it ('should work with takes and puts - put winning', (cb) => {
|
265 | go(async () => {
|
266 |
|
267 | var ch1 = chan()
|
268 | var ch2 = chan()
|
269 |
|
270 | setTimeout(() => take(ch2), 50)
|
271 |
|
272 | let [ v, ch ] = await any(ch1, [ ch2, 1 ])
|
273 |
|
274 | assert.equal(v, 1)
|
275 | assert.equal(ch, ch2)
|
276 |
|
277 | }).then(cb, cb)
|
278 | })
|
279 |
|
280 | it ('should work with takes and puts - take winning', (cb) => {
|
281 |
|
282 | go(async () => {
|
283 |
|
284 | let ch1 = chan()
|
285 | let ch2 = chan()
|
286 | let ch3 = chan()
|
287 | let p = sleep(100)
|
288 |
|
289 | setTimeout(() => put(ch2, 2), 50)
|
290 |
|
291 | let [ v, ch ] = await any(ch1, ch2, p, [ ch3, 1 ])
|
292 |
|
293 | assert.equal(v, 2)
|
294 | assert.equal(ch, ch2)
|
295 |
|
296 | }).then(cb, cb)
|
297 | })
|
298 |
|
299 | it ('should work with raw promises - promise winning', (cb) => {
|
300 |
|
301 | go(async () => {
|
302 |
|
303 | let ch1 = chan()
|
304 | let ch2 = chan()
|
305 | let p = Promise.resolve(3)
|
306 |
|
307 | let [ v, ch ] = await any(ch1, [ ch2, 1 ], p)
|
308 |
|
309 | assert.equal(v, 3)
|
310 | assert.equal(ch, p)
|
311 |
|
312 | }).then(cb, cb)
|
313 | })
|
314 |
|
315 | it ('should work for typical "timeout" effect', (cb) => {
|
316 |
|
317 | go(async () => {
|
318 |
|
319 | let input = chan()
|
320 | let timeout = sleep(100)
|
321 |
|
322 | let [ v, ch ] = await any(input, timeout)
|
323 |
|
324 | assert.equal(ch, timeout)
|
325 |
|
326 | }).then(cb, cb)
|
327 | })
|
328 |
|
329 | it ('should work for typical "timeout" effect - in time', (cb) => {
|
330 |
|
331 | go(async () => {
|
332 |
|
333 | let input = chan()
|
334 | let timeout = sleep(100)
|
335 |
|
336 | setTimeout(() => put(input, 1), 50)
|
337 |
|
338 | let [ v, ch ] = await any(input, timeout)
|
339 |
|
340 | assert.equal(ch, input)
|
341 |
|
342 | }).then(cb, cb)
|
343 | })
|
344 | })
|
345 |
|
346 | describe('cancel()', () => {
|
347 |
|
348 | describe('canceling a put', () => {
|
349 |
|
350 | it ('should remove it from the buffer\'s unreleased queue', (cb) => {
|
351 | let ch = chan()
|
352 | go(async () => {
|
353 | let put1Promise = put(ch, 1)
|
354 | let put2Promise = put(ch, 2)
|
355 | cancel(ch, put1Promise)
|
356 | let val = await take(ch)
|
357 | assert.equal(val, 2)
|
358 | }).then(cb)
|
359 | })
|
360 | })
|
361 |
|
362 | describe('canceling a take', () => {
|
363 |
|
364 | it ('should remove it from the channel\'s takes', (cb) => {
|
365 | let ch = chan()
|
366 | go(async () => {
|
367 | let take1Promise = take(ch)
|
368 | let take2Promise = take(ch)
|
369 | cancel(ch, take1Promise)
|
370 | put(ch, 2)
|
371 | let val = await take2Promise
|
372 | assert.equal(val, 2)
|
373 | }).then(cb)
|
374 | })
|
375 | })
|
376 | })
|
377 |
|
378 | describe('go()', () => {
|
379 | it ('should immediately invoke given function', () => {
|
380 | var spy = sinon.spy()
|
381 | go(spy)
|
382 | assert(spy.called)
|
383 | })
|
384 | })
|
385 |
|
386 | describe('repeat()', () => {
|
387 | it ('should recur until false is returned', async () => {
|
388 | const spy = sinon.spy()
|
389 | let i = 0
|
390 | await repeat(async () => {
|
391 | spy()
|
392 | i++
|
393 | if (i === 3) return false
|
394 | })
|
395 | assert(spy.callCount === 3)
|
396 | })
|
397 | it ('should take a seed value', async () => {
|
398 | const spy = sinon.spy()
|
399 | await repeat(async (i) => {
|
400 | spy()
|
401 | if (i === 3) return false
|
402 | else return i + 1
|
403 | }, 1)
|
404 | assert(spy.callCount === 3)
|
405 | })
|
406 | })
|
407 |
|
408 | describe('repeatTake()', () => {
|
409 | it ('should take() until false is returned', async () => {
|
410 | const spy = sinon.spy()
|
411 | const ch = chan()
|
412 | put(ch, 'a')
|
413 | put(ch, 'b')
|
414 | put(ch, 'c')
|
415 | await repeatTake(ch, async (chr) => {
|
416 | spy()
|
417 | if (chr === 'c') return false
|
418 | })
|
419 | assert(spy.callCount === 3)
|
420 | })
|
421 | })
|
422 |
|
423 | describe('merge()', () => {
|
424 |
|
425 | it ('should take from all inputs and put onto single output channel', (cb) => {
|
426 |
|
427 | var a = chan()
|
428 | var b = chan()
|
429 | var c = chan()
|
430 | var merged = merge(a, b, c)
|
431 |
|
432 | go(async () => {
|
433 |
|
434 | await put(a, 1)
|
435 | await put(b, 2)
|
436 | await put(c, 3)
|
437 |
|
438 | let fromA = await take(merged)
|
439 | let fromB = await take(merged)
|
440 | let fromC = await take(merged)
|
441 |
|
442 | assert.equal(fromA, 1)
|
443 | assert.equal(fromB, 2)
|
444 | assert.equal(fromC, 3)
|
445 |
|
446 | }).then(cb)
|
447 | })
|
448 |
|
449 | it ('should close output channel when all inputs are closed', (cb) => {
|
450 |
|
451 | var a = chan()
|
452 | var b = chan()
|
453 | var c = chan()
|
454 | var merged = merge(a, b, c)
|
455 |
|
456 | go(async () => {
|
457 | assert.equal(merged.isClosed, false)
|
458 | close(a)
|
459 | close(b)
|
460 | close(c)
|
461 | })
|
462 | .then(() => assert.equal(merged.isClosed, true))
|
463 | .then(cb)
|
464 | })
|
465 | })
|
466 | })
|