UNPKG

2.73 kBJavaScriptView Raw
1// ref + channel = bus
2import Cancelable from './cancelable.js'
3import _observable from 'symbol-observable'
4
5export const _bus = Symbol.for('@@spect.bus')
6
7// `get: () => value` is called to obtain current state, like `channel()`. It is called automatically on subscription.
8// The main purpose - provide getter for user.
9// If null - the channel is considered stateless, does't emit initial event and returns always null.
10//
11// `set: (value, prev) => changed?` - intended to set value by user.
12// Emit new state: resolves promise, pushes new value (unless returned false).
13// If null - the channel can be just external-driven [stateful] notifications, like `input`, `on`, `from` etc.
14export default function bus(get, set, teardown) {
15 let resolve, promise = new Cancelable(r => resolve = r), subs = [], ibufs = []
16
17 const channel = function (value) {
18 if (arguments.length) {
19 if (promise.canceled) throw Error('Channel is canceled')
20
21 let notify = set ? set(value) : null
22
23 if (notify !== false) {
24 resolve(value = get ? get() : value)
25 subs.map(sub => sub(value))
26 ibufs.map(buf => buf[0] = value)
27 promise = new Cancelable(r => resolve = r)
28 }
29
30 return notify
31 }
32
33 return get && get()
34 }
35
36 Object.assign(channel, {
37 [_bus]() { return channel },
38
39 async *[Symbol.asyncIterator]() {
40 if (get) yield get()
41 let buf = []
42 ibufs.push(buf)
43 try {
44 while (1) {
45 await promise
46 // from the moment promise was resolved until now, other values could've been set to bus, that's for we need buf
47 while (buf.length) {
48 yield buf.pop()
49 // from the moment yield got control back, the stack could've been topped up again
50 }
51 }
52 } catch (e) {
53 } finally {
54 }
55 },
56
57 [_observable]() {
58 return {
59 subscribe(next) {
60 if (next.next) next = next.next
61 subs.push(next)
62 return {
63 unsubscribe() {
64 subs.splice(subs.indexOf(next) >>> 0, 1)
65 }
66 }
67 },
68 [_observable](){ return this }
69 }
70 },
71
72 // Promise
73 cancel() {
74 subs.length = 0
75 if (teardown) teardown()
76 channel.canceled = true
77 return promise.cancel()
78 },
79 then(y, n) {
80 return promise.then(y, n)
81 }
82 })
83
84 // value[0]
85 if (get) {
86 channel.valueOf = channel.toString = channel[Symbol.toPrimitive] = get
87 Object.defineProperties(channel, { [0]: { enumerable: false, get } })
88 channel[Symbol.iterator] = function*() { if (get) yield get() }
89 }
90
91 return channel
92}