UNPKG

17.9 kBJavaScriptView Raw
1'use strict'
2
3var _ = require('lodash')
4var bloom = require('blomma')(1024, 1)
5var async = require('async')
6var pako = require('pako')
7
8module.exports = function (ipfs, BUCKET_SIZE) {
9
10 var Ref = function (ref, filters, count) {
11 return {
12 type: 'Ref',
13 filters: filters,
14 ref: ref,
15 count: count,
16 children: 1,
17 append: function (el, cb) {
18 restore(this.ref.Hash, function (err, restored) {
19 if (err) return cb(err)
20 restored.append(el, function (err, res) {
21 if (err) return cb(err)
22 cb(null, res)
23 })
24 })
25 },
26 offset: function (ofs) {
27 return [0, ofs]
28 },
29 getOffset: function () {
30 return 0
31 },
32 getChild: function (idx, filter, cb) {
33 var self = this
34 restore(self.ref.Hash, function (err, res) {
35 if (err) return cb(err)
36 cb(null, { restored: res })
37 })
38 },
39 persist: function (cb) {
40 return cb(null, this.ref)
41 }
42 }
43 }
44
45 var Bucket = function (elements) {
46 return {
47 type: 'Bucket',
48 elements: elements || [],
49 count: elements.length,
50 children: elements.length,
51 filters: elementFilters(elements),
52 append: function (el, cb) {
53 if (this.elements.length === BUCKET_SIZE) {
54 cb(null, { split: [ new Bucket(this.elements),
55 new Bucket([el]) ] })
56 } else {
57 var newelements = _.clone(this.elements)
58 newelements.push(el)
59 cb(null, { value: new Bucket(newelements) })
60 }
61 },
62 offset: function (ofs) {
63 return [ofs, 0]
64 },
65 getOffset: function (idx) {
66 return idx
67 },
68 getChild: function (idx, filter, cb) {
69 var el = this.elements[idx]
70 if (typeof el === 'undefined') return cb(null, { eof: true })
71
72 if (matches(el, filter.words)) {
73 return cb(null, { element: el })
74 } else {
75 return cb(null, { skip: true })
76 }
77 },
78 persist: function (cb) {
79 var self = this
80
81 if (self.persisted) return cb(null, self.persisted)
82
83 var buf = new Buffer(JSON.stringify({
84 Data: JSON.stringify({
85 type: 'Bucket',
86 data: this
87 }),
88 Links: []
89 }))
90
91 ipfs.object.put(buf, 'json', function (err, put) {
92 if (err) return cb(err)
93
94 ipfs.object.stat(put.Hash, function (err, stat) {
95 if (err) return cb(err)
96 self.persisted = { Hash: put.Hash,
97 Size: stat.CumulativeSize}
98 cb(null, self.persisted)
99 })
100 })
101 }
102 }
103 }
104
105 var Branch = function (elements) {
106 return {
107 type: 'Branch',
108 elements: elements,
109 count: _.reduce(elements, function (a, b) {
110 return a + b.count
111 }, 0),
112 children: elements.length,
113 filters: combineFilters(this.elements),
114 append: function (el, cb) {
115 if (this.elements.length === BUCKET_SIZE) {
116 cb(null, { split: [ new Branch(this.elements),
117 new Branch([el]) ]})
118 } else {
119 var newelements = _.clone(this.elements)
120 newelements.push(el)
121 cb(null, { value: new Branch(newelements) })
122 }
123 },
124 offset: function (ofs) {
125 var idx = 0
126 while (this.elements[(idx + 1)] && this.elements[idx].count <= ofs) {
127 ofs -= this.elements[idx].count
128 idx++
129 }
130 return [idx, ofs]
131 },
132 getOffset: function (idx) {
133 var count = 0
134 for (var i = 0 ; i < idx ; i++) {
135 count += this.elements[i].count
136 }
137 return count
138 },
139 getChild: function (idx, filter, cb) {
140 var element = this.elements[idx]
141
142 if (element) {
143 if (!subsetMatches(element.filters, filter.blooms)) {
144 cb(null, { skip: true })
145 } else {
146 cb(null, { push: element })
147 }
148 } else {
149 cb(null, { eof: true })
150 }
151 },
152 persist: function (cb) {
153 var self = this
154
155 if (self.persisted) return cb(null, self.persisted)
156
157 var filters = {}
158 var counts = {}
159 async.series(_.map(self.elements, function (element, idx) {
160 var name = zeropad(idx)
161 filters[name] = serializeFilters(self.elements[idx].filters)
162 counts[name] = self.elements[idx].count
163 return function (done) {
164 element.persist(function (err, persisted) {
165 if (err) return done(err)
166 done(null, {
167 Name: name,
168 Hash: persisted.Hash,
169 Size: persisted.Size
170 })
171 })
172 }
173 }), function (err, links) {
174 if (err) return cb(err)
175
176 var obj = {
177 Data: JSON.stringify({
178 type: self.type,
179 counts: counts,
180 filters: filters
181 }),
182 Links: links
183 }
184
185 var buf = new Buffer(JSON.stringify(obj))
186 ipfs.object.put(buf, 'json', function (err, put) {
187 if (err) return cb(err)
188 ipfs.object.stat(put.Hash, function (err, stat) {
189 if (err) return cb(err)
190 self.persisted = { Hash: put.Hash,
191 Size: stat.CumulativeSize }
192 cb(null, self.persisted)
193 })
194 })
195 })
196 }
197 }
198 }
199
200 var Finger = function (elements) {
201 return {
202 type: 'Finger',
203 elements: elements,
204 count: _.reduce(elements, function (a, b) {
205 return a + b.count
206 }, 0),
207 children: 3,
208 append: function (el, cb) {
209
210 var self = this
211 var tail = 2
212 var newelements = _.clone(self.elements)
213 elements[tail].append(el, function (err, res) {
214 if (err) return cb(err)
215 if (res.split) {
216 // push first down the middle
217 newelements[2] = res.split[1]
218 elements[1].append(res.split[0], function (err, pushres) {
219 if (err) return cb(err)
220 if (pushres.split) {
221 newelements[1] = new Finger([ pushres.split[0],
222 new Branch([]),
223 pushres.split[1] ])
224 } else {
225 newelements[1] = pushres.value
226 }
227
228 cb(null, { value: new Finger(newelements)})
229 })
230 } else {
231 newelements[2] = res.value
232 cb(null, { value: new Finger(newelements)})
233 }
234 })
235 },
236 offset: function (ofs) {
237 var idx = 0
238 while (this.elements[(idx + 1)] && this.elements[idx].count <= ofs) {
239 ofs -= this.elements[idx].count
240 idx++
241 }
242 return [idx, ofs]
243 },
244 getOffset: function (idx) {
245 var count = 0
246 for (var i = 0 ; i < idx ; i++) {
247 count += this.elements[i].count
248 }
249 return count
250 },
251 getChild: function (idx, filter, cb) {
252 var element = this.elements[idx]
253 if (element) {
254 if (!subsetMatches(element.filters, filter.blooms)) {
255 cb(null, { skip: true })
256 } else {
257 cb(null, { push: element })
258 }
259 } else {
260 cb(null, { eof: true })
261 }
262 },
263 persist: function (cb) {
264 var self = this
265
266 if (self.persisted) return cb(null, self.persisted)
267
268 var filters = {}
269 var counts = {}
270 var parts = ['head', 'rest', 'tail']
271 async.series(_.map(self.elements, function (element, idx) {
272 var name = parts[idx]
273 filters[name] = serializeFilters(self.elements[idx].filters)
274 counts[name] = self.elements[idx].count
275 return function (done) {
276 self.elements[idx].persist(function (err, persisted) {
277 if (err) return done(err)
278 done(null, {
279 Name: name,
280 Hash: persisted.Hash,
281 Size: persisted.Size
282 })
283 })
284 }
285 }), function (err, links) {
286 if (err) return cb(err)
287
288 var obj = {
289 Data: JSON.stringify({
290 type: 'Finger',
291 filters: filters,
292 counts: counts
293 }),
294 Links: links
295 }
296
297 var buf = new Buffer(JSON.stringify(obj))
298
299 ipfs.object.put(buf, 'json', function (err, put) {
300 if (err) return cb(err)
301 ipfs.object.stat(put.Hash, function (err, stat) {
302 if (err) return cb(err)
303 self.persisted = { Hash: put.Hash,
304 Size: stat.CumulativeSize }
305 cb(null, self.persisted)
306 })
307 })
308 })
309 }
310 }
311 }
312
313 var Root = function (ref) {
314 if (!ref) ref = new Bucket([])
315
316 return {
317 type: 'Root',
318 ref: ref,
319 count: ref.count,
320 append: function (el, cb) {
321 this.ref.append(el, function (err, res) {
322 if (err) return cb(err)
323 if (res.split) {
324 var newelements = []
325 newelements[0] = res.split[0]
326 newelements[1] = new Branch([])
327 newelements[2] = res.split[1]
328 cb(null, new Root(new Finger(newelements)))
329 } else {
330 cb(null, new Root(res.value))
331 }
332 })
333 },
334 iterator: function (opts) {
335 return new Iterator(this.ref, opts)
336 },
337 persist: function (cb) {
338 ref.persist(cb)
339 },
340 concat: function (items, cb) {
341 var idx = 0
342 var log = this
343 async.forever(function (next) {
344 log.append(items[idx++], function (err, res) {
345 if (err) return cb(err)
346 if (idx === items.length) return cb(null, res)
347 log = res
348 next()
349 })
350 })
351 },
352 get: function (idx, cb) {
353 var self = this
354 self.iterator({ offset: idx }).next(function (err, res) {
355 if (err) return cb(err)
356 cb(null, res)
357 })
358 }
359 }
360 }
361
362 var Iterator = function (over, opts) {
363 if (!opts) opts = {}
364 var reverse = !!opts.reverse
365 var fullfilter = makefilter(opts.filter)
366 var def = reverse ? over.count - 1 : 0
367 var offset = (typeof opts.offset !== 'undefined' ? opts.offset : def)
368 var stack = null
369
370 return {
371 pushcount: 0,
372 next: function (cb) {
373 var self = this
374
375 // initialize stack
376 if (!stack) {
377 stackFromOffset(over, offset, function (err, newstack) {
378 if (err) return cb(err)
379 stack = newstack
380 self.next(cb)
381 })
382 return
383 }
384
385 if (!stack[0]) return cb(null, { eof: true })
386
387 stack[0].obj.getChild(stack[0].idx, fullfilter, function (err, res) {
388 if (err) return cb(err)
389
390 if (res.eof) {
391 stack.shift()
392 if (!stack[0]) return cb(null, { eof: true })
393 reverse ? stack[0].idx-- : stack[0].idx++
394 self.next(cb)
395 } else if (res.skip) {
396 reverse ? stack[0].idx-- : stack[0].idx++
397 self.next(cb)
398 } else if (res.push) {
399 self.pushcount++
400 stack.unshift({ obj: res.push,
401 idx: reverse ? res.push.children - 1 : 0 })
402 self.next(cb)
403 } else if (res.restored) {
404 stack[0] = { obj: res.restored,
405 idx: reverse ? res.restored.children - 1 : 0 }
406 self.next(cb)
407 } else if (typeof res.element !== 'undefined') {
408 var index = offsetFromStack(stack)
409
410 reverse ? stack[0].idx-- : stack[0].idx++
411 cb(null, {
412 element: res.element,
413 index: index
414 })
415 } else {
416 throw new Error('unhandled case, ' + JSON.stringify(res))
417 }
418 })
419 },
420 take: function (nr, cb) {
421 var self = this
422 var accum = []
423 async.forever(function (next) {
424 self.next(function (err, res) {
425 if (err) return cb(err)
426 if (res.eof) return cb(null, accum)
427 if (!nr--) return cb(null, accum)
428 accum.push(res)
429 next()
430 })
431 })
432 },
433 all: function (cb) {
434 this.take(Infinity, cb)
435 }
436 }
437 }
438
439 var offsetFromStack = function (stack) {
440 return _.reduce(stack, function (acc, n) {
441 return acc + n.obj.getOffset(n.idx)
442 }, 0)
443 }
444
445 var stackFromOffset = function (over, offset, acc, cb) {
446 if (!cb) {
447 cb = acc
448 acc = []
449 }
450
451 var idxrest = over.offset(offset)
452
453 var idx = idxrest[0]
454 var rest = idxrest[1]
455
456 acc.unshift({ obj: over,
457 idx: idx })
458
459 over.getChild(idx, {}, function (err, res) {
460 if (err) return cb(err)
461 if (res.restored) {
462 acc.shift()
463 stackFromOffset(res.restored, rest, acc, cb)
464 } else if (res.push) {
465 stackFromOffset(res.push, rest, acc, cb)
466 } else {
467 cb(null, acc)
468 }
469 })
470 }
471
472 var elementFilters = function (elements) {
473 var filter = {}
474 _.forEach(elements, function (element) {
475 _.forEach(element, function (value, key) {
476 if (typeof value === 'string') {
477 if (!filter[key]) filter[key] = bloom.empty()
478 _.forEach(splitWords(value), function (word) {
479 filter[key].add(word)
480 })
481 }
482 })
483 })
484 return filter
485 }
486
487 var serializeFilters = function (filters) {
488 var serialized = {}
489 _.forEach(filters, function (value, key) {
490 var compressed = new Buffer(pako.deflate(filters[key].buffer)).toString('base64')
491 serialized[key] = compressed
492 })
493 return serialized
494 }
495
496 var deserializeFilters = function (filters) {
497 var deserialized = {}
498 _.forEach(filters, function (value, key) {
499 var buffer = new Buffer(
500 pako.inflate(new Buffer(filters[key], 'base64')),
501 'base64')
502 deserialized[key] = bloom.fromBuffer(buffer)
503 })
504 return deserialized
505 }
506
507 var makefilter = function (filter) {
508 if (!filter) {
509 return {
510 words: {},
511 blooms: {}}
512 }
513
514 var blooms = {}
515
516 _.forEach(filter, function (value, key) {
517 blooms[key] = bloom.empty()
518 _.forEach(splitWords(value), function (word) {
519 blooms[key].add(word)
520 })
521 })
522
523 return {words: filter,
524 blooms: blooms}
525 }
526
527 var zeropad = function (nr) {
528 var str = ('00' + nr)
529 return str.substr(str.length - 3)
530 }
531
532 var combineFilters = function (tocombine) {
533 var filters = {}
534 _.forEach(tocombine, function (part) {
535 _.forEach(part.filters, function (value, key) {
536 if (!filters[key]) {
537 filters[key] = value
538 } else {
539 filters[key] = bloom.merge(filters[key], value)
540 }
541 })
542 })
543 return filters
544 }
545
546 var splitWords = function (string) {
547 // split into words, # and @ are concidered part
548 // of the words
549 // TODO: support non-latin alphabets
550 return string.toLowerCase().split(/[^0-9a-z\u00C0-\u00ff\u00C0-\u024f#@_-]+/)
551 }
552
553 var matches = function (element, filter) {
554 var matches = true
555 _.forEach(filter, function (value, key) {
556 // TODO use pluggable tokenizer
557 var regexp = new RegExp('(?:^| )' + value + '(?:$| |[?!,.])', 'i')
558 if (typeof element[key] !== 'string' ||
559 !element[key].match(regexp)) {
560 matches = false
561 }
562 })
563
564 return matches
565 }
566
567 var subsetMatches = function (superset, subset) {
568 var matches = true
569 if (!superset || !Object.keys(superset).length) return true
570
571 _.forEach(subset, function (value, key) {
572 if (!superset[key] ||
573 !superset[key].contains(value)) {
574 matches = false
575 }
576 })
577 return matches
578 }
579
580 var restore = function (hash, cb) {
581 ipfs.object.get(hash, function (err, res) {
582 if (err) return cb(err)
583 var object = JSON.parse(res.Data)
584
585 if (object.type === 'Bucket') {
586 cb(null, new Bucket(object.data.elements))
587 } else if (object.type === 'Branch') {
588 cb(null, new Branch(_.map(res.Links, function (link, idx) {
589 return new Ref({ Hash: link.Hash,
590 Size: link.Size },
591 deserializeFilters(object.filters[zeropad(idx)]),
592 object.counts[zeropad(idx)])
593 })))
594 } else if (object.type === 'Finger') {
595 var linkmap = {}
596 _.forEach(res.Links, function (link) {
597 linkmap[link.Name] = link
598 })
599
600 cb(null, new Finger([ new Ref({ Hash: linkmap.head.Hash,
601 Size: linkmap.head.Size },
602 deserializeFilters(object.filters.head),
603 object.counts.head),
604 new Ref({ Hash: linkmap.rest.Hash,
605 Size: linkmap.rest.Size },
606 deserializeFilters(object.filters.rest),
607 object.counts.rest),
608 new Ref({ Hash: linkmap.tail.Hash,
609 Size: linkmap.tail.Size },
610 deserializeFilters(object.filters.tail),
611 object.counts.tail) ]))
612 }
613 })
614 }
615
616 return {
617 empty: function () {
618 return new Root()
619 },
620 restore: function (hash, cb) {
621 restore(hash, function (err, res) {
622 if (err) return cb(err)
623 cb(null, new Root(res))
624 })
625 }
626 }
627}