1 | const path = require('path')
|
2 | const pull = require('pull-stream')
|
3 | const toPull = require('push-stream-to-pull-stream')
|
4 | const EBT = require('epidemic-broadcast-trees')
|
5 | const Store = require('key-value-file-store')
|
6 | const toUrlFriendly = require('base64-url').escape
|
7 | const getSeverity = require('ssb-network-errors')
|
8 | const pullDefer = require('pull-defer')
|
9 | const classicMethods = require('./formats/classic')
|
10 |
|
11 | function hook(hookable, fn) {
|
12 | if (typeof hookable === 'function' && hookable.hook) {
|
13 | hookable.hook(fn)
|
14 | }
|
15 | }
|
16 |
|
17 | exports.name = 'ebt'
|
18 |
|
19 | exports.version = '1.0.0'
|
20 |
|
21 | exports.manifest = {
|
22 | replicate: 'duplex',
|
23 | replicateFormat: 'duplex',
|
24 | request: 'sync',
|
25 | block: 'sync',
|
26 | forget: 'sync',
|
27 | peerStatus: 'sync',
|
28 | clock: 'async',
|
29 | }
|
30 |
|
31 | exports.permissions = {
|
32 | anonymous: {
|
33 | allow: ['replicate', 'replicateFormat', 'clock'],
|
34 | },
|
35 | }
|
36 |
|
37 |
|
38 |
|
39 |
|
40 | function cleanClock(clock, isFeed) {
|
41 | for (const k in clock) {
|
42 | if (!isFeed(k)) {
|
43 | delete clock[k]
|
44 | }
|
45 | }
|
46 | }
|
47 |
|
48 | function isMuxrpcMissingError(err, methodName) {
|
49 | const jsErrorMessage =
|
50 | 'method:ebt,' + methodName + ' is not in list of allowed methods'
|
51 | const goErrorMessage = 'muxrpc: no such command: ebt.' + methodName
|
52 | return err.message === jsErrorMessage || err.message === goErrorMessage
|
53 | }
|
54 |
|
55 | function isReconnectedError(err) {
|
56 | return err.message === 'reconnected to peer'
|
57 | }
|
58 |
|
59 | exports.init = function (sbot, config) {
|
60 | const ebts = []
|
61 | registerFormat(classicMethods)
|
62 |
|
63 | function registerFormat(format) {
|
64 | if (!format.name) throw new Error('format must have a name')
|
65 |
|
66 | const dirName = 'ebt' + (format.name === 'classic' ? '' : format.name)
|
67 | const dir = config.path ? path.join(config.path, dirName) : null
|
68 | const store = Store(dir, null, toUrlFriendly)
|
69 |
|
70 |
|
71 | const isFeed = format.isFeed.bind(format, sbot)
|
72 | const { isMsg, getMsgAuthor, getMsgSequence } = format
|
73 |
|
74 | const ebt = EBT({
|
75 | logging: config.ebt && config.ebt.logging,
|
76 | id: sbot.id,
|
77 | getClock(id, cb) {
|
78 | store.ensure(id, function () {
|
79 | const clock = store.get(id) || {}
|
80 | cleanClock(clock, isFeed)
|
81 | cb(null, clock)
|
82 | })
|
83 | },
|
84 | setClock(id, clock) {
|
85 | cleanClock(clock, isFeed)
|
86 | store.set(id, clock)
|
87 | },
|
88 | getAt(pair, cb) {
|
89 | format.getAtSequence(sbot, pair, cb)
|
90 | },
|
91 | append(msgVal, cb) {
|
92 | format.appendMsg(sbot, msgVal, cb)
|
93 | },
|
94 |
|
95 | isFeed,
|
96 | isMsg,
|
97 | getMsgAuthor,
|
98 | getMsgSequence,
|
99 | })
|
100 |
|
101 |
|
102 | ebt.convertMsg = format.convertMsg.bind(format, sbot)
|
103 | ebt.isReady = format.isReady.bind(format, sbot)
|
104 | ebt.isFeed = isFeed
|
105 | ebt.name = format.name
|
106 | ebt.prepareForIsFeed = format.prepareForIsFeed.bind(format, sbot)
|
107 | ebt.clearClock = store.delete.bind(store)
|
108 |
|
109 | const existingId = ebts.findIndex((e) => e.name === format.name)
|
110 | if (existingId !== -1) ebts[existingId] = ebt
|
111 | else ebts.push(ebt)
|
112 | }
|
113 |
|
114 | function getEBT(formatName) {
|
115 | const ebt = ebts.find((ebt) => ebt.name === formatName)
|
116 | if (!ebt) throw new Error('Unknown format: ' + formatName)
|
117 |
|
118 | return ebt
|
119 | }
|
120 |
|
121 | let isReady = false
|
122 | let waiting = []
|
123 | function onReady(fn) {
|
124 | if (isReady) fn()
|
125 | else waiting.push(fn)
|
126 | }
|
127 |
|
128 | sbot.getVectorClock((err, clock) => {
|
129 | if (err) console.warn('Failed to getVectorClock in ssb-ebt because:', err)
|
130 |
|
131 | const readies = ebts.map((ebt) => ebt.isReady())
|
132 | Promise.all(readies).then(() => {
|
133 | ebts.forEach((ebt) => {
|
134 | const validClock = {}
|
135 | for (const k in clock) {
|
136 | if (ebt.isFeed(k)) {
|
137 | validClock[k] = clock[k]
|
138 | }
|
139 | }
|
140 |
|
141 | ebt.state.clock = validClock
|
142 | ebt.update()
|
143 | })
|
144 |
|
145 | isReady = true
|
146 | for (let i = 0; i < waiting.length; ++i) waiting[i]()
|
147 | waiting = []
|
148 | })
|
149 | })
|
150 |
|
151 | sbot.post((msg) => {
|
152 | onReady(() => {
|
153 | ebts.forEach((ebt) => {
|
154 | if (ebt.isFeed(msg.value.author)) {
|
155 | ebt.convertMsg(msg.value, (err, converted) => {
|
156 | if (err)
|
157 | console.warn('Failed to convert msg in ssb-ebt because:', err)
|
158 | else ebt.onAppend(converted)
|
159 | })
|
160 | }
|
161 | })
|
162 | })
|
163 | })
|
164 |
|
165 |
|
166 |
|
167 | if (sbot.progress) {
|
168 | hook(sbot.progress, function (fn) {
|
169 | const _progress = fn()
|
170 | const ebt = ebts.find((ebt) => ebt.name === 'classic')
|
171 | const ebtProg = ebt.progress()
|
172 | if (ebtProg.target) _progress.ebt = ebtProg
|
173 | return _progress
|
174 | })
|
175 | }
|
176 |
|
177 | sbot.on('rpc:connect', function (rpc, isClient) {
|
178 | if (rpc.id === sbot.id) return
|
179 | if (isClient) {
|
180 | onReady(() => {
|
181 | ebts.forEach((ebt) => {
|
182 | const format = ebt.name
|
183 | const opts = { version: 3, format }
|
184 | const local = toPull.duplex(
|
185 | ebt.createStream(rpc.id, opts.version, true)
|
186 | )
|
187 |
|
188 |
|
189 |
|
190 | const methodName =
|
191 | format === 'classic' ? 'replicate' : 'replicateFormat'
|
192 |
|
193 | const remote = rpc.ebt[methodName](opts, (networkError) => {
|
194 | if (networkError && getSeverity(networkError) >= 3) {
|
195 | if (isMuxrpcMissingError(networkError, methodName)) {
|
196 | console.warn(
|
197 | `peer ${rpc.id} does not support RPC ebt.${methodName}`
|
198 | )
|
199 | } else if (isReconnectedError(networkError)) {
|
200 |
|
201 | } else {
|
202 | console.error('rpc.ebt.replicate exception:', networkError)
|
203 | }
|
204 | }
|
205 | })
|
206 | pull(local, remote, local)
|
207 | })
|
208 | })
|
209 | }
|
210 | })
|
211 |
|
212 | function findEBTForFeed(feedId, formatName) {
|
213 | let ebt
|
214 | if (formatName) {
|
215 | ebt = ebts.find((ebt) => ebt.name === formatName)
|
216 | } else {
|
217 | ebt = ebts.find((ebt) => ebt.isFeed(feedId))
|
218 | }
|
219 |
|
220 | if (!ebt) {
|
221 | ebt = ebts.find((ebt) => ebt.name === 'classic')
|
222 | }
|
223 |
|
224 | return ebt
|
225 | }
|
226 |
|
227 | function request(destFeedId, requesting, formatName) {
|
228 | onReady(() => {
|
229 | if (requesting) {
|
230 | const ebt = findEBTForFeed(destFeedId, formatName)
|
231 | ebt.prepareForIsFeed(destFeedId, () => {
|
232 | if (!ebt.isFeed(destFeedId)) return
|
233 | ebt.request(destFeedId, true)
|
234 | })
|
235 | } else {
|
236 |
|
237 | ebts.forEach((ebt) => {
|
238 | ebt.request(destFeedId, false)
|
239 | })
|
240 | }
|
241 | })
|
242 | }
|
243 |
|
244 | function forget(destFeedId) {
|
245 | onReady(() => {
|
246 | for (const ebt of ebts) {
|
247 | ebt.request(destFeedId, false)
|
248 | ebt.clearClock(destFeedId)
|
249 | delete ebt.state.clock[destFeedId]
|
250 | }
|
251 | })
|
252 | }
|
253 |
|
254 | function block(origFeedId, destFeedId, blocking, formatName) {
|
255 | onReady(() => {
|
256 | const ebt = findEBTForFeed(origFeedId, formatName)
|
257 | ebt.prepareForIsFeed(destFeedId, () => {
|
258 | if (!ebt.isFeed(origFeedId)) return
|
259 | if (!ebt.isFeed(destFeedId)) return
|
260 |
|
261 | if (blocking) {
|
262 | ebt.block(origFeedId, destFeedId, true)
|
263 | } else if (
|
264 | ebt.state.blocks[origFeedId] &&
|
265 | ebt.state.blocks[origFeedId][destFeedId]
|
266 | ) {
|
267 |
|
268 | ebt.block(origFeedId, destFeedId, false)
|
269 | }
|
270 | })
|
271 | })
|
272 | }
|
273 |
|
274 | function replicateFormat(opts) {
|
275 | if (opts.version !== 3) {
|
276 | throw new Error('expected ebt.replicate({version: 3})')
|
277 | }
|
278 |
|
279 | const formatName = opts.format || 'classic'
|
280 | const ebt = getEBT(formatName)
|
281 |
|
282 | const deferred = pullDefer.duplex()
|
283 | onReady(() => {
|
284 |
|
285 | deferred.resolve(
|
286 | toPull.duplex(ebt.createStream(this.id, opts.version, false))
|
287 | )
|
288 | })
|
289 | return deferred
|
290 | }
|
291 |
|
292 |
|
293 | function peerStatus(id) {
|
294 | id = id || sbot.id
|
295 |
|
296 | const ebt = findEBTForFeed(id)
|
297 |
|
298 | const data = {
|
299 | id: id,
|
300 | seq: ebt.state.clock[id],
|
301 | peers: {},
|
302 | }
|
303 |
|
304 | for (const k in ebt.state.peers) {
|
305 | const peer = ebt.state.peers[k]
|
306 | if (
|
307 | peer.clock[id] != null ||
|
308 | (peer.replicating && peer.replicating[id] != null)
|
309 | ) {
|
310 | const rep = peer.replicating && peer.replicating[id]
|
311 | data.peers[k] = {
|
312 | seq: peer.clock[id],
|
313 | replicating: rep,
|
314 | }
|
315 | }
|
316 | }
|
317 |
|
318 | return data
|
319 | }
|
320 |
|
321 | function clock(opts, cb) {
|
322 | if (!cb) {
|
323 | cb = opts
|
324 | opts = { format: 'classic' }
|
325 | }
|
326 |
|
327 | onReady(() => {
|
328 | const ebt = getEBT(opts.format)
|
329 | cb(null, ebt.state.clock)
|
330 | })
|
331 | }
|
332 |
|
333 | function setClockForSlicedReplication(feedId, sequence, formatName) {
|
334 | onReady(() => {
|
335 | const ebt = findEBTForFeed(feedId, formatName)
|
336 |
|
337 | ebt.state.clock[feedId] = sequence
|
338 | })
|
339 | }
|
340 |
|
341 | return {
|
342 | request,
|
343 | block,
|
344 | forget,
|
345 | replicate: replicateFormat,
|
346 | replicateFormat,
|
347 | peerStatus,
|
348 | clock,
|
349 | setClockForSlicedReplication,
|
350 | registerFormat,
|
351 | }
|
352 | }
|