1 | 'use strict'
|
2 | function isEmpty (o) {
|
3 | for (var k in o) return false
|
4 | return true
|
5 | }
|
6 |
|
7 | var Notify = require('pull-notify')
|
8 | var pull = require('pull-stream')
|
9 | var isBlobId = require('ssb-ref').isBlob
|
10 |
|
11 |
|
12 |
|
13 |
|
14 | function clone (obj) {
|
15 | var o = {}
|
16 | for (var k in obj) { o[k] = obj[k] }
|
17 | return o
|
18 | }
|
19 |
|
20 | function onAbort (abortCb) {
|
21 | return function (read) {
|
22 | return function (abort, cb) {
|
23 | if (abort) abortCb(abort, cb)
|
24 | else read(null, cb)
|
25 | }
|
26 | }
|
27 | }
|
28 |
|
29 | function toBlobId (id) {
|
30 | if (Array.isArray(id)) return id
|
31 | return isBlobId(id) ? id : isBlobId(id && id.id) ? id.id : null
|
32 | }
|
33 |
|
34 | function wrap (fn) {
|
35 | return function (id, cb) {
|
36 | if (!toBlobId(id)) {
|
37 | cb = id
|
38 | return fn.call(this, cb)
|
39 | }
|
40 | return fn.call(this, toBlobId(id), cb)
|
41 | }
|
42 | }
|
43 |
|
44 | module.exports = function inject (blobs, set, name, opts) {
|
45 | opts = opts || {}
|
46 |
|
47 | var sympathy = opts.sympathy == null ? 3 : opts.sympathy | 0
|
48 | var stingy = opts.stingy === true
|
49 | var legacy = opts.legacy !== false
|
50 | var pushy = opts.pushy || 3
|
51 | var max = opts.max || 5 * 1024 * 1024
|
52 |
|
53 | var notify = Notify()
|
54 | var pushed = Notify()
|
55 |
|
56 | var peers = {}
|
57 | var want = {}; var push = {}; var waiting = {}; var getting = {}
|
58 | var available = {}; var streams = {}
|
59 | var send = {}; var timer
|
60 |
|
61 | function queue (id, hops) {
|
62 | if (hops < 0) { want[id] = hops } else { delete want[id] }
|
63 |
|
64 | send[id] = hops
|
65 | var _send = send
|
66 | send = {}
|
67 | notify(_send)
|
68 | }
|
69 |
|
70 | function isAvailable (id) {
|
71 | for (var peer in peers) {
|
72 | if (available[peer] && available[peer][id] < max && peers[peer]) { return peer }
|
73 | }
|
74 | }
|
75 |
|
76 | function get (peer, id) {
|
77 | if (getting[id] || !peers[peer]) return
|
78 |
|
79 | getting[id] = peer
|
80 | var source = peers[peer].blobs.get({ key: id, max: max })
|
81 | pull(source, blobs.add(id, function (err, _id) {
|
82 | delete getting[id]
|
83 | if (err) {
|
84 | if (available[peer]) delete available[peer][id]
|
85 |
|
86 |
|
87 | if (peer = isAvailable(id)) get(peer, id)
|
88 | }
|
89 | }))
|
90 | }
|
91 |
|
92 | function wants (peer, id, hops) {
|
93 | if (Math.abs(hops) > sympathy) return
|
94 | if (!want[id] || want[id] < hops) {
|
95 | want[id] = hops
|
96 | queue(id, hops)
|
97 | if (peer = isAvailable(id)) {
|
98 | get(peer, id)
|
99 | }
|
100 | }
|
101 | }
|
102 |
|
103 | pull(
|
104 | blobs.ls({ old: false, meta: true }),
|
105 | pull.drain(function (data) {
|
106 | queue(data.id, data.size)
|
107 | delete want[data.id]
|
108 | if (waiting[data.id]) {
|
109 | while (waiting[data.id].length) { waiting[data.id].shift()(null, true) }
|
110 | }
|
111 | })
|
112 | )
|
113 |
|
114 | function has (peer_id, id, size) {
|
115 | if (typeof peer_id !== 'string') throw new Error('peer must be string id')
|
116 | available[peer_id] = available[peer_id] || {}
|
117 | available[peer_id][id] = size
|
118 |
|
119 |
|
120 |
|
121 | if (push[id]) {
|
122 | push[id][peer_id] = size
|
123 | if (Object.keys(push[id]).length >= pushy) {
|
124 | var data = { key: id, peers: push[id] }
|
125 | set.remove(id)
|
126 | delete push[id]; pushed(data)
|
127 | }
|
128 | }
|
129 | if (want[id] && !getting[id] && size < max) get(peer_id, id)
|
130 | }
|
131 |
|
132 | function process (data, peer, cb) {
|
133 | var n = 0; var res = {}
|
134 | for (var id in data) {
|
135 | (function (id) {
|
136 | if (isBlobId(id) && Number.isInteger(data[id])) {
|
137 | if (data[id] < 0 && (opts.stingy !== true || push[id])) {
|
138 | n++
|
139 |
|
140 |
|
141 | blobs.size(id, function (err, size) {
|
142 | if (size) res[id] = size
|
143 | else wants(peer, id, data[id] - 1)
|
144 | next()
|
145 | })
|
146 | } else if (data[id] > 0) {
|
147 | has(peer, id, data[id])
|
148 | }
|
149 | }
|
150 | }(id))
|
151 | }
|
152 |
|
153 | function next () {
|
154 | if (--n) return
|
155 | cb(null, res)
|
156 | }
|
157 | }
|
158 |
|
159 | function dead (peer_id) {
|
160 | delete peers[peer_id]
|
161 | delete available[peer_id]
|
162 | delete streams[peer_id]
|
163 | }
|
164 |
|
165 |
|
166 | function legacySync (peer) {
|
167 | if (!legacy) return
|
168 |
|
169 | var drain
|
170 |
|
171 | function hasLegacy (hashes) {
|
172 | var ary = Object.keys(hashes).filter(function (k) {
|
173 | return hashes[k] < 0
|
174 | })
|
175 | if (ary.length) {
|
176 | peer.blobs.has(ary, function (err, haves) {
|
177 | if (err) drain.abort(err)
|
178 | else {
|
179 | haves.forEach(function (have, i) {
|
180 | if (have) has(peer.id, ary[i], have)
|
181 | })
|
182 | }
|
183 | })
|
184 | }
|
185 | }
|
186 |
|
187 | function notPeer (err) {
|
188 | if (err) dead(peer.id)
|
189 | }
|
190 |
|
191 | drain = pull.drain(function (hash) {
|
192 | has(peer.id, hash, true)
|
193 | }, notPeer)
|
194 |
|
195 | pull(peer.blobs.changes(), drain)
|
196 |
|
197 | hasLegacy(want)
|
198 |
|
199 |
|
200 | pull(notify.listen(), pull.drain(hasLegacy, notPeer))
|
201 | }
|
202 |
|
203 |
|
204 | function createWantStream (id) {
|
205 | if (!streams[id]) {
|
206 | streams[id] = notify.listen()
|
207 |
|
208 |
|
209 | var w = clone(want)
|
210 | for (var k in push) w[k] = -1
|
211 | streams[id].push(w)
|
212 | }
|
213 | return pull(streams[id], onAbort(function (err, cb) {
|
214 | streams[id] = false
|
215 | cb(err)
|
216 | }))
|
217 | }
|
218 |
|
219 | function wantSink (peer) {
|
220 | createWantStream(peer.id)
|
221 |
|
222 | var modern = false
|
223 | return pull.drain(
|
224 | function (data) {
|
225 | modern = true
|
226 |
|
227 | process(data, peer.id, function (err, has_data) {
|
228 |
|
229 | if (!isEmpty(has_data) && streams[peer.id]) streams[peer.id].push(has_data)
|
230 | })
|
231 | },
|
232 | function (err) {
|
233 | if (err && !modern) {
|
234 | streams[peer.id] = false
|
235 | if (legacy) legacySync(peer)
|
236 | }
|
237 |
|
238 |
|
239 |
|
240 |
|
241 | else if (peers[peer.id] == peer) { dead(peer.id) }
|
242 | }
|
243 | )
|
244 | }
|
245 |
|
246 | var self
|
247 | return self = {
|
248 |
|
249 | has: function (id, cb) {
|
250 | id = toBlobId(id)
|
251 |
|
252 | if (Array.isArray(id)) {
|
253 | for (var i = 0; i < id.length; i++) {
|
254 | if (!isBlobId(id[i])) { return cb(new Error('invalid id:' + id[i])) }
|
255 | }
|
256 | } else if (!isBlobId(id)) { return cb(new Error('invalid id:' + id)) }
|
257 |
|
258 | if (!legacy) {
|
259 | blobs.has.call(this, id, cb)
|
260 | } else {
|
261 |
|
262 | if (this === self || !this || this === global) {
|
263 | return blobs.has.call(this, id, cb)
|
264 | }
|
265 |
|
266 |
|
267 |
|
268 |
|
269 | var a = Array.isArray(id) ? id : [id]
|
270 | var o = {}
|
271 | a.forEach(function (h) { o[h] = -1 })
|
272 |
|
273 | process(o, null, function (err, res) {
|
274 | var a = []; for (var k in o) a.push(res[k] > 0)
|
275 | cb(null, Array.isArray(id) ? a : a[0])
|
276 | })
|
277 |
|
278 | }
|
279 | },
|
280 | size: wrap(blobs.size),
|
281 | get: blobs.get,
|
282 | getSlice: blobs.getSlice,
|
283 | add: wrap(blobs.add),
|
284 | rm: wrap(blobs.rm),
|
285 | ls: blobs.ls,
|
286 | changes: function () {
|
287 |
|
288 | return pull(
|
289 | blobs.ls({ old: false, meta: false }),
|
290 | pull.filter(function (id) {
|
291 | return !stingy || push[id]
|
292 | })
|
293 | )
|
294 | },
|
295 | want: function (id, cb) {
|
296 | id = toBlobId(id)
|
297 | if (!isBlobId(id)) { return cb(new Error('invalid id:' + id)) }
|
298 |
|
299 |
|
300 | if (blobs.isEmptyHash(id)) return cb(null, true)
|
301 |
|
302 | var peerId = isAvailable(id)
|
303 |
|
304 | if (waiting[id]) { waiting[id].push(cb) } else {
|
305 | waiting[id] = [cb]
|
306 | blobs.size(id, function (err, size) {
|
307 | if (size != null) {
|
308 | while (waiting[id].length) { waiting[id].shift()(null, true) }
|
309 | delete waiting[id]
|
310 | }
|
311 | })
|
312 | }
|
313 |
|
314 | if (!peerId && waiting[id]) queue(id, -1)
|
315 |
|
316 | if (peerId) return get(peerId, id)
|
317 | },
|
318 | push: function (id, cb) {
|
319 | id = toBlobId(id)
|
320 |
|
321 | if (!isBlobId(id)) { return cb(new Error('invalid hash:' + id)) }
|
322 |
|
323 | push[id] = push[id] || {}
|
324 | queue(id, -1)
|
325 | set.add(id, cb)
|
326 | },
|
327 | pushed: function () {
|
328 | return pushed.listen()
|
329 | },
|
330 | createWants: function () {
|
331 | return createWantStream(this.id)
|
332 | },
|
333 |
|
334 | _wantSink: wantSink,
|
335 | _onConnect: function (other, name) {
|
336 | peers[other.id] = other
|
337 |
|
338 |
|
339 |
|
340 |
|
341 | pull(other.blobs.createWants(), wantSink(other))
|
342 | },
|
343 | help: function () {
|
344 | return require('./help')
|
345 | }
|
346 | }
|
347 | }
|