Przeglądaj źródła

rewrote rate limiter to time based smoothing buffer

Gil Pedersen 13 lat temu
rodzic
commit
3b1b460f7d
2 zmienionych plików z 88 dodań i 57 usunięć
  1. 2 2
      bin/hlsdump
  2. 86 55
      lib/tslimit.js

+ 2 - 2
bin/hlsdump

@@ -28,7 +28,7 @@ var util = require('util'),
     fs = require('fs');
 
 var reader = require('../lib/reader'),
-    tslimit = require('../lib/tslimit'),
+    tssmooth = require('../lib/tslimit'),
     tsblast = require('../lib/tsblast');
 
 var src = process.argv[2];
@@ -53,7 +53,7 @@ r.on('end', function() {
 
 var stream = r;
 if (hlsdump.sync)
-  stream = stream.pipe(tslimit());
+  stream = stream.pipe(tssmooth());
 
 if (hlsdump.udp)
   stream.pipe(tsblast(hlsdump.udp));

+ 86 - 55
lib/tslimit.js

@@ -4,25 +4,25 @@ var util = require('util'),
 var async = require('async'),
     Transform = require('readable-stream/transform');
 
-// TODO: ratelimit the individual bytes
+// In Transport Streams the intended rate is determined by the values of the PCR fields and the number of Transport Stream bytes between them. (ISO-13818-1 D.0.9)
 
-module.exports = tslimit;
-exports.TsLimit = TsLimit;
+module.exports = tssmooth;
+exports.TsSmooth = TsSmooth;
 
-function parsePCR(packet) {
-  var head = packet.readUInt32BE(0, true);
-//  var b = packet.readUInt8(3, true);
+function parsePCR(buffer, index) {
+  var head = buffer.readUInt32BE(index, true);
+//  var b = buffer.readUInt8(3, true);
   var pid = (head >> 8) & 0x1fff;
   if (((head >> 5) & 1) !== 1) return -1;
 
-  var s = packet.readUInt8(4, true);
+  var s = buffer.readUInt8(index+4, true);
   if (s < 7) return -1;
 
-  var f = packet.readUInt8(5, true);
+  var f = buffer.readUInt8(index+5, true);
   if (((f >> 4) & 1) !== 1) return -1;
 
-  var base = packet.readUInt32BE(6, true) * 2;
-  var ext = packet.readUInt32BE(10, true);
+  var base = buffer.readUInt32BE(index+6, true) * 2;
+  var ext = buffer.readUInt32BE(index+10, true);
 
   base += (ext >> 31);
   ext = ext & 0x1ff;
@@ -30,9 +30,10 @@ function parsePCR(packet) {
   return base / 0.09 + ext / 27; // return usecs
 }
 
-function TsLimit() {
+function TsSmooth(options) {
   var self = this;
-  Transform.call(this);
+//  stream.Transform.call(this, {bufferSize:188*7, highWaterMark:64*1024});
+  this.options = options || {};
 
   // the buffer is only used for partial TS packets ()< 188 bytes)
   this.buffer = new Buffer(0);
@@ -41,7 +42,13 @@ function TsLimit() {
   this.last = null;
 
   this.bitrate = 10E06;
-  this.time = null;
+  this.pcrtime = -1;
+
+  this.pcrdelta = function(pcr, pcr_old) {
+    var pcr_delta = pcr - pcr_old;
+    if (pcr_delta < 0) pcr_delta += (0x200000000 * 300) / 27;
+    return pcr_delta;
+  }
 
   this.pcr2time = function(pcr) {
     if (self.pcr === -1) {
@@ -49,9 +56,7 @@ function TsLimit() {
       self.last = utime();
     }
 
-    var pcr_delta = pcr - self.pcr;
-    if (pcr_delta < 0) pcr_delta += (0x200000000 * 300) / 27;
-
+    var pcr_delta = self.pcrdelta(pcr, self.pcr);
     var ret = self.last + pcr_delta;
     if (pcr_delta > 3600E6) {
       // update pcr reference every hour to handle wrap-around
@@ -61,68 +66,94 @@ function TsLimit() {
     return ret;
   }
 
-  this.bytes2delta = function(bytes) {
-    return bytes*8*1E6 / self.bitrate; /* useconds */
+  this.output_time = function(newPCR) {
+    if (newPCR === -1) return -1;
+
+    var pcrtime = self.pcr2time(newPCR);
+    if (self.pcrtime === -1) {
+      self.pcrtime = pcrtime;
+      return -1;
+    }
+
+    var delta = pcrtime - self.pcrtime;
+    if (delta > 100E3 || delta < -500E3) {
+      console.error('PCR_error: '+(delta/1E6).toFixed(2)+'s missing');
+      var now = utime();
+      var error = now - pcrtime;
+      if (Math.abs(error) > 2*1E6) {
+        console.error('PCR sync reset');
+        self.pcr = -1;
+        pcrtime = self.pcr2time(newPCR);
+      }
+    }
+    self.pcrtime = pcrtime;
+    return pcrtime;
   }
 
+  stream.Transform.call(this);
+
   return this;
 }
-util.inherits(TsLimit, Transform);
+util.inherits(TsSmooth, stream.Transform);
 
 function utime() {
   var t = process.hrtime(); // based on CLOCK_MONOTONIC, and thus accommodates local drift (but apparently not suspend)
   return t[0] * 1E6 + t[1] / 1E3;
 }
 
-TsLimit.prototype._transform = function(chunk, output, cb) {
+// smoothly outputs given buffer before endTime
+function outputBefore(buffer, endTime, outputFn, cb) {
+  var packetSize = 14*188;
+  var index = 0;
+
+  function outputPacket() {
+    var now = utime();
+    var packetTime = (endTime - now) * (packetSize / (buffer.length - index));
+
+    outputFn(buffer.slice(index, Math.min(buffer.length, index+packetSize)));
+    index += packetSize;
+
+    if (index < buffer.length)
+      return setTimeout(outputPacket, Math.max(0.95*packetTime/1000, 0));
+    cb();
+  }
+  process.nextTick(outputPacket);
+}
+
+TsSmooth.prototype._transform = function(chunk, output, cb) {
   var self = this;
+
+  var index = Math.floor(this.buffer.length/188)*188;
   this.buffer = Buffer.concat([this.buffer, chunk]);
 
-  function process() {
-    var buf = self.buffer;
-    var end = buf.length-188-1;
-    var now = utime();
-    if (!self.time) self.time = now;
-
-    var packet_time = self.bytes2delta(188);
-    for (var i=0; i<end; i+=188) {
-      self.time += packet_time;
-      var pcr = parsePCR(buf.slice(i, i+188));
-      if (pcr !== -1) {
-        var pcrtime = self.pcr2time(pcr);
-        var delta = pcrtime - self.time;
-        if (delta > 100E3 || delta < -500E3) {
-          console.error('PCR_error: '+(delta/1E6).toFixed(2)+'s missing');
-          var elapsed = now - self.time;
-          if (now < self.time || now > pcrtime) {
-            console.error('PCR sync reset');
-            self.pcr = -1;
-            pcrtime = self.pcr2time(pcr);
-          }
-        }
-        self.time = pcrtime;
+  var buf = self.buffer;
+  var end = buf.length-187;
+
+  var startIndex = 0;
+  function processNext() {
+    while (index < end) {
+      var pcr = parsePCR(buf, index);
+      var outtime = self.output_time(pcr);
+      if (outtime !== -1 && index !== startIndex) {
+        var slice = buf.slice(startIndex, index);
+        startIndex = index;
+        return outputBefore(slice, outtime, output, processNext);
       }
-      if (self.time > now)
-        break;
+      index += 188;
     }
 
-    if (i) output(buf.slice(0, i));
-    self.buffer = buf.slice(i);
-
-    if (i < end) {
-      return setTimeout(process, Math.min(50000, (self.time - now) / 1000));
-    }
+    if (startIndex !== 0) self.buffer = buf.slice(startIndex);
     cb();
   }
 
-  process();
+  processNext();
 };
 
-TsLimit.prototype._flush = function(output, cb) {
+TsSmooth.prototype._flush = function(output, cb) {
   if (this.buffer.length) output(this.buffer);
   cb();
 };
 
-function tslimit() {
-  return new TsLimit();
+function tssmooth(options) {
+  return new TsSmooth(options);
 }