Parcourir la source

Support map option in all tools

Gil Pedersen il y a 9 ans
Parent
commit
cf323a0b0c
5 fichiers modifiés avec 119 ajouts et 101 suppressions
  1. 2 2
      bin/hlsdump
  2. 16 2
      bin/hlsmon
  3. 3 60
      lib/hls-reader.js
  4. 96 33
      lib/recorder.js
  5. 2 4
      package.json

+ 2 - 2
bin/hlsdump

@@ -85,7 +85,7 @@ r.on('end', function() {
 var totalDuration = 0;
 r.on('segment', function(segmentInfo) {
   var downloadSize = segmentInfo.file.size;
-  var duration = segmentInfo.details.duration;
+  var duration = segmentInfo.segment ? segmentInfo.segment.details.duration : 0;
   totalDuration += duration;
 
   // calculate size when missing
@@ -115,7 +115,7 @@ if (hlsdump.infoPort) {
   stats.meter('streamErrors');
 
   r.on('segment', function(segmentInfo) {
-    currentSegment = segmentInfo.seq;
+    currentSegment = segmentInfo.segment && segmentInfo.segment.seq;
 
     var stopwatch = stats.timer('fetchTime').start();
     oncemore(segmentInfo.stream).once('close', 'end', 'error', function(err) {

+ 16 - 2
bin/hlsmon

@@ -18,11 +18,25 @@ var sep = ';';
 function monitor(srcUrl) {
   var r = new HlsSegmentReader(srcUrl, { fullStream:true, withData:false });
 
+  var contentBytes = function (segmentInfo) {
+
+    if (segmentInfo.type === 'segment') {
+      return segmentInfo.segment.details.byterange ? +segmentInfo.segment.details.byterange.length : segmentInfo.file.size;
+    }
+
+    if (segmentInfo.type === 'init') {
+      return segmentInfo.init.byterange ? parseInt(segmentInfo.init.quotedString('byterange'), 10): segmentInfo.file.size;
+    }
+
+    return segmentInfo.file.size;
+  };
+
   var time = 0;
   r.on('data', function (segmentInfo) {
     var meta = segmentInfo.file;
-    var duration = segmentInfo.details.duration;
-    console.log(meta.modified.toJSON() + sep + meta.size + sep + duration.toFixed(3) + sep + (meta.size / (duration * 1024 / 8)).toFixed(3));
+    var size = contentBytes(segmentInfo);
+    var duration = +(segmentInfo.segment && segmentInfo.segment.details.duration);
+    console.log(meta.modified.toJSON() + sep + size + sep + duration.toFixed(3) + sep + (size / (duration * 1024 / 8)).toFixed(3));
     time += duration;
   });
 

+ 3 - 60
lib/hls-reader.js

@@ -1,23 +1,17 @@
 'use strict';
 
 const Util = require('util');
-const Url = require('url');
 const Readable = require('readable-stream/readable');
 const Passthrough = require('readable-stream/passthrough');
 
-const Async = require('async');
 const StreamEach = require('stream-each');
 const Oncemore = require('oncemore');
-const UriStream = require('uristream');
-const deepEqual = require('deep-equal');
 
 const TsSmooth = require('./tssmooth');
 const SegmentDecrypt = require('./segment-decrypt');
 
 
 const internals = {
-  mapFetchTimeout: 30 * 1000,
-
   NOOP: function(){},
 };
 
@@ -97,38 +91,13 @@ HlsReader.prototype.process = function(segmentInfo, done)  {
 
   this.isReading = true;
 
-  Async.parallel({
-    map: (next) => {
-
-      if (!deepEqual(segmentInfo.details.map, this.map)) {
-        this.map = segmentInfo.details.map;
-        if (this.map) {
-          return this.appendMap(this.map, next);
-        }
-      }
-
-      return next();
-    },
-    stream: (next) => {
-
-      return this.decrypt(segmentInfo.stream, segmentInfo.details.keys, (err, stream) => {
-
-        if (err) {
-          console.error('decrypt failed', err.stack);
-          stream = segmentInfo.stream;
-        }
-
-        return next(null, stream);
-      });
-    },
-  }, (err, results) => {
+  return this.decrypt(segmentInfo.stream, segmentInfo.segment && segmentInfo.segment.details.keys, (err, stream) => {
 
     if (err) {
-      return done(err);
+      console.error('decrypt failed', err.stack);
+      stream = segmentInfo.stream;
     }
 
-    let stream = results.stream;
-
     this.emit('segment', segmentInfo);
 
     stream = Oncemore(stream);
@@ -189,32 +158,6 @@ HlsReader.prototype.hook = function hook() {
   this.emit('ready');
 };
 
-HlsReader.prototype.appendMap = function(map, next) {
-
-  if (!map.uri) {
-    return next(new Error('missing "uri" attribute from map'));
-  }
-  let mapUri = Url.resolve(this.reader.baseUrl, map.quotedString('uri'));
-
-  let fetchOptions = {
-    timeout: internals.mapFetchTimeout,
-  };
-
-  if (map.byterange) {
-    let n = map.quotedString('byterange').split('@');
-    if (n.length !== 2) {
-      return next(new Error('invalid "byterange" attribute from map'));
-    }
-
-    fetchOptions.start = parseInt(n[1], 10);
-    fetchOptions.end = fetchOptions.start + parseInt(n[0], 10) - 1;
-  }
-
-  Oncemore(UriStream(mapUri, fetchOptions))
-    .once('end', 'error', next)
-    .pipe(this.buffer, { end: false })
-};
-
 HlsReader.prototype.decrypt = function(stream, keyAttrs, next) {
 
   return SegmentDecrypt.decrypt(stream, keyAttrs, { base: this.reader.baseUrl, key: this.key, cookie: this.cookie }, next);

+ 96 - 33
lib/recorder.js

@@ -37,6 +37,11 @@ function HlsStreamRecorder(reader, dst, options) {
   this.decrypt = options.decrypt;
 
   this.recorders = [];
+
+  this.mapSeq = 0;
+  this.nextMap = null;
+
+  this.writing = null; // tracks writing state
 }
 
 HlsStreamRecorder.prototype.start = function() {
@@ -171,36 +176,72 @@ HlsStreamRecorder.prototype.updateIndex = function(update) {
   }
 };
 
-HlsStreamRecorder.prototype.process = function(segmentInfo, done) {
+HlsStreamRecorder.prototype.process = function (segmentInfo, next) {
+
+  if (segmentInfo.type === 'segment') {
+    return this.processSegment(segmentInfo, next);
+  }
+
+  if (segmentInfo.type === 'init') {
+    return this.processInfo(segmentInfo, next);
+  }
+
+  debug('unknown segment type: ' + segmentInfo.type);
+
+  return next();
+};
+
+HlsStreamRecorder.prototype.processInfo = function (segmentInfo, callback) {
+
+  const meta = segmentInfo.file;
+  const uri = `${this.segmentName(this.mapSeq, true)}.${Mime.extension(meta.mime)}`;
+
+  this.writeStream(segmentInfo.stream, uri, (err, bytesWritten) => {
+
+    // only to report errors
+    if (err) debug('stream error', err.stack || err);
 
-  let segment = new M3U8Parse.M3U8Segment(segmentInfo.details, true);
+    const map = new M3U8Parse.AttrList();
+
+    map.quotedString('uri', uri);
+
+    // handle byterange
+    if (this.collect) {
+      map.quotedString('byterange', `${bytesWritten}@${this.writing.bytes - bytesWritten}`);
+    }
+
+    this.nextMap = map;
+    return callback();
+  });
+
+  this.mapSeq++;
+};
+
+HlsStreamRecorder.prototype.processSegment = function (segmentInfo, callback) {
+
+  let segment = new M3U8Parse.M3U8Segment(segmentInfo.segment.details, true);
   let meta = segmentInfo.file;
 
   // mark discontinuities
   if (this.nextSegmentSeq !== -1 &&
-      this.nextSegmentSeq !== segmentInfo.seq) {
+      this.nextSegmentSeq !== segmentInfo.segment.seq) {
     segment.discontinuity = true;
   }
-  this.nextSegmentSeq = segmentInfo.seq + 1;
+  this.nextSegmentSeq = segmentInfo.segment.seq + 1;
 
   // create our own uri
   segment.uri = `${this.segmentName(this.seq)}.${Mime.extension(meta.mime)}`;
 
-  // handle byterange
-  let first = this.index.segments.length === 0;
-  let newFile = first || this.index.segments[this.index.segments.length - 1].uri !== segment.uri;
-  if (this.collect) {
-    segment.byterange = {
-      length: 0,
-      offset: newFile ? 0 : null
-    }
-  }
-  else {
-    delete segment.byterange;
+  // add map info
+  if (this.nextMap) {
+    segment.map = this.nextMap;
+    this.nextMap = null;
   }
 
+  delete segment.byterange;
+
   // save the stream segment
-  SegmentDecrypt.decrypt(segmentInfo.stream, segmentInfo.details.keys, this.decrypt, (err, stream, decrypted) => {
+  SegmentDecrypt.decrypt(segmentInfo.stream, segmentInfo.segment.details.keys, this.decrypt, (err, stream, decrypted) => {
  
     if (err) {
       console.error('decrypt failed', err.stack);
@@ -210,35 +251,57 @@ HlsStreamRecorder.prototype.process = function(segmentInfo, done) {
       segment.keys = null;
     }
 
-    stream = Oncemore(stream);
-    stream.pipe(Fs.createWriteStream(Path.join(this.dst, segment.uri), { flags: newFile ? 'w' : 'a' }));
-
-    let bytesWritten = 0;
-    if (this.collect) {
-      stream.on('data', (chunk) => {
-
-        bytesWritten += chunk.length;
-      });
-    }
-
-    stream.once('end', 'error', (err) => {
+    this.writeStream(stream, segment.uri, (err, bytesWritten) => {
 
       // only to report errors
       if (err) debug('stream error', err.stack || err);
 
-      if (segment.byterange) {
-        segment.byterange.length = bytesWritten;
+      // handle byterange
+      if (this.collect) {
+        const isContigious = this.writing.segmentHead > 0 && ((this.writing.segmentHead + bytesWritten) === this.writing.bytes);
+        segment.byterange = {
+          length: bytesWritten,
+          offset: isContigious ? null : this.writing.bytes - bytesWritten
+        }
+
+        this.writing.segmentHead = this.writing.bytes;
       }
 
       // update index
       this.index.segments.push(segment);
-      this.flushIndex(done);
+      this.flushIndex(callback);
     });
 
     this.seq++;
   });
 };
 
+HlsStreamRecorder.prototype.writeStream = function (stream, name, callback) {
+
+  if (!this.writing || !this.collect) {
+    this.writing = {
+      bytes: 0,
+      segmentHead: 0
+    };
+  }
+
+  stream.pipe(Fs.createWriteStream(Path.join(this.dst, name), { flags: this.writing.bytes === 0 ? 'w' : 'a' }));
+
+  let bytesWritten = 0;
+  if (this.collect) {
+    stream.on('data', (chunk) => {
+
+      bytesWritten += +chunk.length;
+    });
+  }
+
+  Oncemore(stream).once('end', 'error', (err) => {
+
+    this.writing.bytes += bytesWritten;
+    return callback(err, bytesWritten);
+  });
+};
+
 HlsStreamRecorder.prototype.variantName = function(info, index) {
 
   return `v${index}`;
@@ -251,7 +314,7 @@ HlsStreamRecorder.prototype.groupSrcName = function(info, index) {
   return `grp/${id}/${lang ? lang + '-' : ''}${index}`;
 };
 
-HlsStreamRecorder.prototype.segmentName = function(seqNo) {
+HlsStreamRecorder.prototype.segmentName = function(seqNo, isInit) {
 
   const name = (n) => {
 
@@ -261,7 +324,7 @@ HlsStreamRecorder.prototype.segmentName = function(seqNo) {
     return chr;
   };
 
-  return this.collect ? 'stream' : name(seqNo);
+  return this.collect ? 'stream' : (isInit ? 'init-' : '') + name(seqNo);
 };
 
 HlsStreamRecorder.prototype.flushIndex = function(cb) {

+ 2 - 4
package.json

@@ -26,11 +26,9 @@
   "author": "Gil Pedersen <gpdev@gpost.dk>",
   "license": "BSD-2-Clause",
   "dependencies": {
-    "async": "^2.0.0-rc.6",
     "commander": "^2.3.0",
     "debug": "^2.0.0",
-    "deep-equal": "^1.0.0",
-    "hls-segment-reader": "^3.0.0",
+    "hls-segment-reader": "^4.0.1",
     "m3u8parse": "^1.0.0",
     "measured": "^1.0.0",
     "mime-types": "^2.0.1",
@@ -40,7 +38,7 @@
     "readable-stream": "^2.0.2",
     "stream-each": "^1.1.2",
     "udp-blast": "^1.0.0",
-    "uristream": "^1.1.0",
+    "uristream": "^2.0.0",
     "write-file-atomic": "^1.1.0"
   },
   "devDependencies": {},