UNPKG

9.53 kBJavaScriptView Raw
1'use strict'
2function isEmpty (o) {
3 for (var k in o) return false
4 return true
5}
6
7var Notify = require('pull-notify')
8var pull = require('pull-stream')
9var isBlobId = require('ssb-ref').isBlob
10
11// var MB = 1024 * 1024
12// var MAX_SIZE = 5 * MB
13
14function clone (obj) {
15 var o = {}
16 for (var k in obj) { o[k] = obj[k] }
17 return o
18}
19
20function onAbort (abortCb) {
21 return function (read) {
22 return function (abort, cb) {
23 if (abort) abortCb(abort, cb)
24 else read(null, cb)
25 }
26 }
27}
28
29function toBlobId (id) {
30 if (Array.isArray(id)) return id// .map(toBlobId)
31 return isBlobId(id) ? id : isBlobId(id && id.id) ? id.id : null
32}
33
34function wrap (fn) {
35 return function (id, cb) {
36 if (!toBlobId(id)) {
37 cb = id
38 return fn.call(this, cb)
39 }
40 return fn.call(this, toBlobId(id), cb)
41 }
42}
43
44module.exports = function inject (blobs, set, name, opts) {
45 opts = opts || {}
46 // sympathy controls whether you'll replicate
47 var sympathy = opts.sympathy == null ? 3 : opts.sympathy | 0
48 var stingy = opts.stingy === true
49 var legacy = opts.legacy !== false
50 var pushy = opts.pushy || 3
51 var max = opts.max || 5 * 1024 * 1024
52
53 var notify = Notify()
54 var pushed = Notify()
55
56 var peers = {}
57 var want = {}; var push = {}; var waiting = {}; var getting = {}
58 var available = {}; var streams = {}
59 var send = {}; var timer
60
61 function queue (id, hops) {
62 if (hops < 0) { want[id] = hops } else { delete want[id] }
63
64 send[id] = hops
65 var _send = send
66 send = {}
67 notify(_send)
68 }
69
70 function isAvailable (id) {
71 for (var peer in peers) {
72 if (available[peer] && available[peer][id] < max && peers[peer]) { return peer }
73 }
74 }
75
76 function get (peer, id) {
77 if (getting[id] || !peers[peer]) return
78
79 getting[id] = peer
80 var source = peers[peer].blobs.get({ key: id, max: max })
81 pull(source, blobs.add(id, function (err, _id) {
82 delete getting[id]
83 if (err) {
84 if (available[peer]) delete available[peer][id]
85 // check if another peer has this.
86 // if so get it from them.
87 if (peer = isAvailable(id)) get(peer, id)
88 }
89 }))
90 }
91
92 function wants (peer, id, hops) {
93 if (Math.abs(hops) > sympathy) return // sorry!
94 if (!want[id] || want[id] < hops) {
95 want[id] = hops
96 queue(id, hops)
97 if (peer = isAvailable(id)) {
98 get(peer, id)
99 }
100 }
101 }
102
103 pull(
104 blobs.ls({ old: false, meta: true }),
105 pull.drain(function (data) {
106 queue(data.id, data.size)
107 delete want[data.id]
108 if (waiting[data.id]) {
109 while (waiting[data.id].length) { waiting[data.id].shift()(null, true) }
110 }
111 })
112 )
113
114 function has (peer_id, id, size) {
115 if (typeof peer_id !== 'string') throw new Error('peer must be string id')
116 available[peer_id] = available[peer_id] || {}
117 available[peer_id][id] = size
118 // if we are broadcasting this blob,
119 // mark this peer has it.
120 // if N peers have it, we can stop broadcasting.
121 if (push[id]) {
122 push[id][peer_id] = size
123 if (Object.keys(push[id]).length >= pushy) {
124 var data = { key: id, peers: push[id] }
125 set.remove(id)
126 delete push[id]; pushed(data)
127 }
128 }
129 if (want[id] && !getting[id] && size < max) get(peer_id, id)
130 }
131
132 function process (data, peer, cb) {
133 var n = 0; var res = {}
134 for (var id in data) {
135 (function (id) {
136 if (isBlobId(id) && Number.isInteger(data[id])) {
137 if (data[id] < 0 && (opts.stingy !== true || push[id])) { // interpret as "WANT"
138 n++
139 // check whether we already *HAVE* this file.
140 // respond with it's size, if we do.
141 blobs.size(id, function (err, size) { // XXX
142 if (size) res[id] = size
143 else wants(peer, id, data[id] - 1)
144 next()
145 })
146 } else if (data[id] > 0) { // interpret as "HAS"
147 has(peer, id, data[id])
148 }
149 }
150 }(id))
151 }
152
153 function next () {
154 if (--n) return
155 cb(null, res)
156 }
157 }
158
159 function dead (peer_id) {
160 delete peers[peer_id]
161 delete available[peer_id]
162 delete streams[peer_id]
163 }
164
165 // LEGACY LEGACY LEGACY
166 function legacySync (peer) {
167 if (!legacy) return
168
169 var drain // we need to keep a reference to drain
170 // so we can abort it when we get an error.
171 function hasLegacy (hashes) {
172 var ary = Object.keys(hashes).filter(function (k) {
173 return hashes[k] < 0
174 })
175 if (ary.length) {
176 peer.blobs.has(ary, function (err, haves) {
177 if (err) drain.abort(err) // ERROR: abort this stream.
178 else {
179 haves.forEach(function (have, i) {
180 if (have) has(peer.id, ary[i], have)
181 })
182 }
183 })
184 }
185 }
186
187 function notPeer (err) {
188 if (err) dead(peer.id)
189 }
190
191 drain = pull.drain(function (hash) {
192 has(peer.id, hash, true)
193 }, notPeer)
194
195 pull(peer.blobs.changes(), drain)
196
197 hasLegacy(want)
198
199 // a stream of hashes
200 pull(notify.listen(), pull.drain(hasLegacy, notPeer))
201 }
202 // LEGACY LEGACY LEGACY
203
204 function createWantStream (id) {
205 if (!streams[id]) {
206 streams[id] = notify.listen()
207
208 // merge in ids we are pushing.
209 var w = clone(want)
210 for (var k in push) w[k] = -1
211 streams[id].push(w)
212 }
213 return pull(streams[id], onAbort(function (err, cb) {
214 streams[id] = false
215 cb(err)
216 }))
217 }
218
219 function wantSink (peer) {
220 createWantStream(peer.id) // set streams[peer.id]
221
222 var modern = false
223 return pull.drain(
224 function (data) {
225 modern = true
226 // respond with list of blobs you already have,
227 process(data, peer.id, function (err, has_data) {
228 // (if you have any)
229 if (!isEmpty(has_data) && streams[peer.id]) streams[peer.id].push(has_data)
230 })
231 },
232 function (err) {
233 if (err && !modern) {
234 streams[peer.id] = false
235 if (legacy) legacySync(peer)
236 }
237 // if can handle unpeer another way,
238 // then can refactor legacy handling out of sight.
239
240 // handle error and fallback to legacy mode, if enabled.
241 else if (peers[peer.id] == peer) { dead(peer.id) }
242 }
243 )
244 }
245
246 var self
247 return self = {
248 // id: name,
249 has: function (id, cb) {
250 id = toBlobId(id)
251
252 if (Array.isArray(id)) {
253 for (var i = 0; i < id.length; i++) {
254 if (!isBlobId(id[i])) { return cb(new Error('invalid id:' + id[i])) }
255 }
256 } else if (!isBlobId(id)) { return cb(new Error('invalid id:' + id)) }
257
258 if (!legacy) {
259 blobs.has.call(this, id, cb)
260 } else {
261 // LEGACY LEGACY LEGACY
262 if (this === self || !this || this === global) { // a local call
263 return blobs.has.call(this, id, cb)
264 }
265 // ELSE, interpret remote calls to has as a WANT request.
266 // handling this by calling process (which calls size())
267 // avoids a race condition in the tests.
268 // (and avoids doubling the number of calls)
269 var a = Array.isArray(id) ? id : [id]
270 var o = {}
271 a.forEach(function (h) { o[h] = -1 })
272 // since this is always "has" process will never use the second arg.
273 process(o, null, function (err, res) {
274 var a = []; for (var k in o) a.push(res[k] > 0)
275 cb(null, Array.isArray(id) ? a : a[0])
276 })
277 // LEGACY LEGACY LEGACY
278 }
279 },
280 size: wrap(blobs.size),
281 get: blobs.get,
282 getSlice: blobs.getSlice,
283 add: wrap(blobs.add),
284 rm: wrap(blobs.rm),
285 ls: blobs.ls,
286 changes: function () {
287 // XXX for bandwidth sensitive peers, don't tell them about blobs we arn't trying to push.
288 return pull(
289 blobs.ls({ old: false, meta: false }),
290 pull.filter(function (id) {
291 return !stingy || push[id]
292 })
293 )
294 },
295 want: function (id, cb) {
296 id = toBlobId(id)
297 if (!isBlobId(id)) { return cb(new Error('invalid id:' + id)) }
298 // always broadcast wants immediately, because of race condition
299 // between has and adding a blob (needed to pass test/async.js)
300 if (blobs.isEmptyHash(id)) return cb(null, true)
301
302 var peerId = isAvailable(id)
303
304 if (waiting[id]) { waiting[id].push(cb) } else {
305 waiting[id] = [cb]
306 blobs.size(id, function (err, size) {
307 if (size != null) {
308 while (waiting[id].length) { waiting[id].shift()(null, true) }
309 delete waiting[id]
310 }
311 })
312 }
313
314 if (!peerId && waiting[id]) queue(id, -1)
315
316 if (peerId) return get(peerId, id)
317 },
318 push: function (id, cb) {
319 id = toBlobId(id)
320 // also store the id to push.
321 if (!isBlobId(id)) { return cb(new Error('invalid hash:' + id)) }
322
323 push[id] = push[id] || {}
324 queue(id, -1)
325 set.add(id, cb)
326 },
327 pushed: function () {
328 return pushed.listen()
329 },
330 createWants: function () {
331 return createWantStream(this.id)
332 },
333 // private api. used for testing. not exposed over rpc.
334 _wantSink: wantSink,
335 _onConnect: function (other, name) {
336 peers[other.id] = other
337 // sending of your wants starts when you we know
338 // that they are not legacy style.
339 // process is called when wantSync
340 // doesn't immediately get an error.
341 pull(other.blobs.createWants(), wantSink(other))
342 },
343 help: function () {
344 return require('./help')
345 }
346 }
347}