Selaa lähdekoodia

use newly refactored hls-segment-reader module

Gil Pedersen 11 vuotta sitten
vanhempi
commit
fa467de051
6 muutettua tiedostoa jossa 25 lisäystä ja 289 poistoa
  1. 4 5
      bin/hlsdump
  2. 4 5
      bin/hlsmon
  3. 6 17
      bin/hlsrecord
  4. 0 260
      lib/reader.js
  5. 9 1
      lib/recorder.js
  6. 2 1
      package.json

+ 4 - 5
bin/hlsdump

@@ -33,8 +33,7 @@ hlsdump.version('0.0.0')
    .option('--key <hex>', 'use oob key for decrypting segments', function(opt) {return new Buffer(opt, 'hex');})
    .parse(process.argv);
 
-var util = require('util'),
-    url = require('url'),
+var url = require('url'),
     fs = require('fs'),
     http = require('http'),
     crypto = require('crypto');
@@ -42,9 +41,9 @@ var util = require('util'),
 var streamprocess = require('streamprocess'),
     oncemore = require('oncemore'),
     uristream = require('uristream'),
+    HlsSegmentReader = require('hls-segment-reader'),
     UdpBlast = require('udp-blast');
-var reader = require('../lib/reader'),
-    tssmooth = require('../lib/tssmooth');
+var tssmooth = require('../lib/tssmooth');
 
 var Passthrough = require('readable-stream/passthrough');
 
@@ -58,7 +57,7 @@ if (!src) {
 
 if (hlsdump.bufferSize) hlsdump.sync = true;
 
-var r = reader(src, {highWaterMark:(hlsdump.concurrent || 1) - 1, fullStream:hlsdump.fullStream});
+var r = new HlsSegmentReader(src, {highWaterMark:(hlsdump.concurrent || 1) - 1, fullStream:hlsdump.fullStream});
 
 var totalDuration = 0, currentSegment = -1;
 var reading = false, hooked = false;

+ 4 - 5
bin/hlsmon

@@ -8,16 +8,15 @@ hlsmon.version('0.0.0')
    .option('-a', '--user-agent <string>', 'User-Agent')
    .parse(process.argv);
 
-var util = require('util'),
-    url = require('url');
+var url = require('url');
 
-var reader = require('../lib/reader');
+var HlsSegmentReader = require('hls-segment-reader');
 
 var src = process.argv[2];
 var sep = ';';
 
 function monitor(srcUrl) {
-  var r = reader(srcUrl, {noData:true});
+  var r = new HlsSegmentReader(srcUrl, { noData:true });
 
   var time = 0;
   r.on('readable', function() {
@@ -50,4 +49,4 @@ function monitor(srcUrl) {
   r.resume();
 }
 
-monitor(src);
+monitor(src);

+ 6 - 17
bin/hlsrecord

@@ -33,21 +33,10 @@ function dateValue(val) {
   return new Date(val);
 }
 
-var fs = require('fs'),
-    path = require('path'),
-    util = require('util');
+var fs = require('fs');
 
-var mime = require('mime');
-var reader = require('../lib/reader'),
-    recorder = require('../lib/recorder');
-
-mime.define({
-  'application/vnd.apple.mpegURL': ['m3u8'],
-  'video/mp2t': ['ts'],
-  'audio/x-aac': ['aac'],
-  'audio/aac': ['aac'],
-  'audio/ac3': ['ac3'],
-});
+var HlsSegmentReader = require('hls-segment-reader');
+var recorder = require('../lib/recorder');
 
 var src = hlsrecord.args[0];
 if (!src) {
@@ -76,13 +65,13 @@ var readerOptions = {
   startDate: hlsrecord.beginDate,
   stopDate: hlsrecord.endDate,
   maxStallTime: 5 * 60 * 1000,
-  fullStream:true,
+  fullStream: true,
   extensions: extensions,
-  highWaterMark:0,
+  highWaterMark: 0,
 };
 
 function createReader(src) {
-  var r = reader(src, readerOptions);
+  var r = new HlsSegmentReader(src, readerOptions);
   r.on('error', function(err) {
     console.error('reader error', err);
   });

+ 0 - 260
lib/reader.js

@@ -1,260 +0,0 @@
-"use strict";
-
-// stream from hls source
-
-var util = require('util'),
-    url = require('url'),
-    zlib = require('zlib'),
-    assert = require('assert');
-
-var extend = require('xtend'),
-    oncemore = require('oncemore'),
-    m3u8parse = require('m3u8parse'),
-    uristream = require('uristream'),
-    debug = require('debug')('hls:reader');
-
-var Readable = require('readable-stream');
-
-var noop = function noop() {};
-
-function HlsSegmentObject(seq, segment, meta, stream) {
-  this.seq = seq;
-  this.segment = segment;
-  this.meta = meta;
-  this.stream = stream;
-}
-
-function fetchfrom(reader, seqNo, segment, cb) {
-  var segmentUrl = url.resolve(reader.baseUrl, segment.uri);
-  var probe = !!reader.noData;
-
-  debug('fetching segment', segmentUrl);
-  var stream = uristream(segmentUrl, { probe:probe, highWaterMark:100 * 1000 * 1000 });
-
-  function finish(err, res) {
-    stream.removeListener('meta', onmeta);
-    stream.removeListener('end', onfail);
-    stream.removeListener('error', onfail);
-    cb(err, res);
-  }
-
-  function onmeta(meta) {
-    debug('got segment meta', meta);
-
-    if (reader.segmentMimeTypes.indexOf(meta.mime.toLowerCase()) === -1) {
-      if (stream.abort) stream.abort();
-      return stream.emit(new Error('Unsupported segment MIME type: ' + meta.mime));
-    }
-
-    finish(null, new HlsSegmentObject(seqNo, segment, meta, stream));
-  }
-
-  function onfail(err) {
-    if (!err) err = new Error('No metadata');
-    finish(err);
-  }
-
-  stream.on('meta', onmeta);
-  stream.on('end', onfail);
-  stream.on('error', onfail);
-
-  return stream;
-}
-
-function checknext(reader) {
-  var state = reader.readState;
-  var index = reader.index;
-  if (!state.active || state.fetching || state.nextSeq === -1 || !index)
-    return null;
-
-  var seq = state.nextSeq;
-  var segment = index.getSegment(seq, true);
-
-  if (segment) {
-    // check if we need to stop
-    if (reader.stopDate && segment.program_time > reader.stopDate)
-      return reader.push(null);
-
-    state.fetching = fetchfrom(reader, seq, segment, function(err, object) {
-      state.fetching = null;
-      if (err) reader.emit('error', err);
-
-      if (seq === state.nextSeq)
-        state.nextSeq++;
-
-      if (object) {
-        reader.watch[object.seq] = object.stream;
-        oncemore(object.stream).once('end', 'error', function() {
-          delete reader.watch[object.seq];
-        });
-
-        state.active = reader.push(object);
-      }
-
-      checknext(reader);
-    });
-  } else if (index.ended) {
-    reader.push(null);
-  } else if (!index.type && (index.lastSeqNo() < state.nextSeq - 1)) {
-    // handle live stream restart
-    state.nextSeq = index.startSeqNo(true);
-    checknext(reader);
-  }
-}
-
-function HlsStreamReader(src, options) {
-  var self = this;
-
-  options = options || {};
-  if (typeof src === 'string')
-    src = url.parse(src);
-
-  this.url = src;
-  this.baseUrl = src;
-
-  this.fullStream = !!options.fullStream;
-  this.noData = !!options.noData;
-
-  // dates are inclusive
-  this.startDate = options.startDate ? new Date(options.startDate) : null;
-  this.stopDate = options.stopDate ? new Date(options.stopDate) : null;
-
-  this.maxStallTime = options.maxStallTime || Infinity;
-
-  this.extensions = options.extensions || {};
-
-  this.index = null;
-  this.readState = {
-    nextSeq:-1,
-    active:false,
-    fetching:null
-  };
-  this.watch = {}; // used to stop buffering on expired segments
-
-  this.indexStallSince = null;
-
-  function getUpdateInterval(updated) {
-    if (updated && self.index.segments.length) {
-      self.indexStallSince = null;
-      return Math.min(self.index.target_duration, self.index.segments[self.index.segments.length - 1].duration);
-    } else {
-      if (self.indexStallSince) {
-        if ((Date.now() - +self.indexStallSince) > self.maxStallTime)
-          return -1;
-      } else {
-        self.indexStallSince = new Date();
-      }
-      return self.index.target_duration / 2;
-    }
-  }
-
-  function initialSeqNo() {
-    var index = self.index;
-
-    if (self.startDate)
-      return index.seqNoForDate(self.startDate, true);
-    else
-      return index.startSeqNo(self.fullStream);
-  }
-
-  function updatecheck(updated) {
-    if (updated) {
-      if (self.readState.nextSeq === -1)
-        self.readState.nextSeq = initialSeqNo();
-      else if (self.readState.nextSeq < self.index.startSeqNo(true)) {
-        debug('skipping ' + (self.index.startSeqNo(true) - self.readState.nextSeq) + ' invalidated segments');
-        self.readState.nextSeq = self.index.startSeqNo(true);
-      }
-
-      // check watched segments
-      for (var seq in self.watch) {
-        if (!self.index.isValidSeqNo(seq)) {
-          var stream = self.watch[seq];
-          delete self.watch[seq];
-
-          setTimeout(function () {
-            debug('aborting discontinued segment download');
-            if (!stream.ended && stream.abort) stream.abort();
-          }, self.index.target_duration * 1000);
-        }
-      }
-
-      self.emit('index', self.index);
-
-      if (self.index.variant)
-        return self.push(null);
-    }
-    checknext(self);
-
-    if (self.index && !self.index.ended && self.readable) {
-      var updateInterval = getUpdateInterval(updated);
-      if (updateInterval <= 0)
-        return self.emit('error', new Error('index stall'));
-      debug('scheduling index refresh', updateInterval);
-      setTimeout(updateindex, Math.max(1, updateInterval) * 1000);
-    }
-  }
-
-  function updateindex() {
-    if (!self.readable) return;
-    var stream = uristream(url.format(self.url), { timeout:30 * 1000 });
-    stream.on('meta', function(meta) {
-      debug('got index meta', meta);
-
-      if (self.indexMimeTypes.indexOf(meta.mime.toLowerCase()) === -1) {
-        // FIXME: correctly handle .m3u us-ascii encoding
-        if (stream.abort) stream.abort();
-        return stream.emit('error', new Error('Invalid MIME type: ' + meta.mime));
-      }
-
-      self.baseUrl = meta.url;
-    });
-
-    m3u8parse(stream, { extensions:self.extensions }, function(err, index) {
-      if (err) {
-        self.emit('error', err);
-        updatecheck(false);
-      } else {
-        var updated = true;
-        if (self.index && self.index.lastSeqNo() === index.lastSeqNo()) {
-          debug('index was not updated');
-          updated = false;
-        }
-
-        self.index = index;
-        updatecheck(updated);
-      }
-    });
-  }
-
-  Readable.call(this, extend(options, {objectMode:true, highWaterMark:options.highWaterMark || 0}));
-
-  updateindex();
-}
-util.inherits(HlsStreamReader, Readable);
-
-HlsStreamReader.prototype.indexMimeTypes = [
-  'application/vnd.apple.mpegurl',
-  'application/x-mpegurl',
-  'audio/mpegurl',
-];
-
-HlsStreamReader.prototype.segmentMimeTypes = [
-  'video/mp2t',
-  'audio/aac',
-  'audio/x-aac',
-  'audio/ac3',
-];
-
-HlsStreamReader.prototype._read = function(/*n*/) {
-  this.readState.active = true;
-  checknext(this);
-};
-
-
-var hlsreader = module.exports = function hlsreader(url, options) {
-  return new HlsStreamReader(url, options);
-};
-
-hlsreader.HlsSegmentObject = HlsSegmentObject;
-hlsreader.HlsStreamReader = HlsStreamReader;

+ 9 - 1
lib/recorder.js

@@ -1,3 +1,5 @@
+/*jslint node: true */
+
 "use strict";
 
 var fs = require('fs'),
@@ -5,13 +7,19 @@ var fs = require('fs'),
     url = require('url'),
     util = require('util');
 
-var mime = require('mime'),
+var mime = require('mime-types'),
     streamprocess = require('streamprocess'),
     oncemore = require('oncemore'),
     m3u8parse = require('m3u8parse'),
     mkdirp = require('mkdirp'),
     debug = require('debug')('hls:recorder');
 
+// add custom types for ext mapping
+mime.define({
+  'audio/aac': ['aac'],
+  'audio/ac3': ['ac3'],
+});
+
 function HlsStreamRecorder(reader, dst, options) {
   options = options || {};
 

+ 2 - 1
package.json

@@ -30,9 +30,10 @@
     "commander": "^2.3.0",
     "debug": "^1.0.4",
     "deep-equal": "^0.2.1",
+    "hls-segment-reader": "^1.0.0",
     "m3u8parse": "^0.1.13",
     "measured": "~0.1.3",
-    "mime": "^1.2.11",
+    "mime-types": "^1.0.2",
     "mkdirp": "^0.5.0",
     "oncemore": "^1.0.0",
     "readable-stream": "~1.0.0",