Parcourir la source

further refactoring, fixing misc issues

Gil Pedersen il y a 11 ans
Parent
commit
2df62d59c1
3 fichiers modifiés avec 71 ajouts et 44 suppressions
  1. 59 30
      bin/hlsdump
  2. 11 13
      lib/hls-reader.js
  3. 1 1
      package.json

+ 59 - 30
bin/hlsdump

@@ -41,8 +41,6 @@ var oncemore = require('oncemore'),
     UdpBlast = require('udp-blast');
 var HlsReader = require('../lib/hls-reader');
 
-var stats = require('measured').createCollection();
-
 var src = hlsdump.args[0];
 if (!src) {
   hlsdump.help();
@@ -51,12 +49,44 @@ if (!src) {
 
 if (hlsdump.bufferSize) hlsdump.sync = true;
 
-var r = new HlsReader(new HlsSegmentReader(src, { highWaterMark: (hlsdump.concurrent || 1) - 1, fullStream: hlsdump.fullStream }), hlsdump);
+var segmentReader = new HlsSegmentReader(src, { highWaterMark: (hlsdump.concurrent || 1) - 1, fullStream: hlsdump.fullStream });
+var r = new HlsReader(segmentReader, hlsdump);
+
+segmentReader.once('index', function() {
+  // wait until first index is returned before attaching error listener.
+  // this will enable initials errors to throw
+  segmentReader.on('error', function(err) {
+    console.error('reader error', err.stack || err);
+  });
+});
+
+if (hlsdump.udp) {
+  var dst = (hlsdump.udp === true) ? null : hlsdump.udp;
+  r.pipe(new UdpBlast(dst, { packetSize: 7 * 188 }));
+}
+
+if (hlsdump.output) {
+  if (hlsdump.output === '-')
+    r.pipe(process.stdout);
+  else
+    r.pipe(fs.createWriteStream(hlsdump.output));
+}
+
+var startTime = process.hrtime();
+r.on('ready', function() {
+  var delay = process.hrtime(startTime);
+  console.error('"ready" after delay of ' + (delay[0] * 1e3 + delay[1] / 1e6).toFixed(2) + 'ms');
+});
+
+r.on('end', function() {
+  console.error('stream complete');
+})
 
-var totalDuration = 0, currentSegment = -1;
+var totalDuration = 0;
 r.on('segment', function(data) {
   var downloadSize = data.meta.size;
   var duration = data.segment.duration;
+  totalDuration += duration;
 
   // calculate size when missing
   if (downloadSize === -1) {
@@ -66,40 +96,35 @@ r.on('segment', function(data) {
     });
   }
 
-  var stopwatch = stats.timer('fetchTime').start();
-  oncemore(data.stream).once('close', 'end', 'error', function(err) {
-    stopwatch.end();
-
+  oncemore(data.stream).once('close', 'end', 'error', function(/*err*/) {
     console.error('segment done at ' + totalDuration.toFixed(0) + ' seconds, avg bitrate (kbps):', (downloadSize / (duration * 1024 / 8)).toFixed(1));
-    if (err) {
-      stats.meter('streamErrors').mark();
-    }
   });
 });
 
-if (hlsdump.udp) {
-  var dst = (hlsdump.udp === true) ? null : hlsdump.udp;
-  r.pipe(new UdpBlast(dst, { packetSize: 7 * 188 }));
-}
+if (hlsdump.infoPort) {
+  var stats = require('measured').createCollection();
+  var currentSegment = -1;
 
-if (hlsdump.output) {
-  if (hlsdump.output === '-')
-    r.pipe(process.stdout);
-  else
-    r.pipe(fs.createWriteStream(hlsdump.output));
-}
+  // setup stat tracking
+  stats.gauge('bufferBytes', function() { return r.buffer._readableState.length/* + buffer._writableState.length*/; });
+  stats.gauge('currentSegment', function() { return currentSegment; });
+  stats.gauge('index.first', function() { return segmentReader.index ? segmentReader.index.first_seq_no : -1; });
+  stats.gauge('index.last', function() { return segmentReader.index ? segmentReader.index.lastSeqNo() : -1; });
+  stats.gauge('totalDuration', function() { return totalDuration; });
 
-// setup stat tracking
-stats.gauge('bufferBytes', function() { return r.buffer._readableState.length/* + buffer._writableState.length*/; });
-stats.gauge('currentSegment', function() { return currentSegment; });
-stats.gauge('index.first', function() { return r.reader.index ? r.reader.index.first_seq_no : -1; });
-stats.gauge('index.last', function() { return r.reader.index ? r.reader.index.lastSeqNo() : -1; });
-stats.gauge('totalDuration', function() { return totalDuration; });
+  stats.meter('streamErrors');
 
-stats.meter('streamErrors');
+  r.on('segment', function(data) {
+    currentSegment = data.seq;
 
-if (hlsdump.infoPort) {
-  http.createServer(function (req, res) {
+    var stopwatch = stats.timer('fetchTime').start();
+    oncemore(data.stream).once('close', 'end', 'error', function(err) {
+      stopwatch.end();
+      if (err) stats.meter('streamErrors').mark();
+    });
+  });
+
+  var server = http.createServer(function (req, res) {
     if (req.method === 'GET') {
       var data = JSON.stringify(stats, null, ' ');
       res.writeHead(200, {
@@ -110,4 +135,8 @@ if (hlsdump.infoPort) {
     }
     res.end();
   }).listen(hlsdump.infoPort);
+
+  oncemore(r).once('end', 'error', function(/*err*/) {
+    server.close();
+  });
 }

+ 11 - 13
lib/hls-reader.js

@@ -36,6 +36,8 @@ function pump(src, dst, done) {
   };
 }
 
+// TODO: use pipe as interface to segment-reader?
+
 function HlsReader(segmentReader, options) {
   if (!(this instanceof HlsReader))
     return new HlsReader(segmentReader, options);
@@ -95,32 +97,26 @@ function HlsReader(segmentReader, options) {
     });
   });
 
-  this.reader.once('index', function() {
-    // wait until first index is returned before attaching error listener.
-    // this will enable initials errors to throw
-    self.reader.on('error', function(err) {
-      console.error('reader error', err.stack || err);
-    });
-  });
-
   this.reader.on('end', function() {
-    console.error('done');
     self.buffer.end();
   });
 
   // start output if needed
-  if (!this.sync || !(this.bufferSize > 0))
-    this.hook();
+  if (!this.sync) {
+    process.nextTick(function() {
+      self.hook();
+    });
+  }
 }
 Util.inherits(HlsReader, Readable);
 
+HlsReader.prototype._read = NOOP;
+
 // the hook is used to prebuffer
 HlsReader.prototype.hook = function hook() {
   var self = this;
-
   if (this.isHooked) return;
 
-  console.error('hooking output', this.sync);
   self.isHooked = true;
 
   var s = this.buffer;
@@ -141,6 +137,8 @@ HlsReader.prototype.hook = function hook() {
     }
     self.push(null);
   });
+
+  this.emit('ready');
 };
 
 HlsReader.prototype.decrypt = function (stream, keyAttrs, next) {

+ 1 - 1
package.json

@@ -36,7 +36,7 @@
     "mkdirp": "^0.5.0",
     "oncemore": "^1.0.0",
     "readable-stream": "~1.0.0",
-    "streamprocess": "0.0.1",
+    "streamprocess": "^1.0.0",
     "udp-blast": "^1.0.0",
     "uristream": "^1.1.0",
     "write-file-atomic": "^1.1.0"