Browse Source

change tsreader to pure readable & handle partial data downloads

Gil Pedersen 13 năm trước cách đây
mục cha
commit
7f4e424a66
4 tập tin đã thay đổi với 140 bổ sung52 xóa
  1. 4 1
      bin/hlsdump
  2. 4 1
      lib/m3u8.js
  3. 103 39
      lib/reader.js
  4. 29 11
      lib/tssmooth.js

+ 4 - 1
bin/hlsdump

@@ -17,6 +17,7 @@ hlsdump.version('0.0.0')
      }
      return r;
    })
+   .option('-b, --prebuffer-size <bytes>', 'prebuffer <bytes> input data before output (implies -s)')
    .option('-s, --sync', 'clock sync using stream PCR')
    .option('-k, --keep-connection', 'don\'t give up once connected')
    .option('-f, --full-stream', 'fetch all stream data')
@@ -34,7 +35,9 @@ var reader = require('../lib/reader'),
 var src = process.argv[2];
 if (!src) return hlsdump.help();
 
-var r = reader(src, {keepConnection:hlsdump.keepConnection, fullStream:hlsdump.fullStream});
+if (hlsdump.prebufferSize) hlsdump.sync = true;
+
+var r = reader(src, {prebufferSize:hlsdump.prebufferSize/*, bufferSize:10000*/, keepConnection:hlsdump.keepConnection, fullStream:hlsdump.fullStream});
 
 var time = 0;
 r.on('segment', function(seqNo, duration, meta) {

+ 4 - 1
lib/m3u8.js

@@ -99,18 +99,21 @@ function parse(stream, cb) {
       line_no = 0,
       meta = {};
 
+  stream.on('error', ReportError);
+
   var cr = carrier.carry(stream);
   cr.on('line', ParseLine);
   cr.on('end', Complete);
 
   function ReportError(err) {
+    stream.removeListener('error', ReportError);
     cr.removeListener('line', ParseLine);
     cr.removeListener('end', Complete);
     cb(err);
   }
 
   function Complete() {
-    debug('result', m3u8);
+    stream.removeListener('error', ReportError);
     cb(null, m3u8);
   }
 

+ 103 - 39
lib/reader.js

@@ -9,13 +9,10 @@ var async = require('async'),
     debug = require('debug')('hls:reader');
 
 try {
-  var Readable = require('stream').Readable,
-      Transform = require('stream').Transform;
+  var Readable = require('stream').Readable;
   assert(Readable);
-  assert(Transform);
 } catch (e) {
-  var Readable = require('readable-stream'),
-      Transform = require('readable-stream/transform');
+  var Readable = require('readable-stream');
 }
 
 var m3u8 = require('./m3u8');
@@ -39,6 +36,16 @@ emits:
   segment (seqNo, duration, datetime, size?, )
 */
 
+// ensure function is never run more than once
+function once(fn) {
+  var called = false;
+  return function() {   
+    var call = !called;
+    called = true;
+    if(call) fn.apply(this, arguments);
+  };
+}
+
 function getFileStream(srcUrl, options, cb) {
   assert(srcUrl.protocol);
 
@@ -51,7 +58,8 @@ function getFileStream(srcUrl, options, cb) {
     var headers = options.headers || {};
     if (!headers['user-agent']) headers['user-agent'] = DEFAULT_AGENT;
 
-    (options.probe ? http.head : http.get)({url:url.format(srcUrl), stream:true, headers:headers}, function(err, res) {
+    // http-get will occasionally call the callback multiple times... :–(
+    (options.probe ? http.head : http.get)({url:url.format(srcUrl), stream:true, headers:headers}, once(function(err, res) {
     	if (err) return cb(err);
 
       var statusCode = res.code || res.stream.statusCode;
@@ -66,10 +74,9 @@ function getFileStream(srcUrl, options, cb) {
           size = res.headers['content-length'] ? parseInt(res.headers['content-length'], 10) : -1,
           modified = res.headers['last-modified'] ? new Date(res.headers['last-modified']) : null;
 
-      if (res.stream)
-        res.stream.resume(); // for some reason http-get pauses the stream for the callback
+      res.stream.resume(); // for some reason http-get pauses the stream for the callback
       cb(null, res.stream, {url:res.url || srcUrl, mime:mimetype, size:size, modified:modified});
-    });
+    }));
   } else {
     process.nextTick(function() {
       cb(new Error('Unsupported protocol: '+srcUrl.protocol));
@@ -88,28 +95,41 @@ function getFileStream(srcUrl, options, cb) {
 
 function HlsStreamReader(src, options) {
   var self = this;
-  Transform.call(this, options);
 
+  options = options || {};
   if (typeof src === 'string')
     src = url.parse(src);
 
   this.url = src;
   this.baseUrl = src;
-  this.options = options || {};
+
+  this.prebufferSize = options.prebufferSize || 0;
+  this.fullStream = !!options.fullStream;
+  this.keepConnection = !!options.keepConnection;
+  this.noData = !!options.noData;
 
   this.indexStream = null;
   this.index = null;
-
   this.readState = {
     currentSeq:-1,
     currentSegment:null,
-    readable:null
+    stream:null
+  }
+
+  if (this.prebufferSize) {
+    var lwm = options.lowWaterMark || 0;
+    var hwm = options.highWaterMark || this.prebufferSize;
+    options.lowWaterMark = Math.max(this.prebufferSize, lwm);
+    options.highWaterMark = Math.max(hwm, lwm);
+    this.once('readable', function() {
+      self._readableState.lowWaterMark = ~~lwm;
+    });
   }
 
   function updatecheck(updated) {
     if (updated) {
       if (self.readState.currentSeq===-1)
-        self.readState.currentSeq = self.index.startSeqNo(self.options.fullStream);
+        self.readState.currentSeq = self.index.startSeqNo(self.fullStream);
       else if (self.readState.currentSeq < self.index.startSeqNo(true))
         self.readState.currentSeq = self.index.startSeqNo(true);
 
@@ -118,8 +138,7 @@ function HlsStreamReader(src, options) {
       if (self.index.variant)
         return self.end();
     }
-    if (!self.readState.currentSegment)
-      checkcurrent();
+    checkcurrent();
 
     if (!self.index.ended) {
       var updateInterval = updated ? self.index.segments[self.index.segments.length-1].duration : self.index.target_duration / 2;
@@ -131,7 +150,7 @@ function HlsStreamReader(src, options) {
   function updateindex() {
     getFileStream(self.url, function(err, stream, meta) {
       if (err) {
-        if (self.index && self.options.keepConnection) {
+        if (self.index && self.keepConnection) {
           console.error('Failed to update index at '+url.format(self.url)+':', err.stack || err);
           return updatecheck();
         }
@@ -162,15 +181,17 @@ function HlsStreamReader(src, options) {
   updateindex();
 
   function checkcurrent() {
+    if (self.readState.currentSegment) return; // already processing
+
     self.readState.currentSegment = self.index.getSegment(self.readState.currentSeq);
     if (self.readState.currentSegment) {
-      fetchfrom(self.readState.currentSeq, self.readState.currentSegment, function(err) {
-        var url = self.readState.currentSegment.uri;
+      var url = self.readState.currentSegment.uri;
+      fetchfrom(self.readState.currentSeq, self.readState.currentSegment, function(err, transferred) {
         self.readState.currentSegment = null;
         if (err) {
-          if (!self.options.keepConnection) return self.emit('error', err);
+          if (!self.keepConnection) return self.emit('error', err);
           console.error('While fetching '+url+':', err.stack || err);
-          return;
+          if (!transferred) return; // TODO: retry with a range header
         }
         self.readState.currentSeq++;
         checkcurrent();
@@ -188,9 +209,10 @@ function HlsStreamReader(src, options) {
     var segmentUrl = url.resolve(self.baseUrl, segment.uri)
 
     debug('fetching segment', segmentUrl);
-    getFileStream(url.parse(segmentUrl), {probe:!!self.options.noData}, function(err, stream, meta) {
+    getFileStream(url.parse(segmentUrl), {probe:!!self.noData}, function(err, stream, meta) {
       if (err) return cb(err);
 
+      debug('got segment info', meta);
       if (meta.mime !== 'video/mp2t'/* && 
           meta.mime !== 'audio/aac' && meta.mime !== 'audio/x-aac' &&
           meta.mime !== 'audio/ac3'*/)
@@ -198,34 +220,76 @@ function HlsStreamReader(src, options) {
 
       self.emit('segment', seqNo, segment.duration, meta);
 
-      // TODO: handle aborted downloads
       if (stream) {
-        var r = stream;
-        if (!(stream instanceof Readable)) {
-          r = new Readable();
-          r.wrap(stream);
-        }
-        self.readState.readable = r;
-        r.pipe(self, {end:false});
+        debug('pushing input stream to reader');
 
-        r.on('end', function() {
-          self.readState.readable = null;
-          r.unpipe(self);
-          cb();
+        var totalBytes = 0;
+        stream.on('data', function(chunk) {
+          totalBytes += chunk.length;
+          self.push(chunk); // intentionally ignore the result to buffer input as fast as possible
         });
+        stream.on('error', Done);
+        stream.on('end', Done);
+        stream.on('close', Done);
+
+        self.readState.stream = stream;
+        self.stream_start(true, !self.push(new Buffer(0)));
+
+        function Done(err) {
+          debug('finished with input stream');
+
+          stream.removeListener('error', Done);
+          stream.removeListener('end', Done);
+          stream.removeListener('close', Done);
+
+          self.readState.stream = null;
+
+          // FIXME: is this required? or already handled by http-get?
+          if (!err && (totalBytes !== meta.size))
+            err = new Error('Invalid returned stream length');
+
+          cb(err, totalBytes);
+        }
       } else {
         process.nextTick(cb);
       }
     });
   }
 
-  return this;
+  this.stream_start = function(fresh, blocked) {
+    if (fresh) {
+      self.readState.stream_started = false;
+      if (self.readState.timer) {
+        clearTimeout(self.readState.timer);
+        self.readState.timer = null;
+      }
+
+      if (blocked) return self.readState.stream.pause();
+    }
+
+    if (self.readState.stream_started) return;
+
+    var stream = self.readState.stream;
+    if (!stream) return;
+
+    if (typeof stream.destroy == 'function') {
+      var duration = self.readState.currentSegment.duration || self.index.target_duration || 10;
+      self.readState.timer = setTimeout(function() {
+        if (self.readState.stream)
+          stream.destroy();
+        self.readState.timer = null;
+      }, 1.5*duration*1000);
+    }
+    self.readState.stream_started = true;
+    stream.resume();
+  }
+
+  Readable.call(this, options);
 }
-util.inherits(HlsStreamReader, Transform);
+util.inherits(HlsStreamReader, Readable);
 
-HlsStreamReader.prototype._transform = function(chunk, output, cb) {
-  // TODO: decrypt here
-  cb(null, chunk);
+HlsStreamReader.prototype._read = function(n, cb) {
+  this.stream_start();
 };
 
 function hlsreader(url, options) {

+ 29 - 11
lib/tssmooth.js

@@ -1,5 +1,6 @@
 var util = require('util'),
-    assert = require('assert');
+    assert = require('assert'),
+    debug = require('debug')('hls:tssmooth');
 
 var async = require('async');
 
@@ -15,11 +16,11 @@ try {
 module.exports = tssmooth;
 exports.TsSmooth = TsSmooth;
 
-function parsePCR(buffer, index) {
+function parsePCR(buffer, index, pcr_pid) {
   var head = buffer.readUInt32BE(index, true);
-//  var b = buffer.readUInt8(3, true);
   var pid = (head >> 8) & 0x1fff;
   if (((head >> 5) & 1) !== 1) return -1;
+  if (pcr_pid && pcr_pid != pid) return -1;
 
   var s = buffer.readUInt8(index+4, true);
   if (s < 7) return -1;
@@ -38,9 +39,10 @@ function parsePCR(buffer, index) {
 
 function TsSmooth(options) {
   var self = this;
-  this.options = options || {};
+  options = options || {};
+
+  this.packetSize = options.packetSize || 7*188; // size of output packets
 
-  // the buffer is only used for partial TS packets ()< 188 bytes)
   this.buffer = new Buffer(0);
 
   this.pcr = -1;
@@ -107,8 +109,7 @@ function utime() {
 }
 
 // smoothly outputs given buffer before endTime
-function outputBefore(buffer, endTime, outputFn, cb) {
-  var packetSize = 14*188;
+function outputBefore(buffer, endTime, packetSize, outputFn, cb) {
   var index = 0;
 
   function outputPacket() {
@@ -118,8 +119,10 @@ function outputBefore(buffer, endTime, outputFn, cb) {
     outputFn(buffer.slice(index, Math.min(buffer.length, index+packetSize)));
     index += packetSize;
 
-    if (index < buffer.length)
+    if (index < buffer.length) {
+      //debug('packetTime', (packetTime/1000).toFixed(2));
       return setTimeout(outputPacket, Math.max(0.95*packetTime/1000, 0));
+    }
     cb();
   }
   process.nextTick(outputPacket);
@@ -132,17 +135,32 @@ TsSmooth.prototype._transform = function(chunk, output, cb) {
   this.buffer = Buffer.concat([this.buffer, chunk]);
 
   var buf = self.buffer;
-  var end = buf.length-187;
+  var end = buf.length-188;
 
   var startIndex = 0;
   function processNext() {
     while (index < end) {
+      // check sync
+      if (buf.readUInt8(index+188, true) !== 0x47) {
+        // find next potential sync point
+        debug('ts sync lost');
+        var sync = index+1;
+        for (; sync < end; sync++) {
+          if (buf.readUInt8(sync, true) === 0x47)
+            break;
+        }
+        // remove bad data
+        console.error('slice', sync, end);
+        buf = Buffer.concat([buf.slice(0, index), buf.slice(sync)]);
+        end -= sync-index;
+      }
+
       var pcr = parsePCR(buf, index);
       var outtime = self.output_time(pcr);
       if (outtime !== -1 && index !== startIndex) {
         var slice = buf.slice(startIndex, index);
         startIndex = index;
-        return outputBefore(slice, outtime, output, processNext);
+        return outputBefore(slice, outtime, self.packetSize, output, processNext);
       }
       index += 188;
     }
@@ -155,7 +173,7 @@ TsSmooth.prototype._transform = function(chunk, output, cb) {
 };
 
 TsSmooth.prototype._flush = function(output, cb) {
-  if (this.buffer.length) output(this.buffer);
+  if (this.buffer.length) output(this.buffer); // TODO: use outputBefore() based on current stream speed?
   cb();
 };