reader.js 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. "use strict";
  2. // stream from hls source
  3. var util = require('util'),
  4. url = require('url'),
  5. zlib = require('zlib'),
  6. assert = require('assert');
  7. var extend = require('xtend'),
  8. oncemore = require('oncemore'),
  9. m3u8parse = require('m3u8parse'),
  10. uristream = require('uristream'),
  11. debug = require('debug')('hls:reader');
  12. var Readable = require('readable-stream');
  13. var noop = function noop() {};
  14. function HlsSegmentObject(seq, segment, meta, stream) {
  15. this.seq = seq;
  16. this.segment = segment;
  17. this.meta = meta;
  18. this.stream = stream;
  19. }
  20. function fetchfrom(reader, seqNo, segment, cb) {
  21. var segmentUrl = url.resolve(reader.baseUrl, segment.uri);
  22. var probe = !!reader.noData;
  23. debug('fetching segment', segmentUrl);
  24. var stream = uristream(segmentUrl, { probe:probe, highWaterMark:100 * 1000 * 1000 });
  25. function finish(err, res) {
  26. stream.removeListener('meta', onmeta);
  27. stream.removeListener('end', onfail);
  28. stream.removeListener('error', onfail);
  29. cb(err, res);
  30. }
  31. function onmeta(meta) {
  32. debug('got segment meta', meta);
  33. if (reader.segmentMimeTypes.indexOf(meta.mime.toLowerCase()) === -1) {
  34. if (stream.abort) stream.abort();
  35. return stream.emit(new Error('Unsupported segment MIME type: ' + meta.mime));
  36. }
  37. finish(null, new HlsSegmentObject(seqNo, segment, meta, stream));
  38. }
  39. function onfail(err) {
  40. if (!err) err = new Error('No metadata');
  41. finish(err);
  42. }
  43. stream.on('meta', onmeta);
  44. stream.on('end', onfail);
  45. stream.on('error', onfail);
  46. return stream;
  47. }
  48. function checknext(reader) {
  49. var state = reader.readState;
  50. var index = reader.index;
  51. if (!state.active || state.fetching || state.nextSeq === -1 || !index)
  52. return null;
  53. var seq = state.nextSeq;
  54. var segment = index.getSegment(seq, true);
  55. if (segment) {
  56. // check if we need to stop
  57. if (reader.stopDate && segment.program_time > reader.stopDate)
  58. return reader.push(null);
  59. state.fetching = fetchfrom(reader, seq, segment, function(err, object) {
  60. state.fetching = null;
  61. if (err) reader.emit('error', err);
  62. if (seq === state.nextSeq)
  63. state.nextSeq++;
  64. if (object) {
  65. reader.watch[object.seq] = object.stream;
  66. oncemore(object.stream).once('end', 'error', function() {
  67. delete reader.watch[object.seq];
  68. });
  69. state.active = reader.push(object);
  70. }
  71. checknext(reader);
  72. });
  73. } else if (index.ended) {
  74. reader.push(null);
  75. } else if (!index.type && (index.lastSeqNo() < state.nextSeq - 1)) {
  76. // handle live stream restart
  77. state.nextSeq = index.startSeqNo(true);
  78. checknext(reader);
  79. }
  80. }
  81. function HlsStreamReader(src, options) {
  82. var self = this;
  83. options = options || {};
  84. if (typeof src === 'string')
  85. src = url.parse(src);
  86. this.url = src;
  87. this.baseUrl = src;
  88. this.fullStream = !!options.fullStream;
  89. this.noData = !!options.noData;
  90. // dates are inclusive
  91. this.startDate = options.startDate ? new Date(options.startDate) : null;
  92. this.stopDate = options.stopDate ? new Date(options.stopDate) : null;
  93. this.maxStallTime = options.maxStallTime || Infinity;
  94. this.index = null;
  95. this.readState = {
  96. nextSeq:-1,
  97. active:false,
  98. fetching:null
  99. };
  100. this.watch = {}; // used to stop buffering on expired segments
  101. this.indexStallSince = null;
  102. function getUpdateInterval(updated) {
  103. if (updated && self.index.segments.length) {
  104. self.indexStallSince = null;
  105. return Math.min(self.index.target_duration, self.index.segments[self.index.segments.length - 1].duration);
  106. } else {
  107. if (self.indexStallSince) {
  108. if ((Date.now() - +self.indexStallSince) > self.maxStallTime)
  109. return -1;
  110. } else {
  111. self.indexStallSince = new Date();
  112. }
  113. return self.index.target_duration / 2;
  114. }
  115. }
  116. function initialSeqNo() {
  117. var index = self.index;
  118. if (self.startDate)
  119. return index.seqNoForDate(self.startDate, true);
  120. else
  121. return index.startSeqNo(self.fullStream);
  122. }
  123. function updatecheck(updated) {
  124. if (updated) {
  125. if (self.readState.nextSeq === -1)
  126. self.readState.nextSeq = initialSeqNo();
  127. else if (self.readState.nextSeq < self.index.startSeqNo(true)) {
  128. debug('skipping ' + (self.index.startSeqNo(true) - self.readState.nextSeq) + ' invalidated segments');
  129. self.readState.nextSeq = self.index.startSeqNo(true);
  130. }
  131. // check watched segments
  132. for (var seq in self.watch) {
  133. if (!self.index.isValidSeqNo(seq)) {
  134. var stream = self.watch[seq];
  135. delete self.watch[seq];
  136. setTimeout(function () {
  137. debug('aborting discontinued segment download');
  138. if (!stream.ended && stream.abort) stream.abort();
  139. }, self.index.target_duration * 1000);
  140. }
  141. }
  142. self.emit('index', self.index);
  143. if (self.index.variant)
  144. return self.push(null);
  145. }
  146. checknext(self);
  147. if (self.index && !self.index.ended && self.readable) {
  148. var updateInterval = getUpdateInterval(updated);
  149. if (updateInterval <= 0)
  150. return self.emit('error', new Error('index stall'));
  151. debug('scheduling index refresh', updateInterval);
  152. setTimeout(updateindex, Math.max(1, updateInterval) * 1000);
  153. }
  154. }
  155. function updateindex() {
  156. if (!self.readable) return;
  157. var stream = uristream(url.format(self.url), { timeout:30 * 1000 });
  158. stream.on('meta', function(meta) {
  159. debug('got index meta', meta);
  160. if (self.indexMimeTypes.indexOf(meta.mime.toLowerCase()) === -1) {
  161. // FIXME: correctly handle .m3u us-ascii encoding
  162. if (stream.abort) stream.abort();
  163. return stream.emit('error', new Error('Invalid MIME type: ' + meta.mime));
  164. }
  165. self.baseUrl = meta.url;
  166. });
  167. m3u8parse(stream, function(err, index) {
  168. if (err) {
  169. self.emit('error', err);
  170. updatecheck(false);
  171. } else {
  172. var updated = true;
  173. if (self.index && self.index.lastSeqNo() === index.lastSeqNo()) {
  174. debug('index was not updated');
  175. updated = false;
  176. }
  177. self.index = index;
  178. updatecheck(updated);
  179. }
  180. });
  181. }
  182. Readable.call(this, extend(options, {objectMode:true, highWaterMark:options.highWaterMark || 0}));
  183. updateindex();
  184. }
  185. util.inherits(HlsStreamReader, Readable);
  186. HlsStreamReader.prototype.indexMimeTypes = [
  187. 'application/vnd.apple.mpegurl',
  188. 'application/x-mpegurl',
  189. 'audio/mpegurl',
  190. ];
  191. HlsStreamReader.prototype.segmentMimeTypes = [
  192. 'video/mp2t',
  193. 'audio/aac',
  194. 'audio/x-aac',
  195. 'audio/ac3',
  196. ];
  197. HlsStreamReader.prototype._read = function(/*n*/) {
  198. this.readState.active = true;
  199. checknext(this);
  200. };
  201. var hlsreader = module.exports = function hlsreader(url, options) {
  202. return new HlsStreamReader(url, options);
  203. };
  204. hlsreader.HlsSegmentObject = HlsSegmentObject;
  205. hlsreader.HlsStreamReader = HlsStreamReader;