1 | 'use strict'
|
2 |
|
3 | var _ = require('lodash')
|
4 | var bloom = require('blomma')(1024, 1)
|
5 | var async = require('async')
|
6 | var pako = require('pako')
|
7 |
|
8 | module.exports = function (ipfs, BUCKET_SIZE) {
|
9 |
|
10 | var Ref = function (ref, filters, count) {
|
11 | return {
|
12 | type: 'Ref',
|
13 | filters: filters,
|
14 | ref: ref,
|
15 | count: count,
|
16 | children: 1,
|
17 | append: function (el, cb) {
|
18 | restore(this.ref.Hash, function (err, restored) {
|
19 | if (err) return cb(err)
|
20 | restored.append(el, function (err, res) {
|
21 | if (err) return cb(err)
|
22 | cb(null, res)
|
23 | })
|
24 | })
|
25 | },
|
26 | offset: function (ofs) {
|
27 | return [0, ofs]
|
28 | },
|
29 | getOffset: function () {
|
30 | return 0
|
31 | },
|
32 | getChild: function (idx, filter, cb) {
|
33 | var self = this
|
34 | restore(self.ref.Hash, function (err, res) {
|
35 | if (err) return cb(err)
|
36 | cb(null, { restored: res })
|
37 | })
|
38 | },
|
39 | persist: function (cb) {
|
40 | return cb(null, this.ref)
|
41 | }
|
42 | }
|
43 | }
|
44 |
|
45 | var Bucket = function (elements) {
|
46 | return {
|
47 | type: 'Bucket',
|
48 | elements: elements || [],
|
49 | count: elements.length,
|
50 | children: elements.length,
|
51 | filters: elementFilters(elements),
|
52 | append: function (el, cb) {
|
53 | if (this.elements.length === BUCKET_SIZE) {
|
54 | cb(null, { split: [ new Bucket(this.elements),
|
55 | new Bucket([el]) ] })
|
56 | } else {
|
57 | var newelements = _.clone(this.elements)
|
58 | newelements.push(el)
|
59 | cb(null, { value: new Bucket(newelements) })
|
60 | }
|
61 | },
|
62 | offset: function (ofs) {
|
63 | return [ofs, 0]
|
64 | },
|
65 | getOffset: function (idx) {
|
66 | return idx
|
67 | },
|
68 | getChild: function (idx, filter, cb) {
|
69 | var el = this.elements[idx]
|
70 | if (typeof el === 'undefined') return cb(null, { eof: true })
|
71 |
|
72 | if (matches(el, filter.words)) {
|
73 | return cb(null, { element: el })
|
74 | } else {
|
75 | return cb(null, { skip: true })
|
76 | }
|
77 | },
|
78 | persist: function (cb) {
|
79 | var self = this
|
80 |
|
81 | if (self.persisted) return cb(null, self.persisted)
|
82 |
|
83 | var buf = new Buffer(JSON.stringify({
|
84 | Data: JSON.stringify({
|
85 | type: 'Bucket',
|
86 | data: this
|
87 | }),
|
88 | Links: []
|
89 | }))
|
90 |
|
91 | ipfs.object.put(buf, 'json', function (err, put) {
|
92 | if (err) return cb(err)
|
93 |
|
94 | ipfs.object.stat(put.Hash, function (err, stat) {
|
95 | if (err) return cb(err)
|
96 | self.persisted = { Hash: put.Hash,
|
97 | Size: stat.CumulativeSize}
|
98 | cb(null, self.persisted)
|
99 | })
|
100 | })
|
101 | }
|
102 | }
|
103 | }
|
104 |
|
105 | var Branch = function (elements) {
|
106 | return {
|
107 | type: 'Branch',
|
108 | elements: elements,
|
109 | count: _.reduce(elements, function (a, b) {
|
110 | return a + b.count
|
111 | }, 0),
|
112 | children: elements.length,
|
113 | filters: combineFilters(this.elements),
|
114 | append: function (el, cb) {
|
115 | if (this.elements.length === BUCKET_SIZE) {
|
116 | cb(null, { split: [ new Branch(this.elements),
|
117 | new Branch([el]) ]})
|
118 | } else {
|
119 | var newelements = _.clone(this.elements)
|
120 | newelements.push(el)
|
121 | cb(null, { value: new Branch(newelements) })
|
122 | }
|
123 | },
|
124 | offset: function (ofs) {
|
125 | var idx = 0
|
126 | while (this.elements[(idx + 1)] && this.elements[idx].count <= ofs) {
|
127 | ofs -= this.elements[idx].count
|
128 | idx++
|
129 | }
|
130 | return [idx, ofs]
|
131 | },
|
132 | getOffset: function (idx) {
|
133 | var count = 0
|
134 | for (var i = 0 ; i < idx ; i++) {
|
135 | count += this.elements[i].count
|
136 | }
|
137 | return count
|
138 | },
|
139 | getChild: function (idx, filter, cb) {
|
140 | var element = this.elements[idx]
|
141 |
|
142 | if (element) {
|
143 | if (!subsetMatches(element.filters, filter.blooms)) {
|
144 | cb(null, { skip: true })
|
145 | } else {
|
146 | cb(null, { push: element })
|
147 | }
|
148 | } else {
|
149 | cb(null, { eof: true })
|
150 | }
|
151 | },
|
152 | persist: function (cb) {
|
153 | var self = this
|
154 |
|
155 | if (self.persisted) return cb(null, self.persisted)
|
156 |
|
157 | var filters = {}
|
158 | var counts = {}
|
159 | async.series(_.map(self.elements, function (element, idx) {
|
160 | var name = zeropad(idx)
|
161 | filters[name] = serializeFilters(self.elements[idx].filters)
|
162 | counts[name] = self.elements[idx].count
|
163 | return function (done) {
|
164 | element.persist(function (err, persisted) {
|
165 | if (err) return done(err)
|
166 | done(null, {
|
167 | Name: name,
|
168 | Hash: persisted.Hash,
|
169 | Size: persisted.Size
|
170 | })
|
171 | })
|
172 | }
|
173 | }), function (err, links) {
|
174 | if (err) return cb(err)
|
175 |
|
176 | var obj = {
|
177 | Data: JSON.stringify({
|
178 | type: self.type,
|
179 | counts: counts,
|
180 | filters: filters
|
181 | }),
|
182 | Links: links
|
183 | }
|
184 |
|
185 | var buf = new Buffer(JSON.stringify(obj))
|
186 | ipfs.object.put(buf, 'json', function (err, put) {
|
187 | if (err) return cb(err)
|
188 | ipfs.object.stat(put.Hash, function (err, stat) {
|
189 | if (err) return cb(err)
|
190 | self.persisted = { Hash: put.Hash,
|
191 | Size: stat.CumulativeSize }
|
192 | cb(null, self.persisted)
|
193 | })
|
194 | })
|
195 | })
|
196 | }
|
197 | }
|
198 | }
|
199 |
|
200 | var Finger = function (elements) {
|
201 | return {
|
202 | type: 'Finger',
|
203 | elements: elements,
|
204 | count: _.reduce(elements, function (a, b) {
|
205 | return a + b.count
|
206 | }, 0),
|
207 | children: 3,
|
208 | append: function (el, cb) {
|
209 |
|
210 | var self = this
|
211 | var tail = 2
|
212 | var newelements = _.clone(self.elements)
|
213 | elements[tail].append(el, function (err, res) {
|
214 | if (err) return cb(err)
|
215 | if (res.split) {
|
216 |
|
217 | newelements[2] = res.split[1]
|
218 | elements[1].append(res.split[0], function (err, pushres) {
|
219 | if (err) return cb(err)
|
220 | if (pushres.split) {
|
221 | newelements[1] = new Finger([ pushres.split[0],
|
222 | new Branch([]),
|
223 | pushres.split[1] ])
|
224 | } else {
|
225 | newelements[1] = pushres.value
|
226 | }
|
227 |
|
228 | cb(null, { value: new Finger(newelements)})
|
229 | })
|
230 | } else {
|
231 | newelements[2] = res.value
|
232 | cb(null, { value: new Finger(newelements)})
|
233 | }
|
234 | })
|
235 | },
|
236 | offset: function (ofs) {
|
237 | var idx = 0
|
238 | while (this.elements[(idx + 1)] && this.elements[idx].count <= ofs) {
|
239 | ofs -= this.elements[idx].count
|
240 | idx++
|
241 | }
|
242 | return [idx, ofs]
|
243 | },
|
244 | getOffset: function (idx) {
|
245 | var count = 0
|
246 | for (var i = 0 ; i < idx ; i++) {
|
247 | count += this.elements[i].count
|
248 | }
|
249 | return count
|
250 | },
|
251 | getChild: function (idx, filter, cb) {
|
252 | var element = this.elements[idx]
|
253 | if (element) {
|
254 | if (!subsetMatches(element.filters, filter.blooms)) {
|
255 | cb(null, { skip: true })
|
256 | } else {
|
257 | cb(null, { push: element })
|
258 | }
|
259 | } else {
|
260 | cb(null, { eof: true })
|
261 | }
|
262 | },
|
263 | persist: function (cb) {
|
264 | var self = this
|
265 |
|
266 | if (self.persisted) return cb(null, self.persisted)
|
267 |
|
268 | var filters = {}
|
269 | var counts = {}
|
270 | var parts = ['head', 'rest', 'tail']
|
271 | async.series(_.map(self.elements, function (element, idx) {
|
272 | var name = parts[idx]
|
273 | filters[name] = serializeFilters(self.elements[idx].filters)
|
274 | counts[name] = self.elements[idx].count
|
275 | return function (done) {
|
276 | self.elements[idx].persist(function (err, persisted) {
|
277 | if (err) return done(err)
|
278 | done(null, {
|
279 | Name: name,
|
280 | Hash: persisted.Hash,
|
281 | Size: persisted.Size
|
282 | })
|
283 | })
|
284 | }
|
285 | }), function (err, links) {
|
286 | if (err) return cb(err)
|
287 |
|
288 | var obj = {
|
289 | Data: JSON.stringify({
|
290 | type: 'Finger',
|
291 | filters: filters,
|
292 | counts: counts
|
293 | }),
|
294 | Links: links
|
295 | }
|
296 |
|
297 | var buf = new Buffer(JSON.stringify(obj))
|
298 |
|
299 | ipfs.object.put(buf, 'json', function (err, put) {
|
300 | if (err) return cb(err)
|
301 | ipfs.object.stat(put.Hash, function (err, stat) {
|
302 | if (err) return cb(err)
|
303 | self.persisted = { Hash: put.Hash,
|
304 | Size: stat.CumulativeSize }
|
305 | cb(null, self.persisted)
|
306 | })
|
307 | })
|
308 | })
|
309 | }
|
310 | }
|
311 | }
|
312 |
|
313 | var Root = function (ref) {
|
314 | if (!ref) ref = new Bucket([])
|
315 |
|
316 | return {
|
317 | type: 'Root',
|
318 | ref: ref,
|
319 | count: ref.count,
|
320 | append: function (el, cb) {
|
321 | this.ref.append(el, function (err, res) {
|
322 | if (err) return cb(err)
|
323 | if (res.split) {
|
324 | var newelements = []
|
325 | newelements[0] = res.split[0]
|
326 | newelements[1] = new Branch([])
|
327 | newelements[2] = res.split[1]
|
328 | cb(null, new Root(new Finger(newelements)))
|
329 | } else {
|
330 | cb(null, new Root(res.value))
|
331 | }
|
332 | })
|
333 | },
|
334 | iterator: function (opts) {
|
335 | return new Iterator(this.ref, opts)
|
336 | },
|
337 | persist: function (cb) {
|
338 | ref.persist(cb)
|
339 | },
|
340 | concat: function (items, cb) {
|
341 | var idx = 0
|
342 | var log = this
|
343 | async.forever(function (next) {
|
344 | log.append(items[idx++], function (err, res) {
|
345 | if (err) return cb(err)
|
346 | if (idx === items.length) return cb(null, res)
|
347 | log = res
|
348 | next()
|
349 | })
|
350 | })
|
351 | },
|
352 | get: function (idx, cb) {
|
353 | var self = this
|
354 | self.iterator({ offset: idx }).next(function (err, res) {
|
355 | if (err) return cb(err)
|
356 | cb(null, res)
|
357 | })
|
358 | }
|
359 | }
|
360 | }
|
361 |
|
362 | var Iterator = function (over, opts) {
|
363 | if (!opts) opts = {}
|
364 | var reverse = !!opts.reverse
|
365 | var fullfilter = makefilter(opts.filter)
|
366 | var def = reverse ? over.count - 1 : 0
|
367 | var offset = (typeof opts.offset !== 'undefined' ? opts.offset : def)
|
368 | var stack = null
|
369 |
|
370 | return {
|
371 | pushcount: 0,
|
372 | next: function (cb) {
|
373 | var self = this
|
374 |
|
375 |
|
376 | if (!stack) {
|
377 | stackFromOffset(over, offset, function (err, newstack) {
|
378 | if (err) return cb(err)
|
379 | stack = newstack
|
380 | self.next(cb)
|
381 | })
|
382 | return
|
383 | }
|
384 |
|
385 | if (!stack[0]) return cb(null, { eof: true })
|
386 |
|
387 | stack[0].obj.getChild(stack[0].idx, fullfilter, function (err, res) {
|
388 | if (err) return cb(err)
|
389 |
|
390 | if (res.eof) {
|
391 | stack.shift()
|
392 | if (!stack[0]) return cb(null, { eof: true })
|
393 | reverse ? stack[0].idx-- : stack[0].idx++
|
394 | self.next(cb)
|
395 | } else if (res.skip) {
|
396 | reverse ? stack[0].idx-- : stack[0].idx++
|
397 | self.next(cb)
|
398 | } else if (res.push) {
|
399 | self.pushcount++
|
400 | stack.unshift({ obj: res.push,
|
401 | idx: reverse ? res.push.children - 1 : 0 })
|
402 | self.next(cb)
|
403 | } else if (res.restored) {
|
404 | stack[0] = { obj: res.restored,
|
405 | idx: reverse ? res.restored.children - 1 : 0 }
|
406 | self.next(cb)
|
407 | } else if (typeof res.element !== 'undefined') {
|
408 | var index = offsetFromStack(stack)
|
409 |
|
410 | reverse ? stack[0].idx-- : stack[0].idx++
|
411 | cb(null, {
|
412 | element: res.element,
|
413 | index: index
|
414 | })
|
415 | } else {
|
416 | throw new Error('unhandled case, ' + JSON.stringify(res))
|
417 | }
|
418 | })
|
419 | },
|
420 | take: function (nr, cb) {
|
421 | var self = this
|
422 | var accum = []
|
423 | async.forever(function (next) {
|
424 | self.next(function (err, res) {
|
425 | if (err) return cb(err)
|
426 | if (res.eof) return cb(null, accum)
|
427 | if (!nr--) return cb(null, accum)
|
428 | accum.push(res)
|
429 | next()
|
430 | })
|
431 | })
|
432 | },
|
433 | all: function (cb) {
|
434 | this.take(Infinity, cb)
|
435 | }
|
436 | }
|
437 | }
|
438 |
|
439 | var offsetFromStack = function (stack) {
|
440 | return _.reduce(stack, function (acc, n) {
|
441 | return acc + n.obj.getOffset(n.idx)
|
442 | }, 0)
|
443 | }
|
444 |
|
445 | var stackFromOffset = function (over, offset, acc, cb) {
|
446 | if (!cb) {
|
447 | cb = acc
|
448 | acc = []
|
449 | }
|
450 |
|
451 | var idxrest = over.offset(offset)
|
452 |
|
453 | var idx = idxrest[0]
|
454 | var rest = idxrest[1]
|
455 |
|
456 | acc.unshift({ obj: over,
|
457 | idx: idx })
|
458 |
|
459 | over.getChild(idx, {}, function (err, res) {
|
460 | if (err) return cb(err)
|
461 | if (res.restored) {
|
462 | acc.shift()
|
463 | stackFromOffset(res.restored, rest, acc, cb)
|
464 | } else if (res.push) {
|
465 | stackFromOffset(res.push, rest, acc, cb)
|
466 | } else {
|
467 | cb(null, acc)
|
468 | }
|
469 | })
|
470 | }
|
471 |
|
472 | var elementFilters = function (elements) {
|
473 | var filter = {}
|
474 | _.forEach(elements, function (element) {
|
475 | _.forEach(element, function (value, key) {
|
476 | if (typeof value === 'string') {
|
477 | if (!filter[key]) filter[key] = bloom.empty()
|
478 | _.forEach(splitWords(value), function (word) {
|
479 | filter[key].add(word)
|
480 | })
|
481 | }
|
482 | })
|
483 | })
|
484 | return filter
|
485 | }
|
486 |
|
487 | var serializeFilters = function (filters) {
|
488 | var serialized = {}
|
489 | _.forEach(filters, function (value, key) {
|
490 | var compressed = new Buffer(pako.deflate(filters[key].buffer)).toString('base64')
|
491 | serialized[key] = compressed
|
492 | })
|
493 | return serialized
|
494 | }
|
495 |
|
496 | var deserializeFilters = function (filters) {
|
497 | var deserialized = {}
|
498 | _.forEach(filters, function (value, key) {
|
499 | var buffer = new Buffer(
|
500 | pako.inflate(new Buffer(filters[key], 'base64')),
|
501 | 'base64')
|
502 | deserialized[key] = bloom.fromBuffer(buffer)
|
503 | })
|
504 | return deserialized
|
505 | }
|
506 |
|
507 | var makefilter = function (filter) {
|
508 | if (!filter) {
|
509 | return {
|
510 | words: {},
|
511 | blooms: {}}
|
512 | }
|
513 |
|
514 | var blooms = {}
|
515 |
|
516 | _.forEach(filter, function (value, key) {
|
517 | blooms[key] = bloom.empty()
|
518 | _.forEach(splitWords(value), function (word) {
|
519 | blooms[key].add(word)
|
520 | })
|
521 | })
|
522 |
|
523 | return {words: filter,
|
524 | blooms: blooms}
|
525 | }
|
526 |
|
527 | var zeropad = function (nr) {
|
528 | var str = ('00' + nr)
|
529 | return str.substr(str.length - 3)
|
530 | }
|
531 |
|
532 | var combineFilters = function (tocombine) {
|
533 | var filters = {}
|
534 | _.forEach(tocombine, function (part) {
|
535 | _.forEach(part.filters, function (value, key) {
|
536 | if (!filters[key]) {
|
537 | filters[key] = value
|
538 | } else {
|
539 | filters[key] = bloom.merge(filters[key], value)
|
540 | }
|
541 | })
|
542 | })
|
543 | return filters
|
544 | }
|
545 |
|
546 | var splitWords = function (string) {
|
547 |
|
548 |
|
549 |
|
550 | return string.toLowerCase().split(/[^0-9a-z\u00C0-\u00ff\u00C0-\u024f#@_-]+/)
|
551 | }
|
552 |
|
553 | var matches = function (element, filter) {
|
554 | var matches = true
|
555 | _.forEach(filter, function (value, key) {
|
556 |
|
557 | var regexp = new RegExp('(?:^| )' + value + '(?:$| |[?!,.])', 'i')
|
558 | if (typeof element[key] !== 'string' ||
|
559 | !element[key].match(regexp)) {
|
560 | matches = false
|
561 | }
|
562 | })
|
563 |
|
564 | return matches
|
565 | }
|
566 |
|
567 | var subsetMatches = function (superset, subset) {
|
568 | var matches = true
|
569 | if (!superset || !Object.keys(superset).length) return true
|
570 |
|
571 | _.forEach(subset, function (value, key) {
|
572 | if (!superset[key] ||
|
573 | !superset[key].contains(value)) {
|
574 | matches = false
|
575 | }
|
576 | })
|
577 | return matches
|
578 | }
|
579 |
|
580 | var restore = function (hash, cb) {
|
581 | ipfs.object.get(hash, function (err, res) {
|
582 | if (err) return cb(err)
|
583 | var object = JSON.parse(res.Data)
|
584 |
|
585 | if (object.type === 'Bucket') {
|
586 | cb(null, new Bucket(object.data.elements))
|
587 | } else if (object.type === 'Branch') {
|
588 | cb(null, new Branch(_.map(res.Links, function (link, idx) {
|
589 | return new Ref({ Hash: link.Hash,
|
590 | Size: link.Size },
|
591 | deserializeFilters(object.filters[zeropad(idx)]),
|
592 | object.counts[zeropad(idx)])
|
593 | })))
|
594 | } else if (object.type === 'Finger') {
|
595 | var linkmap = {}
|
596 | _.forEach(res.Links, function (link) {
|
597 | linkmap[link.Name] = link
|
598 | })
|
599 |
|
600 | cb(null, new Finger([ new Ref({ Hash: linkmap.head.Hash,
|
601 | Size: linkmap.head.Size },
|
602 | deserializeFilters(object.filters.head),
|
603 | object.counts.head),
|
604 | new Ref({ Hash: linkmap.rest.Hash,
|
605 | Size: linkmap.rest.Size },
|
606 | deserializeFilters(object.filters.rest),
|
607 | object.counts.rest),
|
608 | new Ref({ Hash: linkmap.tail.Hash,
|
609 | Size: linkmap.tail.Size },
|
610 | deserializeFilters(object.filters.tail),
|
611 | object.counts.tail) ]))
|
612 | }
|
613 | })
|
614 | }
|
615 |
|
616 | return {
|
617 | empty: function () {
|
618 | return new Root()
|
619 | },
|
620 | restore: function (hash, cb) {
|
621 | restore(hash, function (err, res) {
|
622 | if (err) return cb(err)
|
623 | cb(null, new Root(res))
|
624 | })
|
625 | }
|
626 | }
|
627 | }
|