Gil Pedersen 9 anni fa
parent
commit
47c6b0763d
4 ha cambiato i file con 273 aggiunte e 206 eliminazioni
  1. 58 45
      lib/hls-reader.js
  2. 108 81
      lib/recorder.js
  3. 28 18
      lib/segment-decrypt.js
  4. 79 62
      lib/tssmooth.js

+ 58 - 45
lib/hls-reader.js

@@ -1,35 +1,37 @@
-"use strict";
+'use strict';
 
-var Util = require('util');
+const Util = require('util');
+const Readable = require('readable-stream/readable');
+const Passthrough = require('readable-stream/passthrough');
 
-var StreamEach = require('stream-each'),
-    oncemore = require('oncemore');
+const StreamEach = require('stream-each');
+const Oncemore = require('oncemore');
 
-var Readable = require('readable-stream/readable'),
-    Passthrough = require('readable-stream/passthrough');
+const tssmooth = require('./tssmooth');
+const SegmentDecrypt = require('./segment-decrypt');
 
-var tssmooth = require('./tssmooth');
-var SegmentDecrypt = require('./segment-decrypt');
 
-
-var internals = {
+const internals = {
   NOOP: function(){},
 };
 
 
 // 'pipe' stream to a Readable
-function pump(src, dst, done) {
-  src.on('data', function(chunk) {
+internals.pump = function(src, dst, done) {
+
+  src.on('data', (chunk) => {
+
     if (!dst.push(chunk)) {
       src.pause();
     }
   });
-  oncemore(src).once('end', 'error', function(err) {
+  Oncemore(src).once('end', 'error', (err) => {
+
     // TODO: flush source buffer on error?
     dst._read = internals.NOOP;
     done(err);
   });
-  dst._read = function() {
+  dst._read = () => {
     src.resume();
   };
 }
@@ -37,15 +39,15 @@ function pump(src, dst, done) {
 // TODO: use pipe as interface to segment-reader?
 
 function HlsReader(segmentReader, options) {
-  if (!(this instanceof HlsReader))
+
+  if (!(this instanceof HlsReader)) {
     return new HlsReader(segmentReader, options);
+  }
 
   options = options || {};
 
   Readable.call(this, { lowWaterMark: options.lowWaterMark, highWaterMark: options.highWaterMark });
 
-  var self = this;
-
   this.reader = segmentReader;
 
   this.sync = !!options.sync; // output in real-time
@@ -62,49 +64,55 @@ function HlsReader(segmentReader, options) {
   this.isHooked = false;
   this.buffer = new Passthrough({ highWaterMark: this.bufferSize });
 
-  StreamEach(this.reader, function (segmentInfo, done) {
-    self.isReading = true;
+  StreamEach(this.reader, (segmentInfo, done) => {
+
+    this.isReading = true;
+
+    return this.decrypt(segmentInfo.stream, segmentInfo.details.keys, (err, stream) => {
 
-    return self.decrypt(segmentInfo.stream, segmentInfo.details.keys, function (err, stream) {
       if (err) {
         console.error('decrypt failed', err.stack);
         stream = segmentInfo.stream;
       }
 
-      self.emit('segment', segmentInfo);
+      this.emit('segment', segmentInfo);
 
-      stream = oncemore(stream);
+      stream = Oncemore(stream);
 
-      if (!self.isHooked) {
+      if (!this.isHooked) {
         // pull data and detect if we need to hook before end
-        var buffered = 0;
-        stream.on('data', function(chunk) {
+        let buffered = 0;
+        stream.on('data', (chunk) => {
+
           buffered += chunk.length;
-          if (!self.isHooked && buffered >= self.bufferSize)
-            self.hook();
+          if (!this.isHooked && buffered >= this.bufferSize)
+            this.hook();
         });
       }
 
-      stream.pipe(self.buffer, { end: false });
-      stream.once('end', 'error', function(err) {
-        self.isReading = false;
+      stream.pipe(this.buffer, { end: false });
+      stream.once('end', 'error', (err) => {
+
+        this.isReading = false;
         if (err) {
           console.error('stream error', err.stack || err);
         }
-        self.hook();
+        this.hook();
         done();
       });
     });
-  }, function (err) {
+  }, (err) => {
+
     if (err) throw err;
 
-    self.buffer.end();
+    this.buffer.end();
   });
 
   // start output if needed
   if (!this.sync) {
-    process.nextTick(function() {
-      self.hook();
+    process.nextTick(() => {
+
+      this.hook();
     });
   }
 }
@@ -118,39 +126,44 @@ HlsReader.prototype.destroy = function () {
 
 // the hook is used to prebuffer
 HlsReader.prototype.hook = function hook() {
-  var self = this;
+
   if (this.isHooked) return;
 
-  self.isHooked = true;
+  this.isHooked = true;
 
-  var s = this.buffer;
+  let s = this.buffer;
   if (this.sync) {
-    var smooth = tssmooth();
-    smooth.on('unpipe', function() {
+    let smooth = tssmooth();
+    smooth.on('unpipe', () => {
+
       this.unpipe();
     });
-    smooth.on('warning', function(err) {
+    smooth.on('warning', (err) => {
+
       console.error('smoothing error', err);
     });
     s = s.pipe(smooth);
   }
 
-  pump(s, this, function(err) {
+  internals.pump(s, this, (err) => {
+
     if (err) {
-      return self.emit('error', err);
+      return this.emit('error', err);
     }
-    self.push(null);
+    this.push(null);
   });
 
   this.emit('ready');
 };
 
 HlsReader.prototype.decrypt = function (stream, keyAttrs, next) {
+
   return SegmentDecrypt.decrypt(stream, keyAttrs, { base: this.reader.baseUrl, key: this.key, cookie: this.cookie }, next);
 };
 
 
-var hlsreader = module.exports = function hlsreader(segmentReader, options) {
+const hlsreader = module.exports = function hlsreader(segmentReader, options) {
+
   return new HlsReader(segmentReader, options);
 };
 

+ 108 - 81
lib/recorder.js

@@ -1,28 +1,27 @@
-/*jslint node: true */
+'use strict';
 
-"use strict";
+const Fs = require('fs');
+const Path = require('path');
+const Url = require('url');
 
-var fs = require('fs'),
-    path = require('path'),
-    url = require('url'),
-    util = require('util');
+const Mime = require('mime-types');
+const StreamEach = require('stream-each');
+const Oncemore = require('oncemore');
+const M3U8Parse = require('m3u8parse');
+const Mkdirp = require('mkdirp');
+const writeFileAtomic = require('write-file-atomic');
+const debug = require('debug')('hls:recorder');
 
-var mime = require('mime-types'),
-    StreamEach = require('stream-each'),
-    oncemore = require('oncemore'),
-    M3U8Parse = require('m3u8parse'),
-    mkdirp = require('mkdirp'),
-    writeFileAtomic = require('write-file-atomic'),
-    debug = require('debug')('hls:recorder');
+const SegmentDecrypt = require('./segment-decrypt');
 
-var SegmentDecrypt = require('./segment-decrypt');
 
 // add custom extensions
-mime.extensions['audio/aac'] = ['aac'];
-mime.extensions['audio/ac3'] = ['ac3'];
+Mime.extensions['audio/aac'] = ['aac'];
+Mime.extensions['audio/ac3'] = ['ac3'];
 
 
 function HlsStreamRecorder(reader, dst, options) {
+
   options = options || {};
 
   this.reader = reader;
@@ -41,9 +40,11 @@ function HlsStreamRecorder(reader, dst, options) {
 }
 
 HlsStreamRecorder.prototype.start = function() {
+
   // TODO: make async?
-  if (!fs.existsSync(this.dst))
-    mkdirp.sync(this.dst);
+  if (!Fs.existsSync(this.dst)) {
+    Mkdirp.sync(this.dst);
+  }
 
   StreamEach(this.reader, this.process.bind(this));
 
@@ -52,9 +53,10 @@ HlsStreamRecorder.prototype.start = function() {
 };
 
 HlsStreamRecorder.prototype.updateIndex = function(update) {
-  var self = this;
 
-  if (!update) return;
+  if (!update) {
+    return;
+  }
 
   if (!this.index) {
     this.index = new M3U8Parse.M3U8Playlist(update);
@@ -70,18 +72,20 @@ HlsStreamRecorder.prototype.updateIndex = function(update) {
       this.index.ended = false;
       this.index.discontinuity_sequence = 0; // not allowed in event playlists
       if (!isNaN(this.startOffset)) {
-        var offset = this.startOffset;
+        let offset = this.startOffset;
         if (!update.ended) {
           if (offset < 0) offset = Math.min(offset, -3 * this.index.target_duration);
         }
         this.index.start.decimalInteger('time-offset', offset);
       }
-    } else {
+    }
+    else {
       debug('variants', this.index.variants);
       if (this.subreader) {
         // remove backup sources
         let used = {};
         this.index.variants = this.index.variants.filter((variant) => {
+
           let bw = parseInt(variant.info.bandwidth, 10);
           let res = !(bw in used);
           used[bw] = true;
@@ -89,15 +93,16 @@ HlsStreamRecorder.prototype.updateIndex = function(update) {
         });
 
         this.index.variants.forEach((variant, index) => {
-          var variantUrl = url.resolve(self.reader.baseUrl, variant.uri);
+
+          let variantUrl = Url.resolve(this.reader.baseUrl, variant.uri);
           debug('url', variantUrl);
 
           // check for duplicate source urls
-          var rec = this.recorderForUrl(variantUrl);
+          let rec = this.recorderForUrl(variantUrl);
           if (!rec || !rec.localUrl) {
-            var dir = self.variantName(variant.info, index);
-            rec = new HlsStreamRecorder(self.subreader(variantUrl), path.join(self.dst, dir), { startOffset: self.startOffset, collect: self.collect, decrypt: this.decrypt });
-            rec.localUrl = url.format({pathname: path.join(dir, 'index.m3u8')});
+            let dir = this.variantName(variant.info, index);
+            rec = new HlsStreamRecorder(this.subreader(variantUrl), Path.join(this.dst, dir), { startOffset: this.startOffset, collect: this.collect, decrypt: this.decrypt });
+            rec.localUrl = Url.format({pathname: Path.join(dir, 'index.m3u8')});
             rec.remoteUrl = variantUrl;
 
             this.recorders.push(rec);
@@ -106,21 +111,22 @@ HlsStreamRecorder.prototype.updateIndex = function(update) {
           variant.uri = rec.localUrl;
         });
 
-        var allGroups = [];
-        for (var group in this.index.groups)
-          [].push.apply(allGroups, this.index.groups[group]);
+        let allGroups = [];
+        for (let group in this.index.groups)
+          Array.prototype.push.apply(allGroups, this.index.groups[group]);
+
+        allGroups.forEach((groupItem, index) => {
 
-        allGroups.forEach(function(groupItem, index) {
-          var srcUri = groupItem.quotedString('uri');
+          let srcUri = groupItem.quotedString('uri');
           if (srcUri) {
-            var itemUrl = url.resolve(self.reader.baseUrl, srcUri);
+            let itemUrl = Url.resolve(this.reader.baseUrl, srcUri);
             debug('url', itemUrl);
 
-            var rec = this.recorderForUrl(itemUrl);
+            let rec = this.recorderForUrl(itemUrl);
             if (!rec || !rec.localUrl) {
-              var dir = self.groupSrcName(groupItem, index);
-              rec = new HlsStreamRecorder(self.subreader(itemUrl), path.join(self.dst, dir), { startOffset: self.startOffset, collect: self.collect, decrypt: this.decrypt });
-              rec.localUrl = url.format({pathname: path.join(dir, 'index.m3u8')});
+              let dir = this.groupSrcName(groupItem, index);
+              rec = new HlsStreamRecorder(this.subreader(itemUrl), Path.join(this.dst, dir), { startOffset: this.startOffset, collect: this.collect, decrypt: this.decrypt });
+              rec.localUrl = Url.format({pathname: Path.join(dir, 'index.m3u8')});
               rec.remoteUrl = itemUrl;
 
               this.recorders.push(rec);
@@ -128,15 +134,16 @@ HlsStreamRecorder.prototype.updateIndex = function(update) {
 
             groupItem.quotedString('uri', rec.localUrl);
           }
-        }, this);
+        });
 
         // start all recordings
-        this.recorders.forEach(function(recording) {
+        this.recorders.forEach((recording) => {
           recording.start();
         });
 
         this.index.iframes = [];
-      } else {
+      }
+      else {
         this.index.variants = [];
         this.index.groups = {};
         this.index.iframes = [];
@@ -144,9 +151,11 @@ HlsStreamRecorder.prototype.updateIndex = function(update) {
     }
 
     // hook end listener
-    this.reader.on('end', function() {
-      self.index.ended = true;
-      self.flushIndex(function(/*err*/) {
+    this.reader.on('end', () => {
+
+      this.index.ended = true;
+      this.flushIndex((/*err*/) => {
+
         debug('done');
       });
     });
@@ -157,119 +166,137 @@ HlsStreamRecorder.prototype.updateIndex = function(update) {
   }
 
   // validate update
-  if (this.index.target_duration > update.target_duration)
+  if (this.index.target_duration > update.target_duration) {
     throw new Error('Invalid index');
+  }
 };
 
 HlsStreamRecorder.prototype.process = function(segmentInfo, done) {
-  var self = this;
 
-  var segment = new M3U8Parse.M3U8Segment(segmentInfo.details, true);
-  var meta = segmentInfo.file;
+  let segment = new M3U8Parse.M3U8Segment(segmentInfo.details, true);
+  let meta = segmentInfo.file;
 
   // mark discontinuities
   if (this.nextSegmentSeq !== -1 &&
-      this.nextSegmentSeq !== segmentInfo.seq)
+      this.nextSegmentSeq !== segmentInfo.seq) {
     segment.discontinuity = true;
+  }
   this.nextSegmentSeq = segmentInfo.seq + 1;
 
   // create our own uri
-  segment.uri = util.format('%s.%s', this.segmentName(this.seq), mime.extension(meta.mime));
+  segment.uri = `${this.segmentName(this.seq)}.${Mime.extension(meta.mime)}`;
 
   // handle byterange
-  var first = self.index.segments.length === 0;
-  var newFile = first || self.index.segments[self.index.segments.length - 1].uri !== segment.uri;
+  let first = this.index.segments.length === 0;
+  let newFile = first || this.index.segments[this.index.segments.length - 1].uri !== segment.uri;
   if (this.collect) {
     segment.byterange = {
       length: 0,
       offset: newFile ? 0 : null
     }
-  } else {
+  }
+  else {
     delete segment.byterange;
   }
 
   // save the stream segment
-  SegmentDecrypt.decrypt(segmentInfo.stream, segmentInfo.details.keys, this.decrypt, function (err, stream, decrypted) {
+  SegmentDecrypt.decrypt(segmentInfo.stream, segmentInfo.details.keys, this.decrypt, (err, stream, decrypted) => {
+ 
     if (err) {
       console.error('decrypt failed', err.stack);
       stream = segmentInfo.stream;
-    } else if (decrypted) {
+    }
+    else if (decrypted) {
       segment.keys = null;
     }
 
-    stream = oncemore(stream);
-    stream.pipe(fs.createWriteStream(path.join(self.dst, segment.uri), { flags: newFile ? 'w' : 'a' }));
+    stream = Oncemore(stream);
+    stream.pipe(Fs.createWriteStream(Path.join(this.dst, segment.uri), { flags: newFile ? 'w' : 'a' }));
+
+    let bytesWritten = 0;
+    if (this.collect) {
+      stream.on('data', (chunk) => {
 
-    var bytesWritten = 0;
-    if (self.collect) {
-      stream.on('data', function(chunk) {
         bytesWritten += chunk.length;
       });
     }
 
-    stream.once('end', 'error', function(err) {
+    stream.once('end', 'error', (err) => {
+
       // only to report errors
       if (err) debug('stream error', err.stack || err);
 
-      if (segment.byterange)
+      if (segment.byterange) {
         segment.byterange.length = bytesWritten;
+      }
 
       // update index
-      self.index.segments.push(segment);
-      self.flushIndex(done);
+      this.index.segments.push(segment);
+      this.flushIndex(done);
     });
 
-    self.seq++;
+    this.seq++;
   });
 };
 
 HlsStreamRecorder.prototype.variantName = function(info, index) {
-  return util.format('v%d', index);
+
+  return `v${index}`;
 };
 
 HlsStreamRecorder.prototype.groupSrcName = function(info, index) {
-  var lang = (info.quotedString('language') || '').replace(/\W/g, '').toLowerCase();
-  var id = (info.quotedString('group-id') || 'unk').replace(/\W/g, '').toLowerCase();
-  return util.format('grp/%s/%s%d', id, lang ? lang + '-' : '', index);
+
+  let lang = (info.quotedString('language') || '').replace(/\W/g, '').toLowerCase();
+  let id = (info.quotedString('group-id') || 'unk').replace(/\W/g, '').toLowerCase();
+  return `grp/${id}/${lang ? lang + '-' : ''}${index}`;
 };
 
 HlsStreamRecorder.prototype.segmentName = function(seqNo) {
-  function name(n) {
-    var next = ~~(n / 26);
-    var chr = String.fromCharCode(97 + n % 26); // 'a' + n
+
+  const name = (n) => {
+
+    let next = ~~(n / 26);
+    let chr = String.fromCharCode(97 + n % 26); // 'a' + n
     if (next) return name(next - 1) + chr;
     return chr;
-  }
+  };
+
   return this.collect ? 'stream' : name(seqNo);
 };
 
 HlsStreamRecorder.prototype.flushIndex = function(cb) {
-  var appendString, indexString = this.index.toString().trim();
+
+  let appendString, indexString = this.index.toString().trim();
   if (this.lastIndexString && indexString.lastIndexOf(this.lastIndexString, 0) === 0) {
-    var lastLength = this.lastIndexString.length;
+    let lastLength = this.lastIndexString.length;
     appendString = indexString.substr(lastLength);
   }
   this.lastIndexString = indexString;
 
   if (appendString) {
-    fs.appendFile(path.join(this.dst, 'index.m3u8'), appendString, cb);
-  } else {
-    writeFileAtomic(path.join(this.dst, 'index.m3u8'), indexString, cb);
+    Fs.appendFile(Path.join(this.dst, 'index.m3u8'), appendString, cb);
+  }
+  else {
+    writeFileAtomic(Path.join(this.dst, 'index.m3u8'), indexString, cb);
   }
 };
 
 HlsStreamRecorder.prototype.recorderForUrl = function(remoteUrl) {
-  var idx, len = this.recorders.length;
+
+  let idx, len = this.recorders.length;
   for (idx = 0; idx < len; idx++) {
-    var rec = this.recorders[idx];
-    if (rec.remoteUrl === remoteUrl)
+    let rec = this.recorders[idx];
+    if (rec.remoteUrl === remoteUrl) {
       return rec;
+    }
   }
+
   return null;
 };
 
 
-var hlsrecorder = module.exports = function hlsrecorder(reader, dst, options) {
+const hlsrecorder = module.exports = function hlsrecorder(reader, dst, options) {
+
   return new HlsStreamRecorder(reader, dst, options);
 };
 

+ 28 - 18
lib/segment-decrypt.js

@@ -1,13 +1,13 @@
-"use strict";
+'use strict';
 
-var Url = require('url');
-var Crypto = require('crypto');
+const Url = require('url');
+const Crypto = require('crypto');
 
-var Oncemore = require('oncemore');
-var UriStream = require('uristream');
+const Oncemore = require('oncemore');
+const UriStream = require('uristream');
 
 
-var internals = {
+const internals = {
   allowedProtocols: ['http', 'https', 'data'],
   fetchTimeout: 10 * 1000,
 
@@ -16,6 +16,7 @@ var internals = {
 
 
 internals.KeyFetcher = function (uri, cookie) {
+
   this.uri = uri;
   this.cookie = cookie;
   this.key = null;
@@ -24,37 +25,41 @@ internals.KeyFetcher = function (uri, cookie) {
 };
 
 internals.KeyFetcher.prototype.fetch = function (next) {
-  var key = new Buffer(0);
-  var headers = {};
+
+  let key = new Buffer(0);
+  let headers = {};
   if (this.cookie) {
     headers.Cookie = this.cookie;
   }
 
   Oncemore(UriStream(this.uri, { headers: headers, whitelist: internals.allowedProtocols, timeout: internals.fetchTimeout }))
-    .on('data', function(chunk) {
+
+    .on('data', (chunk) => {
+
       key = Buffer.concat([key, chunk]);
     })
-    .once('error', 'end', function(err) {
+    .once('error', 'end', (err) => {
+
       return next(err, key);
     });
 };
 
 internals.KeyFetcher.prototype.get = function (next) {
-  var self = this;
 
   if (this.key && this.key.length) {
     return next(null, this.key);
   }
 
-  var complete = function (err, key) {
+  const complete = (err, key) => {
+
     if (!err) {
-      self.key = key;
+      this.key = key;
     }
 
-    var gets = self._gets;
-    self._gets = [];
+    let gets = this._gets;
+    this._gets = [];
 
-    for (var idx = 0; idx < gets.length; idx++) {
+    for (let idx = 0; idx < gets.length; idx++) {
       process.nextTick(gets[idx], err, key);
     }
   };
@@ -68,10 +73,11 @@ internals.KeyFetcher.prototype.get = function (next) {
 
 
 internals.fetchKey = function (keyUri, options, next) {
+
   if (options.key) return next(null, options.key);
 
-  var uri = Url.resolve(options.base, keyUri);
-  var fetcher = internals.keyCache[uri];
+  let uri = Url.resolve(options.base, keyUri);
+  let fetcher = internals.keyCache[uri];
   if (!fetcher) {
     fetcher = internals.keyCache[uri] = new internals.KeyFetcher(uri, options.cookie);
   }
@@ -80,6 +86,7 @@ internals.fetchKey = function (keyUri, options, next) {
 
 
 internals.getIdentityKey = function (keyAttrs) {
+
   for (let idx = 0; idx < keyAttrs.length; idx++) {
     let key = keyAttrs[idx];
     let keyformat = key.quotedString('keyformat');
@@ -97,6 +104,7 @@ internals.getIdentityKey = function (keyAttrs) {
 
 
 exports.decrypt = function (stream, keyAttrs, options, next) {
+
   if (!keyAttrs || !options) {
     return next(null, stream, false);
   }
@@ -113,6 +121,7 @@ exports.decrypt = function (stream, keyAttrs, options, next) {
   }
 
   return internals.fetchKey(key.uri, options, (err, keyData) => {
+
     if (err) {
       return next(new Error('key fetch failed: ' + (err.stack || err)));
     }
@@ -126,6 +135,7 @@ exports.decrypt = function (stream, keyAttrs, options, next) {
 
     // forward stream errors
     stream.on('error', (err) => {
+
       decrypt.emit('error', err);
     });
 

+ 79 - 62
lib/tssmooth.js

@@ -1,14 +1,16 @@
-"use strict";
+'use strict';
 
-var util = require('util'),
-    assert = require('assert'),
-    debug = require('debug')('hls:tssmooth');
+const util = require('util');
+const debug = require('debug')('hls:tssmooth');
 
-var Transform = require('readable-stream/transform');
+const Transform = require('readable-stream/transform');
+
+const internals = {};
 
 // In Transport Streams the intended rate is determined by the values of the PCR fields and the number of Transport Stream bytes between them. (ISO-13818-1 D.0.9)
 
 function RateError(msg) {
+
   Error.call(this);
 
   this.message = msg;
@@ -16,34 +18,37 @@ function RateError(msg) {
 util.inherits(RateError, Error);
 RateError.prototype.name = 'Rate Error';
 
-function parsePCR(buffer, index, pcr_pid) {
-  var head = buffer.readUInt32BE(index, true);
-  var pid = (head >> 8) & 0x1fff;
+internals.parsePCR = function(buffer, index, pcr_pid) {
+
+  let head = buffer.readUInt32BE(index, true);
+  let pid = (head >> 8) & 0x1fff;
   if (((head >> 5) & 1) !== 1) return -1;
   if (pcr_pid && pcr_pid != pid) return -1;
 
-  var s = buffer.readUInt8(index + 4, true);
+  let s = buffer.readUInt8(index + 4, true);
   if (s < 7) return -1;
 
-  var f = buffer.readUInt8(index + 5, true);
+  let f = buffer.readUInt8(index + 5, true);
   if (((f >> 4) & 1) !== 1) return -1;
 
-  var base = buffer.readUInt32BE(index + 6, true) * 2;
-  var ext = buffer.readUInt32BE(index + 10, true);
+  let base = buffer.readUInt32BE(index + 6, true) * 2;
+  let ext = buffer.readUInt32BE(index + 10, true);
 
   base += (ext >> 31);
   ext = ext & 0x1ff;
 
   return base / 0.09 + ext / 27; // return usecs
-}
+};
+
+internals.utime = function() {
 
-function utime() {
-  var t = process.hrtime(); // based on CLOCK_MONOTONIC, and thus accommodates local drift (but apparently not suspend)
+  let t = process.hrtime(); // based on CLOCK_MONOTONIC, and thus accommodates local drift (but apparently not suspend)
 //  console.error(t);
   return t[0] * 1E6 + t[1] / 1E3;
-}
+};
+
+internals.wait = function(waitMs, fn) {
 
-function wait(waitMs, fn) {
   if (waitMs > 0)
     setTimeout(fn, Math.round(waitMs));
   else
@@ -51,7 +56,7 @@ function wait(waitMs, fn) {
 }
 
 function TsSmooth(options) {
-  var self = this;
+
   options = options || {};
 
   this.packetSize = options.packetSize || 7 * 188; // size of output packets
@@ -65,40 +70,44 @@ function TsSmooth(options) {
   this.bitrate = 10E06;
   this.pcrTime = -1;
 
-  this.pcrDelta = function(pcr, lastPcr) {
-    var pcrDelta = pcr - lastPcr;
+  this.pcrDelta = (pcr, lastPcr) => {
+
+    let pcrDelta = pcr - lastPcr;
     if (pcrDelta < 0) pcrDelta += (0x200000000 * 300) / 27;
     return pcrDelta;
   };
 
-  this.pcr2time = function(pcr) {
-    if (self.pcr === -1) {
-      self.pcr = pcr;
-      self.last = utime();
+  this.pcr2time = (pcr) => {
+
+    if (this.pcr === -1) {
+      this.pcr = pcr;
+      this.last = internals.utime();
     }
 
-    var pcrDelta = self.pcrDelta(pcr, self.pcr);
-    var ret = self.last + pcrDelta;
+    let pcrDelta = this.pcrDelta(pcr, this.pcr);
+    let ret = this.last + pcrDelta;
     if (pcrDelta > 3600E6) {
       // update pcr reference every hour to handle wrap-around
-      self.pcr = pcr;
-      self.last = ret;
+      this.pcr = pcr;
+      this.last = ret;
     }
+
     return ret;
   };
 
-  this.outputTime = function(newPCR) {
-    // when this is called normally, now ~= self.pcrtime
+  this.outputTime = (newPCR) => {
+
+    // when this is called normally, now ~= this.pcrtime
     if (newPCR === -1) return undefined;
 
-    var pcrtime = self.pcr2time(newPCR);
-    if (self.pcrTime === -1) {
-      self.pcrTime = pcrtime;
+    let pcrtime = this.pcr2time(newPCR);
+    if (this.pcrTime === -1) {
+      this.pcrTime = pcrtime;
       return undefined;
     }
 
-    var delta = pcrtime - self.pcrTime;
-    self.pcrTime = pcrtime;
+    let delta = pcrtime - this.pcrTime;
+    this.pcrTime = pcrtime;
 
     return { time:pcrtime, delta: delta };
   };
@@ -108,6 +117,7 @@ function TsSmooth(options) {
 util.inherits(TsSmooth, Transform);
 
 TsSmooth.prototype.reset = function(currentPCR) {
+
   this.pcr = -1;
   if (typeof currentPCR !== 'undefined')
     this.pcrTime = this.pcr2time(currentPCR);
@@ -115,42 +125,46 @@ TsSmooth.prototype.reset = function(currentPCR) {
 
 // smoothly outputs given buffer before endTime
 function outputBefore(stream, buffer, endTime, packetSize, cb) {
-  var index = 0;
 
-  function outputPacket() {
-    var now = utime();
-    var packetTime = (endTime - now) * (packetSize / (buffer.length - index));
+  let index = 0;
+
+  const outputPacket = () => {
+
+    let now = internals.utime();
+    let packetTime = (endTime - now) * (packetSize / (buffer.length - index));
 
     stream.push(buffer.slice(index, Math.min(buffer.length, index + packetSize)));
     index += packetSize;
 
-    var done = (index < buffer.length) ? outputPacket : cb;
-    var delay = Math.round(Math.min(Math.max((0.8 * packetTime / 1000) - 1, 1), 50));
+    let done = (index < buffer.length) ? outputPacket : cb;
+    let delay = Math.round(Math.min(Math.max((0.8 * packetTime / 1000) - 1, 1), 50));
     if (delay === 1)
       process.nextTick(done);
     else
       setTimeout(done, delay);
   }
+
   outputPacket();
 }
 
 TsSmooth.prototype._transform = function(chunk, encoding, cb) {
-  var self = this;
 
-  var index = Math.floor(this.buffer.length / 188) * 188;
+  let index = Math.floor(this.buffer.length / 188) * 188;
   this.buffer = Buffer.concat([this.buffer, chunk]);
 
-  var buf = self.buffer;
-  var end = buf.length - 188;
+  let buf = this.buffer;
+  let end = buf.length - 188;
+
+  let startIndex = 0;
+
+  const processNext = () => {
 
-  var startIndex = 0;
-  function processNext() {
     while (index < end) {
       // check sync
       if (buf.readUInt8(index + 188, true) !== 0x47) {
         // find next potential sync point
         debug('ts sync lost');
-        var sync = index + 1;
+        let sync = index + 1;
         for (; sync < end; sync++) {
           if (buf.readUInt8(sync, true) === 0x47)
             break;
@@ -162,39 +176,40 @@ TsSmooth.prototype._transform = function(chunk, encoding, cb) {
         continue;
       }
 
-      var pcr = parsePCR(buf, index);
-      var out = self.outputTime(pcr);
+      let pcr = internals.parsePCR(buf, index);
+      let out = this.outputTime(pcr);
       if (out !== undefined && index !== startIndex) {
         if (out.delta > 100E3 || out.delta < 0)
-          self.emit('warning', new Error('PCR_error: ' + (out.delta / 1E6).toFixed(2) + 's missing'));
+          this.emit('warning', new Error('PCR_error: ' + (out.delta / 1E6).toFixed(2) + 's missing'));
 
-        var now = utime();
-        var error = (out.time - now) - out.delta;
-        var waittime = (error > self.errorLimit) ? (error / 1000 - 5) : 0;
+        let now = internals.utime();
+        let error = (out.time - now) - out.delta;
+        let waittime = (error > this.errorLimit) ? (error / 1000 - 5) : 0;
 
         if (error < -2 * 1E6 || error > 300 * 1E6) {
           // negative == buffer too late
           // positive == buffer too early
-          self.emit('warning', new RateError('PCR sync offset ' + (error / 1E6).toFixed(2) + 's error'));
-          self.reset(pcr);
+          this.emit('warning', new RateError('PCR sync offset ' + (error / 1E6).toFixed(2) + 's error'));
+          this.reset(pcr);
           waittime = 0;
-        } else if (error < -self.errorLimit) {
+        } else if (error < -this.errorLimit) {
           // ignore the data since it is too late
           return setImmediate(processNext);
         }
 
-        var slice = buf.slice(startIndex, index);
+        let slice = buf.slice(startIndex, index);
         startIndex = index;
         /* eslint-disable no-loop-func */
-        return wait(waittime, function output() {
-          return outputBefore(self, slice, out.time, self.packetSize, processNext);
+        return internals.wait(waittime, () => {
+
+          return outputBefore(this, slice, out.time, this.packetSize, processNext);
         });
         /* eslint-enable */
       }
       index += 188;
     }
 
-    if (startIndex !== 0) self.buffer = buf.slice(startIndex);
+    if (startIndex !== 0) this.buffer = buf.slice(startIndex);
     cb();
   }
 
@@ -202,12 +217,14 @@ TsSmooth.prototype._transform = function(chunk, encoding, cb) {
 };
 
 TsSmooth.prototype._flush = function(cb) {
+
   if (this.buffer.length) this.push(this.buffer); // TODO: use outputBefore() based on current stream speed?
   cb();
 };
 
 
-var tssmooth = module.exports = function tssmooth(options) {
+const tssmooth = module.exports = function tssmooth(options) {
+
   return new TsSmooth(options);
 };