1 | /*
|
2 | This Transport layers builds on WebTorrent
|
3 |
|
4 | Y Lists have listeners and generate events - see docs at ...
|
5 | */
|
6 |
|
7 | // WebTorrent components
|
8 |
|
9 | //Require in consumer;
|
10 | //const WebTorrent = require('webtorrent');
|
11 | const stream = require('readable-stream');
|
12 | const Url = require('url');
|
13 | const path = require('path');
|
14 | const debug = require('debug')('dweb-transports:webtorrent');
|
15 |
|
16 | // Other Dweb modules
|
17 | const errors = require('./Errors'); // Standard Dweb Errors
|
18 | const Transport = require('./Transport.js'); // Base class for TransportXyz
|
19 | const Transports = require('./Transports'); // Manage all Transports that are loaded
|
20 |
|
21 | let defaultoptions = {
|
22 | };
|
23 |
|
24 | class TransportWEBTORRENT extends Transport {
|
25 | /*
|
26 | WebTorrent specific transport
|
27 |
|
28 | Fields:
|
29 | webtorrent: object returned when starting webtorrent
|
30 | */
|
31 |
|
32 | constructor(options) {
|
33 | super(options);
|
34 | this.webtorrent = undefined; // Undefined till start WebTorrent
|
35 | this.options = options; // Dictionary of options
|
36 | this.name = "WEBTORRENT"; // For console log etc
|
37 | this.supportURLs = ['magnet'];
|
38 | this.supportFunctions = ['fetch', 'createReadStream', "seed"];
|
39 | this.supportFeatures = ['noCache']; // Note doesnt actually support noCache, but immutable is same
|
40 | this.status = Transport.STATUS_LOADED;
|
41 | }
|
42 |
|
43 | loadIntoNode() {
|
44 | super.loadIntoNode(); // should be globally accessible at "WebTorrent", if not then assign to WebTorrent
|
45 | //Dont have opts to check
|
46 | //if (connectOpts.transports.includes('WEBTORRENT') && connectOpts.webtorrent && (connectOpts.webtorrent.tracker === "wrtc")) {
|
47 | try {
|
48 | const wrtc = "wrtc"; // Define string to avoid error in webpack when wrtc not installed
|
49 | this.wrtc = require(wrtc); // Will be undefined if not installed, used by setup0
|
50 | } catch (err) {
|
51 | debug("wrtc requested but not present"); // Allow to continue without wrtc
|
52 | }
|
53 | }
|
54 |
|
55 | p_webtorrentstart() {
|
56 | /*
|
57 | Start WebTorrent and wait until for ready.
|
58 | */
|
59 | let self = this;
|
60 | return new Promise((resolve, reject) => {
|
61 | this.webtorrent = new WebTorrent(this.options);
|
62 | this.webtorrent.once("ready", () => {
|
63 | debug("ready");
|
64 | resolve();
|
65 | });
|
66 | this.webtorrent.once("error", (err) => reject(err));
|
67 | this.webtorrent.on("warning", (err) => {
|
68 | console.warn("WebTorrent Torrent WARNING: " + err.message);
|
69 | });
|
70 | })
|
71 | }
|
72 |
|
73 | //TODO-SPLIT define load()
|
74 |
|
75 | static setup0(options) {
|
76 | /*
|
77 | First part of setup, create obj, add to Transports but dont attempt to connect, typically called instead of p_setup if want to parallelize connections.
|
78 | */
|
79 | let combinedoptions = Transport.mergeoptions(defaultoptions, options.webtorrent);
|
80 | if (combinedoptions.tracker === "wrtc") { // We want wrtc
|
81 | if (this.wrtc) { // Do we have it (loaded in loadIntoNode, currently no way in browser)
|
82 | combinedoptions.tracker = this.wrtc; // replace string "wrtc" with the code
|
83 | } else {
|
84 | delete combinedoptions.tracker; // Not available
|
85 | }
|
86 | }
|
87 | debug("setup0: options=%o", combinedoptions);
|
88 | let t = new TransportWEBTORRENT(combinedoptions);
|
89 | Transports.addtransport(t);
|
90 | return t;
|
91 | }
|
92 |
|
93 | async p_setup1(cb) {
|
94 | try {
|
95 | this.status = Transport.STATUS_STARTING;
|
96 | if (cb) cb(this);
|
97 | await this.p_webtorrentstart();
|
98 | await this.p_status();
|
99 | } catch(err) {
|
100 | console.error(this.name, "failed to connect", err.message);
|
101 | this.status = Transport.STATUS_FAILED;
|
102 | }
|
103 | if (cb) cb(this);
|
104 | return this;
|
105 | }
|
106 |
|
107 | stop(refreshstatus, cb) {
|
108 | this.webtorrent.destroy((err) => {
|
109 | this.status = Transport.STATUS_FAILED;
|
110 | if (refreshstatus) refreshstatus(this);
|
111 | if (err) {
|
112 | debug("Webtorrent error during stopping %o", err);
|
113 | } else {
|
114 | debug("Webtorrent stopped");
|
115 | }
|
116 | cb(err, this);
|
117 | });
|
118 | }
|
119 |
|
120 | async p_status() {
|
121 | /*
|
122 | Return a string for the status of a transport. No particular format, but keep it short as it will probably be in a small area of the screen.
|
123 | */
|
124 | if (this.webtorrent && this.webtorrent.ready) {
|
125 | this.status = Transport.STATUS_CONNECTED;
|
126 | } else if (this.webtorrent) {
|
127 | this.status = Transport.STATUS_STARTING;
|
128 | } else {
|
129 | this.status = Transport.STATUS_FAILED;
|
130 | }
|
131 | return super.p_status();
|
132 | }
|
133 |
|
134 | webtorrentparseurl(url) {
|
135 | /* Parse a URL
|
136 | url: URL as string or already parsed into Url - should start magnet: or in future might support dweb:/magnet/; some other formats might be supported
|
137 | returns: torrentid, path
|
138 | */
|
139 | if (!url) {
|
140 | throw new errors.CodingError("TransportWEBTORRENT.p_rawfetch: requires url");
|
141 | }
|
142 |
|
143 | const urlstring = (typeof url === "string" ? url : url.href);
|
144 | const index = urlstring.indexOf('/');
|
145 |
|
146 | if (index === -1) {
|
147 | throw new errors.CodingError("TransportWEBTORRENT.p_rawfetch: invalid url - missing path component. Should look like magnet:xyzabc/path/to/file");
|
148 | }
|
149 |
|
150 | const torrentId = urlstring.slice(0, index);
|
151 | const path = urlstring.slice(index + 1);
|
152 |
|
153 | return { torrentId, path }
|
154 | }
|
155 |
|
156 | async p_webtorrentadd(torrentId, opts) {
|
157 | return new Promise((resolve, reject) => {
|
158 | // Check if this torrentId is already added to the webtorrent client
|
159 | let torrent = this.webtorrent.get(torrentId);
|
160 |
|
161 | // If not, then add the torrentId to the torrent client
|
162 | if (!torrent) {
|
163 | // This can be added in to rewrite a known torrent for example to test a different tracker.
|
164 | //let testid = "magnet:?xt=urn:btih:ELHVM7F4VEOTZQFDHCX7OZXUXKINUIPJ&tr=http%3A%2F%2Fbt1.archive.org%3A6969%2Fannounce&tr=http%3A%2F%2Fbt2.archive.org%3A6969%2Fannounce&tr=wss%3A%2F%2Ftracker.btorrent.xyz&tr=wss%3A%2F%2Ftracker.openwebtorrent.com&tr=wss%3A%2F%2Ftracker.fastcast.nz&ws=https%3A%2F%2Fdweb.me%2Farc%2Farchive.org%2Fdownload%2F&xs=https%3A%2F%2Fdweb.me%2Farc%2Farchive.org%2Ftorrent%2Fcommute";
|
165 | //let testidnewtrack = "magnet:?xt=urn:btih:ELHVM7F4VEOTZQFDHCX7OZXUXKINUIPJ&tr=http%3A%2F%2Fbt1.archive.org%3A6969%2Fannounce&tr=http%3A%2F%2Fbt2.archive.org%3A6969%2Fannounce&tr=wss%3A%2F%2Fdweb.archive.org:6969&ws=https%3A%2F%2Fdweb.me%2Farc%2Farchive.org%2Fdownload%2F&xs=https%3A%2F%2Fdweb.me%2Farc%2Farchive.org%2Ftorrent%2Fcommute";
|
166 | //if (torrentId === testid) torrentId = testidnewtrack;
|
167 | torrent = this.webtorrent.add(torrentId, opts);
|
168 |
|
169 | torrent.once("error", (err) => {
|
170 | reject(new errors.TransportError("Torrent encountered a fatal error " + err.message));
|
171 | });
|
172 |
|
173 | torrent.on("warning", (err) => {
|
174 | console.warn("WebTorrent Torrent WARNING: " + err.message + " (" + torrent.name + ")");
|
175 | });
|
176 | }
|
177 |
|
178 | if (torrent.ready) {
|
179 | resolve(torrent);
|
180 | } else {
|
181 | torrent.once("ready", () => {
|
182 | resolve(torrent);
|
183 | });
|
184 | }
|
185 | });
|
186 | }
|
187 |
|
188 | webtorrentfindfile (torrent, path) {
|
189 | /*
|
190 | Given a torrent object and a path to a file within the torrent, find the given file.
|
191 | */
|
192 | const filePath = torrent.name + '/' + path;
|
193 | const file = torrent.files.find(file => {
|
194 | return file.path === filePath;
|
195 | });
|
196 | if (!file) {
|
197 | //debugger;
|
198 | throw new errors.TransportError("Requested file (" + path + ") not found within torrent ");
|
199 | }
|
200 | return file;
|
201 | }
|
202 |
|
203 | p_rawfetch(url) {
|
204 | /*
|
205 | Fetch some bytes based on a url of the form:
|
206 |
|
207 | magnet:xyzabc/path/to/file
|
208 |
|
209 | (Where xyzabc is the typical magnet uri contents)
|
210 |
|
211 | No assumption is made about the data in terms of size or structure. Returns a new Promise that resolves to a buffer.
|
212 |
|
213 | :param string url: URL of object being retrieved
|
214 | :resolve buffer: Return the object being fetched.
|
215 | :throws: TransportError if url invalid - note this happens immediately, not as a catch in the promise
|
216 | */
|
217 | return new Promise((resolve, reject) => {
|
218 | // Logged by Transports
|
219 | const { torrentId, path } = this.webtorrentparseurl(url);
|
220 | this.p_webtorrentadd(torrentId)
|
221 | .then((torrent) => {
|
222 | torrent.deselect(0, torrent.pieces.length - 1, false); // Dont download entire torrent as will pull just the file we want (warning - may give problems if multiple reads from same webtorrent)
|
223 | const file = this.webtorrentfindfile(torrent, path);
|
224 | file.getBuffer((err, buffer) => {
|
225 | if (err) {
|
226 | return reject(new errors.TransportError("Torrent encountered a fatal error " + err.message + " (" + torrent.name + ")"));
|
227 | }
|
228 | resolve(buffer);
|
229 | });
|
230 | })
|
231 | .catch((err) => reject(err));
|
232 | });
|
233 | }
|
234 |
|
235 | seed({fileRelativePath, directoryPath, torrentRelativePath }, cb) {
|
236 | /* Add a file to webTorrent - this will be called each time a file is cached and adds the torrent to WT handling so its seeding this (and other) files in directory */
|
237 | if (!torrentRelativePath) { // If no torrentfile available then just skip WebTorrent, MirrorFS will often seed the file (eg to IPFS) while its fetching the torrent and then seed that.
|
238 | cb(null);
|
239 | } else {
|
240 | const torrentfile = path.join(directoryPath, torrentRelativePath);
|
241 | this.p_addTorrentFromTorrentFile(torrentfile, directoryPath)
|
242 | .then(res => { debug("Added %s/%s to webtorrent", directoryPath, fileRelativePath); cb(null)})
|
243 | .catch(err => {
|
244 | if (err.message.includes("Cannot add duplicate torrent")) { // Ignore silently if already added
|
245 | cb(null);
|
246 | } else {
|
247 | debug("addWebTorrent failed %s/%s", directoryPath, fileRelativePath); cb(err);
|
248 | }
|
249 | });
|
250 | }
|
251 | }
|
252 |
|
253 | async _p_fileTorrentFromUrl(url) {
|
254 | /*
|
255 | Then open a webtorrent for the file specified in the path part of the url
|
256 | url: of form magnet:... or magnet/:...
|
257 | return: Web Torrent file
|
258 | */
|
259 | try {
|
260 | const {torrentId, path} = this.webtorrentparseurl(url);
|
261 | const torrent = await this.p_webtorrentadd(torrentId);
|
262 | torrent.deselect(0, torrent.pieces.length - 1, false); // Dont download entire torrent as will pull just the file we want (warning - may give problems if multiple reads from same webtorrent)
|
263 | const file = this.webtorrentfindfile(torrent, path);
|
264 | if (typeof window !== "undefined") { // Check running in browser
|
265 | window.WEBTORRENT_TORRENT = torrent;
|
266 | window.WEBTORRENT_FILE = file;
|
267 | torrent.once('close', () => {
|
268 | window.WEBTORRENT_TORRENT = null;
|
269 | window.WEBTORRENT_FILE = null;
|
270 | })
|
271 | }
|
272 | return file
|
273 | } catch(err) {
|
274 | // Logged by Transports
|
275 | throw(err);
|
276 | }
|
277 |
|
278 | }
|
279 |
|
280 | async p_addTorrentFromTorrentFile(torrentFilePath, filesPath) {
|
281 | // TODO-API: doc
|
282 | try {
|
283 | const opts = { path: filesPath };
|
284 | const oldTorrent = this.webtorrent.get(torrentFilePath);
|
285 | if (oldTorrent) {
|
286 | oldTorrent.rescanFiles();
|
287 | } else {
|
288 | const torrent = await this.p_webtorrentadd(torrentFilePath, opts);
|
289 | torrent.deselect(0, torrent.pieces.length - 1, false); // Dont download entire torrent as will pull just the file we want (warning - may give problems if multiple reads from same webtorrent)
|
290 | }
|
291 | } catch(err) {
|
292 | // Logged by Transports
|
293 | throw(err);
|
294 | }
|
295 |
|
296 | }
|
297 |
|
298 | async p_f_createReadStream(url, {wanturl=false}={}) {
|
299 | /*
|
300 | Fetch bytes progressively, using a node.js readable stream, based on a url of the form:
|
301 | No assumption is made about the data in terms of size or structure.
|
302 |
|
303 | This is the initialisation step, which returns a function suitable for <VIDEO>
|
304 |
|
305 | Returns a new Promise that resolves to function for a node.js readable stream.
|
306 |
|
307 | Node.js readable stream docs: https://nodejs.org/api/stream.html#stream_readable_streams
|
308 |
|
309 | :param string url: URL of object being retrieved of form magnet:xyzabc/path/to/file (Where xyzabc is the typical magnet uri contents)
|
310 | :param boolean wanturl True if want the URL of the stream (for service workers)
|
311 | :resolves to: f({start, end}) => stream (The readable stream.)
|
312 | :throws: TransportError if url invalid - note this happens immediately, not as a catch in the promise
|
313 | */
|
314 | // Logged by Transports
|
315 | try {
|
316 | let filet = await this._p_fileTorrentFromUrl(url);
|
317 | let self = this;
|
318 | if (wanturl) {
|
319 | return url;
|
320 | } else {
|
321 | return function (opts) { return self.createReadStream(filet, opts); };
|
322 | }
|
323 | } catch(err) {
|
324 | // Logged by Transports
|
325 | throw(err);
|
326 | }
|
327 | }
|
328 |
|
329 | createReadStream(file, opts) {
|
330 | /*
|
331 | The function, encapsulated and inside another function by p_f_createReadStream (see docs)
|
332 |
|
333 | :param file: Webtorrent "file" as returned by webtorrentfindfile
|
334 | :param opts: { start: byte to start from; end: optional end byte }
|
335 | :returns stream: The readable stream.
|
336 | */
|
337 | debug("reading from stream %s %o", file.name, opts);
|
338 | let through;
|
339 | try {
|
340 | through = new stream.PassThrough();
|
341 | const fileStream = file.createReadStream(opts);
|
342 | fileStream.pipe(through);
|
343 | return through;
|
344 | } catch(err) {
|
345 | debug("createReadStream error %s", err);
|
346 | if (typeof through.destroy === 'function')
|
347 | through.destroy(err);
|
348 | else through.emit('error', err)
|
349 | }
|
350 | }
|
351 |
|
352 | async p_createReadableStream(url, opts) {
|
353 | //Return a readable stream (suitable for a HTTP response) from a node type stream from webtorrent.
|
354 | // This is used by dweb-serviceworker for WebTorrent only
|
355 | let filet = await this._p_fileTorrentFromUrl(url);
|
356 | return new ReadableStream({
|
357 | start (controller) {
|
358 | debug("start %s %o", url, opts);
|
359 | // Create a webtorrent file stream
|
360 | const filestream = filet.createReadStream(opts);
|
361 | // When data comes out of webtorrent node.js style stream, put it into the WHATWG stream
|
362 | filestream.on('data', value => {
|
363 | controller.enqueue(value)
|
364 | });
|
365 | filestream.on('end', () => {
|
366 | controller.close()
|
367 | })
|
368 | },
|
369 | cancel (reason) {
|
370 | throw new errors.TransportError(`cancelled ${url}, ${opts} ${reason}`);
|
371 | }
|
372 | });
|
373 | }
|
374 |
|
375 |
|
376 |
|
377 | static async p_test(opts) {
|
378 | try {
|
379 | let transport = await this.p_setup(opts); // Assumes IPFS already setup
|
380 | console.log(transport.name, "p_test setup", opts, "complete");
|
381 | let res = await transport.p_status();
|
382 | console.assert(res === Transport.STATUS_CONNECTED);
|
383 |
|
384 | // Creative commons torrent, copied from https://webtorrent.io/free-torrents
|
385 | let bigBuckBunny = 'magnet:?xt=urn:btih:dd8255ecdc7ca55fb0bbf81323d87062db1f6d1c&dn=Big+Buck+Bunny&tr=udp%3A%2F%2Fexplodie.org%3A6969&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Ftracker.empire-js.us%3A1337&tr=udp%3A%2F%2Ftracker.leechers-paradise.org%3A6969&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337&tr=wss%3A%2F%2Ftracker.btorrent.xyz&tr=wss%3A%2F%2Ftracker.fastcast.nz&tr=wss%3A%2F%2Ftracker.openwebtorrent.com&ws=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2F&xs=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2Fbig-buck-bunny.torrent/Big Buck Bunny.en.srt';
|
386 |
|
387 | let data1 = await transport.p_rawfetch(bigBuckBunny);
|
388 | data1 = data1.toString();
|
389 | assertData(data1);
|
390 |
|
391 | const stream = await transport.createReadStream(bigBuckBunny);
|
392 |
|
393 | const chunks = [];
|
394 | stream.on("data", (chunk) => {
|
395 | chunks.push(chunk);
|
396 | });
|
397 | stream.on("end", () => {
|
398 | const data2 = Buffer.concat(chunks).toString();
|
399 | assertData(data2);
|
400 | });
|
401 |
|
402 | function assertData(data) {
|
403 | // Test for a string that is contained within the file
|
404 | let expectedWithinData = "00:00:02,000 --> 00:00:05,000";
|
405 |
|
406 | console.assert(data.indexOf(expectedWithinData) !== -1, "Should fetch 'Big Buck Bunny.en.srt' from the torrent");
|
407 |
|
408 | // Test that the length is what we expect
|
409 | console.assert(data.length, 129, "'Big Buck Bunny.en.srt' was " + data.length);
|
410 | }
|
411 | } catch (err) {
|
412 | console.log("Exception thrown in", transport.name, "p_test():", err.message);
|
413 | throw err;
|
414 | }
|
415 | }
|
416 |
|
417 | }
|
418 | Transports._transportclasses["WEBTORRENT"] = TransportWEBTORRENT;
|
419 | TransportWEBTORRENT.scripts = ['webtorrent@latest/webtorrent.min.js'];
|
420 | TransportWEBTORRENT.requires = ['webtorrent']; // Note wrtc loaded in loadIntoNode above
|
421 |
|
422 | exports = module.exports = TransportWEBTORRENT;
|