UNPKG

47.4 kBJavaScriptView Raw
1const Url = require('url');
2const errors = require('./Errors');
3const utils = require('./utils');
4const debug = require('debug')('dweb-transports');
5const httptools = require('./httptools');
6const each = require('async/each');
7const map = require('async/map');
8const {p_namingcb, naming} = require('./Naming.js')
9
10class Transports {
11 /*
12 Handles multiple transports, API should be (almost) the same as for an individual transport)
13
14 Fields:
15 _transports List of transports loaded (internal)
16 namingcb If set will be called cb(urls) => urls to convert to urls from names.
17 _transportclasses All classes whose code is loaded e.g. {HTTP: TransportHTTP, IPFS: TransportIPFS}
18 _optionspaused Saves paused option for setup
19 */
20
21 //TODO a few of these things could be much better as events that are listened for, especially p_statuses
22
23 constructor(options) {
24 // THIS IS UNUSED - ALL METHODS ARE STATIC, THERE IS NO Transports INSTANCE
25 }
26
27 static _connected() {
28 /*
29 Get an array of transports that are connected, i.e. currently usable
30 */
31 return this._transports.filter((t) => (!t.status));
32 }
33 static p_connectedNames(cb) { //TODO rename to connectedNames
34 /*
35 resolves to: an array of the names of connected transports
36 Note this is async only because the TransportsProxy version of this has to be - that isn't currently used, so this could be made sync
37 */
38 const res = this._connected().map(t => t.name);
39 if (cb) { cb(null, res)} else { return new Promise((resolve, reject) => resolve(res))}
40 }
41 static async p_connectedNamesParm() { // Doesnt strictly need to be async, but for consistency with Proxy it has to be.
42 return (await this.p_connectedNames()).map(n => "transport="+n).join('&')
43 }
44 static statuses({connected=undefined}={}) { //TODO-API (especially add info:)
45 /*
46 Return array of statuses,
47 connected: If true then only connected transports
48 */
49 const ss = Transports._transports.map((t) => { return {name: t.name, status: t.status, info: t.info}});
50 return connected ? ss.filter(s => !s.status) : ss;
51 }
52 static p_statuses(cb) {
53 /*
54 resolves to: a dictionary of statuses of transports, e.g. { TransportHTTP: STATUS_CONNECTED }
55 */
56 const res = this.statuses({connected: false}); // No errors possible
57 if (cb) { cb(null, res)} else { return new Promise((resolve, reject) => resolve(res))}
58 }
59 static validFor(urls, func, opts) { //TODO-RELOAD check for noCache support
60 /*
61 Finds an array or Transports that can support this URL.
62
63 Excludes any transports whose status != 0 as they aren't connected
64
65 urls: Array of urls
66 func: Function to check support for: fetch, store, add, list, listmonitor, reverse - see supportFunctions on each Transport class
67 opts: Passed to each Transport, esp for supportFeatures
68 returns: Array of pairs of Url & transport instance [ [ u1, t1], [u1, t2], [u2, t1]]
69 throws: CodingError if urls empty or [undefined...]
70 */
71 if (typeof urls === "string") urls = [urls];
72 if (!((urls && urls[0]) || ["store", "newlisturls", "newdatabase", "newtable", "seed"].includes(func))) {
73 console.error("Transports.validFor called with invalid arguments: urls=", urls, "func=", func); // FOr debugging old calling patterns with [ undefined ]
74 return [];
75 }
76 if (!(urls && urls.length > 0)) { // No url supplied we are just checking which transports support this function on no url.
77 return this._transports.filter((t) => (t.validFor(undefined, func, opts)))
78 .map((t) => [undefined, t]);
79 } else {
80 return [].concat(
81 ...urls.map((url) => typeof url === 'string' ? Url.parse(url) : url) // parse URLs once
82 .map((url) =>
83 this._transports.filter((t) => (t.validFor(url, func, opts))) // [ t1, t2 ]
84 .map((t) => [url, t]))); // [[ u, t1], [u, t2]]
85 }
86 }
87 static async p_urlsValidFor(urls, func, opts) {
88 // Need a async version of this for serviceworker and TransportsProxy
89 return this.validFor(urls, func, opts).map((ut) => ut[0]);
90 }
91
92 // SEE-OTHER-ADDTRANSPORT
93
94 static http() {
95 // Find an http transport if it exists.
96 return Transports._connected().find((t) => t.name === "HTTP")
97 }
98
99 static wolk() {
100 // Find a Wolk transport if it exists.
101 return Transports._connected().find((t) => t.name === "WOLK")
102 }
103
104 static ipfs() {
105 // Find an ipfs transport if it exists, in particular, so YJS can use it.
106 return Transports._connected().find((t) => t.name === "IPFS")
107 }
108
109 static webtorrent() {
110 // Find an ipfs transport if it exists, so for example ServiceWorker.p_respondWebTorrent can use it.
111 return Transports._connected().find((t) => t.name === "WEBTORRENT")
112 }
113
114 static gun() {
115 // Find a GUN transport if it exists
116 return Transports._connected().find((t) => t.name === "GUN")
117 }
118
119 static fluence() {
120 // Find a FLUENCE transport if it exists
121 return Transports._connected().find((t) => t.name === "FLUENCE")
122 }
123
124 static async p_resolveNames(urls) {
125 /* Resolve urls that might be names, returning a modified array.
126 */
127 if (this.mirror) { // Dont do using dweb-mirror as our gateway, as always want to send URLs there.
128 return Array.isArray(urls) ? this.gatewayUrls(urls) : this.gatewayUrl(url);
129 } else if (this.namingcb) {
130 return await this.namingcb(urls); // Array of resolved urls
131 } else {
132 return urls;
133 }
134 }
135
136 static resolveNamesWith(cb) {
137 // Set a callback for p_resolveNames
138 this.namingcb = cb;
139 }
140
141 static togglePaused(name, cb) {
142 /*
143 Toggle a transport by name,
144 name e.g. "HTTP"
145 cb(err, status)
146 */
147 const transport = this._transports.find((t) => t.name === name);
148 if (!transport) {
149 cb(undefined);
150 } else {
151 transport.togglePaused(t => cb(null, t.status));
152 }
153 }
154 // Storage of data
155
156 static async _p_rawstore(tt, data) {
157 // Internal method to store at known transports
158 let errs = [];
159 let rr = await Promise.all(tt.map(async function(t) {
160 try {
161 debug("Storing %d bytes to %s", data.length, t.name);
162 let url = await t.p_rawstore(data);
163 debug("Storing %d bytes to %s succeeded: %s", data.length, t.name, url);
164 return url; //url
165 } catch(err) {
166 debug("Storing %d bytes to %s failed: %s", data.length, t.name, err.message);
167 errs.push(err);
168 return undefined;
169 }
170 }));
171 rr = rr.filter((r) => !!r); // Trim any that had errors
172 if (!rr.length) {
173 debug("Storing %d bytes failed on all transports", data.length);
174 throw new errors.TransportError(errs.map((err)=>err.message).join(', ')); // New error with concatenated messages
175 }
176 return rr;
177
178 }
179 static async p_rawstore(data) {
180 /*
181 data: Raw data to store - typically a string, but its passed on unmodified here
182 returns: Array of urls of where stored
183 throws: TransportError with message being concatenated messages of transports if NONE of them succeed.
184 */
185 let tt = this.validFor(undefined, "store").map(([u, t]) => t); // Valid connected transports that support "store"
186 if (!tt.length) {
187 debug("Storing %d bytes failed: no transports available", data.length);
188 throw new errors.TransportError('Transports.p_rawstore: Cant find transport for store');
189 }
190 return this._p_rawstore(tt, data);
191 }
192 static async p_rawfetch(urls, opts={}) {
193 /*
194 Fetch the data for a url, transports act on the data, typically storing it.
195 urls: array of urls to retrieve (any are valid)
196 opts {
197 start, integer - first byte wanted
198 end integer - last byte wanted (note this is inclusive start=0,end=1023 is 1024 bytes
199 timeoutMS integer - max time to wait on transports (IPFS) that support it
200 noCache bool - Skip caching (passed to Transports)
201 }
202 returns: string - arbitrary bytes retrieved.
203 throws: TransportError with concatenated error messages if none succeed.
204 throws: CodingError if urls empty or [undefined ... ]
205 */
206 if (!urls.length) throw new errors.TransportError("Transports.p_rawfetch given an empty list of urls");
207 let resolvedurls = await this.p_resolveNames(urls); // If naming is loaded then convert name to [urls]
208 if (!resolvedurls.length) throw new errors.TransportError("Transports.p_rawfetch none of the urls resolved: " + urls);
209 let tt = this.validFor(resolvedurls, "fetch", {noCache: opts.noCache}); //[ [Url,t],[Url,t]] throws CodingError on empty /undefined urls
210 if (!tt.length) {
211 throw new errors.TransportError("Transports.p_rawfetch cant find any transport for urls: " + resolvedurls);
212 }
213 //With multiple transports, it should return when the first one returns something.
214 let errs = [];
215 let failedtransports = []; // Will accumulate any transports fail on before the success
216 for (const [url, t] of tt) {
217 try {
218 debug("Fetching %s via %s", url.href, t.name);
219 let data = await t.p_rawfetch(url, opts); // throws errors if fails or timesout
220 debug("Fetching %s via %s succeeded %d bytes", url.href, t.name, data.length);
221 if (opts.relay && failedtransports.length) {
222 debug("Fetching attempting relay of %d bytes from %s to %o", data.length, url.href, failedtransports.map(t=>t.name));
223 this._p_rawstore(failedtransports, data)
224 .then(uu => debug(`Fetching relayed %d bytes to %o`, data.length, uu)); // Happening async, not waiting and dont care if fails
225 }
226 //END TODO-MULTI-GATEWAY
227 return data;
228 } catch (err) {
229 failedtransports.push(t);
230 errs.push(err);
231 debug("Fetching %s via %s failed: %s", url.href, t.name, err.message);
232 // Don't throw anything here, loop round for next, only throw if drop out bottom
233 //TODO-MULTI-GATEWAY potentially copy from success to failed URLs.
234 }
235 }
236 debug("Fetching %o failed on all transports", urls);
237 throw new errors.TransportError(errs.map((err)=>err.message).join(', ')); //Throw err with combined messages if none succeed
238 }
239 static fetch(urls, opts={}, cb) {
240 if (typeof opts === "function") { cb = opts; opts={}; }
241 const prom = this.p_rawfetch(urls, opts);
242 if (cb) { prom.then((res)=>{ try { cb(null,res)} catch(err) { debug("Uncaught error in fetch %O",err)}}).catch((err) => cb(err)); } else { return prom; } // Unpromisify pattern v5
243 }
244
245// Seeding =====
246 // Similar to storing.
247 static seed({directoryPath=undefined, fileRelativePath=undefined, ipfsHash=undefined, urlToFile=undefined, torrentRelativePath=undefined}, cb) {
248 /*
249 ipfsHash: When passed as a parameter, its checked against whatever IPFS calculates.
250 Its reported, but not an error if it doesn't match. (the cases are complex, for example the file might have been updated).
251 urlToFile: The URL where that file is available, this is to enable transports (e.g. IPFS) that just map an internal id to a URL.
252 directoryPath: Absolute path to the directory, for transports that think in terms of directories (e.g. WebTorrent)
253 this is the unit corresponding to a torrent, and should be where the torrent file will be found or should be built
254 fileRelativePath: Path (relative to directoryPath) to the file to be seeded.
255 torrentRelativePath: Path within directory to torrent file if present.
256 */
257 if (cb) { try { f.call(this, cb) } catch(err) { cb(err)}} else { return new Promise((resolve, reject) => { try { f.call(this, (err, res) => { if (err) {reject(err)} else {resolve(res)} })} catch(err) {reject(err)}})} // Promisify pattern v2
258 function f(cb1) {
259 let tt = this.validFor(undefined, "seed").map(([u, t]) => t); // Valid connected transports that support "seed"
260 if (!tt.length) {
261 debug("Seeding: no transports available");
262 cb1(null); // Its not (currently) an error to be unable to seed
263 } else {
264 const res = {};
265 each(tt, // [ Transport]
266 (t, cb2) => t.seed({directoryPath, torrentRelativePath, fileRelativePath, ipfsHash, urlToFile},
267 (err, oneres) => { res[t.name] = err ? { err: err.message } : oneres; cb2(null)}), // Its not an error for t.seed to fail - errors should have been logged by transports
268 (unusederr) => cb1(null, res)); // Return result of any seeds that succeeded as e.g. { HTTP: {}, IPFS: {ipfsHash:} }
269 }
270 }
271 }
272
273 // List handling ===========================================
274
275 static async p_rawlist(urls) {
276 urls = await this.p_resolveNames(urls); // If naming is loaded then convert to a name
277 let tt = this.validFor(urls, "list"); // Valid connected transports that support "store"
278 if (!tt.length) {
279 throw new errors.TransportError('Transports.p_rawlist: Cant find transport to "list" urls:'+urls.join(','));
280 }
281 let errs = [];
282 let ttlines = await Promise.all(tt.map(async function([url, t]) {
283 try {
284 debug("Listing %s via %s", url, t.name);
285 let res = await t.p_rawlist(url); // [sig]
286 debug("Listing %s via %s retrieved %d items", url, t.name, res.length);
287 return res;
288 } catch(err) {
289 debug("Listing %s via %s failed: %s", url, t.name, err.message);
290 errs.push(err);
291 return [];
292 }
293 })); // [[sig,sig],[sig,sig]]
294 if (errs.length >= tt.length) {
295 // All Transports failed (maybe only 1)
296 debug("Listing %o failed on all transports", urls);
297 throw new errors.TransportError(errs.map((err)=>err.message).join(', ')); // New error with concatenated messages
298 }
299 let uniques = {}; // Used to filter duplicates
300 return [].concat(...ttlines)
301 .filter((x) => (!uniques[x.signature] && (uniques[x.signature] = true)));
302 }
303
304 static async p_rawadd(urls, sig) {
305 /*
306 urls: of lists to add to
307 sig: Sig to add
308 returns: undefined
309 throws: TransportError with message being concatenated messages of transports if NONE of them succeed.
310 */
311 //TODO-MULTI-GATEWAY might be smarter about not waiting but Promise.race is inappropriate as returns after a failure as well.
312 urls = await this.p_resolveNames(urls); // If naming is loaded then convert to a name
313 let tt = this.validFor(urls, "add"); // Valid connected transports that support "store"
314 if (!tt.length) {
315 debug("Adding to %o failed: no transports available", urls);
316 throw new errors.TransportError('Transports.p_rawstore: Cant find transport for urls:'+urls.join(','));
317 }
318 let errs = [];
319 await Promise.all(tt.map(async function([u, t]) {
320 try {
321 debug("Adding to %s via %s", u, t.name);
322 await t.p_rawadd(u, sig); //undefined
323 debug("Adding to %s via %s succeeded", u, t.name);
324 return undefined;
325 } catch(err) {
326 debug("Adding to %s via %s failed: %s", u, t.name, err.message);
327 errs.push(err);
328 return undefined;
329 }
330 }));
331 if (errs.length >= tt.length) {
332 debug("Adding to %o failed on all transports", urls);
333 // All Transports failed (maybe only 1)
334 throw new errors.TransportError(errs.map((err)=>err.message).join(', ')); // New error with concatenated messages
335 }
336 return undefined;
337
338 }
339
340 static listmonitor(urls, cb, opts={}) {
341 /*
342 Add a listmonitor for each transport - note this means if multiple transports support it, then will get duplicate events back if everyone else is notifying all of them.
343 */
344 // Note cant do p_resolveNames since sync but should know real urls of resource by here.
345 this.validFor(urls, "listmonitor")
346 .map(([u, t]) => {
347 t.listmonitor(u, cb, opts);
348 debug("Monitoring list %s via %s", u, t.name);
349 });
350 }
351
352 static async p_newlisturls(cl) {
353 // Create a new list in any transport layer that supports lists.
354 // cl is a CommonList or subclass and can be used by the Transport to get info for choosing the list URL (normally it won't use it)
355 // Note that normally the CL will not have been stored yet, so you can't use its urls.
356 let uuu = await Promise.all(this.validFor(undefined, "newlisturls")
357 .map(([u, t]) => t.p_newlisturls(cl)) ); // [ [ priv, pub] [ priv, pub] [priv pub] ]
358 return [uuu.map(uu=>uu[0]), uuu.map(uu=>uu[1])]; // [[ priv priv priv ] [ pub pub pub ] ]
359 }
360
361 // Stream handling ===========================================
362 //myArray[Math.floor(Math.random() * myArray.length)];
363
364 static async p_f_createReadStream(urls, {wanturl=false, preferredTransports=[]}={}) { // Note options is options for selecting a stream, not the start/end in a createReadStream call
365 /*
366 urls: Url or [urls] of the stream
367 wanturl True if want the URL of the stream (for service workers)
368 returns: f(opts) => stream returning bytes from opts.start || start of file to opts.end-1 || end of file
369 */
370 // Find all the transports that CAN support this request
371 let tt = this.validFor(urls, "createReadStream", {}); //[ [Url,t],[Url,t]] // Can pass options TODO-STREAM support options in validFor
372 if (!tt.length) {
373 debug("Opening stream from %o failed: no transports available", urls);
374 throw new errors.TransportError("Transports.p_createReadStream cant find any transport for urls: " + urls);
375 }
376 //With multiple transports, it should return when the first one returns something.
377 let errs = [];
378
379 // Select first from preferredTransports in the order presented, then the rest at random
380 tt.sort((a,b) =>
381 ((preferredTransports.indexOf(a[1].name)+1) || 999+Math.random()) - ((preferredTransports.indexOf(b[1].name)+1) || 999+Math.random())
382 );
383
384 for (const [url, t] of tt) {
385 try {
386 debug("Opening stream from %s via %s", url.href, t.name);
387 let res = await t.p_f_createReadStream(url, {wanturl} );
388 debug("Opening stream from %s via %s succeeded", url.href, t.name);
389 return res;
390 } catch (err) {
391 errs.push(err);
392 debug("Opening stream from %s via %s failed: %s", url.href, t.name, err.message);
393 // Don't throw anything here, loop round for next, only throw if drop out bottom
394 //TODO-MULTI-GATEWAY potentially copy from success to failed URLs.
395 }
396 }
397 debug("Opening stream from %o failed on all transports", urls);
398 throw new errors.TransportError(errs.map((err)=>err.message).join(', ')); //Throw err with combined messages if none succeed
399 }
400 static createReadStream(urls, opts, cb) {
401 /*
402 Different interface, more suitable when just want a stream, now.
403 urls: Url or [urls] of the stream
404 opts{
405 start, end: First and last byte wanted (default to 0...last)
406 preferredTransports: preferred order to select stream transports (usually determined by application)
407 }
408 cb(err, stream): Called with open readable stream from the net.
409 Returns promise if no cb
410 */
411 if (typeof opts === "function") { cb = opts; opts = {start: 0}; } // Allow skipping opts
412 DwebTransports.p_f_createReadStream(urls, {preferredTransports: (opts.preferredTransports || [])})
413 .then(f => {
414 let s = f(opts);
415 if (cb) { cb(null, s); } else { return(s); }; // Callback or resolve stream
416 })
417 .catch(err => {
418 if (err instanceof errors.TransportError) {
419 console.warn("Transports.createReadStream caught", err.message);
420 } else {
421 console.error("Transports.createReadStream caught", err);
422 }
423 if (cb) { cb(err); } else { reject(err)}
424 });
425 };
426
427
428// KeyValue support ===========================================
429
430 static async p_get(urls, keys) {
431 /*
432 Fetch the values for a url and one or more keys, transports act on the data, typically storing it.
433 urls: array of urls to retrieve (any are valid)
434 keys: array of keys wanted or single key
435 returns: string - arbitrary bytes retrieved or dict of key: value
436 throws: TransportError with concatenated error messages if none succeed.
437 */
438 let tt = this.validFor(urls, "get"); //[ [Url,t],[Url,t]]
439 let debug1 = Array.isArray(keys) ? `${keys.length} keys` : keys; // "1 keys" or "foo"
440 if (!tt.length) {
441 debug("Getting %s from %o failed: no transports available", debug1, urls);
442 throw new errors.TransportError("Transports.p_get cant find any transport to get keys from urls: " + urls);
443 }
444 //With multiple transports, it should return when the first one returns something.
445 let errs = [];
446 for (const [url, t] of tt) {
447 try {
448 debug("Getting %s from %s via %s", debug1, url.href, t.name);
449 let res = await t.p_get(url, keys); //TODO-MULTI-GATEWAY potentially copy from success to failed URLs.
450 debug("Getting %s from %s via %s succeeded length=%d", debug1, url.href, t.name, res.length);
451 return res;
452 } catch (err) {
453 errs.push(err);
454 debug("Getting %s from %s via %s failed: %s", debug1, url.href, t.name, err.message);
455 // Don't throw anything here, loop round for next, only throw if drop out bottom
456 }
457 }
458 debug("Getting %s from %o failed on all transports", debug1, urls);
459 throw new errors.TransportError(errs.map((err)=>err.message).join(', ')); //Throw err with combined messages if none succeed
460 }
461 static async p_set(urls, keyvalues, value) {
462 /* Set a series of key/values or a single value
463 keyvalues: Either dict or a string
464 value: if kv is a string, this is the value to set
465 throws: TransportError with message being concatenated messages of transports if NONE of them succeed.
466 */
467 urls = await this.p_resolveNames(urls); // If naming is loaded then convert to a name
468 let debug1 = typeof keyvalues === "object" ? `${keyvalues.length} keys` : keyvalues; // "1 keys" or "foo"
469 let tt = this.validFor(urls, "set"); //[ [Url,t],[Url,t]]
470 if (!tt.length) {
471 debug("Setting %s on %o failed: no transports available", debug1, urls);
472 throw new errors.TransportError("Transports.p_set cant find any transport for urls: " + urls);
473 }
474 let errs = [];
475 let success = false;
476 await Promise.all(tt.map(async function([url, t]) {
477 try {
478 debug("Setting %s on %s via %s", debug1, url.href, t.name);
479 await t.p_set(url, keyvalues, value);
480 debug("Setting %s on %s via %s succeeded", debug1, url.href, t.name);
481 success = true; // Any one success will return true
482 } catch(err) {
483 debug("Setting %s on %s via %s failed: %s", debug1, url.href, t.name, err.message);
484 errs.push(err);
485 }
486 }));
487 if (!success) {
488 debug("Setting %s on %o failed on all transports", debug1, urls);
489 throw new errors.TransportError(errs.map((err)=>err.message).join(', ')); // New error with concatenated messages
490 }
491 }
492
493 static async p_delete(urls, keys) {
494 /* Delete a key or a list of keys
495 kv: Either dict or a string
496 value: if kv is a string, this is the value to set
497 throws: TransportError with message being concatenated messages of transports if NONE of them succeed.
498 */
499 urls = await this.p_resolveNames(urls); // If naming is loaded then convert to a name
500 let debug1 = Array.isArray(keys) ? `${keys.length} keys` : keys; // "1 keys" or "foo"
501 let tt = this.validFor(urls, "set"); //[ [Url,t],[Url,t]]
502 if (!tt.length) {
503 debug("Deleting %s on %o failed: no transports available", debug1, urls);
504 throw new errors.TransportError("Transports.p_set cant find any transport for urls: " + urls);
505 }
506 let errs = [];
507 let success = false;
508 await Promise.all(tt.map(async function([url, t]) {
509 try {
510 debug("Deleting %s on %s via %s", debug1, url.href, t.name);
511 await t.p_delete(url, keys);
512 debug("Deleting %s on %s via %s succeeded", debug1, url.href, t.name);
513 success = true; // Any one success will return true
514 } catch(err) {
515 debug("Deleting %s on %s via %s failed: %s", debug1, url.href, t.name, err.message);
516 errs.push(err);
517 }
518 }));
519 if (!success) {
520 debug("Deleting %s on %o failed on all transports", debug1, urls);
521 throw new errors.TransportError(errs.map((err)=>err.message).join(', ')); // New error with concatenated messages
522 }
523 }
524 static async p_keys(urls) {
525 /*
526 Fetch the values for a url and one or more keys, transports act on the data, typically storing it.
527 urls: array of urls to retrieve (any are valid)
528 keys: array of keys wanted
529 returns: string - arbitrary bytes retrieved or dict of key: value
530 throws: TransportError with concatenated error messages if none succeed.
531 */
532 urls = await this.p_resolveNames(urls); // If naming is loaded then convert to a name
533 let tt = this.validFor(urls, "keys"); //[ [Url,t],[Url,t]]
534 if (!tt.length) {
535 debug("Getting all keys on %o failed: no transports available", urls);
536 throw new errors.TransportError("Transports.p_keys cant find any transport for urls: " + urls);
537 }
538 //With multiple transports, it should return when the first one returns something. TODO make it return the aggregate
539 let errs = [];
540 for (const [url, t] of tt) {
541 try {
542 debug("Getting all keys on %s via %s", url.href, t.name);
543 let res = await t.p_keys(url); //TODO-MULTI-GATEWAY potentially copy from success to failed URLs.
544 debug("Getting all keys on %s via %s succeeded with %d keys", url.href, t.name, res.length);
545 return res;
546 } catch (err) {
547 errs.push(err);
548 debug("Getting all keys on %s via %s failed: %s", url.href, t.name, err.message);
549 // Don't throw anything here, loop round for next, only throw if drop out bottom
550 }
551 }
552 debug("Getting all keys on %o failed on all transports", urls);
553 throw new errors.TransportError(errs.map((err)=>err.message).join(', ')); //Throw err with combined messages if none succeed
554 }
555
556 static async p_getall(urls) {
557 /*
558 Fetch the values for a url and one or more keys, transports act on the data, typically storing it.
559 urls: array of urls to retrieve (any are valid)
560 keys: array of keys wanted
561 returns: array of strings returned for the keys. //TODO consider issues around return a data type rather than array of strings
562 throws: TransportError with concatenated error messages if none succeed.
563 */
564 urls = await this.p_resolveNames(urls); // If naming is loaded then convert to a name
565 let tt = this.validFor(urls, "getall"); //[ [Url,t],[Url,t]]
566 if (!tt.length) {
567 debug("Getting all values on %o failed: no transports available", urls);
568 throw new errors.TransportError("Transports.p_getall cant find any transport for urls: " + urls);
569 }
570 //With multiple transports, it should return when the first one returns something.
571 let errs = [];
572 for (const [url, t] of tt) {
573 try {
574 debug("Getting all values on %s via %s", url.href, t.name);
575 let res = await t.p_getall(url); //TODO-MULTI-GATEWAY potentially copy from success to failed URLs.
576 debug("Getting all values on %s via %s succeeded with %d values", url.href, t.name, res.length);
577 return res;
578 } catch (err) {
579 errs.push(err);
580 debug("Getting all values on %s via %s failed: %s", url.href, t.name, err.message);
581 // Don't throw anything here, loop round for next, only throw if drop out bottom
582 }
583 }
584 debug("Getting all keys on %o failed on all transports", urls);
585 throw new errors.TransportError(errs.map((err)=>err.message).join(', ')); //Throw err with combined messages if none succeed
586 }
587
588 static async p_newdatabase(pubkey) {
589 /*
590 Create a new database in any transport layer that supports databases (key value pairs).
591 pubkey: CommonList, KeyPair, or exported public key
592 resolves to: [ privateurl, publicurl]
593 */
594 let uuu = await Promise.all(this.validFor(undefined, "newdatabase")
595 .map(([u, t]) => t.p_newdatabase(pubkey)) ); // [ { privateurl, publicurl} { privateurl, publicurl} { privateurl, publicurl} ]
596 return { privateurls: uuu.map(uu=>uu.privateurl), publicurls: uuu.map(uu=>uu.publicurl) }; // { privateurls: [], publicurls: [] }
597 }
598
599 static async p_newtable(pubkey, table) {
600 /*
601 Create a new table in any transport layer that supports the function (key value pairs).
602 pubkey: CommonList, KeyPair, or exported public key
603 resolves to: [ privateurl, publicurl]
604 */
605 let uuu = await Promise.all(this.validFor(undefined, "newtable")
606 .map(([u, t]) => t.p_newtable(pubkey, table)) ); // [ [ priv, pub] [ priv, pub] [priv pub] ]
607 return { privateurls: uuu.map(uu=>uu.privateurl), publicurls: uuu.map(uu=>uu.publicurl)}; // {privateurls: [ priv priv priv ], publicurls: [ pub pub pub ] }
608 }
609
610 static async p_connection(urls) {
611 /*
612 Do any asynchronous connection opening work prior to potentially synchronous methods (like monitor)
613 */
614 urls = await this.p_resolveNames(urls); // If naming is loaded then convert to a name
615 await Promise.all(
616 this.validFor(urls, "connection")
617 .map(([u, t]) => t.p_connection(u)));
618 }
619
620 static monitor(urls, cb, {current=false}={}) {
621 /*
622 Add a listmonitor for each transport - note this means if multiple transports support it, then will get duplicate events back if everyone else is notifying all of them.
623 Stack: KVT()|KVT.p_new => KVT.monitor => (a: Transports.monitor => YJS.monitor)(b: dispatchEvent)
624 cb: function({type, key, value})
625 current: If true then then send all current entries as well
626 */
627 //Can't its async. urls = await this.p_resolveNames(urls); // If naming is loaded then convert to a name
628 this.validFor(urls, "monitor")
629 .map(([u, t]) => {
630 debug("Monitoring table %s via %s", u, t.name);
631 t.monitor(u, cb, {current})
632 }
633 );
634 }
635
636 // Setup and connection
637
638 static addtransport(t) {
639 /*
640 Add a transport to _transports,
641 */
642 Transports._transports.push(t);
643 }
644
645 // Setup Transports - setup0 is called once, and should return quickly, p_setup1 and p_setup2 are asynchronous and p_setup2 relies on p_setup1 having resolved.
646
647 static setup0(tabbrevs, options, cb) {
648 /*
649 Setup Transports for a range of classes
650 tabbrevs is abbreviation HTTP, IPFS, LOCAL or list of them e.g. "HTTP,IPFS"
651 cb is callback for when status changes, but there are no status changes here so its not called.
652 Handles "LOCAL" specially, turning into a HTTP to a local server (for debugging)
653
654 returns array of transport instances
655 */
656 // "IPFS" or "IPFS,LOCAL,HTTP"
657 let localoptions = {http: {urlbase: "http://localhost:4244"}}; //TODO-MIRROR "localoptions" may not be needed any more
658 return tabbrevs.map((tabbrev) => {
659 //TODO-SPLIT-UPNEXT remove LOCAL - not used any more
660 let transportclass = this._transportclasses[ (tabbrev === "LOCAL") ? "HTTP" : tabbrev ];
661 if (!transportclass) {
662 debug("Connection to %s unavailable", tabbrev);
663 return undefined;
664 } else {
665 debug("Setting up connection to %s with options %o", tabbrev, options);
666 return transportclass.setup0(tabbrev === "LOCAL" ? localoptions : options);
667 }
668 }).filter(f => !!f); // Trim out any undefined
669 }
670 static p_setup1(refreshstatus, cb) {
671 /* Second stage of setup, connect if possible */
672 // Does all setup1a before setup1b since 1b can rely on ones with 1a, e.g. YJS relies on IPFS
673 const prom = Promise.all(this._transports
674 .filter((t) => (! this._optionspaused.includes(t.name)))
675 .map((t) => {
676 debug("Connection stage 1 to %s", t.name);
677 return t.p_setup1(refreshstatus);
678 }))
679 if (cb) { prom.catch((err) => cb(err)).then((res)=>cb(null,res)); } else { return prom; } // This should be a standard unpromisify pattern
680 }
681 static p_setup2(refreshstatus, cb) {
682 /* Second stage of setup, connect if possible */
683 // Does all setup1a before setup1b since 1b can rely on ones with 1a, e.g. YJS relies on IPFS
684
685 const prom = Promise.all(this._transports
686 .filter((t) => (! this._optionspaused.includes(t.name)))
687 .map((t) => {
688 debug("Connection stage 2 to %s", t.name);
689 return t.p_setup2(refreshstatus);
690 }));
691 if (cb) { prom.catch((err) => cb(err)).then((res)=>cb(null,res)); } else { return prom; } // This should be a standard unpromisify pattern
692 }
693 static p_stop(refreshstatus, cb) { //TODO-API cb
694 if (cb) { try { f.call(this, cb) } catch(err) { cb(err)}} else { return new Promise((resolve, reject) => { try { f.call(this, (err, res) => { if (err) {reject(err)} else {resolve(res)} })} catch(err) {reject(err)}})} // Promisify pattern v2
695 /* Disconnect from all services, may not be able to reconnect */
696 //TODO rewrite with async/map
697 function f(cb) {
698 map(this._connected(),
699 (t, cb2) => {
700 debug("Stopping %s", t.name);
701 t.stop(refreshstatus, cb2);
702 },
703 cb);
704 }
705 }
706
707 static async refreshstatus(t) {
708 //Note "this' undefined as called as callback
709 let statusclasses = ["transportstatus0","transportstatus1","transportstatus2","transportstatus3","transportstatus4"];
710 let el = t.statuselement;
711 if (el) {
712 el.classList.remove(...statusclasses);
713 el.classList.add(statusclasses[t.status]);
714 }
715 if (Transports.statuscb) {
716 Transports.statuscb(t);
717 }
718 }
719
720 static _tabbrevs(options) {
721 // options = {transports, defaulttransports, ... }
722 // returns [ABBREVIATION] e.g. ["IPFS","HTTP"]
723 let tabbrevs = options.transports; // Array of transport abbreviations
724 if (!(tabbrevs && tabbrevs.length)) { tabbrevs = options.defaulttransports || [] }
725 // WOLK is off by default till get working script to include in browsers etc
726 // GUN is turned off by default because it fills up localstorage on browser and stops working, https://github.com/internetarchive/dweb-archive/issues/106
727 // FLUENCE is turned off by default until tested
728 if (! tabbrevs.length) { tabbrevs = ["HTTP", "IPFS", "WEBTORRENT", "HASH"]; } // SEE-OTHER-ADDTRANSPORT
729 tabbrevs = tabbrevs.map(n => n.toUpperCase());
730 return tabbrevs;
731 }
732 /**
733 * Load required javascript into an html page
734 * This is tricky - order is significant, (see dweb-archive/archive.html for a hopefully working example)
735 */
736 static loadIntoHtmlPage(options) {
737 //TODO move the scripts to dweb-gateway and dweb-mirror then point cdn there (depending on options.mirror or even options.cdn)
738 const cdnUrl = "https://cdn.jsdelivr.net/npm";
739 //const cdnUrl = "https://unpkg.com";
740 this._tabbrevs(options).forEach(t => {
741 this._transportclasses[t].scripts.map(s => {
742 debug("Loading %s %s", t, s);
743 document.write('<script src="' + (s.startsWith("http") ? s : [cdnUrl, s].join('/')) + '"><\/script>');
744 });
745 })
746 }
747
748 /**
749 * Load in the required
750 * @param {transports: [TRANSPORTNAME], ...}
751 *
752 * Each transport should have Transport.requires = STRING | [STRING] | { GLOBAL: STRING}
753 */
754 static loadIntoNode(options) {
755 this._tabbrevs(options).forEach(t => {
756 this._transportclasses[t].loadIntoNode();
757 });
758 }
759
760 static connect(options, cb) {
761 const prom = this.p_connect(options);
762 if (cb) { prom.catch((err) => cb(err)).then((res)=>cb(null,res)); } else { return prom; } // This should be a standard unpromisify pattern
763 }
764
765 static async p_connect(options) {
766 /*
767 This is a standardish starting process, feel free to subclass or replace !
768 It will connect to a set of standard transports and is intended to work inside a browser.
769 options = { defaulttransports: ["IPFS"], statuselement: el, http: {}, ipfs: {}; paused: ["IPFS"] }
770 */
771 try {
772 options = options || {};
773 this._optionspaused = (options.paused || []).map(n => n.toUpperCase()); // Array of transports paused - defaults to none, upper cased
774 let transports = this.setup0(this._tabbrevs(options), options); // synchronous
775 ["statuscb", "mirror"].forEach(k => { if (options[k]) this[k] = options[k];} )
776 //TODO move this to function and then call this from consumer
777 if (!!options.statuselement) {
778 let statuselement = options.statuselement;
779 while (statuselement.lastChild) {statuselement.removeChild(statuselement.lastChild); } // Remove any exist status
780 statuselement.appendChild(
781 utils.createElement("UL", {}, transports.map(t => {
782 let el = utils.createElement("LI",
783 {onclick: "this.source.togglePaused(DwebTransports.refreshstatus);", source: t, name: t.name}, //TODO-SW figure out how t osend this back
784 t.name);
785 t.statuselement = el; // Save status element on transport
786 return el;
787 }))
788 );
789 }
790 //TODO-SPLIT-UPNEXT invert this, use a waterfall here, and then wrap in promise for p_setup, then put load's here
791 await this.p_setup1(this.refreshstatus);
792 await this.p_setup2(this.refreshstatus);
793 debug("Connection completed to %o", this._connected().map(t=>t.name))
794 } catch(err) {
795 console.error("ERROR in p_connect:",err.message);
796 throw(err);
797 }
798 }
799
800 static async p_urlsFrom(url) {
801 /* Utility to convert to urls form wanted for Transports functions, e.g. from user input
802 url: Array of urls, or string representing url or representing array of urls
803 return: Array of strings representing url
804 */
805 if (typeof(url) === "string") {
806 if (url[0] === '[')
807 url = JSON.parse(url);
808 else if (url.includes(','))
809 url = url.split(',');
810 else
811 url = [ url ];
812 }
813 if (!Array.isArray(url)) throw new Error(`Unparsable url: ${url}`);
814 return url;
815 }
816
817 static async p_httpfetchurl(urls) {
818 /*
819 Utility to take a array of Transport urls, convert back to a single url that can be used for a fetch, typically
820 this is done when cant handle a stream, so want to give the url to the <VIDEO> tag.
821 */
822 //TODO this could be cleverer, it could ask each Transport for a http url and then use them in order of prefernece?
823 //TODO which would allow IPFS for example to return a gateway URL
824 //return Transports.http()._url(urls.find(u => (u.startsWith("contenthash") || u.startsWith("http") )), "content/rawfetch");
825 return urls.find(u => u.startsWith("http"));
826 }
827
828 static canonicalName(url, options={}) {
829 /*
830 Utility function to convert a variety of missentered, or presumed names into a canonical result that can be resolved or passed to a transport
831 returns [ protocol e.g. arc or ipfs, locally relevant address e.g. archive.org/metadata/foo or Q12345
832 */
833 if (typeof url !== "string") url = Url.parse(url).href;
834 // In patterns below http or https; and :/ or :// are treated the same
835 const gateways = ["dweb.me", "ipfs.io"]; // Known gateways, may dynamically load this at some point
836 // SEE-OTHER-ADDTRANSPORT
837 const protocols = ["ipfs","gun","magnet","yjs","wolk","arc", "contenthash", "http", "https", "fluence"];
838 const protocolsWantingDomains = ["arc", "http", "https"];
839 const gatewaypatts = [ // Must be before patts because gateway names often start with a valid proto
840 /^http[s]?:[/]+([^/]+)[/](\w+)[/](.*)/i, // https://(gateway)/proto/(internal) + gateway in list (IPFS gateways. dweb.me)
841 ]
842 const patts = [ // No overlap between patts & arcpatts, so order unimportant
843 /^dweb:[/]+(\w+)[/]+(.*)/i, // dweb://(proto)/(internal)
844 /^\w+:[/]+(\w+)[/](.*)/i, // proto1://proto2//(internal) - maybe only match if proto1=proto2 (must be before proto:/internal)
845 /^(\w+):[/]*(.*)/i, // (proto)://(internal) # must be after proto1://proto2
846 /^[/]*(\w+)[/](.*)/i, // /(proto)//(internal) - maybe only match if proto1=proto2
847 /^[/]*dweb[/]*(\w+)[/](.*)/i, // /dweb/(proto)//(internal)
848 ]
849 const arcpatts = [ // No overlap between patts & arcpatts, so order unimportant
850 /^http[s]?:[/]+[^/]+[/](archive).(org)[/]*(.*)/i, // https://localhost;123/(archive.org)/(internal)
851 /^http[s]?:[/]+[^/]+[/]arc[/](archive).(org)[/]*(.*)/i, // https://localhost;123/arc/(archive.org)/(internal)
852 /^http[s]?:[/]+dweb.(\w+)[.]([^/]+)[/]*(.*)/i, // https://dweb.(proto).(dom.ain)/(internal) # Before dweb.dom.ain
853 // /^http[s]?:[/]+dweb.([^/]+[.][^/]+[/]*.*)/i, // https://dweb.(dom.ain)/internal) or https://dweb.(domain) Handled by coe on recognizing above
854 /^(http[s])?:[/]+([^/]+)[/]+(.*)/i, // https://dom.ain/pa/th
855 ]
856
857 for (let patt of gatewaypatts) {
858 let rr = url.match(patt);
859 if (rr && gateways.includes(rr[1]) && protocols.includes(rr[2]))
860 return {proto: rr[2], internal: rr[3]};
861 }
862 for (let patt of arcpatts) {
863 let rr = url.match(patt);
864 if (rr) {
865 if (protocols.includes(rr[1])) {
866 // arc (and possibly others) want the domain as part of the internal
867 return {proto: rr[1], internal: (protocolsWantingDomains.includes(rr[1]) ? [rr[2], rr[3]].join('/') : rr[3])};
868 } else {
869 return {proto: "arc", internal: [[rr[1], rr[2]].join('.'), rr[3]].join('/')};
870 }
871 }
872 };
873 for (let patt of patts) {
874 let rr = url.match(patt);
875 if (rr && protocols.includes(rr[1]))
876 return {proto: rr[1], internal: rr[2]};
877 };
878 return undefined;
879 }
880 static canonicalUrl(url, options={}) {
881 let o = this.canonicalName(url, options);
882 return o.protocol + ":/" + o.internal;
883 }
884 static _o2url(o) {
885 return ["http","https"].includes(o.proto) ? [o.proto, o.internal].join('://') // Shouldnt be relative
886 : o.proto ? [this.mirror, o.proto, o.internal].join('/')
887 : o.internal; // Uncanonicalizable
888 }
889 static gatewayUrl(url) {
890 // Convert url to gateway url, if not canonicalizable then just pass the url along
891 let o = Transports.canonicalName(url);
892 return !o ? url : this._o2url(o)
893 }
894 static gatewayUrls(urls) { //TODO-API
895 // Convert urls to gateway urls,
896 // Easier to work on single form [ { proto, internal } ]
897 const oo = urls.map(url => Transports.canonicalName(url) || { proto: undefined, internal: url }); //if not canonicalizable then just pass the url along
898 const oArc = oo.filter(o => ["arc"].includes(o.proto)); // Prefered
899 return (oArc.length ? oArc : oo) // Prefered if have them, else others
900 .map(o=>this._o2url(o))
901 }
902}
903Transports._transports = []; // Array of transport instances connected
904Transports.naming = naming;
905Transports.namingcb = p_namingcb; // Will be defined by the naming component (turns URLs for names into URLs for transport)
906Transports._transportclasses = {}; // Pointers to classes whose code is loaded.
907Transports.httptools = httptools; // Static http tools
908exports = module.exports = Transports;