UNPKG

27.2 kBtext/x-cView Raw
1//2011-11 Proyectos Equis Ka, s.l., jorge@jorgechamorro.com
2//WebWorkerThreads.cc
3
4
5#include <v8.h>
6#include <node.h>
7#include <uv.h>
8#include <string.h>
9#include <stdio.h>
10#include <stdlib.h>
11#include <string>
12#include "nan.h"
13#include "nan_isolate_data_accessor.h"
14
15
16#include "queues_a_gogo.cc"
17#include "bson.cc"
18#include "jslib.cc"
19
20#if NODE_MODULE_VERSION > 45
21#include "ArrayBufferAllocator.h"
22#endif
23
24using namespace v8;
25
26static Nan::Persistent<ObjectTemplate> threadTemplate;
27static bool useLocker; /* True if the initial V8 instance had a Locker. We'll follow suit. */
28
29static typeQueue* freeJobsQueue= NULL;
30static typeQueue* freeThreadsQueue= NULL;
31
32#define kThreadMagicCookie 0x99c0ffee
33typedef struct {
34 uv_async_t async_watcher; //MUST be the first one
35
36 long int id;
37 uv_thread_t thread;
38 volatile int sigkill;
39
40 typeQueue inQueue; //Jobs to run
41 typeQueue outQueue; //Jobs done
42
43 volatile int IDLE;
44 uv_cond_t IDLE_cv;
45 uv_mutex_t IDLE_mutex;
46
47 Isolate* isolate;
48 Nan::Persistent<Context> context;
49 Nan::Persistent<Object> JSObject;
50 Nan::Persistent<Object> threadJSObject;
51 Nan::Persistent<Object> dispatchEvents;
52
53 unsigned long threadMagicCookie;
54} typeThread;
55
56enum jobTypes {
57 kJobTypeEval,
58 kJobTypeEvent,
59 kJobTypeEventSerialized
60};
61
62typedef struct {
63 int jobType;
64 Nan::Persistent<Object> cb;
65 union {
66 struct {
67 int length;
68 String::Utf8Value* eventName;
69 String::Utf8Value** argumentos;
70 } typeEvent;
71 struct {
72 int length;
73 String::Utf8Value* eventName;
74 char* buffer;
75 size_t bufferSize;
76 } typeEventSerialized;
77 struct {
78 int error;
79 int tiene_callBack;
80 int useStringObject;
81 String::Utf8Value* resultado;
82 union {
83 char* scriptText_CharPtr;
84 String::Utf8Value* scriptText_StringObject;
85 };
86 } typeEval;
87 };
88} typeJob;
89
90/*
91
92cd deps/minifier/src
93gcc minify.c -o minify
94cat ../../../src/events.js | ./minify kEvents_js > ../../../src/kEvents_js
95cat ../../../src/load.js | ./minify kLoad_js > ../../../src/kLoad_js
96cat ../../../src/createPool.js | ./minify kCreatePool_js > ../../../src/kCreatePool_js
97cat ../../../src/worker.js | ./minify kWorker_js > ../../../src/kWorker_js
98cat ../../../src/thread_nextTick.js | ./minify kThread_nextTick_js > ../../../src/kThread_nextTick_js
99
100*/
101
102#include "events.js.c"
103#include "load.js.c"
104#include "createPool.js.c"
105#include "worker.js.c"
106#include "thread_nextTick.js.c"
107
108//node-waf configure uninstall distclean configure build install
109
110
111
112
113
114
115
116
117static typeQueueItem* nuJobQueueItem (void) {
118 typeQueueItem* qitem= queue_pull(freeJobsQueue);
119 if (!qitem) {
120 qitem= nuItem(kItemTypePointer, calloc(1, sizeof(typeJob)));
121 }
122 return qitem;
123}
124
125
126
127
128
129
130static typeThread* isAThread (Local<Object> receiver) {
131 typeThread* thread;
132
133 if (receiver->IsObject()) {
134 if (receiver->InternalFieldCount() == 1) {
135 thread= (typeThread*) Nan::GetInternalFieldPointer(receiver, 0);
136 if (thread && (thread->threadMagicCookie == kThreadMagicCookie)) {
137 return thread;
138 }
139 }
140 }
141
142 return NULL;
143}
144
145
146
147
148
149
150static void pushToInQueue (typeQueueItem* qitem, typeThread* thread) {
151 uv_mutex_lock(&thread->IDLE_mutex);
152 queue_push(qitem, &thread->inQueue);
153 if (thread->IDLE) {
154 uv_cond_signal(&thread->IDLE_cv);
155 }
156 uv_mutex_unlock(&thread->IDLE_mutex);
157}
158
159
160
161
162
163
164NAN_METHOD(Puts) {
165 Nan::HandleScope scope;
166 int i= 0;
167 while (i < info.Length()) {
168 String::Utf8Value c_str(info[i]);
169 fputs(*c_str, stdout);
170 i++;
171 }
172 fflush(stdout);
173
174 //fprintf(stdout, "*** Puts END\n");
175 info.GetReturnValue().SetUndefined();
176}
177
178NAN_METHOD(Print) {
179 Nan::HandleScope scope;
180 int i= 0;
181 while (i < info.Length()) {
182 String::Utf8Value c_str(info[i]);
183 fputs(*c_str, stdout);
184 i++;
185 }
186 static char end = '\n';
187 fputs(&end, stdout);
188 fflush(stdout);
189
190 //fprintf(stdout, "*** Puts END\n");
191 info.GetReturnValue().SetUndefined();
192}
193
194
195
196
197static void eventLoop (typeThread* thread);
198
199// A background thread
200static void aThread (void* arg) {
201 typeThread* thread= (typeThread*) arg;
202
203#if NODE_MODULE_VERSION > 45
204 // ref: https://developers.google.com/v8/get_started
205 WebWorkerThreads::ArrayBufferAllocator a;
206 v8::Isolate::CreateParams cp;
207 cp.array_buffer_allocator = &a;
208 thread->isolate= Isolate::New(cp);
209#else
210 thread->isolate= Isolate::New();
211#endif
212
213 NanSetIsolateData(thread->isolate, thread);
214
215 if (useLocker) {
216 v8::Locker myLocker(thread->isolate);
217 // I think it's not ok to create a isolate scope here,
218 // because it will call Isolate::Exit automatically.
219 //v8::Isolate::Scope isolate_scope(thread->isolate);
220 eventLoop(thread);
221 }
222 else {
223 eventLoop(thread);
224 }
225 thread->isolate->Dispose();
226
227 // wake up callback
228 if (!(thread->inQueue.length)) uv_async_send(&thread->async_watcher);
229}
230
231
232
233NAN_METHOD(threadEmit);
234NAN_METHOD(postMessage);
235NAN_METHOD(postError);
236
237
238
239static void eventLoop (typeThread* thread) {
240 Isolate::Scope isolate_scope(thread->isolate);
241
242 {
243 Nan::HandleScope scope;
244 ExtensionConfiguration extensions(0, NULL);
245
246 Local<FunctionTemplate> ftmpl = Nan::New<FunctionTemplate>();
247 Local<ObjectTemplate> otmpl = ftmpl->InstanceTemplate();
248 Local<Context> ctx = Nan::New<Context>(&extensions, otmpl);
249
250 //thread->context= Context::New();
251 thread->context.Reset(ctx);
252 ctx->Enter();
253
254
255 Local<Object> global= Nan::New(thread->context)->Global();
256
257 Local<Object> fs_obj = Nan::New<Object>();
258 JSObjFn(fs_obj, "readFileSync", readFileSync_);
259 Nan::DefineOwnProperty(global, Nan::New<String>("native_fs_").ToLocalChecked(), fs_obj, attribute_ro_dd);
260
261 Local<Object> console_obj = Nan::New<Object>();
262 JSObjFn(console_obj, "log", console_log);
263 JSObjFn(console_obj, "error", console_error);
264 Nan::DefineOwnProperty(global, Nan::New<String>("console").ToLocalChecked(), console_obj, attribute_ro_dd);
265
266 Nan::DefineOwnProperty(global, Nan::New<String>("self").ToLocalChecked(), global, v8::None);
267 Nan::DefineOwnProperty(global, Nan::New<String>("global").ToLocalChecked(), global, v8::None);
268
269 Nan::DefineOwnProperty(global, Nan::New<String>("puts").ToLocalChecked(), Nan::New<FunctionTemplate>(Puts)->GetFunction(), v8::None);
270 Nan::DefineOwnProperty(global, Nan::New<String>("print").ToLocalChecked(), Nan::New<FunctionTemplate>(Print)->GetFunction(), v8::None);
271
272 Nan::DefineOwnProperty(global, Nan::New<String>("postMessage").ToLocalChecked(), Nan::New<FunctionTemplate>(postMessage)->GetFunction(), v8::None);
273 Nan::DefineOwnProperty(global, Nan::New<String>("__postError").ToLocalChecked(), Nan::New<FunctionTemplate>(postError)->GetFunction(), v8::None);
274
275 Local<Object> threadObject= Nan::New<Object>();
276 Nan::DefineOwnProperty(global, Nan::New<String>("thread").ToLocalChecked(), threadObject, v8::None);
277
278 threadObject->Set(Nan::New<String>("id").ToLocalChecked(), Nan::New<Number>(thread->id));
279 threadObject->Set(Nan::New<String>("emit").ToLocalChecked(), Nan::New<FunctionTemplate>(threadEmit)->GetFunction());
280 Local<Object> dispatchEvents= Nan::CallAsFunction(Script::Compile(Nan::New<String>(kEvents_js).ToLocalChecked())->Run()->ToObject(), threadObject, 0, NULL).ToLocalChecked()->ToObject();
281 Local<Object> dispatchNextTicks= Script::Compile(Nan::New<String>(kThread_nextTick_js).ToLocalChecked())->Run()->ToObject();
282
283 Array* _ntq = Array::Cast(*threadObject->Get(Nan::New<String>("_ntq").ToLocalChecked()));
284
285 Script::Compile(Nan::New<String>(kLoad_js).ToLocalChecked())->Run();
286
287 double nextTickQueueLength= 0;
288 long int ctr= 0;
289
290 while (!thread->sigkill) {
291 typeJob* job;
292 typeQueueItem* qitem;
293
294 {
295 Nan::HandleScope scope;
296 Nan::TryCatch onError;
297 String::Utf8Value* str;
298 Local<String> source;
299 Local<Value> resultado;
300
301
302 while ((qitem= queue_pull(&thread->inQueue))) {
303
304 job= (typeJob*) qitem->asPtr;
305
306 if ((++ctr) > 2e3) {
307 ctr= 0;
308 Nan::IdleNotification(1000);
309
310 }
311
312 if (job->jobType == kJobTypeEval) {
313 //Ejecutar un texto
314
315 if (job->typeEval.useStringObject) {
316 str= job->typeEval.scriptText_StringObject;
317 source= Nan::New<String>(**str, (*str).length()).ToLocalChecked();
318 delete str;
319 }
320 else {
321 source= Nan::New<String>(job->typeEval.scriptText_CharPtr).ToLocalChecked();
322 free(job->typeEval.scriptText_CharPtr);
323 }
324
325 Nan::MaybeLocal<Script> script = Nan::CompileScript(source);
326
327 if (!onError.HasCaught()) {
328 Nan::MaybeLocal<Value> result = Nan::RunScript(script.ToLocalChecked());
329 if (!onError.HasCaught()) resultado = result.ToLocalChecked();
330 }
331
332 if (job->typeEval.tiene_callBack) {
333 job->typeEval.error= onError.HasCaught() ? 1 : 0;
334 job->typeEval.resultado= new String::Utf8Value(job->typeEval.error ? onError.Exception() : resultado);
335 queue_push(qitem, &thread->outQueue);
336 // wake up callback
337 if (!(thread->inQueue.length)) uv_async_send(&thread->async_watcher);
338 }
339 else {
340 queue_push(qitem, freeJobsQueue);
341 }
342
343 if (onError.HasCaught()) onError.Reset();
344 }
345 else if (job->jobType == kJobTypeEvent) {
346 //Emitir evento.
347
348 Local<Value> info[2];
349 str= job->typeEvent.eventName;
350 info[0]= Nan::New<String>(**str, (*str).length()).ToLocalChecked();
351 delete str;
352
353 Local<Array> array= Nan::New<Array>(job->typeEvent.length);
354 info[1]= array;
355
356 int i= 0;
357 while (i < job->typeEvent.length) {
358 str= job->typeEvent.argumentos[i];
359 array->Set(i, Nan::New<String>(**str, (*str).length()).ToLocalChecked());
360 delete str;
361 i++;
362 }
363
364 free(job->typeEvent.argumentos);
365 queue_push(qitem, freeJobsQueue);
366 Nan::CallAsFunction(dispatchEvents, global, 2, info);
367 }
368 else if (job->jobType == kJobTypeEventSerialized) {
369 Local<Value> info[2];
370 str= job->typeEventSerialized.eventName;
371 info[0]= Nan::New<String>(**str, (*str).length()).ToLocalChecked();
372 delete str;
373
374 int len = job->typeEventSerialized.length;
375 Local<Array> array= Nan::New<Array>(len);
376 info[1]= array;
377
378 {
379 BSON *bson = new BSON();
380 char* data = job->typeEventSerialized.buffer;
381 size_t size = job->typeEventSerialized.bufferSize;
382 BSONDeserializer deserializer(bson, data, size);
383 Local<Object> result = deserializer.DeserializeDocument(true)->ToObject();
384 int i = 0; do { array->Set(i, result->Get(i)); } while (++i < len);
385 free(data);
386 delete bson;
387 }
388
389 queue_push(qitem, freeJobsQueue);
390 Nan::CallAsFunction(dispatchEvents, global, 2, info);
391 }
392 }
393
394 if (_ntq->Length()) {
395
396 if ((++ctr) > 2e3) {
397 ctr= 0;
398 Nan::IdleNotification(1000);
399 }
400
401 resultado= Nan::CallAsFunction(dispatchNextTicks, global, 0, NULL).ToLocalChecked();
402 if (onError.HasCaught()) {
403 nextTickQueueLength= 1;
404 onError.Reset();
405 }
406 else {
407 nextTickQueueLength= resultado->NumberValue();
408 }
409 }
410 }
411
412 if (nextTickQueueLength || thread->inQueue.length) continue;
413 if (thread->sigkill) break;
414
415 uv_mutex_lock(&thread->IDLE_mutex);
416 if (!thread->inQueue.length) {
417 thread->IDLE= 1;
418 uv_cond_wait(&thread->IDLE_cv, &thread->IDLE_mutex);
419 thread->IDLE= 0;
420 }
421 uv_mutex_unlock(&thread->IDLE_mutex);
422 }
423
424 }
425
426 thread->context.Reset();
427}
428
429
430
431
432
433
434static void destroyaThread (typeThread* thread) {
435 Nan::HandleScope scope;
436
437 thread->sigkill= 0;
438 //TODO: hay que vaciar las colas y destruir los trabajos antes de ponerlas a NULL
439 thread->inQueue.first= thread->inQueue.last= NULL;
440 thread->outQueue.first= thread->outQueue.last= NULL;
441 Nan::SetInternalFieldPointer(Nan::New(thread->JSObject), 0, NULL);
442 thread->JSObject.Reset();
443
444 uv_unref((uv_handle_t*)&thread->async_watcher);
445
446#ifdef WIN32
447 TerminateThread(thread->thread, 1);
448#else
449 pthread_cancel(thread->thread);
450#endif
451}
452
453
454
455
456
457
458// C callback that runs in the main nodejs thread. This is the one responsible for
459// calling the thread's JS callback.
460static void Callback (uv_async_t *watcher, int revents) {
461 typeThread* thread= (typeThread*) watcher;
462
463 if (thread->sigkill) {
464 destroyaThread(thread);
465 return;
466 }
467
468 Nan::HandleScope scope;
469 typeJob* job;
470 Local<Value> argv[2];
471 Local<Value> null = Nan::Null();
472 typeQueueItem* qitem;
473 String::Utf8Value* str;
474
475 Nan::TryCatch onError;
476 while ((qitem= queue_pull(&thread->outQueue))) {
477 job= (typeJob*) qitem->asPtr;
478
479 if (job->jobType == kJobTypeEval) {
480
481 if (job->typeEval.tiene_callBack) {
482 str= job->typeEval.resultado;
483
484 if (job->typeEval.error) {
485 argv[0]= Exception::Error(Nan::New<String>(**str, (*str).length()).ToLocalChecked());
486 argv[1]= null;
487 } else {
488 argv[0]= null;
489 argv[1]= Nan::New<String>(**str, (*str).length()).ToLocalChecked();
490 }
491 Nan::CallAsFunction(Nan::New(job->cb), Nan::New(thread->JSObject), 2, argv);
492 job->cb.Reset();
493 job->typeEval.tiene_callBack= 0;
494
495 delete str;
496 job->typeEval.resultado= NULL;
497 }
498
499 queue_push(qitem, freeJobsQueue);
500
501 if (onError.HasCaught()) {
502 if (thread->outQueue.first) {
503 uv_async_send(&thread->async_watcher); // wake up callback again
504 }
505#if NODE_MODULE_VERSION >= 0x000E
506 if (useLocker) {
507 v8::Locker myLocker(thread->isolate);
508 }
509#endif
510 Nan::FatalException(onError);
511 return;
512 }
513 }
514 else if (job->jobType == kJobTypeEvent) {
515
516 //fprintf(stdout, "*** Callback\n");
517
518 Local<Value> info[2];
519
520 str= job->typeEvent.eventName;
521 info[0]= Nan::New<String>(**str, (*str).length()).ToLocalChecked();
522 delete str;
523
524 Local<Array> array= Nan::New<Array>(job->typeEvent.length);
525 info[1]= array;
526
527 int i= 0;
528 while (i < job->typeEvent.length) {
529 str= job->typeEvent.argumentos[i];
530 array->Set(i, Nan::New<String>(**str, (*str).length()).ToLocalChecked());
531 delete str;
532 i++;
533 }
534
535 free(job->typeEvent.argumentos);
536 queue_push(qitem, freeJobsQueue);
537 Nan::CallAsFunction(Nan::New(thread->dispatchEvents), Nan::New(thread->JSObject), 2, info);
538 }
539 else if (job->jobType == kJobTypeEventSerialized) {
540 Local<Value> info[2];
541
542 str= job->typeEventSerialized.eventName;
543 info[0]= Nan::New<String>(**str, (*str).length()).ToLocalChecked();
544 delete str;
545
546 int len = job->typeEventSerialized.length;
547 Local<Array> array= Nan::New<Array>(len);
548 info[1]= array;
549
550 {
551 BSON *bson = new BSON();
552 char* data = job->typeEventSerialized.buffer;
553 size_t size = job->typeEventSerialized.bufferSize;
554 BSONDeserializer deserializer(bson, data, size);
555 Local<Object> result = deserializer.DeserializeDocument(true)->ToObject();
556 int i = 0; do { array->Set(i, result->Get(i)); } while (++i < len);
557 free(data);
558 delete bson;
559 }
560
561 queue_push(qitem, freeJobsQueue);
562 Nan::CallAsFunction(Nan::New(thread->dispatchEvents), Nan::New(thread->JSObject), 2, info);
563 }
564 }
565}
566
567
568
569
570
571
572// unconditionally destroys a thread by brute force.
573NAN_METHOD(Destroy) {
574 Nan::HandleScope scope;
575 //TODO: Hay que comprobar que this en un objeto y que tiene hiddenRefTotypeThread_symbol y que no es nil
576 //TODO: Aquí habría que usar static void TerminateExecution(int thread_id);
577 //TODO: static void v8::V8::TerminateExecution ( Isolate * isolate= NULL ) [static]
578
579 typeThread* thread= isAThread(info.This());
580 if (!thread) {
581 return Nan::ThrowTypeError("thread.destroy(): the receiver must be a thread object");
582 }
583
584 if (!thread->sigkill) {
585 thread->sigkill= 1;
586 destroyaThread(thread);
587 }
588
589 info.GetReturnValue().SetUndefined();
590}
591
592
593
594
595
596
597// Eval: Pushes a job into the thread's ->inQueue.
598NAN_METHOD(Eval){
599 Nan::HandleScope scope;
600
601 if (!info.Length()) {
602 return Nan::ThrowTypeError("thread.eval(program [,callback]): missing arguments");
603 }
604
605 typeThread* thread= isAThread(info.This());
606 if (!thread) {
607 return Nan::ThrowTypeError("thread.eval(): the receiver must be a thread object");
608 }
609
610 typeQueueItem* qitem= nuJobQueueItem();
611 typeJob* job= (typeJob*) qitem->asPtr;
612
613 job->typeEval.tiene_callBack= ((info.Length() > 1) && (info[1]->IsFunction()));
614 if (job->typeEval.tiene_callBack) {
615 Local<Object> local_cb = info[1]->ToObject();
616 job->cb.Reset(local_cb);
617 }
618 job->typeEval.scriptText_StringObject= new String::Utf8Value(info[0]);
619 job->typeEval.useStringObject= 1;
620 job->jobType= kJobTypeEval;
621
622 pushToInQueue(qitem, thread);
623 info.GetReturnValue().Set(info.This());
624}
625
626
627
628
629
630static char* readFile (Local<String> path) {
631 v8::String::Utf8Value c_str(path);
632 FILE* fp= fopen(*c_str, "rb");
633 if (!fp) {
634 fprintf(stderr, "Error opening the file %s\n", *c_str);
635 //@bruno: Shouldn't we throw, here ?
636 return NULL;
637 }
638 fseek(fp, 0, SEEK_END);
639 size_t len= ftell(fp);
640 rewind(fp); //fseek(fp, 0, SEEK_SET);
641 char *buf= (char*)malloc((len+1) * sizeof(char)); // +1 to get null terminated string
642 if (fread(buf, sizeof(char), len, fp) < len) {
643 fprintf(stderr, "Error reading the file %s\n", *c_str);
644 return NULL;
645 }
646 buf[len] = 0;
647 fclose(fp);
648 /*
649 printf("SOURCE:\n%s\n", buf);
650 fflush(stdout);
651 */
652 return buf;
653}
654
655
656
657
658
659
660// Load: Loads from file and passes to Eval
661NAN_METHOD(Load) {
662 Nan::HandleScope scope;
663
664 if (!info.Length()) {
665 return Nan::ThrowTypeError("thread.load(filename [,callback]): missing arguments");
666 }
667
668 typeThread* thread= isAThread(info.This());
669 if (!thread) {
670 return Nan::ThrowTypeError("thread.load(): the receiver must be a thread object");
671 }
672
673 char* source= readFile(info[0]->ToString()); //@Bruno: here we don't know if the file was not found or if it was an empty file
674 if (!source) info.GetReturnValue().Set(info.This()); //@Bruno: even if source is empty, we should call the callback ?
675
676 typeQueueItem* qitem= nuJobQueueItem();
677 typeJob* job= (typeJob*) qitem->asPtr;
678
679 job->typeEval.tiene_callBack= ((info.Length() > 1) && (info[1]->IsFunction()));
680 if (job->typeEval.tiene_callBack) {
681 Local<Object> local_cb = info[1]->ToObject();
682 job->cb.Reset(local_cb);
683 }
684 job->typeEval.scriptText_CharPtr= source;
685 job->typeEval.useStringObject= 0;
686 job->jobType= kJobTypeEval;
687
688 pushToInQueue(qitem, thread);
689
690 info.GetReturnValue().Set(info.This());
691}
692
693
694
695
696
697
698NAN_METHOD(processEmit) {
699 Nan::HandleScope scope;
700
701 if (!info.Length()) info.GetReturnValue().Set(info.This());
702
703 typeThread* thread= isAThread(info.This());
704 if (!thread) {
705 Nan::ThrowTypeError("thread.emit(): the receiver must be a thread object");
706 }
707
708 typeQueueItem* qitem= nuJobQueueItem();
709 typeJob* job= (typeJob*) qitem->asPtr;
710
711 job->jobType= kJobTypeEvent;
712 job->typeEvent.length= info.Length()- 1;
713 job->typeEvent.eventName= new String::Utf8Value(info[0]);
714 job->typeEvent.argumentos= (v8::String::Utf8Value**) malloc(job->typeEvent.length* sizeof(void*));
715
716 int i= 1;
717 do {
718 job->typeEvent.argumentos[i-1]= new String::Utf8Value(info[i]);
719 } while (++i <= job->typeEvent.length);
720
721 pushToInQueue(qitem, thread);
722
723 info.GetReturnValue().Set(info.This());
724}
725
726NAN_METHOD(processEmitSerialized) {
727 Nan::HandleScope scope;
728
729 int len = info.Length();
730
731 if (!len) info.GetReturnValue().Set(info.This());
732
733 typeThread* thread= isAThread(info.This());
734 if (!thread) {
735 return Nan::ThrowTypeError("thread.emit(): the receiver must be a thread object");
736 }
737
738 typeQueueItem* qitem= nuJobQueueItem();
739 typeJob* job= (typeJob*) qitem->asPtr;
740
741 job->jobType= kJobTypeEventSerialized;
742 job->typeEventSerialized.length= len-1;
743 job->typeEventSerialized.eventName= new String::Utf8Value(info[0]);
744 Local<Array> array= Nan::New<Array>(len-1);
745 int i = 1; do { array->Set(i-1, info[i]); } while (++i < len);
746
747 {
748 char* buffer;
749 BSON *bson = new BSON();
750 size_t object_size;
751 Local<Object> object = bson->GetSerializeObject(array);
752 BSONSerializer<CountStream> counter(bson, false, false);
753 counter.SerializeDocument(object);
754 object_size = counter.GetSerializeSize();
755 buffer = (char *)malloc(object_size);
756 BSONSerializer<DataStream> data(bson, false, false, buffer);
757 data.SerializeDocument(object);
758 job->typeEventSerialized.buffer= buffer;
759 job->typeEventSerialized.bufferSize= object_size;
760 delete bson;
761 }
762
763 pushToInQueue(qitem, thread);
764
765 info.GetReturnValue().Set(info.This());
766}
767
768#define POST_EVENT(eventname) { \
769 Nan::HandleScope scope; \
770 int len = info.Length(); \
771 \
772 if (!len) info.GetReturnValue().Set(info.This()); \
773 \
774 typeThread* thread= (typeThread*) NanGetIsolateData(Isolate::GetCurrent()); \
775 \
776 typeQueueItem* qitem= nuJobQueueItem(); \
777 typeJob* job= (typeJob*) qitem->asPtr; \
778 \
779 job->jobType= kJobTypeEventSerialized; \
780 job->typeEventSerialized.eventName= new String::Utf8Value(Nan::New<String>(eventname).ToLocalChecked()); \
781 job->typeEventSerialized.length= len; \
782 \
783 Local<Array> array= Nan::New<Array>(len); \
784 int i = 0; do { array->Set(i, info[i]); } while (++i < len); \
785 \
786 { \
787 char* buffer; \
788 BSON *bson = new BSON(); \
789 size_t object_size; \
790 Local<Object> object = bson->GetSerializeObject(array); \
791 BSONSerializer<CountStream> counter(bson, false, false); \
792 counter.SerializeDocument(object); \
793 object_size = counter.GetSerializeSize(); \
794 buffer = (char *)malloc(object_size); \
795 BSONSerializer<DataStream> data(bson, false, false, buffer); \
796 data.SerializeDocument(object); \
797 job->typeEventSerialized.buffer= buffer; \
798 job->typeEventSerialized.bufferSize= object_size; \
799 delete bson; \
800 } \
801 \
802 queue_push(qitem, &thread->outQueue); \
803 if (!(thread->inQueue.length)) uv_async_send(&thread->async_watcher); \
804 \
805 info.GetReturnValue().Set(info.This()); \
806}
807
808NAN_METHOD(postMessage) {
809 POST_EVENT("message");
810}
811
812NAN_METHOD(postError) {
813 POST_EVENT("error");
814}
815
816NAN_METHOD(threadEmit) {
817 Nan::HandleScope scope;
818
819 if (!info.Length()) info.GetReturnValue().Set(info.This());
820
821 int i;
822 typeThread* thread= (typeThread*) NanGetIsolateData(Isolate::GetCurrent());
823
824 typeQueueItem* qitem= nuJobQueueItem();
825 typeJob* job= (typeJob*) qitem->asPtr;
826
827 job->jobType= kJobTypeEvent;
828 job->typeEvent.length= info.Length()- 1;
829 job->typeEvent.eventName= new String::Utf8Value(info[0]);
830 job->typeEvent.argumentos= (v8::String::Utf8Value**) malloc(job->typeEvent.length* sizeof(void*));
831
832 i= 1;
833 do {
834 job->typeEvent.argumentos[i-1]= new String::Utf8Value(info[i]);
835 } while (++i <= job->typeEvent.length);
836
837 queue_push(qitem, &thread->outQueue);
838 if (!(thread->inQueue.length)) uv_async_send(&thread->async_watcher); // wake up callback
839
840 info.GetReturnValue().Set(info.This());
841}
842
843
844
845
846
847
848
849
850// Creates and launches a new isolate in a new background thread.
851NAN_METHOD(Create) {
852 Nan::HandleScope scope;
853
854 typeThread* thread;
855 typeQueueItem* qitem= NULL;
856 qitem= queue_pull(freeThreadsQueue);
857 if (qitem) {
858 thread= (typeThread*) qitem->asPtr;
859 destroyItem(qitem);
860 }
861 else {
862 thread= (typeThread*) calloc(1, sizeof(typeThread));
863 thread->threadMagicCookie= kThreadMagicCookie;
864 }
865
866 static long int threadsCtr= 0;
867 thread->id= threadsCtr++;
868
869 Local<Object> local_JSObject = Nan::New(threadTemplate)->NewInstance();
870 local_JSObject->Set(Nan::New<String>("id").ToLocalChecked(), Nan::New<Integer>((int32_t)thread->id));
871
872 Nan::SetInternalFieldPointer(local_JSObject, 0, thread);
873 thread->JSObject.Reset(local_JSObject);
874
875 Local<Value> dispatchEvents= Nan::CallAsFunction(Script::Compile(Nan::New<String>(kEvents_js).ToLocalChecked())->Run()->ToObject(), local_JSObject, 0, NULL).ToLocalChecked();
876 Local<Object> local_dispatchEvents = dispatchEvents->ToObject();
877 thread->dispatchEvents.Reset(local_dispatchEvents);
878
879 uv_async_init(uv_default_loop(), &thread->async_watcher, reinterpret_cast<uv_async_cb>(Callback));
880 uv_ref((uv_handle_t*)&thread->async_watcher);
881
882 uv_cond_init(&thread->IDLE_cv);
883 uv_mutex_init(&thread->IDLE_mutex);
884 uv_mutex_init(&thread->inQueue.queueLock);
885 uv_mutex_init(&thread->outQueue.queueLock);
886
887 int err= uv_thread_create(&thread->thread, aThread, thread);
888 if (err) {
889 //Ha habido un error no se ha arrancado esta thread
890 destroyaThread(thread);
891 return Nan::ThrowTypeError("create(): error in pthread_create()");
892 }
893
894 Nan::AdjustExternalMemory(sizeof(typeThread));
895 info.GetReturnValue().Set(Nan::New(thread->JSObject));
896}
897
898
899#if NODE_MODULE_VERSION >= 0x000E
900void Init (Handle<Object> target, Handle<Value> module, void *) {
901#elif NODE_MODULE_VERSION >= 0x000B
902void Init (Handle<Object> target, Handle<Value> module) {
903#else
904void Init (Handle<Object> target) {
905#endif
906
907 initQueues();
908 freeThreadsQueue= nuQueue(-3);
909 freeJobsQueue= nuQueue(-4);
910
911 Nan::HandleScope scope;
912
913 useLocker= v8::Locker::IsActive();
914
915 target->Set(Nan::New<String>("create").ToLocalChecked(), Nan::New<FunctionTemplate>(Create)->GetFunction());
916 target->Set(Nan::New<String>("createPool").ToLocalChecked(),
917 Script::Compile(Nan::New<String>(kCreatePool_js).ToLocalChecked())->Run()->ToObject());
918 target->Set(Nan::New<String>("Worker").ToLocalChecked(),
919 Nan::CallAsFunction(Script::Compile(Nan::New<String>(kWorker_js).ToLocalChecked())->Run()->ToObject(), target, 0, NULL).ToLocalChecked()->ToObject());
920
921 Local<ObjectTemplate> local_threadTemplate = Nan::New<v8::ObjectTemplate>();
922 local_threadTemplate->SetInternalFieldCount(1);
923 local_threadTemplate->Set(Nan::New<String>("id").ToLocalChecked(), Nan::New<Integer>(0));
924 local_threadTemplate->Set(Nan::New<String>("eval").ToLocalChecked(), Nan::New<FunctionTemplate>(Eval));
925 local_threadTemplate->Set(Nan::New<String>("load").ToLocalChecked(), Nan::New<FunctionTemplate>(Load));
926 local_threadTemplate->Set(Nan::New<String>("emit").ToLocalChecked(), Nan::New<FunctionTemplate>(processEmit));
927 local_threadTemplate->Set(Nan::New<String>("emitSerialized").ToLocalChecked(), Nan::New<FunctionTemplate>(processEmitSerialized));
928 local_threadTemplate->Set(Nan::New<String>("destroy").ToLocalChecked(), Nan::New<FunctionTemplate>(Destroy));
929 threadTemplate.Reset(local_threadTemplate);
930}
931
932
933
934
935NODE_MODULE(WebWorkerThreads, Init)
936
937/*
938gcc -E -I /Users/jorge/JAVASCRIPT/binarios/include/node -o /o.c /Users/jorge/JAVASCRIPT/threads_a_gogo/src/threads_a_gogo.cc && mate /o.c
939*/