1 | const { pull } = require("pull-stream");
|
2 | const Scan = require('pull-scan');
|
3 | const Value = require('mutant/value')
|
4 | const Abortable = require('pull-abortable');
|
5 | const computed = require('mutant/computed')
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 | module.exports = (dataAccess) => {
|
12 |
|
13 |
|
14 | function getFilteredBackLinks(id, opts) {
|
15 | const obs = Value([]);
|
16 | const sync = Value(false);
|
17 |
|
18 | const abortable = Abortable()
|
19 | const filter = opts.filter;
|
20 |
|
21 | const state = {
|
22 | result: [],
|
23 | live: false
|
24 | }
|
25 |
|
26 | const source = pull(
|
27 | dataAccess.allGameMessages(id, true),
|
28 | abortable,
|
29 | pull.filter(msg => msg.sync || filter(msg)),
|
30 | Scan( (acc, msg) => {
|
31 |
|
32 | if (msg.sync) {
|
33 | acc.live = true;
|
34 | } else {
|
35 | acc.result.push(msg)
|
36 | }
|
37 |
|
38 | return acc;
|
39 | }, state)
|
40 | );
|
41 |
|
42 | pull(source, pull.drain(state => {
|
43 | obs.set(state.result)
|
44 | sync.set(state.live)
|
45 | })
|
46 | );
|
47 |
|
48 | const result = computed([obs], a => a, {
|
49 | onUnlisten: abortable.abort,
|
50 | })
|
51 |
|
52 | result.sync = sync;
|
53 |
|
54 | return result;
|
55 | }
|
56 |
|
57 |
|
58 | return {
|
59 | getFilteredBackLinks: getFilteredBackLinks
|
60 | }
|
61 | };
|