Explorar el Código

refactor reader from hlsdump to new HlsReader class

Gil Pedersen hace 11 años
padre
commit
99971b4767
Se han modificado 3 ficheros con 225 adiciones y 150 borrados
  1. 22 147
      bin/hlsdump
  2. 202 0
      lib/hls-reader.js
  3. 1 3
      package.json

+ 22 - 147
bin/hlsdump

@@ -33,19 +33,13 @@ hlsdump.version('0.0.0')
    .option('--key <hex>', 'use oob key for decrypting segments', function(opt) {return new Buffer(opt, 'hex');})
    .parse(process.argv);
 
-var url = require('url'),
-    fs = require('fs'),
-    http = require('http'),
-    crypto = require('crypto');
-
-var streamprocess = require('streamprocess'),
-    oncemore = require('oncemore'),
-    uristream = require('uristream'),
+var fs = require('fs'),
+    http = require('http');
+
+var oncemore = require('oncemore'),
     HlsSegmentReader = require('hls-segment-reader'),
     UdpBlast = require('udp-blast');
-var tssmooth = require('../lib/tssmooth');
-
-var Passthrough = require('readable-stream/passthrough');
+var HlsReader = require('../lib/hls-reader');
 
 var stats = require('measured').createCollection();
 
@@ -57,168 +51,49 @@ if (!src) {
 
 if (hlsdump.bufferSize) hlsdump.sync = true;
 
-var r = new HlsSegmentReader(src, {highWaterMark:(hlsdump.concurrent || 1) - 1, fullStream:hlsdump.fullStream});
+var r = new HlsReader(new HlsSegmentReader(src, { highWaterMark: (hlsdump.concurrent || 1) - 1, fullStream: hlsdump.fullStream }), hlsdump);
 
 var totalDuration = 0, currentSegment = -1;
-var reading = false, hooked = false;
-var keyCache = {};
-
-streamprocess(r, function (obj, done) {
-  var meta = obj.meta;
-  var duration = obj.segment.duration;
-  var downloadSize = meta.size;
-  var stream = oncemore(obj.stream);
-  totalDuration += duration;
-
-  console.error('piping segment', meta.url);
-
-  var stopwatch = stats.timer('fetchTime').start();
-  stream.once('close', 'end', 'error', function() {
-    stopwatch.end();
-  });
-
-  reading = true;
-  currentSegment = obj.seq;
+r.on('segment', function(data) {
+  var downloadSize = data.meta.size;
+  var duration = data.segment.duration;
 
   // calculate size when missing
   if (downloadSize === -1) {
     downloadSize = 0;
-    obj.stream.on('data', function(chunk) {
+    data.stream.on('data', function(chunk) {
       downloadSize += chunk.length;
     });
   }
 
-  var keyData = obj.segment.key;
-  if (keyData && keyData.method === 'AES-128' && keyData.uri && keyData.uri.length > 2) {
-    fetchKey(function(err, key) {
-      if (err) {
-        console.error('key fetch failed:', err);
-        return pushBuffer(stream);
-      }
-      var iv = new Buffer(keyData.iv.slice(-32), 'hex');
-      try {
-        var decrypt = crypto.createDecipheriv('aes-128-cbc', key, iv);
-      } catch (ex) {
-        console.error('crypto setup failed:', ex.stack || ex);
-        return pushBuffer(stream);
-      }
-
-      stream.on('error', function(err) {
-        decrypt.emit('error', err);
-      });
-      pushBuffer(oncemore(stream.pipe(decrypt)));
-    });
-  } else {
-    pushBuffer(stream);
-  }
-
-  function fetchKey(cb) {
-    if (hlsdump.key) return cb(null, hlsdump.key);
-
-    var uri = url.resolve(r.url, keyData.uri.slice(1,-1));
-    var entry = keyCache[uri];
-    if (entry && entry.length) return cb(null, keyCache[uri]);
-
-    var key = new Buffer(0);
-    var headers = {};
-    if (hlsdump.cookie)
-      headers.Cookie = hlsdump.cookie;
-
-    oncemore(uristream(uri, { headers:headers, whitelist:['http', 'https', 'data'], timeout: 10 * 1000 }))
-      .on('data', function(chunk) {
-        key = Buffer.concat([key, chunk]);
-      })
-      .once('error', 'end', function(err) {
-        keyCache[uri] = key;
-        return cb(err, key);
-      });
-  }
+  var stopwatch = stats.timer('fetchTime').start();
+  oncemore(data.stream).once('close', 'end', 'error', function(err) {
+    stopwatch.end();
 
-  function pushBuffer(stream) {
-    if (!hooked) {
-      // pull data and detect if we need to hook before end
-      var buffered = 0;
-      stream.on('data', function(chunk) {
-        buffered += chunk.length;
-        if (!hooked && buffered >= hlsdump.bufferSize)
-          hook(buffer);
-      });
+    console.error('segment done at ' + totalDuration.toFixed(0) + ' seconds, avg bitrate (kbps):', (downloadSize / (duration * 1024 / 8)).toFixed(1));
+    if (err) {
+      stats.meter('streamErrors').mark();
     }
-
-    stream.pipe(buffer, { end: false });
-    stream.once('end', 'error', function(err) {
-      reading = false;
-      console.error('segment done at ' + totalDuration.toFixed(0) + ' seconds, avg bitrate (kbps):', (downloadSize / (duration * 1024 / 8)).toFixed(1));
-      if (err) {
-        stats.meter('streamErrors').mark();
-        console.error('stream error', err.stack || err);
-      }
-      hook(buffer);
-      done();
-    });
-  }
-});
-
-r.once('index', function() {
-  // wait until first index is returned before attaching error listener.
-  // this will enable initials errors to throw
-  r.on('error', function(err) {
-    console.error('reader error', err.stack || err);
   });
 });
 
-r.on('end', function() {
-  console.error('done');
-});
-
-var buffer = new Passthrough({highWaterMark:hlsdump.bufferSize});
-var outputs = [];
-
 if (hlsdump.udp) {
   var dst = (hlsdump.udp === true) ? null : hlsdump.udp;
-  outputs.push(new UdpBlast(dst, { packetSize: 7 * 188 }));
+  r.pipe(new UdpBlast(dst, { packetSize: 7 * 188 }));
 }
 
 if (hlsdump.output) {
   if (hlsdump.output === '-')
-    outputs.push(process.stdout);
+    r.pipe(process.stdout);
   else
-    outputs.push(fs.createWriteStream(hlsdump.output));
-}
-
-// the hook is used to prebuffer
-function hook(stream) {
-  if (hooked) return;
-
-  console.error('hooking output');
-
-  var s = stream;
-  if (hlsdump.sync) {
-    var smooth = tssmooth();
-    smooth.on('unpipe', function() {
-      this.unpipe();
-    });
-    smooth.on('warning', function(err) {
-      console.error('smoothing error', err);
-    });
-    s = s.pipe(smooth);
-  }
-
-  outputs.forEach(function (o) {
-    s.pipe(o);
-  });
-
-  hooked = true;
+    r.pipe(fs.createWriteStream(hlsdump.output));
 }
 
-if (!hlsdump.sync || !(hlsdump.bufferSize > 0))
-  hook(buffer);
-
 // setup stat tracking
-stats.gauge('bufferBytes', function() { return buffer._readableState.length/* + buffer._writableState.length*/; });
+stats.gauge('bufferBytes', function() { return r.buffer._readableState.length/* + buffer._writableState.length*/; });
 stats.gauge('currentSegment', function() { return currentSegment; });
-stats.gauge('index.first', function() { return r.index ? r.index.first_seq_no : -1; });
-stats.gauge('index.last', function() { return r.index ? r.index.lastSeqNo() : -1; });
+stats.gauge('index.first', function() { return r.reader.index ? r.reader.index.first_seq_no : -1; });
+stats.gauge('index.last', function() { return r.reader.index ? r.reader.index.lastSeqNo() : -1; });
 stats.gauge('totalDuration', function() { return totalDuration; });
 
 stats.meter('streamErrors');

+ 202 - 0
lib/hls-reader.js

@@ -0,0 +1,202 @@
+"use strict";
+
+var Url = require('url'),
+    Util = require('util'),
+    Crypto = require('crypto');
+
+var StreamProcess = require('streamprocess'),
+    oncemore = require('oncemore'),
+    UriStream = require('uristream');
+
+var Readable = require('readable-stream/readable'),
+    Passthrough = require('readable-stream/passthrough');
+
+var tssmooth = require('./tssmooth');
+
+var internals = {
+  keyCache: {},
+};
+
+var NOOP = function(){};
+
+// 'pipe' stream to a Readable
+function pump(src, dst, done) {
+  src.on('data', function(chunk) {
+    if (!dst.push(chunk)) {
+      src.pause();
+    }
+  });
+  oncemore(src).once('end', 'error', function(err) {
+    // TODO: flush source buffer on error?
+    dst._read = NOOP;
+    done(err);
+  });
+  dst._read = function() {
+    src.resume();
+  };
+}
+
+function HlsReader(segmentReader, options) {
+  if (!(this instanceof HlsReader))
+    return new HlsReader(segmentReader, options);
+
+  Readable.call(this, { lowWaterMark: options.lowWaterMark, highWaterMark: options.highWaterMark });
+
+  var self = this;
+
+  this.reader = segmentReader;
+
+  this.sync = !!options.sync; // output in real-time
+  this.bufferSize = ~~options.bufferSize;
+
+  this.cookie = options.cookie;
+  this.key = options.key;
+
+  if (options.key && !Buffer.isBuffer(options.key) && options.key.length !== 32) {
+    throw new TypeError('key must be a 32 byte Buffer');
+  }
+
+  this.isReading = false;
+  this.isHooked = false;
+  this.buffer = new Passthrough({ highWaterMark: this.bufferSize });
+
+  StreamProcess(this.reader, function (obj, done) {
+    self.isReading = true;
+
+    return self.decrypt(obj.stream, obj.segment.key, function (err, stream) {
+      if (err) {
+        console.error('decrypt failed', err.stack);
+        stream = obj.stream;
+      }
+
+      self.emit('segment', obj);
+
+      stream = oncemore(stream);
+
+      if (!self.isHooked) {
+        // pull data and detect if we need to hook before end
+        var buffered = 0;
+        stream.on('data', function(chunk) {
+          buffered += chunk.length;
+          if (!self.isHooked && buffered >= self.bufferSize)
+            self.hook();
+        });
+      }
+
+      stream.pipe(self.buffer, { end: false });
+      stream.once('end', 'error', function(err) {
+        self.isReading = false;
+        if (err) {
+          console.error('stream error', err.stack || err);
+        }
+        self.hook();
+        done();
+      });
+    });
+  });
+
+  this.reader.once('index', function() {
+    // wait until first index is returned before attaching error listener.
+    // this will enable initials errors to throw
+    self.reader.on('error', function(err) {
+      console.error('reader error', err.stack || err);
+    });
+  });
+
+  this.reader.on('end', function() {
+    console.error('done');
+    self.buffer.end();
+  });
+
+  // start output if needed
+  if (!this.sync || !(this.bufferSize > 0))
+    this.hook();
+}
+Util.inherits(HlsReader, Readable);
+
+// the hook is used to prebuffer
+HlsReader.prototype.hook = function hook() {
+  var self = this;
+
+  if (this.isHooked) return;
+
+  console.error('hooking output', this.sync);
+  self.isHooked = true;
+
+  var s = this.buffer;
+  if (this.sync) {
+    var smooth = tssmooth();
+    smooth.on('unpipe', function() {
+      this.unpipe();
+    });
+    smooth.on('warning', function(err) {
+      console.error('smoothing error', err);
+    });
+    s = s.pipe(smooth);
+  }
+
+  pump(s, this, function(err) {
+    if (err) {
+      return self.emit('error', err);
+    }
+    self.push(null);
+  });
+};
+
+HlsReader.prototype.decrypt = function (stream, keyAttrs, next) {
+  if (!keyAttrs) return next(null, stream);
+
+  if (keyAttrs.enumeratedString('method') !== 'AES-128' ||
+      !keyAttrs.quotedString('uri') || !keyAttrs.hexadecimalInteger('iv')) {
+
+    // TODO: hard error when key is not recognized?
+    return next(new Error('unknown encryption parameters'));
+  }
+
+  return this.fetchKey(keyAttrs.quotedString('uri'), function(err, key) {
+    if (err)
+      return next(new Error('key fetch failed: ' + (err.stack || err)));
+
+    var iv = keyAttrs.hexadecimalInteger('iv');
+    try {
+      var decrypt = Crypto.createDecipheriv('aes-128-cbc', key, iv);
+    } catch (ex) {
+      return next(new Error('crypto setup failed: ' (ex.stack || ex)));
+    }
+
+    // forward stream errors
+    stream.on('error', function(err) {
+      decrypt.emit('error', err);
+    });
+
+    return next(null, stream.pipe(decrypt));
+  });
+};
+
+HlsReader.prototype.fetchKey = function (keyUri, next) {
+  if (this.key) return next(null, this.key);
+
+  var uri = Url.resolve(this.reader.url, keyUri);
+  var entry = internals.keyCache[uri];
+  if (entry && entry.length) return next(null, internals.keyCache[uri]);
+
+  var key = new Buffer(0);
+  var headers = {};
+  if (this.cookie)
+    headers.Cookie = this.cookie;
+
+  oncemore(UriStream(uri, { headers: headers, whitelist: ['http', 'https', 'data'], timeout: 10 * 1000 }))
+    .on('data', function(chunk) {
+      key = Buffer.concat([key, chunk]);
+    })
+    .once('error', 'end', function(err) {
+      internals.keyCache[uri] = key;
+      return next(err, key);
+    });
+};
+
+var hlsreader = module.exports = function hlsreader(segmentReader, options) {
+  return new HlsReader(segmentReader, options);
+};
+
+hlsreader.HlsReader = HlsReader;

+ 1 - 3
package.json

@@ -26,7 +26,6 @@
   "author": "Gil Pedersen <gpdev@gpost.dk>",
   "license": "BSD",
   "dependencies": {
-    "carrier": "~0.1.8",
     "commander": "^2.3.0",
     "debug": "^2.0.0",
     "deep-equal": "^1.0.0",
@@ -40,8 +39,7 @@
     "streamprocess": "0.0.1",
     "udp-blast": "^1.0.0",
     "uristream": "^1.1.0",
-    "write-file-atomic": "^1.1.0",
-    "xtend": "^4.0.0"
+    "write-file-atomic": "^1.1.0"
   },
   "devDependencies": {},
   "engines": {