UNPKG

17.9 kBJavaScriptView Raw
1/*jslint node: true, plusplus: true, nomen: true, vars: true, esversion: 6 */
2"use strict";
3
4const assert = require('assert');
5const debug = require('debug')('upnpserver:db:nedb');
6const os = require('os');
7const Path = require('path');
8const Util = require('util');
9const mkdirp = require('mkdirp');
10const Async = require('async');
11const Datastore = require('nedb');
12
13const logger = require('../logger');
14const CachedRegistry = require('./cachedRegistry');
15const Node = require('../node');
16
17const ASYNC_SAVE = false;
18const SAVE_QUEUE_CONCURRENCY = 2;
19
20class NeDbRegistry extends CachedRegistry {
21 constructor() {
22 super();
23
24 if (ASYNC_SAVE) {
25 this._saveQueue = Async.queue(this._saveWorker.bind(this),
26 SAVE_QUEUE_CONCURRENCY);
27 this._saveKeys = {};
28 this._garbageNode=this._garbageNodeAsync;
29 }
30 }
31
32 /**
33 *
34 */
35 initialize(service, callback) {
36 this._service = service;
37
38 var now=Date.now();
39 this.initializeDb((error) => {
40 if (error) {
41 return callback(error);
42 }
43
44 var dt=Date.now()-now;
45
46 if (dt>1500) {
47 dt=Math.floor(dt/1000);
48 logger.info("Database loaded in "+dt+" second"+((dt>1)?"s":""));
49
50 } else {
51 logger.info("Database loaded in "+dt+" ms");
52 }
53
54 super.initialize(service, callback);
55 });
56 }
57
58 /**
59 *
60 */
61 initializeDb(callback) {
62 var path = Path.join(os.homedir(), "upnpserver-nodes.nedb");
63 path = Path.normalize(path);
64
65 var path2 = Path.join(os.homedir(), "upnpserver-metas.nedb");
66 path2 = Path.normalize(path2);
67
68 var path3 = Path.join(os.homedir(), "upnpserver-repositories.nedb");
69 path3 = Path.normalize(path3);
70
71
72 debug("initializeDb", "NodesDb path=", path);
73
74 var db = new Datastore({
75 filename : path
76 });
77
78 db.loadDatabase((error) => {
79 if (error) {
80 logger.error("Can not load nodesDb ",path2, error);
81 return callback(error);
82 }
83
84 this._configureNodesDb(db, (error) => {
85 if (error) {
86 return callback(error);
87 }
88 this._nodesCollection = db;
89
90 debug("initializeDb", "MetasDb path=", path2);
91
92 var db2 = new Datastore({
93 filename : path2
94 });
95
96 db2.loadDatabase((error) => {
97 if (error) {
98 return callback(error);
99 }
100
101 this._configureMetasDb(db2, (error) => {
102 if (error) {
103 return callback(error);
104 }
105
106 this._metasCollection=db2;
107
108
109 debug("initializeDb", "RepositoriesDb path=", path3);
110
111 var db3 = new Datastore({
112 filename : path3
113 });
114
115 db3.loadDatabase((error) => {
116 if (error) {
117 return callback(error);
118 }
119
120 this._configureRepositoriesDb(db3, (error) => {
121 if (error) {
122 return callback(error);
123 }
124
125 this._repositoriesCollection=db3;
126
127 callback();
128 });
129 });
130 });
131 });
132 });
133 });
134 }
135
136 /**
137 *
138 */
139 _ensureIndexes(collection, fields, callback) {
140 Async.eachSeries(fields, (f, callback) => collection.ensureIndex(f, callback), callback);
141 }
142
143 /**
144 *
145 */
146 _configureNodesDb(collection, callback) {
147
148 this._ensureIndexes(collection, [{
149 fieldName : "id",
150 unique : true,
151 sparse: true
152
153 }, {
154 fieldName : "parentId",
155 unique: false,
156 sparse: true
157
158 }, {
159 fieldName : "refId",
160 unique: false,
161 sparse: true
162
163 }], callback);
164 }
165
166 /**
167 *
168 */
169 _configureMetasDb(collection, callback) {
170
171 this._ensureIndexes(collection, [{
172 fieldName : "path",
173 unique : true,
174 sparse: false
175
176 }], callback);
177 }
178
179 /**
180 *
181 */
182 _configureRepositoriesDb(collection, callback) {
183
184 this._ensureIndexes(collection, [{
185 fieldName : "hashKey",
186 unique : true,
187 sparse: false
188
189 }], callback);
190 }
191
192 /**
193 *
194 */
195 saveNode(node, modifiedProperties, callback) {
196 assert(node instanceof Node, "Invalid node parameter");
197 assert(typeof(callback)==="function", "Invalid function parameter");
198
199 node.takeLock("db", () => {
200
201 super.saveNode(node, modifiedProperties,
202 (error) => {
203 if (error) {
204 logger.error("Can not save node", error);
205
206 node.leaveLock("db");
207 return callback(error);
208 }
209
210 if (ASYNC_SAVE) {
211 node.leaveLock("db");
212
213 debug("saveNode", "Async save node #", node.id, "already=", this._saveKeys[node.id]);
214 this._saveKeys[node.id] = true;
215 return callback(null, node);
216 }
217
218 this._dbStore(node, modifiedProperties, (error, storedNode) => {
219 node.leaveLock("db");
220
221 callback(error, storedNode);
222 });
223 });
224 });
225 }
226
227 /**
228 *
229 */
230 _dbStore(node, modifiedProperties, callback) {
231 var json = node.toJSONObject();
232 if (debug.enabled) {
233 debug("_dbStore", "SaveNode id=#", node.id, "storageId=", node.$id, "data=", Util
234 .inspect(json, {
235 depth : null
236 }), "modifiedProperties=", modifiedProperties);
237 }
238
239 if (!this.$rootId && node.id===0 && node.$id) {
240 this.$rootId=node.$id;
241 }
242
243 delete json.childrenIds;
244 delete json.linkedIds;
245 if (modifiedProperties) {
246 delete modifiedProperties.childrenIds;
247 delete modifiedProperties.linkedIds;
248
249 let $push=modifiedProperties.$push;
250 if ($push) {
251 delete $push.childrenIds;
252 delete $push.linkedIds;
253
254 if (!Object.keys(modifiedProperties.$push).length) {
255 delete modifiedProperties.$push;
256 }
257 }
258
259 let $pull=modifiedProperties.$pull;
260 if ($pull) {
261 delete $pull.childrenIds;
262 delete $pull.linkedIds;
263
264 if (!Object.keys(modifiedProperties.$pull).length) {
265 delete modifiedProperties.$pull;
266 }
267 }
268
269 if (modifiedProperties.parentId===-1) {
270 // Never happen (the root node which would change its parent !)
271 delete modifiedProperties.parentId;
272 }
273 }
274
275 if (modifiedProperties) {
276
277 if (!Object.keys(modifiedProperties).length) {
278 debug("_dbStore", "nothing to modify !");
279
280 return callback(null, node);
281 }
282
283 if (modifiedProperties.parentId!==undefined) {
284 modifiedProperties.parentId=this._convertIdToObjectID(modifiedProperties.parentId);
285 }
286 if (modifiedProperties.refId!==undefined) {
287 modifiedProperties.refId=this._convertIdToObjectID(modifiedProperties.refId);
288 }
289 let $push=modifiedProperties.$push;
290 if ($push && $push.repositories) {
291 $push.repositories=$push.repositories.map((id) => this._convertIdToObjectID(id));
292 }
293 let $pull=modifiedProperties.$pull;
294 if ($pull && $pull.repositories) {
295 $pull.repositories=$pull.repositories.map((id) => this._convertIdToObjectID(id));
296 }
297 if (modifiedProperties.repositories) {
298 modifiedProperties.repositories=modifiedProperties.repositories.map((id) => this._convertIdToObjectID(id));
299 }
300
301 var ms = {};
302 for ( var k in modifiedProperties) {
303 if (k.charAt(0) === '$') {
304 ms[k] = modifiedProperties[k];
305 continue;
306 }
307 if (!ms.$set) {
308 ms.$set = {};
309 }
310 ms.$set[k] = modifiedProperties[k]; // Use json{} not modifiedProperties{}
311 }
312
313 debug("_dbStore", "node #", node.id, "storageId=", node.$id, "modifiedProperties=", modifiedProperties,
314 "ms=", ms);
315
316 this._nodesCollection.update({
317 _id : node.$id
318
319 }, ms, {
320 upsert : false,
321 multi : false
322
323 }, (error) => {
324 if (error) {
325 logger.error("dbStore: Can not update node #", json._id, error,
326 error.stack);
327 return callback(error);
328 }
329
330 debug("_dbStore", "modified properties node #", node.id, "storageId=", node.$id);
331
332 callback(null, node);
333 });
334 return;
335 }
336
337 if (json.id!==0) {
338 delete json.id;
339 }
340
341 if (json.parentId===-1) {
342 delete json.parentId;
343
344 } else if (json.parentId!==undefined) {
345 json.parentId=this._convertIdToObjectID(json.parentId);
346 }
347 if (json.refId!==undefined) {
348 json.refId=this._convertIdToObjectID(json.refId);
349 }
350 if (json.repositories) {
351 json.repositories=json.repositories.map((id) => this._convertIdToObjectID(id, true));
352 }
353
354 this._nodesCollection.update({
355 _id : node.$id
356
357 }, {
358 $set : json
359
360 }, {
361 upsert : true,
362 multi : false
363
364 }, (error) => {
365 if (error) {
366 logger.error("dbStore: Can not update node #", node.id, "storage #",node.$id,"error=",error);
367 return callback(error);
368 }
369
370 debug("_dbStore", "stored node #", node.id, "storageId=", node.$id);
371
372 callback(null, node);
373 });
374 }
375
376 /**
377 *
378 */
379 keyFromString(key) {
380 if (key==="0") {
381 return 0;
382 }
383 return key;
384 }
385
386 /**
387 *
388 */
389 _convertIdToObjectID(id, cache) {
390 if (id===0) {
391 return this.$rootId;
392 }
393 return id;
394 }
395
396 /**
397 *
398 */
399 _convertObjectIDToId(id, cache) {
400 if (this.$rootId===id) {
401 return 0;
402 }
403 return id;
404 }
405
406 /**
407 *
408 */
409 getNodeById(id, callback) {
410 assert(id!==undefined && id!==null, "Invalid id parameter");
411 assert(typeof(callback)==="function", "Invalid function parameter");
412
413 if (id === 0 && this._service.root) {
414 return callback(null, this._service.root);
415 }
416
417 super.getNodeById(id, (error, node) => {
418 if (error) {
419 logger.error(error);
420 return callback(error);
421 }
422 if (node) {
423 setImmediate(callback.bind(this, null, node));
424 return;
425 }
426
427 debug("getNodeById", "search #", id);
428
429 var criteria;
430 if (id===0) {
431 criteria={id: id};
432
433 } else {
434 criteria={_id: this._convertIdToObjectID(id)};
435 }
436
437 this._nodesCollection.findOne(criteria, (error, document) => {
438 if (error) {
439 logger.error("Can not get document #",id,error);
440 return callback(error);
441 }
442
443 debug("getNodeById", "Find by id #", id, "=>", document);
444
445 if (!document) {
446 return callback();
447 }
448
449 if (document.id===0) {
450 document.parentId=-1;
451
452 } else if (document.parentId===undefined) {
453 logger.error("Document",document,"has not parent !");
454 callback(new Error("Node #"+id+" has no parent !"));
455 return;
456 }
457
458 var objectID=document._id;
459 if (document.id===undefined) {
460 document.id=this._convertObjectIDToId(objectID);
461 }
462
463 if (!this.$rootId && id===0) {
464 this.$rootId=objectID;
465 }
466 if (document.repositories) {
467 document.repositories=document.repositories.map((id) => this._convertObjectIDToId(id, true));
468 }
469
470 node = Node.fromJSONObject(this._service, document);
471 node.$id = objectID;
472// debug(" node=>", node);
473
474 this._fillChildrenAndLinkIds(node, objectID, callback);
475 });
476 });
477 }
478
479 /**
480 * Internal USE: Needed by mongodb
481 */
482 _saveNode(node, modifiedProperties, callback) {
483 super.saveNode(node, modifiedProperties, callback);
484 }
485
486 /**
487 *
488 */
489 _fillChildrenAndLinkIds(node, objectID, callback) {
490
491 // Bug in nedb, we must specify another field _id in the projection
492 this._nodesCollection.find( { parentId: objectID }, { _id: 1, x: 1 }, (error, docs) => {
493 debug("_fillChildrenAndLinkIds", "Find children by parentId #",objectID,"=>", docs, "error=",error);
494 if (error) {
495 logger.error(error);
496 return callback(error);
497 }
498
499 if (docs.length) {
500 node.childrenIds=docs.map((doc) => this._convertObjectIDToId(doc._id));
501 }
502
503 this._nodesCollection.find( { refId: objectID }, { _id: 1, x: 1 }, (error, docs) => {
504 debug("_fillChildrenAndLinkIds", "Find linked by node #",objectID,"=>", docs, "error=",error);
505 if (error) {
506 logger.error(error);
507 return callback(error);
508 }
509
510 if (docs.length) {
511 node.linkedIds=docs.map((doc) => this._convertObjectIDToId(doc._id));
512 }
513
514 this._saveNode(node, null, callback);
515 });
516 });
517 }
518
519
520 /**
521 *
522 */
523 allocateNodeId(node, callback) {
524 assert(node instanceof Node, "Invalid node parameter");
525 assert(typeof(callback)==="function", "Invalid function parameter");
526
527 var id = this._nodesCollection.createNewId();
528
529 node.$id=id;
530 node._id=this._convertObjectIDToId(id);
531
532 debug("allocateNodeId", "Allocated id=", id);
533
534 callback();
535 }
536
537 /**
538 *
539 */
540 unregisterNode(node, callback) {
541 assert(node instanceof Node, "Invalid node parameter");
542 assert(typeof(callback)==="function", "Invalid function parameter");
543
544 debug("unregisterNode", "Unregister node #",node);
545 super.unregisterNode(node, (error) => {
546 if (error) {
547 return callback(error);
548 }
549
550 if (!node.$id) {
551 logger.error("Can not unregister node #"+node.id);
552 return callback();
553 }
554
555 this._nodesCollection.remove({
556 _id : node.$id
557
558 }, (error, removedNode) => {
559 if (error) {
560 logger.error("Can not unregister node", error);
561 return callback(error);
562 }
563
564 callback();
565 });
566 });
567 }
568
569 _garbageNodeAsync(node) {
570 debug("_garbageNodeAsync", "garbageNode #", node.id, "state=", this._saveKeys[node.id]);
571 if (!this._saveKeys[node.id]) {
572 return;
573 }
574 this._saveQueue.push(node);
575 }
576
577 _saveWorker(node, callback) {
578 if (debug.enabled) {
579 debug("_saveWorker", "saveWorker #", node.id, "state=", this._saveKeys[node.id]);
580 }
581 if (!this._saveKeys[node.id]) {
582 return;
583 }
584 delete this._saveKeys[node.id];
585
586 node.takeLock("db", () => {
587
588 this._dbStore(node, null, (error) => {
589 node.leaveLock("db");
590
591 if (error) {
592 logger.error(error);
593 }
594
595 debug("_saveWorker", "STORED #", node.id);
596
597 callback(error);
598 });
599 });
600 }
601
602 /**
603 *
604 */
605 getMetas(path, mtime, callback) {
606 assert(typeof(path)==="string", "Invalid path parameter");
607 assert(typeof(callback)==="function", "Invalid function parameter");
608 assert(typeof(mtime)==="number", "Invalid metas parameter");
609
610 debug("getMetas", "Get metas for path=",path,"mtime=",mtime);
611
612 this._metasCollection.findOne({
613 path : path
614
615 }, (error, document) => {
616 if (error) {
617 logger.error(error);
618 return callback(error);
619 }
620
621 if (!document) {
622 debug("getMetas", "No metas for path=",path);
623 return callback();
624 }
625
626 if (document.mtime && document.mtime<mtime) {
627 // TODO Remove old ?
628
629 debug("getMetas", "Metas outdated for path=",path);
630
631 return callback();
632 }
633
634 debug("getMetas", "Metas for path=",path,"metas=",document.metas);
635
636 callback(null, document.metas || {});
637 });
638 }
639
640 /**
641 *
642 */
643 putMetas(path, mtime, metas, callback) {
644 assert(typeof(path)==="string", "Invalid path parameter");
645 assert(typeof(mtime)==="number", "Invalid metas parameter");
646 assert(typeof(metas)==="object", "Invalid metas parameter");
647 assert(typeof(callback)==="function", "Invalid function parameter");
648
649 debug("putMetas", "Put metas of path=",path,"mtimer=",mtime,"metas=",metas);
650
651 this._metasCollection.update({
652 path : path
653
654 }, {
655 path: path,
656 mtime: mtime,
657 metas: metas
658
659 }, {
660 upsert : true,
661 multi : false
662
663 }, (error) => {
664 if (error) {
665 logger.error("putMetas: Can not save metas path="+path, error,
666 error.stack);
667 return callback(error);
668 }
669
670 debug("putMetas", "Metas saved");
671
672 callback(null);
673 });
674 }
675
676 registerRepository(repository, repositoryHashKey, callback) {
677 var repo={
678 hashKey: repositoryHashKey
679 };
680
681 this._repositoriesCollection.findOne(repo, (error, document) => {
682 if (error) {
683 logger.error("Can not get repository repo=", repo, error);
684 return callback(error);
685 }
686
687 if (document) {
688 repository.$id=document._id;
689 repository._id=this._convertObjectIDToId(document._id);
690
691 debug("Found repository #", repository._id, "hashKey=",repo);
692
693 return callback(null, repository);
694 }
695
696 this._repositoriesCollection.insert(repo, (error, document) => {
697 if (error) {
698 logger.error("Can not insert new repository repo=", repo, error);
699 return callback(error);
700 }
701
702 repository.$id=document._id;
703 repository._id=this._convertObjectIDToId(document._id);
704
705 debug("Register repository #", repository.id, "hashKey=",repo);
706
707 callback(null, repository);
708 });
709 });
710 }
711}
712
713module.exports = NeDbRegistry;