Browse Source

Update for node@8

Gil Pedersen 8 years ago
parent
commit
ceb20d3384
5 changed files with 220 additions and 162 deletions
  1. 66 24
      bin/hlsdump
  2. 31 17
      lib/hls-reader.js
  3. 77 61
      lib/recorder.js
  4. 44 58
      lib/segment-decrypt.js
  5. 2 2
      package.json

+ 66 - 24
bin/hlsdump

@@ -36,8 +36,9 @@ hlsdump.version('0.0.0')
 var fs = require('fs'),
     http = require('http');
 
-var oncemore = require('oncemore'),
-    HlsSegmentReader = require('hls-segment-reader'),
+const Bounce = require('bounce');
+var HlsSegmentReader = require('hls-segment-reader'),
+    Pati = require('pati'),
     UdpBlast = require('udp-blast');
 var HlsReader = require('../lib/hls-reader');
 
@@ -49,8 +50,9 @@ if (!src) {
 
 if (hlsdump.bufferSize) hlsdump.sync = true;
 
-var segmentReader = new HlsSegmentReader(src, { withData: true, highWaterMark: (hlsdump.concurrent || 1) - 1, fullStream: hlsdump.fullStream });
-var r = new HlsReader(segmentReader, hlsdump);
+const segmentReader = new HlsSegmentReader(src, { withData: true, highWaterMark: (hlsdump.concurrent || 1) - 1, fullStream: hlsdump.fullStream });
+const reader = new HlsReader(segmentReader, hlsdump);
+const r = new Pati.EventDispatcher(reader);
 
 segmentReader.once('index', function() {
   // wait until first index is returned before attaching error listener.
@@ -62,28 +64,31 @@ segmentReader.once('index', function() {
 
 if (hlsdump.udp) {
   var dst = (hlsdump.udp === true) ? null : hlsdump.udp;
-  r.pipe(new UdpBlast(dst, { packetSize: 7 * 188 }));
+  reader.pipe(new UdpBlast(dst, { packetSize: 7 * 188 }));
 }
 
 if (hlsdump.output) {
   if (hlsdump.output === '-')
-    r.pipe(process.stdout);
+    reader.pipe(process.stdout);
   else
-    r.pipe(fs.createWriteStream(hlsdump.output));
+    reader.pipe(fs.createWriteStream(hlsdump.output));
 }
 
 var startTime = process.hrtime();
-r.on('ready', function() {
-  var delay = process.hrtime(startTime);
+r.on('ready', () => {
+
+  const delay = process.hrtime(startTime);
   console.error('"ready" after delay of ' + (delay[0] * 1e3 + delay[1] / 1e6).toFixed(2) + 'ms');
 });
 
-r.on('end', function() {
-  console.error('stream complete');
-})
+r.on('end', () => {
+
+  r.end();
+});
+
+let totalDuration = 0;
+r.on('segment', (segmentInfo) => {
 
-var totalDuration = 0;
-r.on('segment', function(segmentInfo) {
   var downloadSize = segmentInfo.file.size;
   var duration = segmentInfo.segment ? segmentInfo.segment.details.duration : 0;
   totalDuration += duration;
@@ -96,7 +101,17 @@ r.on('segment', function(segmentInfo) {
     });
   }
 
-  oncemore(segmentInfo.stream).once('close', 'end', 'error', function(/*err*/) {
+  const dispatcher = new Pati.EventDispatcher(segmentInfo.stream);
+  dispatcher.on('end', Pati.EventDispatcher.end);
+  dispatcher.on('close', Pati.EventDispatcher.end);
+
+  Bounce.background(async () => {
+
+    try {
+      await dispatcher.finish();
+    }
+    catch (err) { /* ignore */ }
+
     console.error('segment done at ' + totalDuration.toFixed(0) + ' seconds, avg bitrate (kbps):', (downloadSize / (duration * 1024 / 8)).toFixed(1));
   });
 });
@@ -106,21 +121,36 @@ if (hlsdump.infoPort) {
   var currentSegment = -1;
 
   // setup stat tracking
-  stats.gauge('bufferBytes', function() { return r.buffer._readableState.length/* + buffer._writableState.length*/; });
+  stats.gauge('bufferBytes', function() { return reader.buffer._readableState.length/* + buffer._writableState.length*/; });
   stats.gauge('currentSegment', function() { return currentSegment; });
   stats.gauge('index.first', function() { return segmentReader.index ? segmentReader.index.first_seq_no : -1; });
   stats.gauge('index.last', function() { return segmentReader.index ? segmentReader.index.lastSeqNo() : -1; });
   stats.gauge('totalDuration', function() { return totalDuration; });
 
-  stats.meter('streamErrors');
+  stats.timer('fetchTime').unref();
+  stats.meter('streamErrors').unref();
+
+  r.on('segment', (segmentInfo) => {
 
-  r.on('segment', function(segmentInfo) {
     currentSegment = segmentInfo.segment && segmentInfo.segment.seq;
 
-    var stopwatch = stats.timer('fetchTime').start();
-    oncemore(segmentInfo.stream).once('close', 'end', 'error', function(err) {
-      stopwatch.end();
-      if (err) stats.meter('streamErrors').mark();
+    const stopwatch = stats.timer('fetchTime').start();
+
+    const dispatcher = new Pati.EventDispatcher(segmentInfo.stream);
+    dispatcher.on('end', Pati.EventDispatcher.end);
+    dispatcher.on('close', Pati.EventDispatcher.end);
+
+    Bounce.background(async () => {
+
+      try {
+        await dispatcher.finish();
+      }
+      catch (err) {
+        stats.meter('streamErrors').mark();
+      }
+      finally {
+        stopwatch.end();
+      }
     });
   });
 
@@ -136,7 +166,19 @@ if (hlsdump.infoPort) {
     res.end();
   }).listen(hlsdump.infoPort);
 
-  oncemore(r).once('end', 'error', function(/*err*/) {
+  const cleanup = () => {
+
     server.close();
-  });
+
+  };
+
+  r.finish().then(() => server.close(), () => server.close());
 }
+
+r.finish().then(() => {
+
+  console.error('stream complete');
+}, (err) => {
+
+  console.error('error', err);
+});

+ 31 - 17
lib/hls-reader.js

@@ -6,6 +6,7 @@ const Passthrough = require('readable-stream/passthrough');
 
 const StreamEach = require('stream-each');
 const Oncemore = require('oncemore');
+const Pati = require('pati');
 
 const TsSmooth = require('./tssmooth');
 const SegmentDecrypt = require('./segment-decrypt');
@@ -87,43 +88,56 @@ HlsReader.prototype.destroy = function () {
   
 };
 
-HlsReader.prototype.process = function(segmentInfo, done)  {
+HlsReader.prototype.process = async function(segmentInfo, done)  {
 
-  this.isReading = true;
+  let result;
+  try {
+    this.isReading = true;
 
-  return this.decrypt(segmentInfo.stream, segmentInfo.segment && segmentInfo.segment.details.keys, (err, stream) => {
-
-    if (err) {
+    let stream;
+    try {
+      stream = await this.decrypt(segmentInfo.stream, segmentInfo.segment && segmentInfo.segment.details.keys);
+    }
+    catch (err) {
       console.error('decrypt failed', err.stack);
       stream = segmentInfo.stream;
     }
 
     this.emit('segment', segmentInfo);
 
-    stream = Oncemore(stream);
+    const dispatcher = new Pati.EventDispatcher(stream);
+    dispatcher.on('end', Pati.EventDispatcher.end);
 
     if (!this.isHooked) {
       // pull data and detect if we need to hook before end
       let buffered = 0;
-      stream.on('data', (chunk) => {
+      dispatcher.on('data', (chunk) => {
 
         buffered += chunk.length;
-        if (!this.isHooked && buffered >= this.bufferSize)
+        if (!this.isHooked && buffered >= this.bufferSize) {
           this.hook();
+        }
       });
     }
 
     stream.pipe(this.buffer, { end: false });
-    stream.once('end', 'error', (err) => {
 
-      this.isReading = false;
-      if (err) {
-        console.error('stream error', err.stack || err);
-      }
-      this.hook();
-      done();
-    });
-  });
+    try {
+      await dispatcher.finish();
+    }
+    catch (err) {
+      console.error('stream error', err.stack || err);
+    }
+
+    this.isReading = false;
+    this.hook();
+  }
+  catch (err) {
+    result = err;
+  }
+  finally {
+    done(result);
+  }
 };
 
 // the hook is used to prebuffer

+ 77 - 61
lib/recorder.js

@@ -3,6 +3,7 @@
 const Path = require('path');
 const Url = require('url');
 
+const Bounce = require('bounce');
 const Mime = require('mime-types');
 const StreamEach = require('stream-each');
 const M3U8Parse = require('m3u8parse');
@@ -151,13 +152,11 @@ HlsStreamRecorder.prototype.updateIndex = function(update) {
     }
 
     // hook end listener
-    this.reader.on('end', () => {
+    this.reader.on('end', async () => {
 
       this.index.ended = true;
-      this.flushIndex((/*err*/) => {
-
-        debug('done');
-      });
+      await this.flushIndex();
+      debug('done');
     });
 
     if (this.decrypt) {
@@ -171,48 +170,58 @@ HlsStreamRecorder.prototype.updateIndex = function(update) {
   }
 };
 
-HlsStreamRecorder.prototype.process = function (segmentInfo, next) {
-
-  if (segmentInfo.type === 'segment') {
-    return this.processSegment(segmentInfo, next);
-  }
+HlsStreamRecorder.prototype.process = async function (segmentInfo, done) {
 
-  if (segmentInfo.type === 'init') {
-    return this.processInfo(segmentInfo, next);
-  }
+  let result;
+  try {
+    if (segmentInfo.type === 'segment') {
+      return await this.processSegment(segmentInfo);
+    }
 
-  debug('unknown segment type: ' + segmentInfo.type);
+    if (segmentInfo.type === 'init') {
+      return await this.processInfo(segmentInfo);
+    }
 
-  return next();
+    debug('unknown segment type: ' + segmentInfo.type);
+  }
+  catch (err) {
+    result = err;
+  }
+  finally {
+    done(result);
+  }
 };
 
-HlsStreamRecorder.prototype.processInfo = function (segmentInfo, callback) {
+HlsStreamRecorder.prototype.processInfo = async function (segmentInfo) {
 
   const meta = segmentInfo.file;
   const uri = `${this.segmentName(this.mapSeq, true)}.${Mime.extension(meta.mime)}`;
 
-  this.writeStream(segmentInfo.stream, uri, meta, (err, bytesWritten) => {
+  this.mapSeq++;
 
+  let bytesWritten = 0;
+  try {
+    bytesWritten = await this.writeStream(segmentInfo.stream, uri, meta);
+  }
+  catch (err) {
+    Bounce.rethrow(err, 'system');
     // only to report errors
-    if (err) debug('stream error', err.stack || err);
-
-    const map = new M3U8Parse.AttrList();
+    debug('stream error', err.stack || err);
+  }
 
-    map.quotedString('uri', uri);
+  const map = new M3U8Parse.AttrList();
 
-    // handle byterange
-    if (this.collect) {
-      map.quotedString('byterange', `${bytesWritten}@${this.uploader.segmentBytes - bytesWritten}`);
-    }
+  map.quotedString('uri', uri);
 
-    this.nextMap = map;
-    return callback();
-  });
+  // handle byterange
+  if (this.collect) {
+    map.quotedString('byterange', `${bytesWritten}@${this.uploader.segmentBytes - bytesWritten}`);
+  }
 
-  this.mapSeq++;
+  this.nextMap = map;
 };
 
-HlsStreamRecorder.prototype.processSegment = function (segmentInfo, callback) {
+HlsStreamRecorder.prototype.processSegment = async function (segmentInfo) {
 
   let segment = new M3U8Parse.M3U8Segment(segmentInfo.segment.details, true);
   let meta = segmentInfo.file;
@@ -236,45 +245,52 @@ HlsStreamRecorder.prototype.processSegment = function (segmentInfo, callback) {
   delete segment.byterange;
 
   // save the stream segment
-  SegmentDecrypt.decrypt(segmentInfo.stream, segmentInfo.segment.details.keys, this.decrypt, (err, stream, decrypted) => {
+  let stream;
+  try {
+    stream = await SegmentDecrypt.decrypt(segmentInfo.stream, segmentInfo.segment.details.keys, this.decrypt);
+  }
+  catch (err) {
+    console.error('decrypt failed', err.stack);
+    stream = segmentInfo.stream;
+  }
  
-    if (err) {
-      console.error('decrypt failed', err.stack);
-      stream = segmentInfo.stream;
-    }
-    else if (decrypted) {
-      segment.keys = null;
-      meta = { mime: meta.mime, modified: meta.modified }; // size is no longer valid
-    }
+  if (stream !== segmentInfo.stream) {
+    segment.keys = null;
+    meta = { mime: meta.mime, modified: meta.modified }; // size is no longer valid
+  }
 
-    this.writeStream(stream, segment.uri, meta, (err, bytesWritten) => {
+  this.seq++;
 
-      // only to report errors
-      if (err) debug('stream error', err.stack || err);
+  let bytesWritten = 0;
+  try {
+    bytesWritten = await this.writeStream(stream, segment.uri, meta);
+  }
+  catch (err) {
+    Bounce.rethrow(err, 'system');
 
-      // handle byterange
-      if (this.collect) {
-        const isContigious = this.segmentHead > 0 && ((this.segmentHead + bytesWritten) === this.uploader.segmentBytes);
-        segment.byterange = {
-          length: bytesWritten,
-          offset: isContigious ? null : this.uploader.segmentBytes - bytesWritten
-        }
+      // only report errors
+    debug('stream error', err.stack || err);
+  }
 
-        this.segmentHead = this.uploader.segmentBytes;
-      }
+  // handle byterange
+  if (this.collect) {
+    const isContigious = this.segmentHead > 0 && ((this.segmentHead + bytesWritten) === this.uploader.segmentBytes);
+    segment.byterange = {
+      length: bytesWritten,
+      offset: isContigious ? null : this.uploader.segmentBytes - bytesWritten
+    }
 
-      // update index
-      this.index.segments.push(segment);
-      this.flushIndex(callback);
-    });
+    this.segmentHead = this.uploader.segmentBytes;
+  }
 
-    this.seq++;
-  });
+  // update index
+  this.index.segments.push(segment);
+  return this.flushIndex();
 };
 
-HlsStreamRecorder.prototype.writeStream = function (stream, name, meta, callback) {
+HlsStreamRecorder.prototype.writeStream = function (stream, name, meta) {
 
-  this.uploader.pushSegment(stream, name, meta).then((written) => callback(null, written), callback);
+  return this.uploader.pushSegment(stream, name, meta);
 };
 
 HlsStreamRecorder.prototype.variantName = function(info, index) {
@@ -302,9 +318,9 @@ HlsStreamRecorder.prototype.segmentName = function(seqNo, isInit) {
   return this.collect ? 'stream' : (isInit ? 'init-' : '') + name(seqNo);
 };
 
-HlsStreamRecorder.prototype.flushIndex = function(cb) {
+HlsStreamRecorder.prototype.flushIndex = function() {
 
-  this.uploader.flushIndex(this.index).then(() => cb(), cb);
+  return this.uploader.flushIndex(this.index);
 };
 
 HlsStreamRecorder.prototype.recorderForUrl = function(remoteUrl) {

+ 44 - 58
lib/segment-decrypt.js

@@ -3,7 +3,7 @@
 const Url = require('url');
 const Crypto = require('crypto');
 
-const Oncemore = require('oncemore');
+const Pati = require('pati');
 const UriStream = require('uristream');
 
 
@@ -20,11 +20,9 @@ internals.KeyFetcher = function (uri, cookie) {
   this.uri = uri;
   this.cookie = cookie;
   this.key = null;
-
-  this._gets = [];
 };
 
-internals.KeyFetcher.prototype.fetch = function (next) {
+internals.KeyFetcher.prototype.fetch = function () {
 
   let key = new Buffer(0);
   let headers = {};
@@ -32,56 +30,43 @@ internals.KeyFetcher.prototype.fetch = function (next) {
     headers.Cookie = this.cookie;
   }
 
-  Oncemore(UriStream(this.uri, { headers: headers, whitelist: internals.allowedProtocols, timeout: internals.fetchTimeout }))
-
-    .on('data', (chunk) => {
-
-      key = Buffer.concat([key, chunk]);
-    })
-    .once('error', 'end', (err) => {
+  const dispatcher = new Pati.EventDispatcher(UriStream(this.uri, { headers: headers, whitelist: internals.allowedProtocols, timeout: internals.fetchTimeout }));
 
-      return next(err, key);
-    });
-};
-
-internals.KeyFetcher.prototype.get = function (next) {
+  dispatcher.on('data', (chunk) => {
 
-  if (this.key && this.key.length) {
-    return next(null, this.key);
-  }
-
-  const complete = (err, key) => {
+    key = Buffer.concat([key, chunk]);
+  });
+  dispatcher.on('end', () => {
 
-    if (!err) {
-      this.key = key;
-    }
+    dispatcher.end(key);
+  });
 
-    let gets = this._gets;
-    this._gets = [];
+  return dispatcher.finish();
+};
 
-    for (let idx = 0; idx < gets.length; idx++) {
-      process.nextTick(gets[idx], err, key);
-    }
-  };
+internals.KeyFetcher.prototype.get = function () {
 
-  if (this._gets.length === 0) {
-    this.fetch(complete);
+  if (!this.key) {
+    this.key = this.fetch();
   }
 
-  return this._gets.push(next);
+  return this.key;
 };
 
 
-internals.fetchKey = function (keyUri, options, next) {
+internals.fetchKey = function (keyUri, options) {
 
-  if (options.key) return next(null, options.key);
+  if (options.key) {
+    return options.key;
+  }
 
-  let uri = Url.resolve(options.base, keyUri);
+  const uri = Url.resolve(options.base, keyUri);
   let fetcher = internals.keyCache[uri];
   if (!fetcher) {
     fetcher = internals.keyCache[uri] = new internals.KeyFetcher(uri, options.cookie);
   }
-  return fetcher.get(next);
+
+  return fetcher.get();
 };
 
 
@@ -103,42 +88,43 @@ internals.getIdentityKey = function (keyAttrs) {
 };
 
 
-exports.decrypt = function (stream, keyAttrs, options, next) {
+exports.decrypt = async function (stream, keyAttrs, options, next) {
 
   if (!keyAttrs || !options) {
-    return next(null, stream, false);
+    return stream;
   }
 
   let key = internals.getIdentityKey(keyAttrs);
   if (!key || key.method === 'NONE') {
-    return next(null, stream, false);
+    return stream;
   }
 
   if (key.method !== 'AES-128' || !key.uri || !key.iv) {
 
     // TODO: hard error when key is not recognized?
-    return next(new Error('unknown encryption parameters'), stream);
+    throw new Error('unknown encryption parameters');
   }
 
-  return internals.fetchKey(key.uri, options, (err, keyData) => {
-
-    if (err) {
-      return next(new Error('key fetch failed: ' + (err.stack || err)));
-    }
-
-    let decrypt;
-    try {
-      decrypt = Crypto.createDecipheriv('aes-128-cbc', keyData, key.iv);
-    } catch (ex) {
-      return next(new Error('crypto setup failed: ' + (ex.stack || ex)));
-    }
+  let keyData;
+  try {
+    keyData = await internals.fetchKey(key.uri, options);
+  }
+  catch (err) {
+    throw new Error('key fetch failed: ' + (err.stack || err));
+  }
 
-    // forward stream errors
-    stream.on('error', (err) => {
+  let decrypt;
+  try {
+    decrypt = Crypto.createDecipheriv('aes-128-cbc', keyData, key.iv);
+  } catch (ex) {
+    throw new Error('crypto setup failed: ' + (ex.stack || ex));
+  }
 
-      decrypt.emit('error', err);
-    });
+  // forward stream errors
+  stream.on('error', (err) => {
 
-    return next(null, stream.pipe(decrypt), true);
+    decrypt.emit('error', err);
   });
+
+  return stream.pipe(decrypt);
 };

+ 2 - 2
package.json

@@ -27,6 +27,7 @@
   "license": "BSD-2-Clause",
   "dependencies": {
     "aws-sdk": "^2.179.0",
+    "bounce": "^1.2.0",
     "commander": "^2.3.0",
     "debug": "^2.0.0",
     "hls-segment-reader": "^4.0.1",
@@ -35,7 +36,6 @@
     "mime-types": "^2.0.1",
     "mkdirp": "^0.5.0",
     "noptify": "0.0.3",
-    "oncemore": "^1.0.0",
     "pati": "^1.1.0",
     "readable-stream": "^2.0.2",
     "stream-each": "^1.1.2",
@@ -45,6 +45,6 @@
   },
   "devDependencies": {},
   "engines": {
-    "node": ">=4.0.0"
+    "node": ">=8.9.0"
   }
 }