Преглед на файлове

add --keep-connection and --full-stream options

Gil Pedersen преди 13 години
родител
ревизия
265298db40
променени са 5 файла, в които са добавени 70 реда и са изтрити 43 реда
  1. 7 1
      bin/hlsdump
  2. 1 1
      bin/hlsmon
  3. 2 2
      lib/m3u8.js
  4. 52 34
      lib/reader.js
  5. 8 5
      lib/tslimit.js

+ 7 - 1
bin/hlsdump

@@ -18,6 +18,8 @@ hlsdump.version('0.0.0')
      return r;
    })
    .option('-s, --sync', 'clock sync using stream PCR')
+   .option('-k, --keep-connection', 'don\'t give up once connected')
+   .option('-f, --full-stream', 'fetch all stream data')
    .option('-a, --user-agent <string>', 'HTTP User-Agent')
    .parse(process.argv);
 
@@ -32,7 +34,7 @@ var reader = require('../lib/reader'),
 var src = process.argv[2];
 if (!src) return hlsdump.help();
 
-var r = reader(src);
+var r = reader(src, {keepConnection:hlsdump.keepConnection, fullStream:hlsdump.fullStream});
 
 var time = 0;
 r.on('segment', function(seqNo, duration, meta) {
@@ -41,6 +43,10 @@ r.on('segment', function(seqNo, duration, meta) {
   time += duration;
 });
 
+r.on('error', function(err) {
+  console.error('error', err.stack || err);
+});
+
 r.on('end', function() {
   console.error('done');
 });

+ 1 - 1
bin/hlsmon

@@ -15,7 +15,7 @@ var src = process.argv[2];
 var sep = ';';
 
 function monitor(srcUrl) {
-  var r = reader(srcUrl, {noData:true});
+  var r = reader(srcUrl, {noData:true, keepConnection:true});
 
   var time = 0;
   r.on('segment', function(seqNo, duration, file) {

+ 2 - 2
lib/m3u8.js

@@ -44,8 +44,8 @@ M3U8Playlist.prototype.isLive = function() {
   return !(this.ended || this.type === this.PlaylistType.VOD);
 };
 
-M3U8Playlist.prototype.startSeqNo = function() {
-  if (!this.isLive()) return this.first_seq_no;
+M3U8Playlist.prototype.startSeqNo = function(full) {
+  if (!this.isLive() || full) return this.first_seq_no;
 
   var duration = this.target_duration * 3;
   for (var i=this.segments.length-1; i>0; i--) {

+ 52 - 34
lib/reader.js

@@ -98,9 +98,37 @@ function HlsStreamReader(src, options) {
     readable:null
   }
 
+  function updatecheck(updated) {
+    if (updated) {
+      if (self.readState.currentSeq===-1)
+        self.readState.currentSeq = self.index.startSeqNo(self.options.fullStream);
+      else if (self.readState.currentSeq < self.index.startSeqNo(true))
+        self.readState.currentSeq = self.index.startSeqNo(true);
+
+      self.emit('index', self.index);
+
+      if (self.index.variant)
+        return self.end();
+    }
+    if (!self.readState.currentSegment)
+      checkcurrent();
+
+    if (!self.index.ended) {
+      var updateInterval = updated ? self.index.segments[self.index.segments.length-1].duration : self.index.target_duration / 2;
+      debug('scheduling index refresh', updateInterval);
+      setTimeout(updateindex, Math.max(1, updateInterval)*1000);
+    }
+  }
+
   function updateindex() {
     getFileStream(self.url, function(err, stream, meta) {
-      if (err) return self.emit('error', err);
+      if (err) {
+        if (self.index && self.options.keepConnection) {
+          console.error('Failed to update index at '+url.format(self.url)+':', err.stack || err);
+          return updatecheck();
+        }
+        return self.emit('error', err);
+      }
 
       if (meta.mime !== 'application/vnd.apple.mpegurl' &&
           meta.mime !== 'application/x-mpegurl' && meta.mime !== 'audio/mpegurl')
@@ -119,24 +147,7 @@ function HlsStreamReader(src, options) {
 
         self.index = index;
 
-        if (updated) {
-          if (self.readState.currentSeq===-1)
-            self.readState.currentSeq = index.startSeqNo();
-
-          self.emit('index', index);
-
-          if (index.variant)
-            return self.end();
-
-          if (!self.readState.currentSegment)
-            checkcurrent();
-        }
-
-        if (!index.ended) {
-          var updateInterval = updated ? index.segments[index.segments.length-1].duration : self.index.target_duration / 2;
-          debug('scheduling index refresh', updateInterval);
-          setTimeout(updateindex, Math.max(1, updateInterval)*1000);
-        }
+        updatecheck(updated);
       });
     });
   }
@@ -144,36 +155,42 @@ function HlsStreamReader(src, options) {
 
   function checkcurrent() {
     self.readState.currentSegment = self.index.getSegment(self.readState.currentSeq);
-    if (self.readState.currentSegment)
-      fetchfrom(self.readState.currentSegment);
-    else if (self.index.ended)
+    if (self.readState.currentSegment) {
+      fetchfrom(self.readState.currentSeq, self.readState.currentSegment, function(err) {
+        var url = self.readState.currentSegment.uri;
+        self.readState.currentSegment = null;
+        if (err) {
+          if (!self.options.keepConnection) return self.emit('error', err);
+          console.error('While fetching '+url+':', err.stack || err);
+          return;
+        }
+        self.readState.currentSeq++;
+        checkcurrent();
+      });
+    } else if (self.index.ended)
       self.end();
     else if (!self.index.type && (self.index.lastSeqNo() < self.readState.currentSeq-1)) {
       // handle live stream restart
-      self.readState.currentSeq = self.index.first_seq_no;
+      self.readState.currentSeq = self.index.startSeqNo(true);
       checkcurrent();
     }
   }
 
-  function fetchfrom(segment) {
+  function fetchfrom(seqNo, segment, cb) {
     var segmentUrl = url.resolve(self.baseUrl, segment.uri)
 
     debug('fetching segment', segmentUrl);
     getFileStream(url.parse(segmentUrl), {probe:!!self.options.noData}, function(err, stream, meta) {
-      if (err) return self.emit('error', err);
+      if (err) return cb(err);
 
       if (meta.mime !== 'video/mp2t'/* && 
           meta.mime !== 'audio/aac' && meta.mime !== 'audio/x-aac' &&
           meta.mime !== 'audio/ac3'*/)
-        return self.emit('error', new Error('Unsupported segment MIME type: '+meta.mime));
-
-      self.emit('segment', self.readState.currentSeq, segment.duration, meta);
+        return cb(new Error('Unsupported segment MIME type: '+meta.mime));
 
-      function nextstream() {
-        self.readState.currentSeq++;
-        checkcurrent();
-      }
+      self.emit('segment', seqNo, segment.duration, meta);
 
+      // TODO: handle aborted downloads
       if (stream) {
         var r = stream;
         if (!(stream instanceof Readable)) {
@@ -184,11 +201,12 @@ function HlsStreamReader(src, options) {
         r.pipe(self, {end:false});
 
         r.on('end', function() {
+          self.readState.readable = null;
           r.unpipe(self);
-          process.nextTick(nextstream);
+          cb();
         });
       } else {
-        process.nextTick(nextstream);
+        process.nextTick(cb);
       }
     });
   }

+ 8 - 5
lib/tslimit.js

@@ -90,10 +90,14 @@ TsLimit.prototype._transform = function(chunk, output, cb) {
       var pcr = parsePCR(buf.slice(i, i+188));
       if (pcr !== -1) {
         var pcrtime = self.pcr2time(pcr);
-        if (Math.abs(pcrtime - self.time) > 700E3) {
-          console.error('PCR out of sync');
-          self.pcr = -1;
-          pcrtime = self.pcr2time(pcr);
+        if (Math.abs(pcrtime - self.time) > 100E3) {
+          console.error('PCR_error: '+((pcrtime - self.time)/1E6).toFixed(2)+'s missing');
+          var elapsed = now - self.time;
+          if (now < self.time || now > pcrtime) {
+            console.error('PCR sync reset');
+            self.pcr = -1;
+            pcrtime = self.pcr2time(pcr);
+          }
         }
         self.time = pcrtime;
       }
@@ -101,7 +105,6 @@ TsLimit.prototype._transform = function(chunk, output, cb) {
         break;
     }
 
-    // TODO: limit output speed
     if (i) output(buf.slice(0, i));
     self.buffer = buf.slice(i);