1 | var Rx = require('./dist/rx');
|
2 | require('./dist/rx.aggregates');
|
3 | require('./dist/rx.async');
|
4 | require('./dist/rx.backpressure');
|
5 | require('./dist/rx.binding');
|
6 | require('./dist/rx.coincidence');
|
7 | require('./dist/rx.experimental');
|
8 | require('./dist/rx.joinpatterns');
|
9 | require('./dist/rx.sorting');
|
10 | require('./dist/rx.virtualtime');
|
11 | require('./dist/rx.testing');
|
12 | require('./dist/rx.time');
|
13 |
|
14 |
|
15 | // Add specific Node functions
|
16 | var EventEmitter = require('events').EventEmitter,
|
17 | Observable = Rx.Observable;
|
18 |
|
19 | Rx.Node = {
|
20 | /**
|
21 | * @deprecated Use Rx.Observable.fromCallback from rx.async.js instead.
|
22 | *
|
23 | * Converts a callback function to an observable sequence.
|
24 | *
|
25 | * @param {Function} func Function to convert to an asynchronous function.
|
26 | * @param {Mixed} [context] The context for the func parameter to be executed. If not specified, defaults to undefined.
|
27 | * @param {Function} [selector] A selector which takes the arguments from the event handler to produce a single item to yield on next.
|
28 | * @returns {Function} Asynchronous function.
|
29 | */
|
30 | fromCallback: function (func, context, selector) {
|
31 | return Observable.fromCallback(func, context, selector);
|
32 | },
|
33 |
|
34 | /**
|
35 | * @deprecated Use Rx.Observable.fromNodeCallback from rx.async.js instead.
|
36 | *
|
37 | * Converts a Node.js callback style function to an observable sequence. This must be in function (err, ...) format.
|
38 | *
|
39 | * @param {Function} func The function to call
|
40 | * @param {Mixed} [context] The context for the func parameter to be executed. If not specified, defaults to undefined.
|
41 | * @param {Function} [selector] A selector which takes the arguments from the event handler to produce a single item to yield on next.
|
42 | * @returns {Function} An async function which when applied, returns an observable sequence with the callback arguments as an array.
|
43 | */
|
44 | fromNodeCallback: function (func, context, selector) {
|
45 | return Observable.fromNodeCallback(func, context, selector);
|
46 | },
|
47 |
|
48 | /**
|
49 | * @deprecated Use Rx.Observable.fromNodeCallback from rx.async.js instead.
|
50 | *
|
51 | * Handles an event from the given EventEmitter as an observable sequence.
|
52 | *
|
53 | * @param {EventEmitter} eventEmitter The EventEmitter to subscribe to the given event.
|
54 | * @param {String} eventName The event name to subscribe
|
55 | * @param {Function} [selector] A selector which takes the arguments from the event handler to produce a single item to yield on next.
|
56 | * @returns {Observable} An observable sequence generated from the named event from the given EventEmitter. The data will be returned as an array of arguments to the handler.
|
57 | */
|
58 | fromEvent: function (eventEmitter, eventName, selector) {
|
59 | return Observable.fromEvent(eventEmitter, eventName, selector);
|
60 | },
|
61 |
|
62 | /**
|
63 | * Converts the given observable sequence to an event emitter with the given event name.
|
64 | * The errors are handled on the 'error' event and completion on the 'end' event.
|
65 | * @param {Observable} observable The observable sequence to convert to an EventEmitter.
|
66 | * @param {String} eventName The event name to emit onNext calls.
|
67 | * @returns {EventEmitter} An EventEmitter which emits the given eventName for each onNext call in addition to 'error' and 'end' events.
|
68 | * You must call publish in order to invoke the subscription on the Observable sequuence.
|
69 | */
|
70 | toEventEmitter: function (observable, eventName, selector) {
|
71 | var e = new EventEmitter();
|
72 |
|
73 | // Used to publish the events from the observable
|
74 | e.publish = function () {
|
75 | e.subscription = observable.subscribe(
|
76 | function (x) {
|
77 | var result = x;
|
78 | if (selector) {
|
79 | try {
|
80 | result = selector(x);
|
81 | } catch (e) {
|
82 | e.emit('error', e);
|
83 | return;
|
84 | }
|
85 | }
|
86 |
|
87 | e.emit(eventName, result);
|
88 | },
|
89 | function (err) {
|
90 | e.emit('error', err);
|
91 | },
|
92 | function () {
|
93 | e.emit('end');
|
94 | });
|
95 | };
|
96 |
|
97 | return e;
|
98 | },
|
99 |
|
100 | /**
|
101 | * Converts a flowing stream to an Observable sequence.
|
102 | * @param {Stream} stream A stream to convert to a observable sequence.
|
103 | * @param {String} [finishEventName] Event that notifies about closed stream. ("end" by default)
|
104 | * @returns {Observable} An observable sequence which fires on each 'data' event as well as handling 'error' and finish events like `end` or `finish`.
|
105 | */
|
106 | fromStream: function (stream, finishEventName) {
|
107 | stream.pause();
|
108 |
|
109 | finishEventName || (finishEventName = 'end');
|
110 |
|
111 | return Observable.create(function (observer) {
|
112 | function dataHandler (data) {
|
113 | observer.onNext(data);
|
114 | }
|
115 |
|
116 | function errorHandler (err) {
|
117 | observer.onError(err);
|
118 | }
|
119 |
|
120 | function endHandler () {
|
121 | observer.onCompleted();
|
122 | }
|
123 |
|
124 | stream.addListener('data', dataHandler);
|
125 | stream.addListener('error', errorHandler);
|
126 | stream.addListener(finishEventName, endHandler);
|
127 |
|
128 | stream.resume();
|
129 |
|
130 | return function () {
|
131 | stream.removeListener('data', dataHandler);
|
132 | stream.removeListener('error', errorHandler);
|
133 | stream.removeListener(finishEventName, endHandler);
|
134 | };
|
135 | }).publish().refCount();
|
136 | },
|
137 |
|
138 | /**
|
139 | * Converts a flowing readable stream to an Observable sequence.
|
140 | * @param {Stream} stream A stream to convert to a observable sequence.
|
141 | * @returns {Observable} An observable sequence which fires on each 'data' event as well as handling 'error' and 'end' events.
|
142 | */
|
143 | fromReadableStream: function (stream) {
|
144 | return this.fromStream(stream, 'end');
|
145 | },
|
146 |
|
147 | /**
|
148 | * Converts a flowing writeable stream to an Observable sequence.
|
149 | * @param {Stream} stream A stream to convert to a observable sequence.
|
150 | * @returns {Observable} An observable sequence which fires on each 'data' event as well as handling 'error' and 'finish' events.
|
151 | */
|
152 | fromWritableStream: function (stream) {
|
153 | return this.fromStream(stream, 'finish');
|
154 | },
|
155 |
|
156 | /**
|
157 | * Converts a flowing transform stream to an Observable sequence.
|
158 | * @param {Stream} stream A stream to convert to a observable sequence.
|
159 | * @returns {Observable} An observable sequence which fires on each 'data' event as well as handling 'error' and 'finish' events.
|
160 | */
|
161 | fromTransformStream: function (stream) {
|
162 | return this.fromStream(stream, 'finish');
|
163 | },
|
164 |
|
165 | /**
|
166 | * Writes an observable sequence to a stream
|
167 | * @param {Observable} observable Observable sequence to write to a stream.
|
168 | * @param {Stream} stream The stream to write to.
|
169 | * @param {String} [encoding] The encoding of the item to write.
|
170 | * @returns {Disposable} The subscription handle.
|
171 | */
|
172 | writeToStream: function (observable, stream, encoding) {
|
173 | var source = observable.pausableBuffered();
|
174 |
|
175 | function onDrain() {
|
176 | source.resume();
|
177 | }
|
178 |
|
179 | stream.addListener('drain', onDrain);
|
180 |
|
181 | return source.subscribe(
|
182 | function (x) {
|
183 | !stream.write(String(x), encoding) && source.pause();
|
184 | },
|
185 | function (err) {
|
186 | stream.emit('error', err);
|
187 | },
|
188 | function () {
|
189 | // Hack check because STDIO is not closable
|
190 | !stream._isStdio && stream.end();
|
191 | stream.removeListener('drain', onDrain);
|
192 | });
|
193 |
|
194 | source.resume();
|
195 | }
|
196 | };
|
197 |
|
198 | /**
|
199 | * Pipes the existing Observable sequence into a Node.js Stream.
|
200 | * @param {Stream} dest The destination Node.js stream.
|
201 | * @returns {Stream} The destination stream.
|
202 | */
|
203 | Rx.Observable.prototype.pipe = function (dest) {
|
204 | var source = this.pausableBuffered();
|
205 |
|
206 | function onDrain() {
|
207 | source.resume();
|
208 | }
|
209 |
|
210 | dest.addListener('drain', onDrain);
|
211 |
|
212 | source.subscribe(
|
213 | function (x) {
|
214 | !dest.write(String(x)) && source.pause();
|
215 | },
|
216 | function (err) {
|
217 | dest.emit('error', err);
|
218 | },
|
219 | function () {
|
220 | // Hack check because STDIO is not closable
|
221 | !dest._isStdio && dest.end();
|
222 | dest.removeListener('drain', onDrain);
|
223 | });
|
224 |
|
225 | source.resume();
|
226 |
|
227 | return dest;
|
228 | };
|
229 |
|
230 | module.exports = Rx;
|