UNPKG

4.28 kBJavaScriptView Raw
1/* eslint-disable */
2;(function() {
3"use strict"
4/* eslint-enable */
5Stream.SKIP = {}
6Stream.lift = lift
7Stream.scan = scan
8Stream.merge = merge
9Stream.combine = combine
10Stream.scanMerge = scanMerge
11Stream["fantasy-land/of"] = Stream
12
13var warnedHalt = false
14Object.defineProperty(Stream, "HALT", {
15 get: function() {
16 warnedHalt || console.log("HALT is deprecated and has been renamed to SKIP");
17 warnedHalt = true
18 return Stream.SKIP
19 }
20})
21
22function Stream(value) {
23 var dependentStreams = []
24 var dependentFns = []
25
26 function stream(v) {
27 if (arguments.length && v !== Stream.SKIP) {
28 value = v
29 if (open(stream)) {
30 stream._changing()
31 stream._state = "active"
32 dependentStreams.forEach(function(s, i) { s(dependentFns[i](value)) })
33 }
34 }
35
36 return value
37 }
38
39 stream.constructor = Stream
40 stream._state = arguments.length && value !== Stream.SKIP ? "active" : "pending"
41 stream._parents = []
42
43 stream._changing = function() {
44 if (open(stream)) stream._state = "changing"
45 dependentStreams.forEach(function(s) {
46 s._changing()
47 })
48 }
49
50 stream._map = function(fn, ignoreInitial) {
51 var target = ignoreInitial ? Stream() : Stream(fn(value))
52 target._parents.push(stream)
53 dependentStreams.push(target)
54 dependentFns.push(fn)
55 return target
56 }
57
58 stream.map = function(fn) {
59 return stream._map(fn, stream._state !== "active")
60 }
61
62 var end
63 function createEnd() {
64 end = Stream()
65 end.map(function(value) {
66 if (value === true) {
67 stream._parents.forEach(function (p) {p._unregisterChild(stream)})
68 stream._state = "ended"
69 stream._parents.length = dependentStreams.length = dependentFns.length = 0
70 }
71 return value
72 })
73 return end
74 }
75
76 stream.toJSON = function() { return value != null && typeof value.toJSON === "function" ? value.toJSON() : value }
77
78 stream["fantasy-land/map"] = stream.map
79 stream["fantasy-land/ap"] = function(x) { return combine(function(s1, s2) { return s1()(s2()) }, [x, stream]) }
80
81 stream._unregisterChild = function(child) {
82 var childIndex = dependentStreams.indexOf(child)
83 if (childIndex !== -1) {
84 dependentStreams.splice(childIndex, 1)
85 dependentFns.splice(childIndex, 1)
86 }
87 }
88
89 Object.defineProperty(stream, "end", {
90 get: function() { return end || createEnd() }
91 })
92
93 return stream
94}
95
96function combine(fn, streams) {
97 var ready = streams.every(function(s) {
98 if (s.constructor !== Stream)
99 throw new Error("Ensure that each item passed to stream.combine/stream.merge/lift is a stream")
100 return s._state === "active"
101 })
102 var stream = ready
103 ? Stream(fn.apply(null, streams.concat([streams])))
104 : Stream()
105
106 var changed = []
107
108 var mappers = streams.map(function(s) {
109 return s._map(function(value) {
110 changed.push(s)
111 if (ready || streams.every(function(s) { return s._state !== "pending" })) {
112 ready = true
113 stream(fn.apply(null, streams.concat([changed])))
114 changed = []
115 }
116 return value
117 }, true)
118 })
119
120 var endStream = stream.end.map(function(value) {
121 if (value === true) {
122 mappers.forEach(function(mapper) { mapper.end(true) })
123 endStream.end(true)
124 }
125 return undefined
126 })
127
128 return stream
129}
130
131function merge(streams) {
132 return combine(function() { return streams.map(function(s) { return s() }) }, streams)
133}
134
135function scan(fn, acc, origin) {
136 var stream = origin.map(function(v) {
137 var next = fn(acc, v)
138 if (next !== Stream.SKIP) acc = next
139 return next
140 })
141 stream(acc)
142 return stream
143}
144
145function scanMerge(tuples, seed) {
146 var streams = tuples.map(function(tuple) { return tuple[0] })
147
148 var stream = combine(function() {
149 var changed = arguments[arguments.length - 1]
150 streams.forEach(function(stream, i) {
151 if (changed.indexOf(stream) > -1)
152 seed = tuples[i][1](seed, stream())
153 })
154
155 return seed
156 }, streams)
157
158 stream(seed)
159
160 return stream
161}
162
163function lift() {
164 var fn = arguments[0]
165 var streams = Array.prototype.slice.call(arguments, 1)
166 return merge(streams).map(function(streams) {
167 return fn.apply(undefined, streams)
168 })
169}
170
171function open(s) {
172 return s._state === "pending" || s._state === "active" || s._state === "changing"
173}
174
175if (typeof module !== "undefined") module["exports"] = Stream
176else if (typeof window.m === "function" && !("stream" in window.m)) window.m.stream = Stream
177else window.m = {stream : Stream}
178
179}());