UNPKG

6.66 kBJavaScriptView Raw
1"use strict";
2var __importDefault = (this && this.__importDefault) || function (mod) {
3 return (mod && mod.__esModule) ? mod : { "default": mod };
4};
5const stream_1 = require("stream");
6const url_1 = require("url");
7const miniget_1 = __importDefault(require("miniget"));
8const m3u8_parser_1 = __importDefault(require("./m3u8-parser"));
9const dash_mpd_parser_1 = __importDefault(require("./dash-mpd-parser"));
10const queue_1 = __importDefault(require("./queue"));
11const parse_time_1 = require("./parse-time");
12const supportedParsers = {
13 'm3u8': m3u8_parser_1.default,
14 'dash-mpd': dash_mpd_parser_1.default,
15};
16let m3u8stream = (playlistURL, options = {}) => {
17 const stream = new stream_1.PassThrough();
18 const chunkReadahead = options.chunkReadahead || 3;
19 const liveBuffer = options.liveBuffer || 20000; // 20 seconds
20 const requestOptions = options.requestOptions;
21 const Parser = supportedParsers[options.parser || (/\.mpd$/.test(playlistURL) ? 'dash-mpd' : 'm3u8')];
22 if (!Parser) {
23 throw TypeError(`parser '${options.parser}' not supported`);
24 }
25 let begin = 0;
26 if (typeof options.begin !== 'undefined') {
27 begin = typeof options.begin === 'string' ?
28 parse_time_1.humanStr(options.begin) :
29 Math.max(options.begin - liveBuffer, 0);
30 }
31 let liveBegin = Date.now() - liveBuffer;
32 let currSegment;
33 const streamQueue = new queue_1.default((req, callback) => {
34 currSegment = req;
35 // Count the size manually, since the `content-length` header is not
36 // always there.
37 let size = 0;
38 req.on('data', (chunk) => size += chunk.length);
39 req.pipe(stream, { end: false });
40 req.on('end', () => callback(undefined, size));
41 }, { concurrency: 1 });
42 let segmentNumber = 0;
43 let downloaded = 0;
44 const requestQueue = new queue_1.default((segment, callback) => {
45 let req = miniget_1.default(url_1.resolve(playlistURL, segment.url), requestOptions);
46 req.on('error', callback);
47 streamQueue.push(req, (err, size) => {
48 downloaded += +size;
49 stream.emit('progress', {
50 num: ++segmentNumber,
51 size: size,
52 duration: segment.duration,
53 url: segment.url,
54 }, requestQueue.total, downloaded);
55 callback();
56 });
57 }, { concurrency: chunkReadahead });
58 const onError = (err) => {
59 if (ended) {
60 return;
61 }
62 stream.emit('error', err);
63 // Stop on any error.
64 stream.end();
65 };
66 // When to look for items again.
67 let refreshThreshold;
68 let minRefreshTime;
69 let refreshTimeout;
70 let fetchingPlaylist = true;
71 let ended = false;
72 let isStatic = false;
73 let lastRefresh;
74 const onQueuedEnd = (err) => {
75 currSegment = null;
76 if (err) {
77 onError(err);
78 }
79 else if (!fetchingPlaylist && !ended && !isStatic &&
80 requestQueue.tasks.length + requestQueue.active <= refreshThreshold) {
81 let ms = Math.max(0, minRefreshTime - (Date.now() - lastRefresh));
82 fetchingPlaylist = true;
83 refreshTimeout = setTimeout(refreshPlaylist, ms);
84 }
85 else if ((ended || isStatic) &&
86 !requestQueue.tasks.length && !requestQueue.active) {
87 stream.end();
88 }
89 };
90 let currPlaylist;
91 let lastSeq;
92 let starttime = 0;
93 const refreshPlaylist = () => {
94 lastRefresh = Date.now();
95 currPlaylist = miniget_1.default(playlistURL, requestOptions);
96 currPlaylist.on('error', onError);
97 const parser = currPlaylist.pipe(new Parser(options.id));
98 parser.on('starttime', (a) => {
99 if (starttime) {
100 return;
101 }
102 starttime = a;
103 if (typeof options.begin === 'string' && begin >= 0) {
104 begin += starttime;
105 }
106 });
107 parser.on('endlist', () => { isStatic = true; });
108 parser.on('endearly', currPlaylist.unpipe.bind(currPlaylist, parser));
109 let addedItems = [];
110 let liveAddedItems = [];
111 const addItem = (item, isLive) => {
112 if (item.seq <= lastSeq) {
113 return;
114 }
115 lastSeq = item.seq;
116 begin = item.time;
117 requestQueue.push(item, onQueuedEnd);
118 addedItems.push(item);
119 if (isLive) {
120 liveAddedItems.push(item);
121 }
122 };
123 let tailedItems = [], tailedItemsDuration = 0;
124 parser.on('item', (item) => {
125 let timedItem = Object.assign({ time: starttime }, item);
126 let isLive = liveBegin <= timedItem.time;
127 if (begin <= timedItem.time) {
128 addItem(timedItem, isLive);
129 }
130 else {
131 tailedItems.push(timedItem);
132 tailedItemsDuration += timedItem.duration;
133 // Only keep the last `liveBuffer` of items.
134 while (tailedItems.length > 1 &&
135 tailedItemsDuration - tailedItems[0].duration > liveBuffer) {
136 tailedItemsDuration -= tailedItems.shift().duration;
137 }
138 }
139 starttime += timedItem.duration;
140 });
141 parser.on('end', () => {
142 currPlaylist = null;
143 // If we are too ahead of the stream, make sure to get the
144 // latest available items with a small buffer.
145 if (!addedItems.length && tailedItems.length) {
146 tailedItems.forEach((item) => { addItem(item, true); });
147 }
148 // Refresh the playlist when remaining segments get low.
149 refreshThreshold = Math.max(1, Math.ceil(addedItems.length * 0.01));
150 // Throttle refreshing the playlist by looking at the duration
151 // of live items added on this refresh.
152 minRefreshTime =
153 addedItems.reduce(((total, item) => item.duration + total), 0);
154 fetchingPlaylist = false;
155 });
156 };
157 refreshPlaylist();
158 stream.end = () => {
159 ended = true;
160 streamQueue.die();
161 requestQueue.die();
162 clearTimeout(refreshTimeout);
163 if (currPlaylist) {
164 currPlaylist.unpipe();
165 currPlaylist.abort();
166 }
167 if (currSegment) {
168 currSegment.unpipe();
169 currSegment.abort();
170 }
171 stream_1.PassThrough.prototype.end.call(stream, null);
172 };
173 return stream;
174};
175module.exports = m3u8stream;
176//# sourceMappingURL=index.js.map
\No newline at end of file