tssmooth.js 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. var util = require('util'),
  2. assert = require('assert');
  3. var async = require('async'),
  4. Transform = require('readable-stream/transform');
  5. // In Transport Streams the intended rate is determined by the values of the PCR fields and the number of Transport Stream bytes between them. (ISO-13818-1 D.0.9)
  6. module.exports = tssmooth;
  7. exports.TsSmooth = TsSmooth;
  8. function parsePCR(buffer, index) {
  9. var head = buffer.readUInt32BE(index, true);
  10. // var b = buffer.readUInt8(3, true);
  11. var pid = (head >> 8) & 0x1fff;
  12. if (((head >> 5) & 1) !== 1) return -1;
  13. var s = buffer.readUInt8(index+4, true);
  14. if (s < 7) return -1;
  15. var f = buffer.readUInt8(index+5, true);
  16. if (((f >> 4) & 1) !== 1) return -1;
  17. var base = buffer.readUInt32BE(index+6, true) * 2;
  18. var ext = buffer.readUInt32BE(index+10, true);
  19. base += (ext >> 31);
  20. ext = ext & 0x1ff;
  21. return base / 0.09 + ext / 27; // return usecs
  22. }
  23. function TsSmooth(options) {
  24. var self = this;
  25. // stream.Transform.call(this, {bufferSize:188*7, highWaterMark:64*1024});
  26. this.options = options || {};
  27. // the buffer is only used for partial TS packets ()< 188 bytes)
  28. this.buffer = new Buffer(0);
  29. this.pcr = -1;
  30. this.last = null;
  31. this.bitrate = 10E06;
  32. this.pcrtime = -1;
  33. this.pcrdelta = function(pcr, pcr_old) {
  34. var pcr_delta = pcr - pcr_old;
  35. if (pcr_delta < 0) pcr_delta += (0x200000000 * 300) / 27;
  36. return pcr_delta;
  37. }
  38. this.pcr2time = function(pcr) {
  39. if (self.pcr === -1) {
  40. self.pcr = pcr;
  41. self.last = utime();
  42. }
  43. var pcr_delta = self.pcrdelta(pcr, self.pcr);
  44. var ret = self.last + pcr_delta;
  45. if (pcr_delta > 3600E6) {
  46. // update pcr reference every hour to handle wrap-around
  47. self.pcr = pcr;
  48. self.last = ret;
  49. }
  50. return ret;
  51. }
  52. this.output_time = function(newPCR) {
  53. if (newPCR === -1) return -1;
  54. var pcrtime = self.pcr2time(newPCR);
  55. if (self.pcrtime === -1) {
  56. self.pcrtime = pcrtime;
  57. return -1;
  58. }
  59. var delta = pcrtime - self.pcrtime;
  60. if (delta > 100E3 || delta < -500E3) {
  61. console.error('PCR_error: '+(delta/1E6).toFixed(2)+'s missing');
  62. var now = utime();
  63. var error = now - pcrtime;
  64. if (Math.abs(error) > 2*1E6) {
  65. console.error('PCR sync reset');
  66. self.pcr = -1;
  67. pcrtime = self.pcr2time(newPCR);
  68. }
  69. }
  70. self.pcrtime = pcrtime;
  71. return pcrtime;
  72. }
  73. stream.Transform.call(this);
  74. return this;
  75. }
  76. util.inherits(TsSmooth, stream.Transform);
  77. function utime() {
  78. var t = process.hrtime(); // based on CLOCK_MONOTONIC, and thus accommodates local drift (but apparently not suspend)
  79. return t[0] * 1E6 + t[1] / 1E3;
  80. }
  81. // smoothly outputs given buffer before endTime
  82. function outputBefore(buffer, endTime, outputFn, cb) {
  83. var packetSize = 14*188;
  84. var index = 0;
  85. function outputPacket() {
  86. var now = utime();
  87. var packetTime = (endTime - now) * (packetSize / (buffer.length - index));
  88. outputFn(buffer.slice(index, Math.min(buffer.length, index+packetSize)));
  89. index += packetSize;
  90. if (index < buffer.length)
  91. return setTimeout(outputPacket, Math.max(0.95*packetTime/1000, 0));
  92. cb();
  93. }
  94. process.nextTick(outputPacket);
  95. }
  96. TsSmooth.prototype._transform = function(chunk, output, cb) {
  97. var self = this;
  98. var index = Math.floor(this.buffer.length/188)*188;
  99. this.buffer = Buffer.concat([this.buffer, chunk]);
  100. var buf = self.buffer;
  101. var end = buf.length-187;
  102. var startIndex = 0;
  103. function processNext() {
  104. while (index < end) {
  105. var pcr = parsePCR(buf, index);
  106. var outtime = self.output_time(pcr);
  107. if (outtime !== -1 && index !== startIndex) {
  108. var slice = buf.slice(startIndex, index);
  109. startIndex = index;
  110. return outputBefore(slice, outtime, output, processNext);
  111. }
  112. index += 188;
  113. }
  114. if (startIndex !== 0) self.buffer = buf.slice(startIndex);
  115. cb();
  116. }
  117. processNext();
  118. };
  119. TsSmooth.prototype._flush = function(output, cb) {
  120. if (this.buffer.length) output(this.buffer);
  121. cb();
  122. };
  123. function tssmooth(options) {
  124. return new TsSmooth(options);
  125. }