"use strict"; // stream from hls source var util = require('util'), url = require('url'), zlib = require('zlib'), assert = require('assert'); var extend = require('xtend'), oncemore = require('oncemore'), m3u8parse = require('m3u8parse'), uristream = require('uristream'), debug = require('debug')('hls:reader'); try { var Readable = require('stream').Readable; assert(Readable); } catch (e) { var Readable = require('readable-stream'); } function noop() {}; module.exports = hlsreader; hlsreader.HlsSegmentObject = HlsSegmentObject; hlsreader.HlsStreamReader = HlsStreamReader; function HlsSegmentObject(seq, segment, meta, stream) { this.seq = seq; this.segment = segment; this.meta = meta; this.stream = stream; } function checknext(reader) { var state = reader.readState; var index = reader.index; if (!state.active || state.fetching || state.nextSeq === -1 || !index) return; var seq = state.nextSeq; var segment = index.getSegment(seq); // check if we need to stop if (reader.stopDate) { var seqDate = index.dateForSeqNo(seq); if (seqDate > reader.stopDate) return reader.push(null); } if (segment) { state.fetching = fetchfrom(reader, seq, segment, function(err, object) { state.fetching = null; if (err) reader.emit('error', err); if (seq === state.nextSeq) state.nextSeq++; if (object) { reader.watch[object.seq] = object.stream; oncemore(object.stream).once('end', 'error', function() { delete reader.watch[object.seq]; }); state.active = reader.push(object); } checknext(reader); }); } else if (index.ended) { reader.push(null); } else if (!index.type && (index.lastSeqNo() < state.nextSeq-1)) { // handle live stream restart state.nextSeq = index.startSeqNo(true); checknext(reader); } } function fetchfrom(reader, seqNo, segment, cb) { var segmentUrl = url.resolve(reader.baseUrl, segment.uri) var probe = !!reader.noData; debug('fetching segment', segmentUrl); var stream = uristream(segmentUrl, { probe:probe, highWaterMark:100*1000*1000 }); stream.on('meta', onmeta); stream.on('end', onfail); stream.on('error', onfail); function finish(err, res) { stream.removeListener('meta', onmeta); stream.removeListener('end', onfail); stream.removeListener('error', onfail); cb(err, res); } function onmeta(meta) { debug('got segment meta', meta); if (reader.segmentMimeTypes.indexOf(meta.mime.toLowerCase()) === -1) { if (stream.abort) stream.abort(); return stream.emit(new Error('Unsupported segment MIME type: '+meta.mime)); } finish(null, new HlsSegmentObject(seqNo, segment, meta, stream)); } function onfail(err) { if (!err) err = new Error('No metadata'); finish(err); } return stream; } function HlsStreamReader(src, options) { var self = this; options = options || {}; if (typeof src === 'string') src = url.parse(src); this.url = src; this.baseUrl = src; this.fullStream = !!options.fullStream; this.noData = !!options.noData; // dates are inclusive this.startDate = options.startDate; this.stopDate = options.stopDate; this.index = null; this.readState = { nextSeq:-1, active:false, fetching:null }; this.watch = {}; // used to stop buffering on expired segments function getUpdateInterval(updated) { if (updated && self.index.segments.length) return Math.min(self.index.target_duration, self.index.segments[self.index.segments.length-1].duration); else return self.index.target_duration / 2; } function initialSeqNo() { var index = self.index; if (self.startDate) { var count = index.segments.length; var time = 0, startTime = self.startDate.getTime(); for (var i=0; i= startTime) return index.first_seq_no + i; } return -1; } return index.startSeqNo(self.fullStream); } function updatecheck(updated) { if (updated) { if (self.readState.nextSeq===-1) self.readState.nextSeq = initialSeqNo(); else if (self.readState.nextSeq < self.index.startSeqNo(true)) { debug('skipping '+(self.index.startSeqNo(true)-self.readState.nextSeq)+' invalidated segments'); self.readState.nextSeq = self.index.startSeqNo(true); } // check watched segments for (var seq in self.watch) { if (!self.index.isValidSeqNo(seq)) { var stream = self.watch[seq]; delete self.watch[seq]; setTimeout(function () { debug('aborting discontinued segment download'); if (!stream.ended && stream.abort) stream.abort(); }, self.index.target_duration * 1000); } } self.emit('index', self.index); if (self.index.variant) return self.push(null); } checknext(self); if (self.index && !self.index.ended && self.readable) { var updateInterval = getUpdateInterval(updated); debug('scheduling index refresh', updateInterval); setTimeout(updateindex, Math.max(1, updateInterval)*1000); } } function updateindex() { if (!self.readable) return; var stream = uristream(url.format(self.url), { timeout:30*1000 }); stream.on('meta', function(meta) { debug('got index meta', meta); if (self.indexMimeTypes.indexOf(meta.mime.toLowerCase()) === -1) { // FIXME: correctly handle .m3u us-ascii encoding if (stream.abort) stream.abort(); return stream.emit('error', new Error('Invalid MIME type: '+meta.mime)); } self.baseUrl = meta.url; }); m3u8parse(stream, function(err, index) { if (err) { self.emit('error', err); updatecheck(false); } else { var updated = true; if (self.index && self.index.lastSeqNo() === index.lastSeqNo()) { debug('index was not updated'); updated = false; } self.index = index; updatecheck(updated); } }); } Readable.call(this, extend(options, {objectMode:true, highWaterMark:options.highWaterMark || 0})); updateindex(); } util.inherits(HlsStreamReader, Readable); HlsStreamReader.prototype.indexMimeTypes = [ 'application/vnd.apple.mpegurl', 'application/x-mpegurl', 'audio/mpegurl', ]; HlsStreamReader.prototype.segmentMimeTypes = [ 'video/mp2t', 'audio/aac', 'audio/x-aac', 'audio/ac3', ]; HlsStreamReader.prototype._read = function(n) { this.readState.active = true; checknext(this); }; function hlsreader(url, options) { return new HlsStreamReader(url, options); }