UNPKG

17.2 kBJavaScriptView Raw
1'use strict'
2
3var _ = require('lodash')
4var bloom = require('blomma')(1024, 1)
5var async = require('async')
6var pako = require('pako')
7
8module.exports = function (ipfs, BUCKET_SIZE) {
9
10 var Ref = function (ref, filters, count) {
11 return {
12 type: 'Ref',
13 filters: filters,
14 ref: ref,
15 count: count,
16 children: 1,
17 append: function (el, cb) {
18 restore(this.ref.Hash, function (err, restored) {
19 if (err) return cb(err)
20 restored.append(el, function (err, res) {
21 if (err) return cb(err)
22 cb(null, res)
23 })
24 })
25 },
26 offset: function (ofs) {
27 return [0, ofs]
28 },
29 getOffset: function () {
30 return 0
31 },
32 get: function (idx, filter, cb) {
33 var self = this
34 restore(self.ref.Hash, function (err, res) {
35 if (err) return cb(err)
36 cb(null, { restored: res })
37 })
38 },
39 persist: function (cb) {
40 return cb(null, this.ref)
41 }
42 }
43 }
44
45 var Bucket = function (elements) {
46 return {
47 type: 'Bucket',
48 elements: elements || [],
49 count: elements.length,
50 children: elements.length,
51 filters: elementFilters(elements),
52 append: function (el, cb) {
53 if (this.elements.length === BUCKET_SIZE) {
54 cb(null, { split: [ new Bucket(this.elements),
55 new Bucket([el]) ] })
56 } else {
57 var newelements = _.clone(this.elements)
58 newelements.push(el)
59 cb(null, { value: new Bucket(newelements) })
60 }
61 },
62 offset: function (ofs) {
63 return [ofs, 0]
64 },
65 getOffset: function (idx) {
66 return idx
67 },
68 get: function (idx, filter, cb) {
69 var el = this.elements[idx]
70 if (typeof el === 'undefined') return cb(null, { eof: true })
71
72 if (matches(el, filter.words)) {
73 return cb(null, { element: el })
74 } else {
75 return cb(null, { skip: true })
76 }
77 },
78 persist: function (cb) {
79 var self = this
80
81 if (self.persisted) return cb(null, self.persisted)
82
83 var buf = new Buffer(JSON.stringify({
84 Data: JSON.stringify({
85 type: 'Bucket',
86 data: this
87 }),
88 Links: []
89 }))
90
91 ipfs.object.put(buf, 'json', function (err, put) {
92 if (err) return cb(err)
93
94 ipfs.object.stat(put.Hash, function (err, stat) {
95 if (err) return cb(err)
96 self.persisted = { Hash: put.Hash,
97 Size: stat.CumulativeSize}
98 cb(null, self.persisted)
99 })
100 })
101 }
102 }
103 }
104
105 var Branch = function (elements) {
106 return {
107 type: 'Branch',
108 elements: elements,
109 count: _.reduce(elements, function (a, b) {
110 return a + b.count
111 }, 0),
112 children: elements.length,
113 filters: combineFilters(this.elements),
114 append: function (el, cb) {
115 if (this.elements.length === BUCKET_SIZE) {
116 cb(null, { split: [ new Branch(this.elements),
117 new Branch([el]) ]})
118 } else {
119 var newelements = _.clone(this.elements)
120 newelements.push(el)
121 cb(null, { value: new Branch(newelements) })
122 }
123 },
124 offset: function (ofs) {
125 var idx = 0
126 while (this.elements[(idx + 1)] && this.elements[idx].count <= ofs) {
127 ofs -= this.elements[idx].count
128 idx++
129 }
130 return [idx, ofs]
131 },
132 getOffset: function (idx) {
133 var count = 0
134 for (var i = 0 ; i < idx ; i++) {
135 count += this.elements[i].count
136 }
137 return count
138 },
139 get: function (idx, filter, cb) {
140 var element = this.elements[idx]
141
142 if (element) {
143 if (!subsetMatches(element.filters, filter.blooms)) {
144 cb(null, { skip: true })
145 } else {
146 cb(null, { push: element })
147 }
148 } else {
149 cb(null, { eof: true })
150 }
151 },
152 persist: function (cb) {
153 var self = this
154
155 if (self.persisted) return cb(null, self.persisted)
156
157 var filters = {}
158 var counts = {}
159 async.series(_.map(self.elements, function (element, idx) {
160 var name = zeropad(idx)
161 filters[name] = serializeFilters(self.elements[idx].filters)
162 counts[name] = self.elements[idx].count
163 return function (done) {
164 element.persist(function (err, persisted) {
165 if (err) return done(err)
166 done(null, {
167 Name: name,
168 Hash: persisted.Hash,
169 Size: persisted.Size
170 })
171 })
172 }
173 }), function (err, links) {
174 if (err) return cb(err)
175
176 var obj = {
177 Data: JSON.stringify({
178 type: self.type,
179 counts: counts,
180 filters: filters
181 }),
182 Links: links
183 }
184
185 var buf = new Buffer(JSON.stringify(obj))
186 ipfs.object.put(buf, 'json', function (err, put) {
187 if (err) return cb(err)
188 ipfs.object.stat(put.Hash, function (err, stat) {
189 if (err) return cb(err)
190 self.persisted = { Hash: put.Hash,
191 Size: stat.CumulativeSize }
192 cb(null, self.persisted)
193 })
194 })
195 })
196 }
197 }
198 }
199
200 var Finger = function (elements) {
201 return {
202 type: 'Finger',
203 elements: elements,
204 count: _.reduce(elements, function (a, b) {
205 return a + b.count
206 }, 0),
207 children: 3,
208 append: function (el, cb) {
209
210 var self = this
211 var tail = 2
212 var newelements = _.clone(self.elements)
213 elements[tail].append(el, function (err, res) {
214 if (err) return cb(err)
215 if (res.split) {
216 // push first down the middle
217 newelements[2] = res.split[1]
218 elements[1].append(res.split[0], function (err, pushres) {
219 if (err) return cb(err)
220 if (pushres.split) {
221 newelements[1] = new Finger([ pushres.split[0],
222 new Branch([]),
223 pushres.split[1] ])
224 } else {
225 newelements[1] = pushres.value
226 }
227
228 cb(null, { value: new Finger(newelements)})
229 })
230 } else {
231 newelements[2] = res.value
232 cb(null, { value: new Finger(newelements)})
233 }
234 })
235 },
236 offset: function (ofs) {
237 var idx = 0
238 while (this.elements[(idx + 1)] && this.elements[idx].count <= ofs) {
239 ofs -= this.elements[idx].count
240 idx++
241 }
242 return [idx, ofs]
243 },
244 getOffset: function (idx) {
245 var count = 0
246 for (var i = 0 ; i < idx ; i++) {
247 count += this.elements[i].count
248 }
249 return count
250 },
251 get: function (idx, filter, cb) {
252 var element = this.elements[idx]
253 if (element) {
254 if (!subsetMatches(element.filters, filter.blooms)) {
255 cb(null, { skip: true })
256 } else {
257 cb(null, { push: element })
258 }
259 } else {
260 cb(null, { eof: true })
261 }
262 },
263 persist: function (cb) {
264 var self = this
265
266 if (self.persisted) return cb(null, self.persisted)
267
268 var filters = {}
269 var counts = {}
270 var parts = ['head', 'rest', 'tail']
271 async.series(_.map(self.elements, function (element, idx) {
272 var name = parts[idx]
273 filters[name] = serializeFilters(self.elements[idx].filters)
274 counts[name] = self.elements[idx].count
275 return function (done) {
276 self.elements[idx].persist(function (err, persisted) {
277 if (err) return done(err)
278 done(null, {
279 Name: name,
280 Hash: persisted.Hash,
281 Size: persisted.Size
282 })
283 })
284 }
285 }), function (err, links) {
286 if (err) return cb(err)
287
288 var obj = {
289 Data: JSON.stringify({
290 type: 'Finger',
291 filters: filters,
292 counts: counts
293 }),
294 Links: links
295 }
296
297 var buf = new Buffer(JSON.stringify(obj))
298
299 ipfs.object.put(buf, 'json', function (err, put) {
300 if (err) return cb(err)
301 ipfs.object.stat(put.Hash, function (err, stat) {
302 if (err) return cb(err)
303 self.persisted = { Hash: put.Hash,
304 Size: stat.CumulativeSize }
305 cb(null, self.persisted)
306 })
307 })
308 })
309 }
310 }
311 }
312
313 var Root = function (ref) {
314 if (!ref) ref = new Bucket([])
315
316 return {
317 type: 'Root',
318 ref: ref,
319 count: ref.count,
320 append: function (el, cb) {
321 this.ref.append(el, function (err, res) {
322 if (err) return cb(err)
323 if (res.split) {
324 var newelements = []
325 newelements[0] = res.split[0]
326 newelements[1] = new Branch([])
327 newelements[2] = res.split[1]
328 cb(null, new Root(new Finger(newelements)))
329 } else {
330 cb(null, new Root(res.value))
331 }
332 })
333 },
334 iterator: function (opts) {
335 return new Iterator(this.ref, opts)
336 },
337 persist: function (cb) {
338 ref.persist(cb)
339 }
340 }
341 }
342
343 var Iterator = function (over, opts) {
344 if (!opts) opts = {}
345 var reverse = !!opts.reverse
346 var fullfilter = makefilter(opts.filter)
347 var def = reverse ? over.count - 1 : 0
348 var offset = (typeof opts.offset !== 'undefined' ? opts.offset : def)
349 var stack = null
350
351 return {
352 pushcount: 0,
353 next: function (cb) {
354 var self = this
355
356 // initialize stack
357 if (!stack) {
358 stackFromOffset(over, offset, function (err, newstack) {
359 if (err) return cb(err)
360 stack = newstack
361 self.next(cb)
362 })
363 return
364 }
365
366 if (!stack[0]) return cb(null, { eof: true })
367
368 stack[0].obj.get(stack[0].idx, fullfilter, function (err, res) {
369 if (err) return cb(err)
370
371 if (res.eof) {
372 stack.shift()
373 if (!stack[0]) return cb(null, { eof: true })
374 reverse ? stack[0].idx-- : stack[0].idx++
375 self.next(cb)
376 } else if (res.skip) {
377 reverse ? stack[0].idx-- : stack[0].idx++
378 self.next(cb)
379 } else if (res.push) {
380 self.pushcount++
381 stack.unshift({ obj: res.push,
382 idx: reverse ? res.push.children - 1 : 0 })
383 self.next(cb)
384 } else if (res.restored) {
385 stack[0] = { obj: res.restored,
386 idx: reverse ? res.restored.children - 1 : 0 }
387 self.next(cb)
388 } else if (typeof res.element !== 'undefined') {
389 var index = offsetFromStack(stack)
390
391 reverse ? stack[0].idx-- : stack[0].idx++
392 cb(null, {
393 element: res.element,
394 index: index
395 })
396 } else {
397 throw new Error('unhandled case, ' + JSON.stringify(res))
398 }
399 })
400 },
401 take: function (nr, cb) {
402 var self = this
403 var accum = []
404 async.forever(function (next) {
405 self.next(function (err, res) {
406 if (err) return cb(err)
407 if (res.eof) return cb(null, accum)
408 if (!nr--) return cb(null, accum)
409 accum.push(res)
410 next()
411 })
412 })
413 },
414 all: function (cb) {
415 this.take(Infinity, cb)
416 }
417 }
418 }
419
420 var offsetFromStack = function (stack) {
421 return _.reduce(stack, function (acc, n) {
422 return acc + n.obj.getOffset(n.idx)
423 }, 0)
424 }
425
426 var stackFromOffset = function (over, offset, acc, cb) {
427 if (!cb) {
428 cb = acc
429 acc = []
430 }
431
432 var idxrest = over.offset(offset)
433
434 var idx = idxrest[0]
435 var rest = idxrest[1]
436
437 acc.unshift({ obj: over,
438 idx: idx })
439
440 over.get(idx, {}, function (err, res) {
441 if (err) return cb(err)
442 if (res.restored) {
443 acc.shift()
444 stackFromOffset(res.restored, rest, acc, cb)
445 } else if (res.push) {
446 stackFromOffset(res.push, rest, acc, cb)
447 } else {
448 cb(null, acc)
449 }
450 })
451 }
452
453 var elementFilters = function (elements) {
454 var filter = {}
455 _.forEach(elements, function (element) {
456 _.forEach(element, function (value, key) {
457 if (typeof value === 'string') {
458 if (!filter[key]) filter[key] = bloom.empty()
459 _.forEach(splitWords(value), function (word) {
460 filter[key].add(word)
461 })
462 }
463 })
464 })
465 return filter
466 }
467
468 var serializeFilters = function (filters) {
469 var serialized = {}
470 _.forEach(filters, function (value, key) {
471 var compressed = new Buffer(pako.deflate(filters[key].buffer)).toString('base64')
472 serialized[key] = compressed
473 })
474 return serialized
475 }
476
477 var deserializeFilters = function (filters) {
478 var deserialized = {}
479 _.forEach(filters, function (value, key) {
480 var buffer = new Buffer(
481 pako.inflate(new Buffer(filters[key], 'base64')),
482 'base64')
483 deserialized[key] = bloom.fromBuffer(buffer)
484 })
485 return deserialized
486 }
487
488 var makefilter = function (filter) {
489 if (!filter) {
490 return {
491 words: {},
492 blooms: {}}
493 }
494
495 var blooms = {}
496
497 _.forEach(filter, function (value, key) {
498 blooms[key] = bloom.empty()
499 _.forEach(splitWords(value), function (word) {
500 blooms[key].add(word)
501 })
502 })
503
504 return {words: filter,
505 blooms: blooms}
506 }
507
508 var zeropad = function (nr) {
509 var str = ('00' + nr)
510 return str.substr(str.length - 3)
511 }
512
513 var combineFilters = function (tocombine) {
514 var filters = {}
515 _.forEach(tocombine, function (part) {
516 _.forEach(part.filters, function (value, key) {
517 if (!filters[key]) {
518 filters[key] = value
519 } else {
520 filters[key] = bloom.merge(filters[key], value)
521 }
522 })
523 })
524 return filters
525 }
526
527 var splitWords = function (string) {
528 // split into words, # and @ are concidered part
529 // of the words
530 // TODO: support non-latin alphabets
531 return string.toLowerCase().split(/[^0-9a-z\u00C0-\u00ff\u00C0-\u024f#@_-]+/)
532 }
533
534 var matches = function (element, filter) {
535 var matches = true
536 _.forEach(filter, function (value, key) {
537 var regexp = new RegExp('\\b' + value + '\\b', 'i')
538 if (typeof element[key] !== 'string' ||
539 !element[key].match(regexp)) {
540 matches = false
541 }
542 })
543 return matches
544 }
545
546 var subsetMatches = function (superset, subset) {
547 var matches = true
548 if (!superset || !Object.keys(superset).length) return true
549
550 _.forEach(subset, function (value, key) {
551 if (!superset[key] ||
552 !superset[key].contains(value)) {
553 matches = false
554 }
555 })
556 return matches
557 }
558
559 var restore = function (hash, cb) {
560 ipfs.object.get(hash, function (err, res) {
561 if (err) return cb(err)
562 var object = JSON.parse(res.Data)
563
564 if (object.type === 'Bucket') {
565 cb(null, new Bucket(object.data.elements))
566 } else if (object.type === 'Branch') {
567 cb(null, new Branch(_.map(res.Links, function (link, idx) {
568 return new Ref({ Hash: link.Hash,
569 Size: link.Size },
570 deserializeFilters(object.filters[zeropad(idx)]),
571 object.counts[zeropad(idx)])
572 })))
573 } else if (object.type === 'Finger') {
574 var linkmap = {}
575 _.forEach(res.Links, function (link) {
576 linkmap[link.Name] = link
577 })
578
579 cb(null, new Finger([ new Ref({ Hash: linkmap.head.Hash,
580 Size: linkmap.head.Size },
581 deserializeFilters(object.filters.head),
582 object.counts.head),
583 new Ref({ Hash: linkmap.rest.Hash,
584 Size: linkmap.rest.Size },
585 deserializeFilters(object.filters.rest),
586 object.counts.rest),
587 new Ref({ Hash: linkmap.tail.Hash,
588 Size: linkmap.tail.Size },
589 deserializeFilters(object.filters.tail),
590 object.counts.tail) ]))
591 }
592 })
593 }
594
595 return {
596 empty: function () {
597 return new Root()
598 },
599 restore: function (hash, cb) {
600 restore(hash, function (err, res) {
601 if (err) return cb(err)
602 cb(null, new Root(res))
603 })
604 }
605 }
606}