| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232 |
- 'use strict';
- const util = require('util');
- const debug = require('debug')('hls:tssmooth');
- const Transform = require('readable-stream/transform');
- const internals = {};
- // In Transport Streams the intended rate is determined by the values of the PCR fields and the number of Transport Stream bytes between them. (ISO-13818-1 D.0.9)
- function RateError(msg) {
- Error.call(this);
- this.message = msg;
- }
- util.inherits(RateError, Error);
- RateError.prototype.name = 'Rate Error';
- internals.parsePCR = function(buffer, index, pcr_pid) {
- let head = buffer.readUInt32BE(index, true);
- let pid = (head >> 8) & 0x1fff;
- if (((head >> 5) & 1) !== 1) return -1;
- if (pcr_pid && pcr_pid != pid) return -1;
- let s = buffer.readUInt8(index + 4, true);
- if (s < 7) return -1;
- let f = buffer.readUInt8(index + 5, true);
- if (((f >> 4) & 1) !== 1) return -1;
- let base = buffer.readUInt32BE(index + 6, true) * 2;
- let ext = buffer.readUInt32BE(index + 10, true);
- base += (ext >> 31);
- ext = ext & 0x1ff;
- return base / 0.09 + ext / 27; // return usecs
- };
- internals.utime = function() {
- let t = process.hrtime(); // based on CLOCK_MONOTONIC, and thus accommodates local drift (but apparently not suspend)
- // console.error(t);
- return t[0] * 1E6 + t[1] / 1E3;
- };
- internals.wait = function(waitMs, fn) {
- if (waitMs > 0)
- setTimeout(fn, Math.round(waitMs));
- else
- fn();
- }
- function TsSmooth(options) {
- options = options || {};
- this.packetSize = options.packetSize || 7 * 188; // size of output packets
- this.buffer = new Buffer(0);
- this.pcr = -1;
- this.last = null;
- this.errorLimit = 80000; /* 80 ms */
- this.bitrate = 10E06;
- this.pcrTime = -1;
- this.pcrDelta = (pcr, lastPcr) => {
- let pcrDelta = pcr - lastPcr;
- if (pcrDelta < 0) pcrDelta += (0x200000000 * 300) / 27;
- return pcrDelta;
- };
- this.pcr2time = (pcr) => {
- if (this.pcr === -1) {
- this.pcr = pcr;
- this.last = internals.utime();
- }
- let pcrDelta = this.pcrDelta(pcr, this.pcr);
- let ret = this.last + pcrDelta;
- if (pcrDelta > 3600E6) {
- // update pcr reference every hour to handle wrap-around
- this.pcr = pcr;
- this.last = ret;
- }
- return ret;
- };
- this.outputTime = (newPCR) => {
- // when this is called normally, now ~= this.pcrtime
- if (newPCR === -1) return undefined;
- let pcrtime = this.pcr2time(newPCR);
- if (this.pcrTime === -1) {
- this.pcrTime = pcrtime;
- return undefined;
- }
- let delta = pcrtime - this.pcrTime;
- this.pcrTime = pcrtime;
- return { time:pcrtime, delta: delta };
- };
- Transform.call(this, {highWaterMark:this.packetSize});
- }
- util.inherits(TsSmooth, Transform);
- TsSmooth.prototype.reset = function(currentPCR) {
- this.pcr = -1;
- if (typeof currentPCR !== 'undefined')
- this.pcrTime = this.pcr2time(currentPCR);
- };
- // smoothly outputs given buffer before endTime
- function outputBefore(stream, buffer, endTime, packetSize, cb) {
- let index = 0;
- const outputPacket = () => {
- let now = internals.utime();
- let packetTime = (endTime - now) * (packetSize / (buffer.length - index));
- stream.push(buffer.slice(index, Math.min(buffer.length, index + packetSize)));
- index += packetSize;
- let done = (index < buffer.length) ? outputPacket : cb;
- let delay = Math.round(Math.min(Math.max((0.8 * packetTime / 1000) - 1, 1), 50));
- if (delay === 1)
- process.nextTick(done);
- else
- setTimeout(done, delay);
- }
- outputPacket();
- }
- TsSmooth.prototype._transform = function(chunk, encoding, cb) {
- let index = Math.floor(this.buffer.length / 188) * 188;
- this.buffer = Buffer.concat([this.buffer, chunk]);
- let buf = this.buffer;
- let end = buf.length - 188;
- let startIndex = 0;
- const processNext = () => {
- while (index < end) {
- // check sync
- if (buf.readUInt8(index + 188, true) !== 0x47) {
- // find next potential sync point
- debug('ts sync lost');
- let sync = index + 1;
- for (; sync < end; sync++) {
- if (buf.readUInt8(sync, true) === 0x47)
- break;
- }
- // remove bad data
- debug('slice', sync, end);
- buf = Buffer.concat([buf.slice(0, index), buf.slice(sync)]);
- end -= sync - index;
- continue;
- }
- let pcr = internals.parsePCR(buf, index);
- let out = this.outputTime(pcr);
- if (out !== undefined && index !== startIndex) {
- if (out.delta > 100E3 || out.delta < 0)
- this.emit('warning', new Error('PCR_error: ' + (out.delta / 1E6).toFixed(2) + 's missing'));
- let now = internals.utime();
- let error = (out.time - now) - out.delta;
- let waittime = (error > this.errorLimit) ? (error / 1000 - 5) : 0;
- if (error < -2 * 1E6 || error > 300 * 1E6) {
- // negative == buffer too late
- // positive == buffer too early
- this.emit('warning', new RateError('PCR sync offset ' + (error / 1E6).toFixed(2) + 's error'));
- this.reset(pcr);
- waittime = 0;
- } else if (error < -this.errorLimit) {
- // ignore the data since it is too late
- return setImmediate(processNext);
- }
- let slice = buf.slice(startIndex, index);
- startIndex = index;
- /* eslint-disable no-loop-func */
- return internals.wait(waittime, () => {
- return outputBefore(this, slice, out.time, this.packetSize, processNext);
- });
- /* eslint-enable */
- }
- index += 188;
- }
- if (startIndex !== 0) this.buffer = buf.slice(startIndex);
- cb();
- }
- processNext();
- };
- TsSmooth.prototype._flush = function(cb) {
- if (this.buffer.length) this.push(this.buffer); // TODO: use outputBefore() based on current stream speed?
- cb();
- };
- const tssmooth = module.exports = function tssmooth(options) {
- return new TsSmooth(options);
- };
- tssmooth.TsSmooth = TsSmooth;
|