UNPKG

7.95 kBJavaScriptView Raw
1var Rx = require('./dist/rx');
2require('./dist/rx.aggregates');
3require('./dist/rx.async');
4require('./dist/rx.backpressure');
5require('./dist/rx.binding');
6require('./dist/rx.coincidence');
7require('./dist/rx.experimental');
8require('./dist/rx.joinpatterns');
9require('./dist/rx.sorting');
10require('./dist/rx.virtualtime');
11require('./dist/rx.testing');
12require('./dist/rx.time');
13
14
15// Add specific Node functions
16var EventEmitter = require('events').EventEmitter,
17 Observable = Rx.Observable;
18
19Rx.Node = {
20 /**
21 * @deprecated Use Rx.Observable.fromCallback from rx.async.js instead.
22 *
23 * Converts a callback function to an observable sequence.
24 *
25 * @param {Function} func Function to convert to an asynchronous function.
26 * @param {Mixed} [context] The context for the func parameter to be executed. If not specified, defaults to undefined.
27 * @param {Function} [selector] A selector which takes the arguments from the event handler to produce a single item to yield on next.
28 * @returns {Function} Asynchronous function.
29 */
30 fromCallback: function (func, context, selector) {
31 return Observable.fromCallback(func, context, selector);
32 },
33
34 /**
35 * @deprecated Use Rx.Observable.fromNodeCallback from rx.async.js instead.
36 *
37 * Converts a Node.js callback style function to an observable sequence. This must be in function (err, ...) format.
38 *
39 * @param {Function} func The function to call
40 * @param {Mixed} [context] The context for the func parameter to be executed. If not specified, defaults to undefined.
41 * @param {Function} [selector] A selector which takes the arguments from the event handler to produce a single item to yield on next.
42 * @returns {Function} An async function which when applied, returns an observable sequence with the callback arguments as an array.
43 */
44 fromNodeCallback: function (func, context, selector) {
45 return Observable.fromNodeCallback(func, context, selector);
46 },
47
48 /**
49 * @deprecated Use Rx.Observable.fromNodeCallback from rx.async.js instead.
50 *
51 * Handles an event from the given EventEmitter as an observable sequence.
52 *
53 * @param {EventEmitter} eventEmitter The EventEmitter to subscribe to the given event.
54 * @param {String} eventName The event name to subscribe
55 * @param {Function} [selector] A selector which takes the arguments from the event handler to produce a single item to yield on next.
56 * @returns {Observable} An observable sequence generated from the named event from the given EventEmitter. The data will be returned as an array of arguments to the handler.
57 */
58 fromEvent: function (eventEmitter, eventName, selector) {
59 return Observable.fromEvent(eventEmitter, eventName, selector);
60 },
61
62 /**
63 * Converts the given observable sequence to an event emitter with the given event name.
64 * The errors are handled on the 'error' event and completion on the 'end' event.
65 * @param {Observable} observable The observable sequence to convert to an EventEmitter.
66 * @param {String} eventName The event name to emit onNext calls.
67 * @returns {EventEmitter} An EventEmitter which emits the given eventName for each onNext call in addition to 'error' and 'end' events.
68 * You must call publish in order to invoke the subscription on the Observable sequuence.
69 */
70 toEventEmitter: function (observable, eventName, selector) {
71 var e = new EventEmitter();
72
73 // Used to publish the events from the observable
74 e.publish = function () {
75 e.subscription = observable.subscribe(
76 function (x) {
77 var result = x;
78 if (selector) {
79 try {
80 result = selector(x);
81 } catch (e) {
82 e.emit('error', e);
83 return;
84 }
85 }
86
87 e.emit(eventName, result);
88 },
89 function (err) {
90 e.emit('error', err);
91 },
92 function () {
93 e.emit('end');
94 });
95 };
96
97 return e;
98 },
99
100 /**
101 * Converts a flowing stream to an Observable sequence.
102 * @param {Stream} stream A stream to convert to a observable sequence.
103 * @param {String} [finishEventName] Event that notifies about closed stream. ("end" by default)
104 * @returns {Observable} An observable sequence which fires on each 'data' event as well as handling 'error' and finish events like `end` or `finish`.
105 */
106 fromStream: function (stream, finishEventName) {
107 stream.pause();
108
109 finishEventName || (finishEventName = 'end');
110
111 return Observable.create(function (observer) {
112 function dataHandler (data) {
113 observer.onNext(data);
114 }
115
116 function errorHandler (err) {
117 observer.onError(err);
118 }
119
120 function endHandler () {
121 observer.onCompleted();
122 }
123
124 stream.addListener('data', dataHandler);
125 stream.addListener('error', errorHandler);
126 stream.addListener(finishEventName, endHandler);
127
128 stream.resume();
129
130 return function () {
131 stream.removeListener('data', dataHandler);
132 stream.removeListener('error', errorHandler);
133 stream.removeListener(finishEventName, endHandler);
134 };
135 }).publish().refCount();
136 },
137
138 /**
139 * Converts a flowing readable stream to an Observable sequence.
140 * @param {Stream} stream A stream to convert to a observable sequence.
141 * @returns {Observable} An observable sequence which fires on each 'data' event as well as handling 'error' and 'end' events.
142 */
143 fromReadableStream: function (stream) {
144 return this.fromStream(stream, 'end');
145 },
146
147 /**
148 * Converts a flowing writeable stream to an Observable sequence.
149 * @param {Stream} stream A stream to convert to a observable sequence.
150 * @returns {Observable} An observable sequence which fires on each 'data' event as well as handling 'error' and 'finish' events.
151 */
152 fromWritableStream: function (stream) {
153 return this.fromStream(stream, 'finish');
154 },
155
156 /**
157 * Converts a flowing transform stream to an Observable sequence.
158 * @param {Stream} stream A stream to convert to a observable sequence.
159 * @returns {Observable} An observable sequence which fires on each 'data' event as well as handling 'error' and 'finish' events.
160 */
161 fromTransformStream: function (stream) {
162 return this.fromStream(stream, 'finish');
163 },
164
165 /**
166 * Writes an observable sequence to a stream
167 * @param {Observable} observable Observable sequence to write to a stream.
168 * @param {Stream} stream The stream to write to.
169 * @param {String} [encoding] The encoding of the item to write.
170 * @returns {Disposable} The subscription handle.
171 */
172 writeToStream: function (observable, stream, encoding) {
173 var source = observable.pausableBuffered();
174
175 function onDrain() {
176 source.resume();
177 }
178
179 stream.addListener('drain', onDrain);
180
181 return source.subscribe(
182 function (x) {
183 !stream.write(String(x), encoding) && source.pause();
184 },
185 function (err) {
186 stream.emit('error', err);
187 },
188 function () {
189 // Hack check because STDIO is not closable
190 !stream._isStdio && stream.end();
191 stream.removeListener('drain', onDrain);
192 });
193
194 source.resume();
195 }
196};
197
198/**
199 * Pipes the existing Observable sequence into a Node.js Stream.
200 * @param {Stream} dest The destination Node.js stream.
201 * @returns {Stream} The destination stream.
202 */
203Rx.Observable.prototype.pipe = function (dest) {
204 var source = this.pausableBuffered();
205
206 function onDrain() {
207 source.resume();
208 }
209
210 dest.addListener('drain', onDrain);
211
212 source.subscribe(
213 function (x) {
214 !dest.write(String(x)) && source.pause();
215 },
216 function (err) {
217 dest.emit('error', err);
218 },
219 function () {
220 // Hack check because STDIO is not closable
221 !dest._isStdio && dest.end();
222 dest.removeListener('drain', onDrain);
223 });
224
225 source.resume();
226
227 return dest;
228};
229
230module.exports = Rx;