1 | const PassThrough = require('stream').PassThrough;
|
2 | const urlResolve = require('url').resolve;
|
3 | const miniget = require('miniget');
|
4 | const m3u8Parser = require('./m3u8-parser');
|
5 | const DashMPDParser = require('./dash-mpd-parser');
|
6 | const Queue = require('./queue');
|
7 | const parseTime = require('./parse-time');
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 | module.exports = (playlistURL, options) => {
|
16 | const stream = new PassThrough();
|
17 | options = options || {};
|
18 | const chunkReadahead = options.chunkReadahead || 3;
|
19 | const liveBuffer = options.liveBuffer || 20000;
|
20 | const requestOptions = options.requestOptions;
|
21 | const Parser = {
|
22 | 'm3u8': m3u8Parser,
|
23 | 'dash-mpd': DashMPDParser,
|
24 | }[options.parser || (/\.mpd$/.test(playlistURL) ? 'dash-mpd' : 'm3u8')];
|
25 | if (!Parser) {
|
26 | throw TypeError(`parser '${options.parser}' not supported`);
|
27 | }
|
28 | let relativeBegin = typeof options.begin === 'string';
|
29 | let begin = relativeBegin ?
|
30 | parseTime(options.begin) :
|
31 | Math.max(options.begin - liveBuffer, 0) || 0;
|
32 | let liveBegin = Date.now() - liveBuffer;
|
33 |
|
34 | let currSegment;
|
35 | const streamQueue = new Queue((req, callback) => {
|
36 | currSegment = req;
|
37 |
|
38 |
|
39 | let size = 0;
|
40 | req.on('data', (chunk) => size += chunk.length);
|
41 | req.pipe(stream, { end: false });
|
42 | req.on('end', () => callback(size));
|
43 | }, { concurrency: 1 });
|
44 |
|
45 | let segmentNumber = 0;
|
46 | let downloaded = 0;
|
47 | const requestQueue = new Queue((segment, callback) => {
|
48 | let req = miniget(urlResolve(playlistURL, segment.url), requestOptions);
|
49 | req.on('error', callback);
|
50 | streamQueue.push(req, (size) => {
|
51 | downloaded += size;
|
52 | stream.emit('progress', {
|
53 | num: ++segmentNumber,
|
54 | size: size,
|
55 | url: segment.url,
|
56 | duration: segment.duration,
|
57 | }, requestQueue.total, downloaded);
|
58 | callback();
|
59 | });
|
60 | }, { concurrency: chunkReadahead });
|
61 |
|
62 | const onError = (err) => {
|
63 | if (ended) { return; }
|
64 | stream.emit('error', err);
|
65 |
|
66 | stream.end();
|
67 | };
|
68 |
|
69 |
|
70 | let refreshThreshold;
|
71 | let minRefreshTime;
|
72 | let refreshTimeout;
|
73 | let fetchingPlaylist = false;
|
74 | let ended = false;
|
75 | let isStatic = false;
|
76 | let lastRefresh;
|
77 |
|
78 | const onQueuedEnd = (err) => {
|
79 | currSegment = null;
|
80 | if (err) {
|
81 | onError(err);
|
82 | } else if (!fetchingPlaylist && !ended && !isStatic &&
|
83 | requestQueue.tasks.length + requestQueue.active === refreshThreshold) {
|
84 | let ms = Math.max(0, minRefreshTime - (Date.now() - lastRefresh));
|
85 | refreshTimeout = setTimeout(refreshPlaylist, ms);
|
86 | } else if ((ended || isStatic) &&
|
87 | !requestQueue.tasks.length && !requestQueue.active) {
|
88 | stream.end();
|
89 | }
|
90 | };
|
91 |
|
92 | let currPlaylist;
|
93 | let lastSeq;
|
94 | const refreshPlaylist = () => {
|
95 | fetchingPlaylist = true;
|
96 | lastRefresh = Date.now();
|
97 | currPlaylist = miniget(playlistURL, requestOptions);
|
98 | currPlaylist.on('error', onError);
|
99 | const parser = currPlaylist.pipe(new Parser(options.id));
|
100 | let starttime = null;
|
101 | parser.on('starttime', (a) => {
|
102 | starttime = a;
|
103 | if (relativeBegin && begin >= 0) {
|
104 | begin += starttime;
|
105 | }
|
106 | });
|
107 | parser.on('endlist', () => { isStatic = true; });
|
108 | parser.on('endearly', () => { currPlaylist.unpipe(parser); });
|
109 |
|
110 | let addedItems = [];
|
111 | let liveAddedItems = [];
|
112 | const addItem = (item, isLive) => {
|
113 | if (item.seq <= lastSeq) { return; }
|
114 | lastSeq = item.seq;
|
115 | begin = item.time;
|
116 | requestQueue.push(item, onQueuedEnd);
|
117 | addedItems.push(item);
|
118 | if (isLive) {
|
119 | liveAddedItems.push(item);
|
120 | }
|
121 | };
|
122 |
|
123 | let tailedItems = [], tailedItemsDuration = 0;
|
124 | parser.on('item', (item) => {
|
125 | item.time = starttime;
|
126 | if (!starttime || begin <= item.time) {
|
127 | addItem(item, liveBegin <= item.time);
|
128 | } else {
|
129 | tailedItems.push(item);
|
130 | tailedItemsDuration += item.duration;
|
131 |
|
132 | while (tailedItems.length > 1 &&
|
133 | tailedItemsDuration - tailedItems[0].duration > liveBuffer) {
|
134 | tailedItemsDuration -= tailedItems.shift().duration;
|
135 | }
|
136 | }
|
137 | starttime += item.duration;
|
138 | });
|
139 |
|
140 | parser.on('end', () => {
|
141 | currPlaylist = null;
|
142 |
|
143 |
|
144 | if (!addedItems.length && tailedItems.length) {
|
145 | tailedItems.forEach((item) => { addItem(item, true); });
|
146 | }
|
147 |
|
148 |
|
149 | refreshThreshold = Math.max(1, Math.ceil(addedItems.length * 0.01));
|
150 |
|
151 |
|
152 |
|
153 | minRefreshTime =
|
154 | addedItems.reduce(((total, item) => item.duration + total), 0);
|
155 |
|
156 | fetchingPlaylist = false;
|
157 | });
|
158 | };
|
159 | refreshPlaylist();
|
160 |
|
161 | stream.end = () => {
|
162 | ended = true;
|
163 | streamQueue.die();
|
164 | requestQueue.die();
|
165 | clearTimeout(refreshTimeout);
|
166 | if (currPlaylist) {
|
167 | currPlaylist.unpipe();
|
168 | currPlaylist.abort();
|
169 | }
|
170 | if (currSegment) {
|
171 | currSegment.unpipe();
|
172 | currSegment.abort();
|
173 | }
|
174 | PassThrough.prototype.end.call(stream);
|
175 | };
|
176 |
|
177 | return stream;
|
178 | };
|