UNPKG

13.2 kBJavaScriptView Raw
1/*
2 Copyright 2011 Yuriy Bogdanov <chinsay@gmail.com>
3
4 Permission is hereby granted, free of charge, to any person obtaining a copy
5 of this software and associated documentation files (the "Software"), to
6 deal in the Software without restriction, including without limitation the
7 rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
8 sell copies of the Software, and to permit persons to whom the Software is
9 furnished to do so, subject to the following conditions:
10
11 The above copyright notice and this permission notice shall be included in
12 all copies or substantial portions of the Software.
13
14 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20 IN THE SOFTWARE.
21*/
22
23// use node-fibers module
24require('fibers');
25
26/**
27 * sync() method simply turns any asynchronous function to synchronous one
28 * It receives context object as first param (like Function.prototype.call)
29 *
30 */
31Function.prototype.sync = function(obj /* arguments */) {
32
33 var fiber = Fiber.current,
34 err, result,
35 yielded = false;
36
37 // Create virtual callback
38 var syncCallback = function (callbackError, callbackResult, otherArgs) {
39 // forbid to call twice
40 if (syncCallback.called) return;
41 syncCallback.called = true;
42
43 if (callbackError) {
44 err = callbackError;
45 }
46 else if (otherArgs) {
47 // Support multiple callback result values
48 result = [];
49 for (var i = 1, l = arguments.length; i < l; i++) {
50 result.push(arguments[i]);
51 }
52 }
53 else {
54 result = callbackResult;
55 }
56
57 // Resume fiber if yielding
58 if (yielded) fiber.run();
59 }
60
61 // Prepare args (remove first arg and add callback to the end)
62 // The cycle is used because of slow v8 arguments materialization
63 for (var i = 1, args = [], l = arguments.length; i < l; i++) {
64 args.push(arguments[i]);
65 }
66 args.push(syncCallback);
67
68 // call async function
69 this.apply(obj, args);
70
71 // wait for result
72 if (!syncCallback.called) {
73 yielded = true;
74 Fiber.yield();
75 }
76
77 // Throw if err
78 if (err) throw err;
79
80 return result;
81}
82
83/**
84 * Sync module itself
85 */
86var Sync = function Sync(fn, callback)
87{
88 if (fn instanceof Function) {
89 return Sync.Fiber(fn, callback);
90 }
91
92 // TODO: we can also wrap any object with Sync, in future..
93}
94
95Sync.stat = {
96 totalFibers : 0,
97 activeFibers : 0,
98 totalFutures : 0,
99 activeFutures : 0
100}
101
102/**
103 * This function should be used when you need to turn some peace of code fiberized
104 * It just wraps your code with Fiber() logic in addition with exceptions handling
105 */
106Sync.Fiber = function SyncFiber(fn, callback)
107{
108 var parent = Fiber.current;
109 Sync.stat.totalFibers++;
110
111 var traceError = new Error();
112 if (parent) {
113 traceError.__previous = parent.traceError;
114 }
115
116 var fiber = Fiber(function(){
117
118 Sync.stat.activeFibers++;
119
120 var fiber = Fiber.current,
121 result,
122 error;
123
124 // Set id to fiber
125 fiber.id = Sync.stat.totalFibers;
126
127 // Save the callback to fiber
128 fiber.callback = callback;
129
130 // Register trace error to the fiber
131 fiber.traceError = traceError;
132
133 // Initialize scope
134 fiber.scope = {};
135
136 // Assign parent fiber
137 fiber.parent = parent;
138
139 // Fiber string representation
140 fiber.toString = function() {
141 return 'Fiber#' + fiber.id;
142 }
143
144 // Fiber path representation
145 fiber.getPath = function() {
146 return (fiber.parent ? fiber.parent.getPath() + ' > ' : '' )
147 + fiber.toString();
148 }
149
150 // Inherit scope from parent fiber
151 if (parent) {
152 fiber.scope.__proto__ = parent.scope;
153 }
154
155 // Add futures support to a fiber
156 fiber.futures = [];
157
158 fiber.waitFutures = function() {
159 var results = [];
160 while (fiber.futures.length)
161 results.push(fiber.futures.shift().result);
162 return results;
163 }
164
165 fiber.removeFuture = function(ticket) {
166 var index = fiber.futures.indexOf(ticket);
167 if (~index)
168 fiber.futures.splice(index, 1);
169 }
170
171 fiber.addFuture = function(ticket) {
172 fiber.futures.push(ticket);
173 }
174
175 // Run body
176 try {
177 // call fn and wait for result
178 result = fn(Fiber.current);
179 // if there are some futures, wait for results
180 fiber.waitFutures();
181 }
182 catch (e) {
183 error = e;
184 }
185
186 Sync.stat.activeFibers--;
187
188 // return result to the callback
189 if (callback instanceof Function) {
190 callback(error, result);
191 }
192 else if (error && parent && parent.callback) {
193 parent.callback(error);
194 }
195 else if (error) {
196 throw error;
197 }
198
199 });
200
201 fiber.run();
202}
203
204/**
205 * Future object itself
206 */
207function SyncFuture(timeout)
208{
209 var self = this;
210
211 this.resolved = false;
212 this.fiber = Fiber.current;
213 this.yielding = false;
214 this.timeout = timeout;
215 this.time = null;
216
217 this._timeoutId = null;
218 this._result = undefined;
219 this._error = null;
220 this._start = +new Date;
221
222 Sync.stat.totalFutures++;
223 Sync.stat.activeFutures++;
224
225 // Create timeout error to capture stack trace correctly
226 self.timeoutError = new Error();
227 Error.captureStackTrace(self.timeoutError, arguments.callee);
228
229 this.ticket = function Future()
230 {
231 // clear timeout if present
232 if (self._timeoutId) clearTimeout(self._timeoutId);
233 // measure time
234 self.time = new Date - self._start;
235
236 // forbid to call twice
237 if (self.resolved) return;
238 self.resolved = true;
239
240 // err returned as first argument
241 var err = arguments[0];
242 if (err) {
243 self._error = err;
244 }
245 else {
246 self._result = arguments[1];
247 }
248
249 // remove self from current fiber
250 self.fiber.removeFuture(self.ticket);
251 Sync.stat.activeFutures--;
252
253 if (self.yielding && Fiber.current !== self.fiber) {
254 self.yielding = false;
255 self.fiber.run();
256 }
257 else if (self._error) {
258 throw self._error;
259 }
260 }
261
262 this.ticket.__proto__ = this;
263
264 this.ticket.yield = function() {
265 while (!self.resolved) {
266 self.yielding = true;
267 if (self.timeout) {
268 self._timeoutId = setTimeout(function(){
269 self.timeoutError.message = 'Future function timed out at ' + self.timeout + ' ms';
270 self.ticket(self.timeoutError);
271 }, self.timeout)
272 }
273 Fiber.yield();
274 }
275 if (self._error) throw self._error;
276 return self._result;
277 }
278
279 this.ticket.__defineGetter__('result', function(){
280 return this.yield();
281 });
282
283 this.ticket.__defineGetter__('error', function(){
284 if (self._error) {
285 return self._error;
286 }
287 try {
288 this.result;
289 }
290 catch (e) {
291 return e;
292 }
293 return null;
294 });
295
296 this.ticket.__defineGetter__('timeout', function(){
297 return self.timeout;
298 });
299
300 this.ticket.__defineSetter__('timeout', function(value){
301 self.timeout = value;
302 });
303
304 // append self to current fiber
305 this.fiber.addFuture(this.ticket);
306
307 return this.ticket;
308}
309
310SyncFuture.prototype.__proto__ = Function;
311Sync.Future = SyncFuture;
312
313/**
314 * Calls the function asynchronously and yields only when 'value' or 'error' getters called
315 * Returs Future function/object (promise)
316 *
317 */
318Function.prototype.future = function(obj /* arguments */) {
319
320 var fn = this,
321 future = new SyncFuture();
322
323 // Prepare args (remove first arg and add callback to the end)
324 // The cycle is used because of slow v8 arguments materialization
325 for (var i = 1, args = [], l = arguments.length; i < l; i++) {
326 args.push(arguments[i]);
327 }
328 // virtual future callback, push it as last argument
329 args.push(future);
330
331 // call async function
332 fn.apply(obj, args);
333
334 return future;
335}
336
337/**
338 * Use this method to make asynchronous function from synchronous one
339 * This is a opposite function from .sync()
340 */
341Function.prototype.async = function(context)
342{
343 var fn = this, fiber = Fiber.current;
344
345 function asyncFunction() {
346
347 // Prepare args (remove first arg and add callback to the end)
348 // The cycle is used because of slow v8 arguments materialization
349 for (var i = 0, args = [], l = arguments.length; i < l; i++) {
350 args.push(arguments[i]);
351 }
352
353 var obj = context || this,
354 cb = args.pop(),
355 async = true;
356
357 if (typeof(cb) !== 'function') {
358 args.push(cb);
359 if (Fiber.current) async = false;
360 }
361
362 Fiber.current = Fiber.current || fiber;
363
364 // Call asynchronously
365 if (async) {
366 Sync(function(){
367 return fn.apply(obj, args);
368 }, cb);
369 }
370 // Call synchronously in same fiber
371 else {
372 return fn.apply(obj, args);
373 }
374 }
375
376 // Do nothing on async again
377 asyncFunction.async = function() {
378 return asyncFunction;
379 }
380 // Override sync call
381 asyncFunction.sync = function(obj) {
382 for (var i = 1, args = [], l = arguments.length; i < l; i++) {
383 args.push(arguments[i]);
384 }
385 return fn.apply(obj || context || this, args);
386 }
387 // Override toString behavior
388 asyncFunction.toString = function() {
389 return fn + '.async()';
390 }
391
392 return asyncFunction;
393}
394
395/**
396 * Used for writing synchronous middleware-style functions
397 *
398 * throw "something" --> next('something')
399 * return --> next()
400 * return null --> next()
401 * return undefined --> next()
402 * return true --> void
403 */
404Function.prototype.asyncMiddleware = function(obj){
405 var fn = this.async(obj);
406 // normal (req, res) middleware
407 if (this.length === 2) {
408 return function(req, res, next) {
409 return fn.call(this, req, res, function(err, result){
410 if (err) return next(err);
411 if (result !== true) next();
412 });
413 }
414 }
415 // error handling (err, req, res) middleware
416 else if (this.length === 3) {
417 return function(err, req, res, next) {
418 return fn.call(this, err, req, res, function(err, result){
419 if (err) return next(err);
420 if (result !== true) next();
421 });
422 }
423 }
424}
425
426/**
427 * Sleeps current fiber on given value of millis
428 */
429Sync.sleep = function(ms)
430{
431 var fiber = Fiber.current;
432 if (!fiber) {
433 throw new Error('Sync.sleep() can be called only inside of fiber');
434 }
435
436 setTimeout(function(){
437 fiber.run();
438 }, ms);
439
440 Fiber.yield();
441}
442
443/**
444 * Logs sync result
445 */
446Sync.log = function(err, result)
447{
448 if (err) return console.error(err.stack || err);
449 if (arguments.length == 2) {
450 if (result === undefined) return;
451 return console.log(result);
452 }
453 console.log(Array.prototyle.slice.call(arguments, 1));
454}
455
456/**
457 * Synchronous repl implementation: each line = new fiber
458 */
459Sync.repl = function() {
460
461 var repl = require('repl');
462
463 // Start original repl
464 var r = repl.start.apply(repl, arguments);
465
466 // Wrap line watchers with Fiber
467 var newLinsteners = []
468 r.rli.listeners('line').map(function(f){
469 newLinsteners.push(function(a){
470 Sync(function(){
471 require.cache[__filename] = module;
472 f(a);
473 }, Sync.log)
474 })
475 })
476 r.rli.removeAllListeners('line');
477 while (newLinsteners.length) {
478 r.rli.on('line', newLinsteners.shift());
479 }
480
481 // Assign Sync to repl context
482 r.context.Sync = Sync;
483
484 return r;
485};
486
487// TODO: document
488Sync.__defineGetter__('scope', function() {
489 return Fiber.current && Fiber.current.scope;
490})
491
492// TODO: document
493Sync.waitFutures = function() {
494 if (Fiber.current) {
495 Fiber.current.waitFutures();
496 }
497}
498
499module.exports = exports = Sync;