tssmooth.js 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. "use strict";
  2. var util = require('util'),
  3. assert = require('assert'),
  4. debug = require('debug')('hls:tssmooth');
  5. try {
  6. var Transform = require('stream').Transform;
  7. assert(Transform);
  8. } catch (e) {
  9. var Transform = require('readable-stream/transform');
  10. }
  11. // In Transport Streams the intended rate is determined by the values of the PCR fields and the number of Transport Stream bytes between them. (ISO-13818-1 D.0.9)
  12. module.exports = tssmooth;
  13. exports.TsSmooth = TsSmooth;
  14. function RateError(msg) {
  15. Error.call(this);
  16. this.message = msg;
  17. }
  18. util.inherits(RateError, Error);
  19. RateError.prototype.name = 'Rate Error';
  20. function parsePCR(buffer, index, pcr_pid) {
  21. var head = buffer.readUInt32BE(index, true);
  22. var pid = (head >> 8) & 0x1fff;
  23. if (((head >> 5) & 1) !== 1) return -1;
  24. if (pcr_pid && pcr_pid != pid) return -1;
  25. var s = buffer.readUInt8(index+4, true);
  26. if (s < 7) return -1;
  27. var f = buffer.readUInt8(index+5, true);
  28. if (((f >> 4) & 1) !== 1) return -1;
  29. var base = buffer.readUInt32BE(index+6, true) * 2;
  30. var ext = buffer.readUInt32BE(index+10, true);
  31. base += (ext >> 31);
  32. ext = ext & 0x1ff;
  33. return base / 0.09 + ext / 27; // return usecs
  34. }
  35. function TsSmooth(options) {
  36. var self = this;
  37. options = options || {};
  38. this.packetSize = options.packetSize || 7*188; // size of output packets
  39. this.buffer = new Buffer(0);
  40. this.pcr = -1;
  41. this.last = null;
  42. this.error_limit = 80000; /* 80 ms */
  43. this.bitrate = 10E06;
  44. this.pcrtime = -1;
  45. this.pcrdelta = function(pcr, pcr_old) {
  46. var pcr_delta = pcr - pcr_old;
  47. if (pcr_delta < 0) pcr_delta += (0x200000000 * 300) / 27;
  48. return pcr_delta;
  49. }
  50. this.pcr2time = function(pcr) {
  51. if (self.pcr === -1) {
  52. self.pcr = pcr;
  53. self.last = utime();
  54. }
  55. var pcr_delta = self.pcrdelta(pcr, self.pcr);
  56. var ret = self.last + pcr_delta;
  57. if (pcr_delta > 3600E6) {
  58. // update pcr reference every hour to handle wrap-around
  59. self.pcr = pcr;
  60. self.last = ret;
  61. }
  62. return ret;
  63. }
  64. this.output_time = function(newPCR) {
  65. // when this is called normally, now ~= self.pcrtime
  66. if (newPCR === -1) return;
  67. var pcrtime = self.pcr2time(newPCR);
  68. if (self.pcrtime === -1) {
  69. self.pcrtime = pcrtime;
  70. return;
  71. }
  72. var delta = pcrtime - self.pcrtime;
  73. self.pcrtime = pcrtime;
  74. return { time:pcrtime, delta: delta };
  75. }
  76. Transform.call(this, {highWaterMark:this.packetSize});
  77. }
  78. util.inherits(TsSmooth, Transform);
  79. TsSmooth.prototype.reset = function(currentPCR) {
  80. this.pcr = -1;
  81. if (typeof currentPCR !== 'undefined')
  82. this.pcrtime = this.pcr2time(currentPCR);
  83. };
  84. function utime() {
  85. var t = process.hrtime(); // based on CLOCK_MONOTONIC, and thus accommodates local drift (but apparently not suspend)
  86. // console.error(t);
  87. return t[0] * 1E6 + t[1] / 1E3;
  88. }
  89. function wait(waitMs, fn) {
  90. if (waitMs > 0)
  91. setTimeout(fn, waitMs);
  92. else
  93. fn();
  94. }
  95. // smoothly outputs given buffer before endTime
  96. function outputBefore(stream, buffer, endTime, packetSize, cb) {
  97. var index = 0;
  98. function outputPacket() {
  99. var now = utime();
  100. var packetTime = (endTime - now) * (packetSize / (buffer.length - index));
  101. stream.push(buffer.slice(index, Math.min(buffer.length, index+packetSize)));
  102. index += packetSize;
  103. var done = (index < buffer.length) ? outputPacket: cb;
  104. var delay = Math.min(Math.max((0.8*packetTime/1000)-1, 1), 50);
  105. if (delay === 1)
  106. process.nextTick(done);
  107. else
  108. setTimeout(done, delay);
  109. }
  110. outputPacket();
  111. }
  112. TsSmooth.prototype._transform = function(chunk, encoding, cb) {
  113. var self = this;
  114. var index = Math.floor(this.buffer.length/188)*188;
  115. this.buffer = Buffer.concat([this.buffer, chunk]);
  116. var buf = self.buffer;
  117. var end = buf.length-188;
  118. var startIndex = 0;
  119. function processNext() {
  120. while (index < end) {
  121. // check sync
  122. if (buf.readUInt8(index+188, true) !== 0x47) {
  123. // find next potential sync point
  124. debug('ts sync lost');
  125. var sync = index+1;
  126. for (; sync < end; sync++) {
  127. if (buf.readUInt8(sync, true) === 0x47)
  128. break;
  129. }
  130. // remove bad data
  131. debug('slice', sync, end);
  132. buf = Buffer.concat([buf.slice(0, index), buf.slice(sync)]);
  133. end -= sync-index;
  134. continue;
  135. }
  136. var pcr = parsePCR(buf, index);
  137. var out = self.output_time(pcr);
  138. if (out !== undefined && index !== startIndex) {
  139. if (out.delta > 100E3 || out.delta < 0)
  140. self.emit('warning', new Error('PCR_error: '+(out.delta/1E6).toFixed(2)+'s missing'));
  141. var now = utime();
  142. var error = (out.time - now) - out.delta;
  143. var waittime = (error > self.error_limit) ? (error/1000 - 5) : 0;
  144. if (error < -2*1E6 || error > 300*1E6) {
  145. // negative == buffer too late
  146. // positive == buffer too early
  147. self.emit('warning', new RateError('PCR sync offset '+(error/1E6).toFixed(2)+'s error'));
  148. self.reset(pcr);
  149. waittime = 0;
  150. } else if (error < -self.error_limit) {
  151. // ignore the data since it is too late
  152. return setImmediate(processNext);
  153. }
  154. return wait(waittime, function output() {
  155. var slice = buf.slice(startIndex, index);
  156. startIndex = index;
  157. return outputBefore(self, slice, out.time, self.packetSize, processNext);
  158. });
  159. }
  160. index += 188;
  161. }
  162. if (startIndex !== 0) self.buffer = buf.slice(startIndex);
  163. cb();
  164. }
  165. processNext();
  166. };
  167. TsSmooth.prototype._flush = function(cb) {
  168. if (this.buffer.length) this.push(this.buffer); // TODO: use outputBefore() based on current stream speed?
  169. cb();
  170. };
  171. function tssmooth(options) {
  172. return new TsSmooth(options);
  173. }