瀏覽代碼

complete rework of reader interface to support parallel segment downloads

Gil Pedersen 13 年之前
父節點
當前提交
69d9c5c99a
共有 3 個文件被更改,包括 126 次插入124 次删除
  1. 36 8
      bin/hlsdump
  2. 88 114
      lib/reader.js
  3. 2 2
      lib/uristream.js

+ 36 - 8
bin/hlsdump

@@ -24,6 +24,7 @@ hlsdump.version('0.0.0')
    .option('-s, --sync', 'clock sync using stream PCR')
    .option('-k, --keep-connection', 'don\'t give up once connected')
    .option('-f, --full-stream', 'fetch all stream data')
+   .option('-c, --concurrent <count>', 'fetch using concurrent connections', parseInt)
    .option('-a, --user-agent <string>', 'HTTP User-Agent')
    .parse(process.argv);
 
@@ -33,31 +34,58 @@ var util = require('util'),
 
 var reader = require('../lib/reader'),
     tssmooth = require('../lib/tssmooth'),
-    tsblast = require('../lib/tsblast');
+    tsblast = require('../lib/tsblast'),
+    oncemore = require('../lib/oncemore');
+
+try {
+  var Passthrough = require('stream').Passthrough;
+  assert(Passthrough);
+} catch (e) {
+  var Passthrough = require('readable-stream/passthrough');
+}
 
 var src = process.argv[2];
 if (!src) return hlsdump.help();
 
 if (hlsdump.bufferSize) hlsdump.sync = true;
 
-var r = reader(src, {highWaterMark:hlsdump.bufferSize, keepConnection:hlsdump.keepConnection, fullStream:hlsdump.fullStream});
+var r = reader(src, {highWaterMark:(hlsdump.concurrent || 1) - 1, fullStream:hlsdump.fullStream});
 
 var time = 0;
-r.on('segment', function(seqNo, duration, meta) {
-//  console.error(new Date().toJSON() + sep + meta.size + sep + duration.toFixed(3) + sep + (meta.size / (duration * 1024/8)).toFixed(3));
-  console.error('new segment at '+time.toFixed(0)+' seconds, avg bitrate (kbps):', (meta.size / (duration * 1024/8)).toFixed(1));
-  time += duration;
+var reading = false;
+r.on('readable', function() {
+  if (reading) return;// console.error('readable call error');
+
+  function grabnext() {
+    var obj = r.read();
+    if (obj) {
+      var meta = obj.meta;
+      var duration = obj.segment.duration;
+      console.error('new segment at '+time.toFixed(0)+' seconds, avg bitrate (kbps):', (meta.size / (duration * 1024/8)).toFixed(1));
+      time += duration;
+
+      reading = true;
+      obj.stream.pipe(buffer, { end: false });
+      oncemore(obj.stream).once('end', 'error', function(err) {
+        reading = false;
+        if (err) console.error('stream error', err.stack || err);
+        grabnext();
+      });
+    }
+  }
+  grabnext();
 });
 
 r.on('error', function(err) {
-  console.error('error', err.stack || err);
+  console.error('reader error', err.stack || err);
 });
 
 r.on('end', function() {
   console.error('done');
 });
 
-var stream = r;
+var buffer = new Passthrough({highWaterMark:hlsdump.bufferSize});
+var stream = buffer;
 if (hlsdump.sync)
   stream = stream.pipe(tssmooth());
 

+ 88 - 114
lib/reader.js

@@ -6,6 +6,7 @@ var util = require('util'),
     assert = require('assert');
 
 var request = require('request'),
+    extend = require('xtend'),
     oncemore = require('./oncemore'),
     uristream = require('./uristream'),
     debug = require('debug')('hls:reader');
@@ -22,21 +23,84 @@ var m3u8 = require('./m3u8');
 function noop() {};
 
 module.exports = hlsreader;
+hlsreader.HlsSegmentObject = HlsSegmentObject;
 hlsreader.HlsStreamReader = HlsStreamReader;
 
-/*
-options:
-  startSeq*
-  noData // don't emit any data - useful for analyzing the stream structure
+function HlsSegmentObject(seq, segment, meta, stream) {
+  this.seq = seq;
+  this.segment = segment;
+  this.meta = meta;
+  this.stream = stream;
+}
+
+function checknext(reader) {
+  var state = reader.readState;
+  var index = reader.index;
+  if (!state.active || state.fetching || !index)
+    return;
+
+  var seq = state.nextSeq;
+  var segment = index.getSegment(seq);
+
+  if (segment) {
+    state.fetching = fetchfrom(reader, seq, segment, function(err, object) {
+      state.fetching = null;
+      if (err) reader.emit('error', err);
+
+      if (seq === state.nextSeq)
+        state.nextSeq++;
+
+      state.active = reader.push(object);
+      checknext(reader);
+    });
+  } else if (index.ended) {
+    reader.push(null);
+  } else if (!index.type && (index.lastSeqNo() < state.nextSeq-1)) {
+    // handle live stream restart
+    state.nextSeq = index.startSeqNo(true);
+    checknext(reader);
+  }
+}
+
+function fetchfrom(reader, seqNo, segment, cb) {
+  var segmentUrl = url.resolve(reader.baseUrl, segment.uri)
+  var probe = !!reader.noData;
+
+  debug('fetching segment', segmentUrl);
+  var stream = uristream(segmentUrl, { probe:probe, highWaterMark:100*1000*1000 });
+  stream.on('meta', onmeta);
+  stream.on('end', onfail);
+  stream.on('error', onfail);
+
+  function cleanup() {
+    stream.removeListener('meta', onmeta);
+    stream.removeListener('end', onfail);
+    stream.removeListener('error', onfail);
+  }
+
+  function onmeta(meta) {
+    debug('got segment info', meta);
+
+    if (meta.mime !== 'video/mp2t'/* && 
+        meta.mime !== 'audio/aac' && meta.mime !== 'audio/x-aac' &&
+        meta.mime !== 'audio/ac3'*/) {
+      if (stream.abort) stream.abort();
+      return stream.emit(new Error('Unsupported segment MIME type: '+meta.mime));
+    }
+
+    cleanup();
+    cb(null, new HlsSegmentObject(seqNo, segment, meta, stream));
+  }
 
-  maxRedirects*
-  cacheDir*
-  headers* // allows for custom user-agent, cookies, auth, etc
-  
-emits:
-  index (m3u8)
-  segment (seqNo, duration, datetime, size?, )
-*/
+  function onfail(err) {
+    if (!err) err = new Error('No metadata');
+
+    cleanup();
+    cb(err)
+  }
+
+  return stream;
+}
 
 function HlsStreamReader(src, options) {
   var self = this;
@@ -49,15 +113,12 @@ function HlsStreamReader(src, options) {
   this.baseUrl = src;
 
   this.fullStream = !!options.fullStream;
-  this.keepConnection = !!options.keepConnection;
   this.noData = !!options.noData;
 
-  this.indexStream = null;
   this.index = null;
   this.readState = {
-    currentSeq:-1,
+    nextSeq:-1,
     currentSegment:null,
-    stream:null
   }
 
   function getUpdateInterval(updated) {
@@ -69,17 +130,19 @@ function HlsStreamReader(src, options) {
 
   function updatecheck(updated) {
     if (updated) {
-      if (self.readState.currentSeq===-1)
-        self.readState.currentSeq = self.index.startSeqNo(self.fullStream);
-      else if (self.readState.currentSeq < self.index.startSeqNo(true))
-        self.readState.currentSeq = self.index.startSeqNo(true);
+      if (self.readState.nextSeq===-1)
+        self.readState.nextSeq = self.index.startSeqNo(self.fullStream);
+      else if (self.readState.nextSeq < self.index.startSeqNo(true)) {
+        debug('skipping '+(self.index.startSeqNo(true)-self.readState.nextSeq)+' invalidated segments');
+        self.readState.nextSeq = self.index.startSeqNo(true);
+      }
 
       self.emit('index', self.index);
 
       if (self.index.variant)
         return self.end();
     }
-    checkcurrent();
+    checknext(self);
 
     if (!self.index.ended) {
       var updateInterval = getUpdateInterval(updated);
@@ -120,102 +183,13 @@ function HlsStreamReader(src, options) {
   }
   updateindex();
 
-  function checkcurrent() {
-    if (self.readState.currentSegment) return; // already processing
-
-    self.readState.currentSegment = self.index.getSegment(self.readState.currentSeq);
-    if (self.readState.currentSegment) {
-      var url = self.readState.currentSegment.uri;
-
-      function tryfetch(start) {
-        var seq = self.readState.currentSeq;
-        fetchfrom(seq, self.readState.currentSegment, start, function(err) {
-          if (err) {
-            if (!self.keepConnection) return self.emit('error', err);
-            console.error('While fetching '+url+':', err.stack || err);
-
-            // retry with missing range if it is still relevant
-            if (err instanceof uristream.PartialError && err.processed > 0 &&
-                self.index.getSegment(seq))
-                return tryfetch(start + err.processed);
-          }
-
-          self.readState.currentSegment = null;
-          if (seq === self.readState.currentSeq)
-            self.readState.currentSeq++;
-
-          checkcurrent();
-        });
-      }
-      tryfetch(0);
-    } else if (self.index.ended)
-      self.end();
-    else if (!self.index.type && (self.index.lastSeqNo() < self.readState.currentSeq-1)) {
-      // handle live stream restart
-      self.readState.currentSeq = self.index.startSeqNo(true);
-      checkcurrent();
-    }
-  }
-
-  function fetchfrom(seqNo, segment, start, cb) {
-    var segmentUrl = url.resolve(self.baseUrl, segment.uri)
-    var probe = !!self.noData;
-
-    debug('fetching segment', segmentUrl);
-    var stream = uristream(segmentUrl, { probe:probe, start:start, highWaterMark:100*1000*1000 });
-    stream.on('meta', function(meta) {
-      debug('got segment info', meta);
-      if (meta.mime !== 'video/mp2t'/* && 
-          meta.mime !== 'audio/aac' && meta.mime !== 'audio/x-aac' &&
-          meta.mime !== 'audio/ac3'*/) {
-        if (stream.abort) stream.abort();
-        return stream.emit(new Error('Unsupported segment MIME type: '+meta.mime));
-      }
-
-      // abort() indicates a temporal stream. Ie. ensure it is completed in a timely fashion
-      if (self.index.isLive() && typeof stream.abort == 'function') {
-        var duration = segment.duration || self.index.target_duration || 10;
-        duration = Math.min(duration, self.index.target_duration || 10);
-        self.readState.timer = setTimeout(function() {
-          if (self.readState.stream) {
-            debug('timed out waiting for data');
-            self.readState.stream.abort();
-          }
-          // TODO: ensure Done() is always called
-          self.readState.timer = null;
-        }, 1.5*duration*1000);
-      }
-
-      if (start === 0)
-        self.emit('segment', seqNo, segment.duration, meta);
-    });
-
-    stream.pipe(self);
-    oncemore(stream).once('end', 'error', function(err) {
-      clearTimeout(self.readState.timer);
-
-      stream.unpipe(self);
-      self.readState.stream = null;
-
-      if (err) debug('stream error', err);
-      else debug('finished with input stream', stream.meta.url);
-
-      cb(err);
-    });
-
-    self.readState.stream = stream;
-  }
-
-  // allow piping content to self
-  this.write = this.push.bind(this);
-  this.end = function() {};
-
-  Readable.call(this, options);
+  Readable.call(this, extend(options, {objectMode:true}));
 }
 util.inherits(HlsStreamReader, Readable);
 
-HlsStreamReader.prototype._read = function(n, cb) {
-  this.emit('drain');
+HlsStreamReader.prototype._read = function(n) {
+  this.readState.active = true;
+  checknext(this);
 };
 
 function hlsreader(url, options) {

+ 2 - 2
lib/uristream.js

@@ -43,7 +43,7 @@ function setupHttp(uri, options, dst) {
 
   // TODO: handle case in header names
   var headers = extend(defaults, options.headers);
-  var timeout = options.timeout || 60*1000;
+  var timeout = options.timeout || 10*1000;
   var probe = !!options.probe;
   var offset = ~~options.start;
 
@@ -144,7 +144,7 @@ function setupHttp(uri, options, dst) {
           if (accum > size)
             req.abort();
         });
-      
+
         oncemore(res).once('end', 'error', function(err) {
           if (!err && accum !== size)
             failOrRetry(new Error('Stream length did not match header'), accum && accum < size);