tssmooth.js 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. var util = require('util'),
  2. assert = require('assert'),
  3. debug = require('debug')('hls:tssmooth');
  4. var async = require('async');
  5. try {
  6. var Transform = require('stream').Transform;
  7. assert(Transform);
  8. } catch (e) {
  9. var Transform = require('readable-stream/transform');
  10. }
  11. // In Transport Streams the intended rate is determined by the values of the PCR fields and the number of Transport Stream bytes between them. (ISO-13818-1 D.0.9)
  12. module.exports = tssmooth;
  13. exports.TsSmooth = TsSmooth;
  14. function parsePCR(buffer, index, pcr_pid) {
  15. var head = buffer.readUInt32BE(index, true);
  16. var pid = (head >> 8) & 0x1fff;
  17. if (((head >> 5) & 1) !== 1) return -1;
  18. if (pcr_pid && pcr_pid != pid) return -1;
  19. var s = buffer.readUInt8(index+4, true);
  20. if (s < 7) return -1;
  21. var f = buffer.readUInt8(index+5, true);
  22. if (((f >> 4) & 1) !== 1) return -1;
  23. var base = buffer.readUInt32BE(index+6, true) * 2;
  24. var ext = buffer.readUInt32BE(index+10, true);
  25. base += (ext >> 31);
  26. ext = ext & 0x1ff;
  27. return base / 0.09 + ext / 27; // return usecs
  28. }
  29. function TsSmooth(options) {
  30. var self = this;
  31. options = options || {};
  32. this.packetSize = options.packetSize || 7*188; // size of output packets
  33. this.buffer = new Buffer(0);
  34. this.pcr = -1;
  35. this.last = null;
  36. this.bitrate = 10E06;
  37. this.pcrtime = -1;
  38. this.pcrdelta = function(pcr, pcr_old) {
  39. var pcr_delta = pcr - pcr_old;
  40. if (pcr_delta < 0) pcr_delta += (0x200000000 * 300) / 27;
  41. return pcr_delta;
  42. }
  43. this.pcr2time = function(pcr) {
  44. if (self.pcr === -1) {
  45. self.pcr = pcr;
  46. self.last = utime();
  47. }
  48. var pcr_delta = self.pcrdelta(pcr, self.pcr);
  49. var ret = self.last + pcr_delta;
  50. if (pcr_delta > 3600E6) {
  51. // update pcr reference every hour to handle wrap-around
  52. self.pcr = pcr;
  53. self.last = ret;
  54. }
  55. return ret;
  56. }
  57. this.output_time = function(newPCR) {
  58. if (newPCR === -1) return -1;
  59. var pcrtime = self.pcr2time(newPCR);
  60. if (self.pcrtime === -1) {
  61. self.pcrtime = pcrtime;
  62. return -1;
  63. }
  64. var delta = pcrtime - self.pcrtime;
  65. if (delta > 100E3 || delta < 0) {
  66. console.error('PCR_error: '+(delta/1E6).toFixed(2)+'s missing');
  67. /* var now = utime();
  68. var error = now - pcrtime;
  69. if (Math.abs(error) > 2*1E6) {
  70. console.error('PCR sync reset');
  71. self.pcr = -1;
  72. pcrtime = self.pcr2time(newPCR);
  73. }*/
  74. }
  75. self.pcrtime = pcrtime;
  76. return pcrtime;
  77. }
  78. Transform.call(this);
  79. // Transform.call(this, {bufferSize:5*this.packetSize, highWaterMark:5*this.packetSize});
  80. // Transform.call(this, {bufferSize:188*7, highWaterMark:64*1024});
  81. }
  82. util.inherits(TsSmooth, Transform);
  83. function utime() {
  84. var t = process.hrtime(); // based on CLOCK_MONOTONIC, and thus accommodates local drift (but apparently not suspend)
  85. // console.error(t);
  86. return t[0] * 1E6 + t[1] / 1E3;
  87. }
  88. // smoothly outputs given buffer before endTime
  89. function outputBefore(buffer, endTime, packetSize, outputFn, cb) {
  90. var index = 0;
  91. function outputPacket() {
  92. var now = utime();
  93. var packetTime = (endTime - now) * (packetSize / (buffer.length - index));
  94. outputFn(buffer.slice(index, Math.min(buffer.length, index+packetSize)));
  95. index += packetSize;
  96. if (index < buffer.length) {
  97. //debug('packetTime', (packetTime/1000).toFixed(2));
  98. return setTimeout(outputPacket, Math.max(0.95*packetTime/1000, 0));
  99. }
  100. cb();
  101. }
  102. process.nextTick(outputPacket);
  103. }
  104. TsSmooth.prototype._transform = function(chunk, output, cb) {
  105. var self = this;
  106. var index = Math.floor(this.buffer.length/188)*188;
  107. this.buffer = Buffer.concat([this.buffer, chunk]);
  108. var buf = self.buffer;
  109. var end = buf.length-188;
  110. var startIndex = 0;
  111. function processNext() {
  112. while (index < end) {
  113. // check sync
  114. if (buf.readUInt8(index+188, true) !== 0x47) {
  115. // find next potential sync point
  116. debug('ts sync lost');
  117. var sync = index+1;
  118. for (; sync < end; sync++) {
  119. if (buf.readUInt8(sync, true) === 0x47)
  120. break;
  121. }
  122. // remove bad data
  123. console.error('slice', sync, end);
  124. buf = Buffer.concat([buf.slice(0, index), buf.slice(sync)]);
  125. end -= sync-index;
  126. continue;
  127. }
  128. var pcr = parsePCR(buf, index);
  129. var outtime = self.output_time(pcr);
  130. if (outtime !== -1 && index !== startIndex) {
  131. var slice = buf.slice(startIndex, index);
  132. startIndex = index;
  133. return outputBefore(slice, outtime, self.packetSize, output, processNext);
  134. }
  135. index += 188;
  136. }
  137. if (startIndex !== 0) self.buffer = buf.slice(startIndex);
  138. cb();
  139. }
  140. processNext();
  141. };
  142. TsSmooth.prototype._flush = function(output, cb) {
  143. if (this.buffer.length) output(this.buffer); // TODO: use outputBefore() based on current stream speed?
  144. cb();
  145. };
  146. function tssmooth(options) {
  147. return new TsSmooth(options);
  148. }