tslimit.js 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. var util = require('util'),
  2. assert = require('assert');
  3. var async = require('async'),
  4. Transform = require('readable-stream/transform');
  5. // TODO: ratelimit the individual bytes
  6. module.exports = tslimit;
  7. exports.TsLimit = TsLimit;
  8. function parsePCR(packet) {
  9. var head = packet.readUInt32BE(0, true);
  10. // var b = packet.readUInt8(3, true);
  11. var pid = (head >> 8) & 0x1fff;
  12. if (((head >> 5) & 1) !== 1) return -1;
  13. var s = packet.readUInt8(4, true);
  14. if (s < 7) return -1;
  15. var f = packet.readUInt8(5, true);
  16. if (((f >> 4) & 1) !== 1) return -1;
  17. var base = packet.readUInt32BE(6, true) * 2;
  18. var ext = packet.readUInt32BE(10, true);
  19. base += (ext >> 31);
  20. ext = ext & 0x1ff;
  21. return base / 0.09 + ext / 27; // return usecs
  22. }
  23. function TsLimit() {
  24. var self = this;
  25. Transform.call(this);
  26. // the buffer is only used for partial TS packets ()< 188 bytes)
  27. this.buffer = new Buffer(0);
  28. this.pcr = -1;
  29. this.last = null;
  30. this.pcr2time = function(pcr) {
  31. if (self.pcr === -1) {
  32. self.pcr = pcr;
  33. self.last = utime();
  34. }
  35. var pcr_delta = pcr - self.pcr;
  36. if (pcr_delta < 0) pcr_delta += (0x200000000 * 300) / 27;
  37. return self.last + pcr_delta;
  38. }
  39. return this;
  40. }
  41. util.inherits(TsLimit, Transform);
  42. function utime() {
  43. var t = process.hrtime(); // based on CLOCK_MONOTONIC, and thus accommodates local drift (but apparently not suspend)
  44. return t[0] * 1E6 + t[1] / 1E3;
  45. }
  46. TsLimit.prototype._transform = function(chunk, output, cb) {
  47. var self = this;
  48. this.buffer = Buffer.concat([this.buffer, chunk]);
  49. function process() {
  50. var buf = self.buffer;
  51. var end = buf.length-188-1;
  52. var now = utime();
  53. for (var i=0; i<end; i+=188) {
  54. var pcr = parsePCR(buf.slice(i, i+188));
  55. if (pcr !== -1) {
  56. var pcrtime = self.pcr2time(pcr);
  57. if (pcrtime > now)
  58. break;
  59. }
  60. }
  61. // TODO: limit output speed
  62. if (i) output(buf.slice(0, i));
  63. self.buffer = buf.slice(i);
  64. if (i < end) {
  65. // TODO: calculate timeout?
  66. return setTimeout(process, 5);
  67. }
  68. cb();
  69. }
  70. process();
  71. };
  72. TsLimit.prototype._flush = function(output, cb) {
  73. if (this.buffer.length) output(this.buffer);
  74. cb();
  75. };
  76. function tslimit() {
  77. return new TsLimit();
  78. }