1 |
|
2 | ;(function() {
|
3 | "use strict"
|
4 |
|
5 | Stream.SKIP = {}
|
6 | Stream.lift = lift
|
7 | Stream.scan = scan
|
8 | Stream.merge = merge
|
9 | Stream.combine = combine
|
10 | Stream.scanMerge = scanMerge
|
11 | Stream["fantasy-land/of"] = Stream
|
12 |
|
13 | var warnedHalt = false
|
14 | Object.defineProperty(Stream, "HALT", {
|
15 | get: function() {
|
16 | warnedHalt || console.log("HALT is deprecated and has been renamed to SKIP");
|
17 | warnedHalt = true
|
18 | return Stream.SKIP
|
19 | }
|
20 | })
|
21 |
|
22 | function Stream(value) {
|
23 | var dependentStreams = []
|
24 | var dependentFns = []
|
25 |
|
26 | function stream(v) {
|
27 | if (arguments.length && v !== Stream.SKIP) {
|
28 | value = v
|
29 | if (open(stream)) {
|
30 | stream._changing()
|
31 | stream._state = "active"
|
32 | dependentStreams.forEach(function(s, i) { s(dependentFns[i](value)) })
|
33 | }
|
34 | }
|
35 |
|
36 | return value
|
37 | }
|
38 |
|
39 | stream.constructor = Stream
|
40 | stream._state = arguments.length && value !== Stream.SKIP ? "active" : "pending"
|
41 | stream._parents = []
|
42 |
|
43 | stream._changing = function() {
|
44 | if (open(stream)) stream._state = "changing"
|
45 | dependentStreams.forEach(function(s) {
|
46 | s._changing()
|
47 | })
|
48 | }
|
49 |
|
50 | stream._map = function(fn, ignoreInitial) {
|
51 | var target = ignoreInitial ? Stream() : Stream(fn(value))
|
52 | target._parents.push(stream)
|
53 | dependentStreams.push(target)
|
54 | dependentFns.push(fn)
|
55 | return target
|
56 | }
|
57 |
|
58 | stream.map = function(fn) {
|
59 | return stream._map(fn, stream._state !== "active")
|
60 | }
|
61 |
|
62 | var end
|
63 | function createEnd() {
|
64 | end = Stream()
|
65 | end.map(function(value) {
|
66 | if (value === true) {
|
67 | stream._parents.forEach(function (p) {p._unregisterChild(stream)})
|
68 | stream._state = "ended"
|
69 | stream._parents.length = dependentStreams.length = dependentFns.length = 0
|
70 | }
|
71 | return value
|
72 | })
|
73 | return end
|
74 | }
|
75 |
|
76 | stream.toJSON = function() { return value != null && typeof value.toJSON === "function" ? value.toJSON() : value }
|
77 |
|
78 | stream["fantasy-land/map"] = stream.map
|
79 | stream["fantasy-land/ap"] = function(x) { return combine(function(s1, s2) { return s1()(s2()) }, [x, stream]) }
|
80 |
|
81 | stream._unregisterChild = function(child) {
|
82 | var childIndex = dependentStreams.indexOf(child)
|
83 | if (childIndex !== -1) {
|
84 | dependentStreams.splice(childIndex, 1)
|
85 | dependentFns.splice(childIndex, 1)
|
86 | }
|
87 | }
|
88 |
|
89 | Object.defineProperty(stream, "end", {
|
90 | get: function() { return end || createEnd() }
|
91 | })
|
92 |
|
93 | return stream
|
94 | }
|
95 |
|
96 | function combine(fn, streams) {
|
97 | var ready = streams.every(function(s) {
|
98 | if (s.constructor !== Stream)
|
99 | throw new Error("Ensure that each item passed to stream.combine/stream.merge/lift is a stream")
|
100 | return s._state === "active"
|
101 | })
|
102 | var stream = ready
|
103 | ? Stream(fn.apply(null, streams.concat([streams])))
|
104 | : Stream()
|
105 |
|
106 | var changed = []
|
107 |
|
108 | var mappers = streams.map(function(s) {
|
109 | return s._map(function(value) {
|
110 | changed.push(s)
|
111 | if (ready || streams.every(function(s) { return s._state !== "pending" })) {
|
112 | ready = true
|
113 | stream(fn.apply(null, streams.concat([changed])))
|
114 | changed = []
|
115 | }
|
116 | return value
|
117 | }, true)
|
118 | })
|
119 |
|
120 | var endStream = stream.end.map(function(value) {
|
121 | if (value === true) {
|
122 | mappers.forEach(function(mapper) { mapper.end(true) })
|
123 | endStream.end(true)
|
124 | }
|
125 | return undefined
|
126 | })
|
127 |
|
128 | return stream
|
129 | }
|
130 |
|
131 | function merge(streams) {
|
132 | return combine(function() { return streams.map(function(s) { return s() }) }, streams)
|
133 | }
|
134 |
|
135 | function scan(fn, acc, origin) {
|
136 | var stream = origin.map(function(v) {
|
137 | var next = fn(acc, v)
|
138 | if (next !== Stream.SKIP) acc = next
|
139 | return next
|
140 | })
|
141 | stream(acc)
|
142 | return stream
|
143 | }
|
144 |
|
145 | function scanMerge(tuples, seed) {
|
146 | var streams = tuples.map(function(tuple) { return tuple[0] })
|
147 |
|
148 | var stream = combine(function() {
|
149 | var changed = arguments[arguments.length - 1]
|
150 | streams.forEach(function(stream, i) {
|
151 | if (changed.indexOf(stream) > -1)
|
152 | seed = tuples[i][1](seed, stream())
|
153 | })
|
154 |
|
155 | return seed
|
156 | }, streams)
|
157 |
|
158 | stream(seed)
|
159 |
|
160 | return stream
|
161 | }
|
162 |
|
163 | function lift() {
|
164 | var fn = arguments[0]
|
165 | var streams = Array.prototype.slice.call(arguments, 1)
|
166 | return merge(streams).map(function(streams) {
|
167 | return fn.apply(undefined, streams)
|
168 | })
|
169 | }
|
170 |
|
171 | function open(s) {
|
172 | return s._state === "pending" || s._state === "active" || s._state === "changing"
|
173 | }
|
174 |
|
175 | if (typeof module !== "undefined") module["exports"] = Stream
|
176 | else if (typeof window.m === "function" && !("stream" in window.m)) window.m.stream = Stream
|
177 | else window.m = {stream : Stream}
|
178 |
|
179 | }());
|