UNPKG

9.37 kBJavaScriptView Raw
1const path = require('path')
2const pull = require('pull-stream')
3const toPull = require('push-stream-to-pull-stream')
4const EBT = require('epidemic-broadcast-trees')
5const Store = require('key-value-file-store')
6const toUrlFriendly = require('base64-url').escape
7const getSeverity = require('ssb-network-errors')
8const pullDefer = require('pull-defer')
9const classicMethods = require('./formats/classic')
10
11function hook(hookable, fn) {
12 if (typeof hookable === 'function' && hookable.hook) {
13 hookable.hook(fn)
14 }
15}
16
17exports.name = 'ebt'
18
19exports.version = '1.0.0'
20
21exports.manifest = {
22 replicate: 'duplex',
23 replicateFormat: 'duplex',
24 request: 'sync',
25 block: 'sync',
26 forget: 'sync',
27 peerStatus: 'sync',
28 clock: 'async',
29}
30
31exports.permissions = {
32 anonymous: {
33 allow: ['replicate', 'replicateFormat', 'clock'],
34 },
35}
36
37// there was a bug that caused some peers
38// to request things that weren't feeds.
39// this is fixed, so just ignore anything that isn't a feed.
40function cleanClock(clock, isFeed) {
41 for (const k in clock) {
42 if (!isFeed(k)) {
43 delete clock[k]
44 }
45 }
46}
47
48function isMuxrpcMissingError(err, methodName) {
49 const jsErrorMessage =
50 'method:ebt,' + methodName + ' is not in list of allowed methods'
51 const goErrorMessage = 'muxrpc: no such command: ebt.' + methodName
52 return err.message === jsErrorMessage || err.message === goErrorMessage
53}
54
55function isReconnectedError(err) {
56 return err.message === 'reconnected to peer'
57}
58
59exports.init = function (sbot, config) {
60 const ebts = []
61 registerFormat(classicMethods)
62
63 function registerFormat(format) {
64 if (!format.name) throw new Error('format must have a name')
65
66 const dirName = 'ebt' + (format.name === 'classic' ? '' : format.name)
67 const dir = config.path ? path.join(config.path, dirName) : null
68 const store = Store(dir, null, toUrlFriendly)
69
70 // EBT expects a function of only feedId so we bind sbot here
71 const isFeed = format.isFeed.bind(format, sbot)
72 const { isMsg, getMsgAuthor, getMsgSequence } = format
73
74 const ebt = EBT({
75 logging: config.ebt && config.ebt.logging,
76 id: sbot.id,
77 getClock(id, cb) {
78 store.ensure(id, function () {
79 const clock = store.get(id) || {}
80 cleanClock(clock, isFeed)
81 cb(null, clock)
82 })
83 },
84 setClock(id, clock) {
85 cleanClock(clock, isFeed)
86 store.set(id, clock)
87 },
88 getAt(pair, cb) {
89 format.getAtSequence(sbot, pair, cb)
90 },
91 append(msgVal, cb) {
92 format.appendMsg(sbot, msgVal, cb)
93 },
94
95 isFeed,
96 isMsg,
97 getMsgAuthor,
98 getMsgSequence,
99 })
100
101 // attach a few methods we need in this module
102 ebt.convertMsg = format.convertMsg.bind(format, sbot)
103 ebt.isReady = format.isReady.bind(format, sbot)
104 ebt.isFeed = isFeed
105 ebt.name = format.name
106 ebt.prepareForIsFeed = format.prepareForIsFeed.bind(format, sbot)
107 ebt.clearClock = store.delete.bind(store)
108
109 const existingId = ebts.findIndex((e) => e.name === format.name)
110 if (existingId !== -1) ebts[existingId] = ebt
111 else ebts.push(ebt)
112 }
113
114 function getEBT(formatName) {
115 const ebt = ebts.find((ebt) => ebt.name === formatName)
116 if (!ebt) throw new Error('Unknown format: ' + formatName)
117
118 return ebt
119 }
120
121 let isReady = false
122 let waiting = []
123 function onReady(fn) {
124 if (isReady) fn()
125 else waiting.push(fn)
126 }
127
128 sbot.getVectorClock((err, clock) => {
129 if (err) console.warn('Failed to getVectorClock in ssb-ebt because:', err)
130
131 const readies = ebts.map((ebt) => ebt.isReady())
132 Promise.all(readies).then(() => {
133 ebts.forEach((ebt) => {
134 const validClock = {}
135 for (const k in clock) {
136 if (ebt.isFeed(k)) {
137 validClock[k] = clock[k]
138 }
139 }
140
141 ebt.state.clock = validClock
142 ebt.update()
143 })
144
145 isReady = true
146 for (let i = 0; i < waiting.length; ++i) waiting[i]()
147 waiting = []
148 })
149 })
150
151 sbot.post((msg) => {
152 onReady(() => {
153 ebts.forEach((ebt) => {
154 if (ebt.isFeed(msg.value.author)) {
155 ebt.convertMsg(msg.value, (err, converted) => {
156 if (err)
157 console.warn('Failed to convert msg in ssb-ebt because:', err)
158 else ebt.onAppend(converted)
159 })
160 }
161 })
162 })
163 })
164
165 // TODO: remove this when no one uses ssb-db anymore, because
166 // sbot.progress is defined in ssb-db but not in ssb-db2
167 if (sbot.progress) {
168 hook(sbot.progress, function (fn) {
169 const _progress = fn()
170 const ebt = ebts.find((ebt) => ebt.name === 'classic')
171 const ebtProg = ebt.progress()
172 if (ebtProg.target) _progress.ebt = ebtProg
173 return _progress
174 })
175 }
176
177 sbot.on('rpc:connect', function (rpc, isClient) {
178 if (rpc.id === sbot.id) return // ssb-client connecting to ssb-server
179 if (isClient) {
180 onReady(() => {
181 ebts.forEach((ebt) => {
182 const format = ebt.name
183 const opts = { version: 3, format }
184 const local = toPull.duplex(
185 ebt.createStream(rpc.id, opts.version, true)
186 )
187
188 // for backwards compatibility we always replicate classic
189 // feeds using existing replicate RPC
190 const methodName =
191 format === 'classic' ? 'replicate' : 'replicateFormat'
192
193 const remote = rpc.ebt[methodName](opts, (networkError) => {
194 if (networkError && getSeverity(networkError) >= 3) {
195 if (isMuxrpcMissingError(networkError, methodName)) {
196 console.warn(
197 `peer ${rpc.id} does not support RPC ebt.${methodName}`
198 )
199 } else if (isReconnectedError(networkError)) {
200 // Do nothing, this is a harmless error
201 } else {
202 console.error('rpc.ebt.replicate exception:', networkError)
203 }
204 }
205 })
206 pull(local, remote, local)
207 })
208 })
209 }
210 })
211
212 function findEBTForFeed(feedId, formatName) {
213 let ebt
214 if (formatName) {
215 ebt = ebts.find((ebt) => ebt.name === formatName)
216 } else {
217 ebt = ebts.find((ebt) => ebt.isFeed(feedId))
218 }
219
220 if (!ebt) {
221 ebt = ebts.find((ebt) => ebt.name === 'classic')
222 }
223
224 return ebt
225 }
226
227 function request(destFeedId, requesting, formatName) {
228 onReady(() => {
229 if (requesting) {
230 const ebt = findEBTForFeed(destFeedId, formatName)
231 ebt.prepareForIsFeed(destFeedId, () => {
232 if (!ebt.isFeed(destFeedId)) return
233 ebt.request(destFeedId, true)
234 })
235 } else {
236 // If we don't want a destFeedId, make sure it's not registered anywhere
237 ebts.forEach((ebt) => {
238 ebt.request(destFeedId, false)
239 })
240 }
241 })
242 }
243
244 function forget(destFeedId) {
245 onReady(() => {
246 for (const ebt of ebts) {
247 ebt.request(destFeedId, false)
248 ebt.clearClock(destFeedId)
249 delete ebt.state.clock[destFeedId]
250 }
251 })
252 }
253
254 function block(origFeedId, destFeedId, blocking, formatName) {
255 onReady(() => {
256 const ebt = findEBTForFeed(origFeedId, formatName)
257 ebt.prepareForIsFeed(destFeedId, () => {
258 if (!ebt.isFeed(origFeedId)) return
259 if (!ebt.isFeed(destFeedId)) return
260
261 if (blocking) {
262 ebt.block(origFeedId, destFeedId, true)
263 } else if (
264 ebt.state.blocks[origFeedId] &&
265 ebt.state.blocks[origFeedId][destFeedId]
266 ) {
267 // only update unblock if they were already blocked
268 ebt.block(origFeedId, destFeedId, false)
269 }
270 })
271 })
272 }
273
274 function replicateFormat(opts) {
275 if (opts.version !== 3) {
276 throw new Error('expected ebt.replicate({version: 3})')
277 }
278
279 const formatName = opts.format || 'classic'
280 const ebt = getEBT(formatName)
281
282 const deferred = pullDefer.duplex()
283 onReady(() => {
284 // `this` refers to the remote peer who called this muxrpc API
285 deferred.resolve(
286 toPull.duplex(ebt.createStream(this.id, opts.version, false))
287 )
288 })
289 return deferred
290 }
291
292 // get replication status for feeds for this id
293 function peerStatus(id) {
294 id = id || sbot.id
295
296 const ebt = findEBTForFeed(id)
297
298 const data = {
299 id: id,
300 seq: ebt.state.clock[id],
301 peers: {},
302 }
303
304 for (const k in ebt.state.peers) {
305 const peer = ebt.state.peers[k]
306 if (
307 peer.clock[id] != null ||
308 (peer.replicating && peer.replicating[id] != null)
309 ) {
310 const rep = peer.replicating && peer.replicating[id]
311 data.peers[k] = {
312 seq: peer.clock[id],
313 replicating: rep,
314 }
315 }
316 }
317
318 return data
319 }
320
321 function clock(opts, cb) {
322 if (!cb) {
323 cb = opts
324 opts = { format: 'classic' }
325 }
326
327 onReady(() => {
328 const ebt = getEBT(opts.format)
329 cb(null, ebt.state.clock)
330 })
331 }
332
333 function setClockForSlicedReplication(feedId, sequence, formatName) {
334 onReady(() => {
335 const ebt = findEBTForFeed(feedId, formatName)
336
337 ebt.state.clock[feedId] = sequence
338 })
339 }
340
341 return {
342 request,
343 block,
344 forget,
345 replicate: replicateFormat,
346 replicateFormat,
347 peerStatus,
348 clock,
349 setClockForSlicedReplication,
350 registerFormat,
351 }
352}