UNPKG

1.93 kBJavaScriptView Raw
1import { kTrue, noop } from './utils'
2
3export const BUFFER_OVERFLOW = "Channel's Buffer overflow!"
4
5const ON_OVERFLOW_THROW = 1
6const ON_OVERFLOW_DROP = 2
7const ON_OVERFLOW_SLIDE = 3
8const ON_OVERFLOW_EXPAND = 4
9
10const zeroBuffer = { isEmpty: kTrue, put: noop, take: noop }
11
12function ringBuffer(limit = 10, overflowAction) {
13 let arr = new Array(limit)
14 let length = 0
15 let pushIndex = 0
16 let popIndex = 0
17
18 const push = it => {
19 arr[pushIndex] = it
20 pushIndex = (pushIndex + 1) % limit
21 length++
22 }
23
24 const take = () => {
25 if (length != 0) {
26 let it = arr[popIndex]
27 arr[popIndex] = null
28 length--
29 popIndex = (popIndex + 1) % limit
30 return it
31 }
32 }
33
34 const flush = () => {
35 let items = []
36 while (length) {
37 items.push(take())
38 }
39 return items
40 }
41
42 return {
43 isEmpty: () => length == 0,
44 put: it => {
45 if (length < limit) {
46 push(it)
47 } else {
48 let doubledLimit
49 switch (overflowAction) {
50 case ON_OVERFLOW_THROW:
51 throw new Error(BUFFER_OVERFLOW)
52 case ON_OVERFLOW_SLIDE:
53 arr[pushIndex] = it
54 pushIndex = (pushIndex + 1) % limit
55 popIndex = pushIndex
56 break
57 case ON_OVERFLOW_EXPAND:
58 doubledLimit = 2 * limit
59
60 arr = flush()
61
62 length = arr.length
63 pushIndex = arr.length
64 popIndex = 0
65
66 arr.length = doubledLimit
67 limit = doubledLimit
68
69 push(it)
70 break
71 default:
72 // DROP
73 }
74 }
75 },
76 take,
77 flush,
78 }
79}
80
81export const buffers = {
82 none: () => zeroBuffer,
83 fixed: limit => ringBuffer(limit, ON_OVERFLOW_THROW),
84 dropping: limit => ringBuffer(limit, ON_OVERFLOW_DROP),
85 sliding: limit => ringBuffer(limit, ON_OVERFLOW_SLIDE),
86 expanding: initialSize => ringBuffer(initialSize, ON_OVERFLOW_EXPAND),
87}