ソースを参照

improved stream error handling in corner cases

Gil Pedersen 13 年 前
コミット
9e3f7cc3c7
1 ファイル変更58 行追加19 行削除
  1. 58 19
      lib/reader.js

+ 58 - 19
lib/reader.js

@@ -26,6 +26,42 @@ var DEFAULT_AGENT = util.format('hls-tools/v%s (http://github.com/kanongil/node-
 module.exports = hlsreader;
 hlsreader.HlsStreamReader = HlsStreamReader;
 
+// apply oncemore() to an emitter, and enable it to accept multiple events as input 
+function oncemore(emitter) {
+  if (!emitter) return emitter;
+
+  var once = emitter.once;
+  if (once && !once._old) {
+    emitter.once = function(type, listener) {
+      if (arguments.length <= 2)
+        return once.apply(this, arguments);
+
+      var types = Array.prototype.slice.call(arguments, 0, -1);
+      var listener = arguments.length ? arguments[arguments.length-1] : undefined;
+      if (typeof listener !== 'function')
+        throw TypeError('listener must be a function');
+
+      function g() {
+        types.forEach(function(type) {
+          this.removeListener(type, g);
+        }, this);
+
+        listener.apply(this, arguments);
+      }
+      g.listener = listener;
+
+      types.forEach(function(type) {
+        this.on(type, g);
+      }, this);
+
+      return this;
+    };
+    emitter.once._old = once;
+  }
+
+  return emitter;
+}
+
 /*
 options:
   startSeq*
@@ -86,9 +122,10 @@ function getFileStream(srcUrl, options, cb) {
           if (accum > size)
             req.abort();
         });
-        res.on('end', function() {
+        
+        oncemore(res).once('end', 'error', function(err) {
           // TODO: make this a custom error?
-          if (accum !== size)
+          if (!err && accum !== size)
             stream.emit('error', new Error('Invalid returned stream length (req='+size+', ret='+accum+')'));
         });
       }
@@ -110,7 +147,6 @@ function getFileStream(srcUrl, options, cb) {
 
       // forward all future errors to response stream
       req.on('error', function(err) {
-        console.error('error', err);
         if (stream.listeners('error').length !== 0)
           stream.emit('error', err);
       });
@@ -259,22 +295,31 @@ function HlsStreamReader(src, options) {
       debug('got segment info', meta);
       if (meta.mime !== 'video/mp2t'/* && 
           meta.mime !== 'audio/aac' && meta.mime !== 'audio/x-aac' &&
-          meta.mime !== 'audio/ac3'*/)
+          meta.mime !== 'audio/ac3'*/) {
+        if (stream && stream.abort)
+          stream.abort();
         return cb(new Error('Unsupported segment MIME type: '+meta.mime));
+      }
 
       self.emit('segment', seqNo, segment.duration, meta);
 
       if (stream) {
         debug('preparing to push stream to reader', meta.url);
-        stream.on('error', function (err) {
+
+        stream.on('error', onerror);
+
+        function onerror(err) {
           debug('stream error', err);
-        });
+          stream.removeListener('error', onerror);
+          cb(err)
+        }
 
         self.readState.stream = stream;
         self.readState.stream_started = false;
-        self.readState.doneCb = function(err) {
+        self.readState.doneCb = function() {
           debug('finished with input stream', meta.url);
-          cb(err);
+          stream.removeListener('error', onerror);
+          cb(null);
         };
 
         // force a new _read in the future()
@@ -300,21 +345,15 @@ function HlsStreamReader(src, options) {
 
       stream.pipe(self);
 
-      stream.on('error', Done);
-      stream.on('end', Done);
-
-      function Done(err) {
+      oncemore(stream).once('end', 'error', function(err) {
         clearTimeout(self.readState.timer);
 
-        stream.removeListener('error', Done);
-        stream.removeListener('end', Done);
-
         stream.unpipe(self);
-
         self.readState.stream = null;
-        
-        self.readState.doneCb(err);
-      }
+
+        if (!err)
+          self.readState.doneCb();
+      });
 
       clearTimeout(self.readState.timer);