1 | import { kTrue, noop } from './utils'
|
2 |
|
3 | export const BUFFER_OVERFLOW = "Channel's Buffer overflow!"
|
4 |
|
5 | const ON_OVERFLOW_THROW = 1
|
6 | const ON_OVERFLOW_DROP = 2
|
7 | const ON_OVERFLOW_SLIDE = 3
|
8 | const ON_OVERFLOW_EXPAND = 4
|
9 |
|
10 | const zeroBuffer = { isEmpty: kTrue, put: noop, take: noop }
|
11 |
|
12 | function ringBuffer(limit = 10, overflowAction) {
|
13 | let arr = new Array(limit)
|
14 | let length = 0
|
15 | let pushIndex = 0
|
16 | let popIndex = 0
|
17 |
|
18 | const push = it => {
|
19 | arr[pushIndex] = it
|
20 | pushIndex = (pushIndex + 1) % limit
|
21 | length++
|
22 | }
|
23 |
|
24 | const take = () => {
|
25 | if (length != 0) {
|
26 | let it = arr[popIndex]
|
27 | arr[popIndex] = null
|
28 | length--
|
29 | popIndex = (popIndex + 1) % limit
|
30 | return it
|
31 | }
|
32 | }
|
33 |
|
34 | const flush = () => {
|
35 | let items = []
|
36 | while (length) {
|
37 | items.push(take())
|
38 | }
|
39 | return items
|
40 | }
|
41 |
|
42 | return {
|
43 | isEmpty: () => length == 0,
|
44 | put: it => {
|
45 | if (length < limit) {
|
46 | push(it)
|
47 | } else {
|
48 | let doubledLimit
|
49 | switch (overflowAction) {
|
50 | case ON_OVERFLOW_THROW:
|
51 | throw new Error(BUFFER_OVERFLOW)
|
52 | case ON_OVERFLOW_SLIDE:
|
53 | arr[pushIndex] = it
|
54 | pushIndex = (pushIndex + 1) % limit
|
55 | popIndex = pushIndex
|
56 | break
|
57 | case ON_OVERFLOW_EXPAND:
|
58 | doubledLimit = 2 * limit
|
59 |
|
60 | arr = flush()
|
61 |
|
62 | length = arr.length
|
63 | pushIndex = arr.length
|
64 | popIndex = 0
|
65 |
|
66 | arr.length = doubledLimit
|
67 | limit = doubledLimit
|
68 |
|
69 | push(it)
|
70 | break
|
71 | default:
|
72 |
|
73 | }
|
74 | }
|
75 | },
|
76 | take,
|
77 | flush,
|
78 | }
|
79 | }
|
80 |
|
81 | export const buffers = {
|
82 | none: () => zeroBuffer,
|
83 | fixed: limit => ringBuffer(limit, ON_OVERFLOW_THROW),
|
84 | dropping: limit => ringBuffer(limit, ON_OVERFLOW_DROP),
|
85 | sliding: limit => ringBuffer(limit, ON_OVERFLOW_SLIDE),
|
86 | expanding: initialSize => ringBuffer(initialSize, ON_OVERFLOW_EXPAND),
|
87 | }
|