UNPKG

18.1 kBJavaScriptView Raw
1/*
2This Transport layers builds on WebTorrent
3
4Y Lists have listeners and generate events - see docs at ...
5*/
6
7// WebTorrent components
8
9//Require in consumer;
10//const WebTorrent = require('webtorrent');
11const stream = require('readable-stream');
12const Url = require('url');
13const path = require('path');
14const debug = require('debug')('dweb-transports:webtorrent');
15
16// Other Dweb modules
17const errors = require('./Errors'); // Standard Dweb Errors
18const Transport = require('./Transport.js'); // Base class for TransportXyz
19const Transports = require('./Transports'); // Manage all Transports that are loaded
20
21let defaultoptions = {
22};
23
24class TransportWEBTORRENT extends Transport {
25 /*
26 WebTorrent specific transport
27
28 Fields:
29 webtorrent: object returned when starting webtorrent
30 */
31
32 constructor(options) {
33 super(options);
34 this.webtorrent = undefined; // Undefined till start WebTorrent
35 this.options = options; // Dictionary of options
36 this.name = "WEBTORRENT"; // For console log etc
37 this.supportURLs = ['magnet'];
38 this.supportFunctions = ['fetch', 'createReadStream', "seed"];
39 this.supportFeatures = ['noCache']; // Note doesnt actually support noCache, but immutable is same
40 this.status = Transport.STATUS_LOADED;
41 }
42
43 loadIntoNode() {
44 super.loadIntoNode(); // should be globally accessible at "WebTorrent", if not then assign to WebTorrent
45 //Dont have opts to check
46 //if (connectOpts.transports.includes('WEBTORRENT') && connectOpts.webtorrent && (connectOpts.webtorrent.tracker === "wrtc")) {
47 try {
48 const wrtc = "wrtc"; // Define string to avoid error in webpack when wrtc not installed
49 this.wrtc = require(wrtc); // Will be undefined if not installed, used by setup0
50 } catch (err) {
51 debug("wrtc requested but not present"); // Allow to continue without wrtc
52 }
53 }
54
55 p_webtorrentstart() {
56 /*
57 Start WebTorrent and wait until for ready.
58 */
59 let self = this;
60 return new Promise((resolve, reject) => {
61 this.webtorrent = new WebTorrent(this.options);
62 this.webtorrent.once("ready", () => {
63 debug("ready");
64 resolve();
65 });
66 this.webtorrent.once("error", (err) => reject(err));
67 this.webtorrent.on("warning", (err) => {
68 console.warn("WebTorrent Torrent WARNING: " + err.message);
69 });
70 })
71 }
72
73 //TODO-SPLIT define load()
74
75 static setup0(options) {
76 /*
77 First part of setup, create obj, add to Transports but dont attempt to connect, typically called instead of p_setup if want to parallelize connections.
78 */
79 let combinedoptions = Transport.mergeoptions(defaultoptions, options.webtorrent);
80 if (combinedoptions.tracker === "wrtc") { // We want wrtc
81 if (this.wrtc) { // Do we have it (loaded in loadIntoNode, currently no way in browser)
82 combinedoptions.tracker = this.wrtc; // replace string "wrtc" with the code
83 } else {
84 delete combinedoptions.tracker; // Not available
85 }
86 }
87 debug("setup0: options=%o", combinedoptions);
88 let t = new TransportWEBTORRENT(combinedoptions);
89 Transports.addtransport(t);
90 return t;
91 }
92
93 async p_setup1(cb) {
94 try {
95 this.status = Transport.STATUS_STARTING;
96 if (cb) cb(this);
97 await this.p_webtorrentstart();
98 await this.p_status();
99 } catch(err) {
100 console.error(this.name, "failed to connect", err.message);
101 this.status = Transport.STATUS_FAILED;
102 }
103 if (cb) cb(this);
104 return this;
105 }
106
107 stop(refreshstatus, cb) {
108 this.webtorrent.destroy((err) => {
109 this.status = Transport.STATUS_FAILED;
110 if (refreshstatus) refreshstatus(this);
111 if (err) {
112 debug("Webtorrent error during stopping %o", err);
113 } else {
114 debug("Webtorrent stopped");
115 }
116 cb(err, this);
117 });
118 }
119
120 async p_status() {
121 /*
122 Return a string for the status of a transport. No particular format, but keep it short as it will probably be in a small area of the screen.
123 */
124 if (this.webtorrent && this.webtorrent.ready) {
125 this.status = Transport.STATUS_CONNECTED;
126 } else if (this.webtorrent) {
127 this.status = Transport.STATUS_STARTING;
128 } else {
129 this.status = Transport.STATUS_FAILED;
130 }
131 return super.p_status();
132 }
133
134 webtorrentparseurl(url) {
135 /* Parse a URL
136 url: URL as string or already parsed into Url - should start magnet: or in future might support dweb:/magnet/; some other formats might be supported
137 returns: torrentid, path
138 */
139 if (!url) {
140 throw new errors.CodingError("TransportWEBTORRENT.p_rawfetch: requires url");
141 }
142
143 const urlstring = (typeof url === "string" ? url : url.href);
144 const index = urlstring.indexOf('/');
145
146 if (index === -1) {
147 throw new errors.CodingError("TransportWEBTORRENT.p_rawfetch: invalid url - missing path component. Should look like magnet:xyzabc/path/to/file");
148 }
149
150 const torrentId = urlstring.slice(0, index);
151 const path = urlstring.slice(index + 1);
152
153 return { torrentId, path }
154 }
155
156 async p_webtorrentadd(torrentId, opts) {
157 return new Promise((resolve, reject) => {
158 // Check if this torrentId is already added to the webtorrent client
159 let torrent = this.webtorrent.get(torrentId);
160
161 // If not, then add the torrentId to the torrent client
162 if (!torrent) {
163 // This can be added in to rewrite a known torrent for example to test a different tracker.
164 //let testid = "magnet:?xt=urn:btih:ELHVM7F4VEOTZQFDHCX7OZXUXKINUIPJ&tr=http%3A%2F%2Fbt1.archive.org%3A6969%2Fannounce&tr=http%3A%2F%2Fbt2.archive.org%3A6969%2Fannounce&tr=wss%3A%2F%2Ftracker.btorrent.xyz&tr=wss%3A%2F%2Ftracker.openwebtorrent.com&tr=wss%3A%2F%2Ftracker.fastcast.nz&ws=https%3A%2F%2Fdweb.me%2Farc%2Farchive.org%2Fdownload%2F&xs=https%3A%2F%2Fdweb.me%2Farc%2Farchive.org%2Ftorrent%2Fcommute";
165 //let testidnewtrack = "magnet:?xt=urn:btih:ELHVM7F4VEOTZQFDHCX7OZXUXKINUIPJ&tr=http%3A%2F%2Fbt1.archive.org%3A6969%2Fannounce&tr=http%3A%2F%2Fbt2.archive.org%3A6969%2Fannounce&tr=wss%3A%2F%2Fdweb.archive.org:6969&ws=https%3A%2F%2Fdweb.me%2Farc%2Farchive.org%2Fdownload%2F&xs=https%3A%2F%2Fdweb.me%2Farc%2Farchive.org%2Ftorrent%2Fcommute";
166 //if (torrentId === testid) torrentId = testidnewtrack;
167 torrent = this.webtorrent.add(torrentId, opts);
168
169 torrent.once("error", (err) => {
170 reject(new errors.TransportError("Torrent encountered a fatal error " + err.message));
171 });
172
173 torrent.on("warning", (err) => {
174 console.warn("WebTorrent Torrent WARNING: " + err.message + " (" + torrent.name + ")");
175 });
176 }
177
178 if (torrent.ready) {
179 resolve(torrent);
180 } else {
181 torrent.once("ready", () => {
182 resolve(torrent);
183 });
184 }
185 });
186 }
187
188 webtorrentfindfile (torrent, path) {
189 /*
190 Given a torrent object and a path to a file within the torrent, find the given file.
191 */
192 const filePath = torrent.name + '/' + path;
193 const file = torrent.files.find(file => {
194 return file.path === filePath;
195 });
196 if (!file) {
197 //debugger;
198 throw new errors.TransportError("Requested file (" + path + ") not found within torrent ");
199 }
200 return file;
201 }
202
203 p_rawfetch(url) {
204 /*
205 Fetch some bytes based on a url of the form:
206
207 magnet:xyzabc/path/to/file
208
209 (Where xyzabc is the typical magnet uri contents)
210
211 No assumption is made about the data in terms of size or structure. Returns a new Promise that resolves to a buffer.
212
213 :param string url: URL of object being retrieved
214 :resolve buffer: Return the object being fetched.
215 :throws: TransportError if url invalid - note this happens immediately, not as a catch in the promise
216 */
217 return new Promise((resolve, reject) => {
218 // Logged by Transports
219 const { torrentId, path } = this.webtorrentparseurl(url);
220 this.p_webtorrentadd(torrentId)
221 .then((torrent) => {
222 torrent.deselect(0, torrent.pieces.length - 1, false); // Dont download entire torrent as will pull just the file we want (warning - may give problems if multiple reads from same webtorrent)
223 const file = this.webtorrentfindfile(torrent, path);
224 file.getBuffer((err, buffer) => {
225 if (err) {
226 return reject(new errors.TransportError("Torrent encountered a fatal error " + err.message + " (" + torrent.name + ")"));
227 }
228 resolve(buffer);
229 });
230 })
231 .catch((err) => reject(err));
232 });
233 }
234
235 seed({fileRelativePath, directoryPath, torrentRelativePath }, cb) {
236 /* Add a file to webTorrent - this will be called each time a file is cached and adds the torrent to WT handling so its seeding this (and other) files in directory */
237 if (!torrentRelativePath) { // If no torrentfile available then just skip WebTorrent, MirrorFS will often seed the file (eg to IPFS) while its fetching the torrent and then seed that.
238 cb(null);
239 } else {
240 const torrentfile = path.join(directoryPath, torrentRelativePath);
241 this.p_addTorrentFromTorrentFile(torrentfile, directoryPath)
242 .then(res => { debug("Added %s/%s to webtorrent", directoryPath, fileRelativePath); cb(null)})
243 .catch(err => {
244 if (err.message.includes("Cannot add duplicate torrent")) { // Ignore silently if already added
245 cb(null);
246 } else {
247 debug("addWebTorrent failed %s/%s", directoryPath, fileRelativePath); cb(err);
248 }
249 });
250 }
251 }
252
253 async _p_fileTorrentFromUrl(url) {
254 /*
255 Then open a webtorrent for the file specified in the path part of the url
256 url: of form magnet:... or magnet/:...
257 return: Web Torrent file
258 */
259 try {
260 const {torrentId, path} = this.webtorrentparseurl(url);
261 const torrent = await this.p_webtorrentadd(torrentId);
262 torrent.deselect(0, torrent.pieces.length - 1, false); // Dont download entire torrent as will pull just the file we want (warning - may give problems if multiple reads from same webtorrent)
263 const file = this.webtorrentfindfile(torrent, path);
264 if (typeof window !== "undefined") { // Check running in browser
265 window.WEBTORRENT_TORRENT = torrent;
266 window.WEBTORRENT_FILE = file;
267 torrent.once('close', () => {
268 window.WEBTORRENT_TORRENT = null;
269 window.WEBTORRENT_FILE = null;
270 })
271 }
272 return file
273 } catch(err) {
274 // Logged by Transports
275 throw(err);
276 }
277
278 }
279
280 async p_addTorrentFromTorrentFile(torrentFilePath, filesPath) {
281 // TODO-API: doc
282 try {
283 const opts = { path: filesPath };
284 const oldTorrent = this.webtorrent.get(torrentFilePath);
285 if (oldTorrent) {
286 oldTorrent.rescanFiles();
287 } else {
288 const torrent = await this.p_webtorrentadd(torrentFilePath, opts);
289 torrent.deselect(0, torrent.pieces.length - 1, false); // Dont download entire torrent as will pull just the file we want (warning - may give problems if multiple reads from same webtorrent)
290 }
291 } catch(err) {
292 // Logged by Transports
293 throw(err);
294 }
295
296 }
297
298 async p_f_createReadStream(url, {wanturl=false}={}) {
299 /*
300 Fetch bytes progressively, using a node.js readable stream, based on a url of the form:
301 No assumption is made about the data in terms of size or structure.
302
303 This is the initialisation step, which returns a function suitable for <VIDEO>
304
305 Returns a new Promise that resolves to function for a node.js readable stream.
306
307 Node.js readable stream docs: https://nodejs.org/api/stream.html#stream_readable_streams
308
309 :param string url: URL of object being retrieved of form magnet:xyzabc/path/to/file (Where xyzabc is the typical magnet uri contents)
310 :param boolean wanturl True if want the URL of the stream (for service workers)
311 :resolves to: f({start, end}) => stream (The readable stream.)
312 :throws: TransportError if url invalid - note this happens immediately, not as a catch in the promise
313 */
314 // Logged by Transports
315 try {
316 let filet = await this._p_fileTorrentFromUrl(url);
317 let self = this;
318 if (wanturl) {
319 return url;
320 } else {
321 return function (opts) { return self.createReadStream(filet, opts); };
322 }
323 } catch(err) {
324 // Logged by Transports
325 throw(err);
326 }
327 }
328
329 createReadStream(file, opts) {
330 /*
331 The function, encapsulated and inside another function by p_f_createReadStream (see docs)
332
333 :param file: Webtorrent "file" as returned by webtorrentfindfile
334 :param opts: { start: byte to start from; end: optional end byte }
335 :returns stream: The readable stream.
336 */
337 debug("reading from stream %s %o", file.name, opts);
338 let through;
339 try {
340 through = new stream.PassThrough();
341 const fileStream = file.createReadStream(opts);
342 fileStream.pipe(through);
343 return through;
344 } catch(err) {
345 debug("createReadStream error %s", err);
346 if (typeof through.destroy === 'function')
347 through.destroy(err);
348 else through.emit('error', err)
349 }
350 }
351
352 async p_createReadableStream(url, opts) {
353 //Return a readable stream (suitable for a HTTP response) from a node type stream from webtorrent.
354 // This is used by dweb-serviceworker for WebTorrent only
355 let filet = await this._p_fileTorrentFromUrl(url);
356 return new ReadableStream({
357 start (controller) {
358 debug("start %s %o", url, opts);
359 // Create a webtorrent file stream
360 const filestream = filet.createReadStream(opts);
361 // When data comes out of webtorrent node.js style stream, put it into the WHATWG stream
362 filestream.on('data', value => {
363 controller.enqueue(value)
364 });
365 filestream.on('end', () => {
366 controller.close()
367 })
368 },
369 cancel (reason) {
370 throw new errors.TransportError(`cancelled ${url}, ${opts} ${reason}`);
371 }
372 });
373 }
374
375
376
377 static async p_test(opts) {
378 try {
379 let transport = await this.p_setup(opts); // Assumes IPFS already setup
380 console.log(transport.name, "p_test setup", opts, "complete");
381 let res = await transport.p_status();
382 console.assert(res === Transport.STATUS_CONNECTED);
383
384 // Creative commons torrent, copied from https://webtorrent.io/free-torrents
385 let bigBuckBunny = 'magnet:?xt=urn:btih:dd8255ecdc7ca55fb0bbf81323d87062db1f6d1c&dn=Big+Buck+Bunny&tr=udp%3A%2F%2Fexplodie.org%3A6969&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Ftracker.empire-js.us%3A1337&tr=udp%3A%2F%2Ftracker.leechers-paradise.org%3A6969&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337&tr=wss%3A%2F%2Ftracker.btorrent.xyz&tr=wss%3A%2F%2Ftracker.fastcast.nz&tr=wss%3A%2F%2Ftracker.openwebtorrent.com&ws=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2F&xs=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2Fbig-buck-bunny.torrent/Big Buck Bunny.en.srt';
386
387 let data1 = await transport.p_rawfetch(bigBuckBunny);
388 data1 = data1.toString();
389 assertData(data1);
390
391 const stream = await transport.createReadStream(bigBuckBunny);
392
393 const chunks = [];
394 stream.on("data", (chunk) => {
395 chunks.push(chunk);
396 });
397 stream.on("end", () => {
398 const data2 = Buffer.concat(chunks).toString();
399 assertData(data2);
400 });
401
402 function assertData(data) {
403 // Test for a string that is contained within the file
404 let expectedWithinData = "00:00:02,000 --> 00:00:05,000";
405
406 console.assert(data.indexOf(expectedWithinData) !== -1, "Should fetch 'Big Buck Bunny.en.srt' from the torrent");
407
408 // Test that the length is what we expect
409 console.assert(data.length, 129, "'Big Buck Bunny.en.srt' was " + data.length);
410 }
411 } catch (err) {
412 console.log("Exception thrown in", transport.name, "p_test():", err.message);
413 throw err;
414 }
415 }
416
417}
418Transports._transportclasses["WEBTORRENT"] = TransportWEBTORRENT;
419TransportWEBTORRENT.scripts = ['webtorrent@latest/webtorrent.min.js'];
420TransportWEBTORRENT.requires = ['webtorrent']; // Note wrtc loaded in loadIntoNode above
421
422exports = module.exports = TransportWEBTORRENT;