UNPKG

5.45 kBJavaScriptView Raw
1const PassThrough = require('stream').PassThrough;
2const urlResolve = require('url').resolve;
3const miniget = require('miniget');
4const m3u8Parser = require('./m3u8-parser');
5const DashMPDParser = require('./dash-mpd-parser');
6const Queue = require('./queue');
7const parseTime = require('./parse-time');
8
9
10/**
11 * @param {string} playlistURL
12 * @param {Object} options
13 * @return {stream.Readable}
14 */
15module.exports = (playlistURL, options) => {
16 const stream = new PassThrough();
17 options = options || {};
18 const chunkReadahead = options.chunkReadahead || 3;
19 const liveBuffer = options.liveBuffer || 20000; // 20 seconds
20 const requestOptions = options.requestOptions;
21 const Parser = {
22 'm3u8': m3u8Parser,
23 'dash-mpd': DashMPDParser,
24 }[options.parser || (/\.mpd$/.test(playlistURL) ? 'dash-mpd' : 'm3u8')];
25 if (!Parser) {
26 throw TypeError(`parser '${options.parser}' not supported`);
27 }
28 let relativeBegin = typeof options.begin === 'string';
29 let begin = relativeBegin ?
30 parseTime(options.begin) :
31 Math.max(options.begin - liveBuffer, 0) || 0;
32 let liveBegin = Date.now() - liveBuffer;
33
34 let currSegment;
35 const streamQueue = new Queue((req, callback) => {
36 currSegment = req;
37 // Count the size manually, since the `content-length` header is not
38 // always there.
39 let size = 0;
40 req.on('data', (chunk) => size += chunk.length);
41 req.pipe(stream, { end: false });
42 req.on('end', () => callback(size));
43 }, { concurrency: 1 });
44
45 let segmentNumber = 0;
46 let downloaded = 0;
47 const requestQueue = new Queue((segment, callback) => {
48 let req = miniget(urlResolve(playlistURL, segment.url), requestOptions);
49 req.on('error', callback);
50 streamQueue.push(req, (size) => {
51 downloaded += size;
52 stream.emit('progress', {
53 num: ++segmentNumber,
54 size: size,
55 url: segment.url,
56 duration: segment.duration,
57 }, requestQueue.total, downloaded);
58 callback();
59 });
60 }, { concurrency: chunkReadahead });
61
62 const onError = (err) => {
63 if (ended) { return; }
64 stream.emit('error', err);
65 // Stop on any error.
66 stream.end();
67 };
68
69 // When to look for items again.
70 let refreshThreshold;
71 let minRefreshTime;
72 let refreshTimeout;
73 let fetchingPlaylist = false;
74 let ended = false;
75 let isStatic = false;
76 let lastRefresh;
77
78 const onQueuedEnd = (err) => {
79 currSegment = null;
80 if (err) {
81 onError(err);
82 } else if (!fetchingPlaylist && !ended && !isStatic &&
83 requestQueue.tasks.length + requestQueue.active === refreshThreshold) {
84 let ms = Math.max(0, minRefreshTime - (Date.now() - lastRefresh));
85 refreshTimeout = setTimeout(refreshPlaylist, ms);
86 } else if ((ended || isStatic) &&
87 !requestQueue.tasks.length && !requestQueue.active) {
88 stream.end();
89 }
90 };
91
92 let currPlaylist;
93 let lastSeq;
94 const refreshPlaylist = () => {
95 fetchingPlaylist = true;
96 lastRefresh = Date.now();
97 currPlaylist = miniget(playlistURL, requestOptions);
98 currPlaylist.on('error', onError);
99 const parser = currPlaylist.pipe(new Parser(options.id));
100 let starttime = null;
101 parser.on('starttime', (a) => {
102 starttime = a;
103 if (relativeBegin && begin >= 0) {
104 begin += starttime;
105 }
106 });
107 parser.on('endlist', () => { isStatic = true; });
108 parser.on('endearly', () => { currPlaylist.unpipe(parser); });
109
110 let addedItems = [];
111 let liveAddedItems = [];
112 const addItem = (item, isLive) => {
113 if (item.seq <= lastSeq) { return; }
114 lastSeq = item.seq;
115 begin = item.time;
116 requestQueue.push(item, onQueuedEnd);
117 addedItems.push(item);
118 if (isLive) {
119 liveAddedItems.push(item);
120 }
121 };
122
123 let tailedItems = [], tailedItemsDuration = 0;
124 parser.on('item', (item) => {
125 item.time = starttime;
126 if (!starttime || begin <= item.time) {
127 addItem(item, liveBegin <= item.time);
128 } else {
129 tailedItems.push(item);
130 tailedItemsDuration += item.duration;
131 // Only keep the last `liveBuffer` of items.
132 while (tailedItems.length > 1 &&
133 tailedItemsDuration - tailedItems[0].duration > liveBuffer) {
134 tailedItemsDuration -= tailedItems.shift().duration;
135 }
136 }
137 starttime += item.duration;
138 });
139
140 parser.on('end', () => {
141 currPlaylist = null;
142 // If we are too ahead of the stream, make sure to get the
143 // latest available items with a small buffer.
144 if (!addedItems.length && tailedItems.length) {
145 tailedItems.forEach((item) => { addItem(item, true); });
146 }
147
148 // Refresh the playlist when remaining segments get low.
149 refreshThreshold = Math.max(1, Math.ceil(addedItems.length * 0.01));
150
151 // Throttle refreshing the playlist by looking at the duration
152 // of live items added on this refresh.
153 minRefreshTime =
154 addedItems.reduce(((total, item) => item.duration + total), 0);
155
156 fetchingPlaylist = false;
157 });
158 };
159 refreshPlaylist();
160
161 stream.end = () => {
162 ended = true;
163 streamQueue.die();
164 requestQueue.die();
165 clearTimeout(refreshTimeout);
166 if (currPlaylist) {
167 currPlaylist.unpipe();
168 currPlaylist.abort();
169 }
170 if (currSegment) {
171 currSegment.unpipe();
172 currSegment.abort();
173 }
174 PassThrough.prototype.end.call(stream);
175 };
176
177 return stream;
178};