tslimit.js 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. var util = require('util'),
  2. assert = require('assert');
  3. var async = require('async'),
  4. Transform = require('readable-stream/transform');
  5. // TODO: ratelimit the individual bytes
  6. module.exports = tslimit;
  7. exports.TsLimit = TsLimit;
  8. function parsePCR(packet) {
  9. var head = packet.readUInt32BE(0, true);
  10. // var b = packet.readUInt8(3, true);
  11. var pid = (head >> 8) & 0x1fff;
  12. if (((head >> 5) & 1) !== 1) return -1;
  13. var s = packet.readUInt8(4, true);
  14. if (s < 7) return -1;
  15. var f = packet.readUInt8(5, true);
  16. if (((f >> 4) & 1) !== 1) return -1;
  17. var base = packet.readUInt32BE(6, true) * 2;
  18. var ext = packet.readUInt32BE(10, true);
  19. base += (ext >> 31);
  20. ext = ext & 0x1ff;
  21. return base / 0.09 + ext / 27; // return usecs
  22. }
  23. function TsLimit() {
  24. var self = this;
  25. Transform.call(this);
  26. // the buffer is only used for partial TS packets ()< 188 bytes)
  27. this.buffer = new Buffer(0);
  28. this.pcr = -1;
  29. this.last = null;
  30. this.bitrate = 10E06;
  31. this.time = null;
  32. this.pcr2time = function(pcr) {
  33. if (self.pcr === -1) {
  34. self.pcr = pcr;
  35. self.last = utime();
  36. }
  37. var pcr_delta = pcr - self.pcr;
  38. if (pcr_delta < 0) pcr_delta += (0x200000000 * 300) / 27;
  39. return self.last + pcr_delta;
  40. }
  41. this.bytes2delta = function(bytes) {
  42. return bytes*8*1E6 / self.bitrate; /* useconds */
  43. }
  44. return this;
  45. }
  46. util.inherits(TsLimit, Transform);
  47. function utime() {
  48. var t = process.hrtime(); // based on CLOCK_MONOTONIC, and thus accommodates local drift (but apparently not suspend)
  49. return t[0] * 1E6 + t[1] / 1E3;
  50. }
  51. TsLimit.prototype._transform = function(chunk, output, cb) {
  52. var self = this;
  53. this.buffer = Buffer.concat([this.buffer, chunk]);
  54. function process() {
  55. var buf = self.buffer;
  56. var end = buf.length-188-1;
  57. var now = utime();
  58. if (!self.time) self.time = now;
  59. var packet_time = self.bytes2delta(188);
  60. for (var i=0; i<end; i+=188) {
  61. self.time += packet_time;
  62. var pcr = parsePCR(buf.slice(i, i+188));
  63. if (pcr !== -1)
  64. self.time = self.pcr2time(pcr);
  65. if (self.time > now)
  66. break;
  67. }
  68. // TODO: limit output speed
  69. if (i) output(buf.slice(0, i));
  70. self.buffer = buf.slice(i);
  71. if (i < end) {
  72. return setTimeout(process, (self.time - now) / 1000);
  73. }
  74. cb();
  75. }
  76. process();
  77. };
  78. TsLimit.prototype._flush = function(output, cb) {
  79. if (this.buffer.length) output(this.buffer);
  80. cb();
  81. };
  82. function tslimit() {
  83. return new TsLimit();
  84. }