tssmooth.js 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. "use strict";
  2. var util = require('util'),
  3. assert = require('assert'),
  4. debug = require('debug')('hls:tssmooth');
  5. var Transform = require('readable-stream/transform');
  6. // In Transport Streams the intended rate is determined by the values of the PCR fields and the number of Transport Stream bytes between them. (ISO-13818-1 D.0.9)
  7. function RateError(msg) {
  8. Error.call(this);
  9. this.message = msg;
  10. }
  11. util.inherits(RateError, Error);
  12. RateError.prototype.name = 'Rate Error';
  13. function parsePCR(buffer, index, pcr_pid) {
  14. var head = buffer.readUInt32BE(index, true);
  15. var pid = (head >> 8) & 0x1fff;
  16. if (((head >> 5) & 1) !== 1) return -1;
  17. if (pcr_pid && pcr_pid != pid) return -1;
  18. var s = buffer.readUInt8(index + 4, true);
  19. if (s < 7) return -1;
  20. var f = buffer.readUInt8(index + 5, true);
  21. if (((f >> 4) & 1) !== 1) return -1;
  22. var base = buffer.readUInt32BE(index + 6, true) * 2;
  23. var ext = buffer.readUInt32BE(index + 10, true);
  24. base += (ext >> 31);
  25. ext = ext & 0x1ff;
  26. return base / 0.09 + ext / 27; // return usecs
  27. }
  28. function utime() {
  29. var t = process.hrtime(); // based on CLOCK_MONOTONIC, and thus accommodates local drift (but apparently not suspend)
  30. // console.error(t);
  31. return t[0] * 1E6 + t[1] / 1E3;
  32. }
  33. function wait(waitMs, fn) {
  34. if (waitMs > 0)
  35. setTimeout(fn, Math.round(waitMs));
  36. else
  37. fn();
  38. }
  39. function TsSmooth(options) {
  40. var self = this;
  41. options = options || {};
  42. this.packetSize = options.packetSize || 7 * 188; // size of output packets
  43. this.buffer = new Buffer(0);
  44. this.pcr = -1;
  45. this.last = null;
  46. this.errorLimit = 80000; /* 80 ms */
  47. this.bitrate = 10E06;
  48. this.pcrTime = -1;
  49. this.pcrDelta = function(pcr, lastPcr) {
  50. var pcrDelta = pcr - lastPcr;
  51. if (pcrDelta < 0) pcrDelta += (0x200000000 * 300) / 27;
  52. return pcrDelta;
  53. };
  54. this.pcr2time = function(pcr) {
  55. if (self.pcr === -1) {
  56. self.pcr = pcr;
  57. self.last = utime();
  58. }
  59. var pcrDelta = self.pcrDelta(pcr, self.pcr);
  60. var ret = self.last + pcrDelta;
  61. if (pcrDelta > 3600E6) {
  62. // update pcr reference every hour to handle wrap-around
  63. self.pcr = pcr;
  64. self.last = ret;
  65. }
  66. return ret;
  67. };
  68. this.outputTime = function(newPCR) {
  69. // when this is called normally, now ~= self.pcrtime
  70. if (newPCR === -1) return undefined;
  71. var pcrtime = self.pcr2time(newPCR);
  72. if (self.pcrTime === -1) {
  73. self.pcrTime = pcrtime;
  74. return undefined;
  75. }
  76. var delta = pcrtime - self.pcrTime;
  77. self.pcrTime = pcrtime;
  78. return { time:pcrtime, delta: delta };
  79. };
  80. Transform.call(this, {highWaterMark:this.packetSize});
  81. }
  82. util.inherits(TsSmooth, Transform);
  83. TsSmooth.prototype.reset = function(currentPCR) {
  84. this.pcr = -1;
  85. if (typeof currentPCR !== 'undefined')
  86. this.pcrTime = this.pcr2time(currentPCR);
  87. };
  88. // smoothly outputs given buffer before endTime
  89. function outputBefore(stream, buffer, endTime, packetSize, cb) {
  90. var index = 0;
  91. function outputPacket() {
  92. var now = utime();
  93. var packetTime = (endTime - now) * (packetSize / (buffer.length - index));
  94. stream.push(buffer.slice(index, Math.min(buffer.length, index + packetSize)));
  95. index += packetSize;
  96. var done = (index < buffer.length) ? outputPacket : cb;
  97. var delay = Math.round(Math.min(Math.max((0.8 * packetTime / 1000) - 1, 1), 50));
  98. if (delay === 1)
  99. process.nextTick(done);
  100. else
  101. setTimeout(done, delay);
  102. }
  103. outputPacket();
  104. }
  105. TsSmooth.prototype._transform = function(chunk, encoding, cb) {
  106. var self = this;
  107. var index = Math.floor(this.buffer.length / 188) * 188;
  108. this.buffer = Buffer.concat([this.buffer, chunk]);
  109. var buf = self.buffer;
  110. var end = buf.length - 188;
  111. var startIndex = 0;
  112. function processNext() {
  113. while (index < end) {
  114. // check sync
  115. if (buf.readUInt8(index + 188, true) !== 0x47) {
  116. // find next potential sync point
  117. debug('ts sync lost');
  118. var sync = index + 1;
  119. for (; sync < end; sync++) {
  120. if (buf.readUInt8(sync, true) === 0x47)
  121. break;
  122. }
  123. // remove bad data
  124. debug('slice', sync, end);
  125. buf = Buffer.concat([buf.slice(0, index), buf.slice(sync)]);
  126. end -= sync - index;
  127. continue;
  128. }
  129. var pcr = parsePCR(buf, index);
  130. var out = self.outputTime(pcr);
  131. if (out !== undefined && index !== startIndex) {
  132. if (out.delta > 100E3 || out.delta < 0)
  133. self.emit('warning', new Error('PCR_error: ' + (out.delta / 1E6).toFixed(2) + 's missing'));
  134. var now = utime();
  135. var error = (out.time - now) - out.delta;
  136. var waittime = (error > self.errorLimit) ? (error / 1000 - 5) : 0;
  137. if (error < -2 * 1E6 || error > 300 * 1E6) {
  138. // negative == buffer too late
  139. // positive == buffer too early
  140. self.emit('warning', new RateError('PCR sync offset ' + (error / 1E6).toFixed(2) + 's error'));
  141. self.reset(pcr);
  142. waittime = 0;
  143. } else if (error < -self.errorLimit) {
  144. // ignore the data since it is too late
  145. return setImmediate(processNext);
  146. }
  147. var slice = buf.slice(startIndex, index);
  148. startIndex = index;
  149. /* eslint-disable no-loop-func */
  150. return wait(waittime, function output() {
  151. return outputBefore(self, slice, out.time, self.packetSize, processNext);
  152. });
  153. /* eslint-enable */
  154. }
  155. index += 188;
  156. }
  157. if (startIndex !== 0) self.buffer = buf.slice(startIndex);
  158. cb();
  159. }
  160. processNext();
  161. };
  162. TsSmooth.prototype._flush = function(cb) {
  163. if (this.buffer.length) this.push(this.buffer); // TODO: use outputBefore() based on current stream speed?
  164. cb();
  165. };
  166. var tssmooth = module.exports = function tssmooth(options) {
  167. return new TsSmooth(options);
  168. };
  169. tssmooth.TsSmooth = TsSmooth;