tslimit.js 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. var util = require('util'),
  2. assert = require('assert');
  3. var async = require('async'),
  4. Transform = require('readable-stream/transform');
  5. // TODO: ratelimit the individual bytes
  6. module.exports = tslimit;
  7. exports.TsLimit = TsLimit;
  8. function parsePCR(packet) {
  9. var head = packet.readUInt32BE(0, true);
  10. // var b = packet.readUInt8(3, true);
  11. var pid = (head >> 8) & 0x1fff;
  12. if (((head >> 5) & 1) !== 1) return -1;
  13. var s = packet.readUInt8(4, true);
  14. if (s < 7) return -1;
  15. var f = packet.readUInt8(5, true);
  16. if (((f >> 4) & 1) !== 1) return -1;
  17. var base = packet.readUInt32BE(6, true) * 2;
  18. var ext = packet.readUInt32BE(10, true);
  19. base += (ext >> 31);
  20. ext = ext & 0x1ff;
  21. return base / 0.09 + ext / 27; // return usecs
  22. }
  23. function TsLimit() {
  24. var self = this;
  25. Transform.call(this);
  26. // the buffer is only used for partial TS packets ()< 188 bytes)
  27. this.buffer = new Buffer(0);
  28. this.pcr = -1;
  29. this.last = null;
  30. this.bitrate = 10E06;
  31. this.time = null;
  32. this.pcr2time = function(pcr) {
  33. if (self.pcr === -1) {
  34. self.pcr = pcr;
  35. self.last = utime();
  36. }
  37. var pcr_delta = pcr - self.pcr;
  38. if (pcr_delta < 0) pcr_delta += (0x200000000 * 300) / 27;
  39. var ret = self.last + pcr_delta;
  40. if (pcr_delta > 3600E6) {
  41. // update pcr reference every hour to handle wrap-around
  42. self.pcr = pcr;
  43. self.last = ret;
  44. }
  45. return ret;
  46. }
  47. this.bytes2delta = function(bytes) {
  48. return bytes*8*1E6 / self.bitrate; /* useconds */
  49. }
  50. return this;
  51. }
  52. util.inherits(TsLimit, Transform);
  53. function utime() {
  54. var t = process.hrtime(); // based on CLOCK_MONOTONIC, and thus accommodates local drift (but apparently not suspend)
  55. return t[0] * 1E6 + t[1] / 1E3;
  56. }
  57. TsLimit.prototype._transform = function(chunk, output, cb) {
  58. var self = this;
  59. this.buffer = Buffer.concat([this.buffer, chunk]);
  60. function process() {
  61. var buf = self.buffer;
  62. var end = buf.length-188-1;
  63. var now = utime();
  64. if (!self.time) self.time = now;
  65. var packet_time = self.bytes2delta(188);
  66. for (var i=0; i<end; i+=188) {
  67. self.time += packet_time;
  68. var pcr = parsePCR(buf.slice(i, i+188));
  69. if (pcr !== -1) {
  70. var pcrtime = self.pcr2time(pcr);
  71. var delta = pcrtime - self.time;
  72. if (delta > 100E3 || delta < -500E3) {
  73. console.error('PCR_error: '+(delta/1E6).toFixed(2)+'s missing');
  74. var elapsed = now - self.time;
  75. if (now < self.time || now > pcrtime) {
  76. console.error('PCR sync reset');
  77. self.pcr = -1;
  78. pcrtime = self.pcr2time(pcr);
  79. }
  80. }
  81. self.time = pcrtime;
  82. }
  83. if (self.time > now)
  84. break;
  85. }
  86. if (i) output(buf.slice(0, i));
  87. self.buffer = buf.slice(i);
  88. if (i < end) {
  89. return setTimeout(process, Math.min(50000, (self.time - now) / 1000));
  90. }
  91. cb();
  92. }
  93. process();
  94. };
  95. TsLimit.prototype._flush = function(output, cb) {
  96. if (this.buffer.length) output(this.buffer);
  97. cb();
  98. };
  99. function tslimit() {
  100. return new TsLimit();
  101. }