Browse Source

reworked stream handling

Gil Pedersen 13 years ago
parent
commit
b3628f6099
2 changed files with 133 additions and 63 deletions
  1. 130 61
      lib/reader.js
  2. 3 2
      package.json

+ 130 - 61
lib/reader.js

@@ -11,12 +11,16 @@ var request = require('request'),
 try {
   var Readable = require('stream').Readable;
   assert(Readable);
+  var Passthrough = null;
 } catch (e) {
   var Readable = require('readable-stream');
+  var Passthrough = require('readable-stream/passthrough');
 }
 
 var m3u8 = require('./m3u8');
 
+function noop() {};
+
 var DEFAULT_AGENT = util.format('hls-tools/v%s (http://github.com/kanongil/node-hls-tools) node.js/%s', require('../package').version, process.version);
 
 module.exports = hlsreader;
@@ -36,6 +40,16 @@ emits:
   segment (seqNo, duration, datetime, size?, )
 */
 
+function inheritErrors(stream) {
+  stream.on('pipe', function(source) {
+    source.on('error', stream.emit.bind(stream, 'error'));
+  });
+  stream.on('unpipe', function(source) {
+    source.removeListener('error', stream.emit.bind(stream, 'error'));
+  });
+  return stream;
+}
+
 function getFileStream(srcUrl, options, cb) {
   assert(srcUrl.protocol);
 
@@ -49,32 +63,67 @@ function getFileStream(srcUrl, options, cb) {
     if (!headers['user-agent']) headers['user-agent'] = DEFAULT_AGENT;
     if (!headers['accept-encoding']) headers['accept-encoding'] = ['gzip','deflate'];
 
-    var req = (options.probe ? request.head : request.get)({url:url.format(srcUrl), pool:false, headers:headers});
+    var req = (options.probe ? request.head : request.get)({uri:url.format(srcUrl), pool:false, headers:headers, timeout:60*1000});
     req.on('error', cb);
     req.on('response', function (res) {
+      req.removeListener('error', cb);
+
       if (res.statusCode !== 200) {
         req.abort();
-        return cb(new Error('Bad server response code: '+statusCode));
+        if (res.statusCode >= 500 && res.statusCode !== 501)
+          return cb(new TempError('HTTP Server returned: '+res.statusCode));
+        else
+          return cb(new Error('Bad server response code: '+res.statusCode));
       }
 
-      res.abort = req.abort.bind(req);
-      // forward all errors to result
-      req.on('error', function(err) {
-        res.emit('error', err);
-      });
+      var size = res.headers['content-length'] ? parseInt(res.headers['content-length'], 10) : -1;
+
+      // turn bad content-length into actual errors
+      if (size >= 0) {
+        var accum = 0;
+        res.on('data', function(chunk) {
+          accum += chunk.length;
+          if (accum > size)
+            req.abort();
+        });
+        res.on('end', function() {
+          // TODO: make this a custom error?
+          if (accum !== size)
+            stream.emit('error', new Error('Invalid returned stream length (req='+size+', ret='+accum+')'));
+        });
+      }
 
+      // transparently handle gzip responses
       var stream = res;
       if (res.headers['content-encoding'] === 'gzip' || res.headers['content-encoding'] === 'deflate') {
         unzip = zlib.createUnzip();
-        stream = res.pipe(unzip);
+        stream = stream.pipe(inheritErrors(unzip));
+        size = -1;
       }
 
+      // adapt old style streams for pre-streams2 node versions
+      if (Passthrough && !(stream instanceof Readable))
+        stream = stream.pipe(inheritErrors(new Passthrough()));
+
+      // allow aborting the request
+      stream.abort = req.abort.bind(req);
+
+      // forward all future errors to response stream
+      req.on('error', function(err) {
+        console.error('error', err);
+        if (stream.listeners('error').length !== 0)
+          stream.emit('error', err);
+      });
+      
+      // attach empty 'error' listener to keep it from ever throwing
+      stream.on('error', noop);
+
+      // extract meta information from header
       var typeparts = /^(.+?\/.+?)(?:;\w*.*)?$/.exec(res.headers['content-type']) || [null, 'application/octet-stream'],
           mimetype = typeparts[1].toLowerCase(),
-          size = res.headers['content-length'] ? parseInt(res.headers['content-length'], 10) : -1,
           modified = res.headers['last-modified'] ? new Date(res.headers['last-modified']) : null;
 
-      cb(null, stream, {url:res.url || url.format(srcUrl), mime:mimetype, size:size, modified:modified});
+      cb(null, stream, {url:url.format(req.uri), mime:mimetype, size:size, modified:modified});
     });
   } else {
     process.nextTick(function() {
@@ -114,6 +163,13 @@ function HlsStreamReader(src, options) {
     stream:null
   }
 
+  function getUpdateInterval(updated) {
+    if (updated && self.index.segments.length)
+      return Math.min(self.index.target_duration, self.index.segments[self.index.segments.length-1].duration);
+    else
+      return self.index.target_duration / 2;
+  }
+
   function updatecheck(updated) {
     if (updated) {
       if (self.readState.currentSeq===-1)
@@ -129,7 +185,7 @@ function HlsStreamReader(src, options) {
     checkcurrent();
 
     if (!self.index.ended) {
-      var updateInterval = updated ? self.index.segments[self.index.segments.length-1].duration : self.index.target_duration / 2;
+      var updateInterval = getUpdateInterval(updated);
       debug('scheduling index refresh', updateInterval);
       setTimeout(updateindex, Math.max(1, updateInterval)*1000);
     }
@@ -174,12 +230,12 @@ function HlsStreamReader(src, options) {
     self.readState.currentSegment = self.index.getSegment(self.readState.currentSeq);
     if (self.readState.currentSegment) {
       var url = self.readState.currentSegment.uri;
-      fetchfrom(self.readState.currentSeq, self.readState.currentSegment, function(err, transferred) {
+      fetchfrom(self.readState.currentSeq, self.readState.currentSegment, function(err) {
         self.readState.currentSegment = null;
         if (err) {
           if (!self.keepConnection) return self.emit('error', err);
           console.error('While fetching '+url+':', err.stack || err);
-          if (!transferred) return; // TODO: retry with a range header
+          //if (!transferred && err instanceof TempError) return; // TODO: retry with a range header
         }
         self.readState.currentSeq++;
         checkcurrent();
@@ -209,68 +265,74 @@ function HlsStreamReader(src, options) {
       self.emit('segment', seqNo, segment.duration, meta);
 
       if (stream) {
-        debug('pushing input stream to reader');
-
-        var totalBytes = 0;
-        stream.on('data', function(chunk) {
-          totalBytes += chunk.length;
-          self.push(chunk); // intentionally ignore the result to buffer input as fast as possible
+        debug('preparing to push stream to reader', meta.url);
+        stream.on('error', function (err) {
+          debug('stream error', err);
         });
-        stream.on('error', Done);
-        stream.on('end', Done);
-        stream.on('close', Done);
 
         self.readState.stream = stream;
-        self.stream_start(true, !self.push(''));
-
-        function Done(err) {
-          debug('finished with input stream');
-
-          stream.removeListener('error', Done);
-          stream.removeListener('end', Done);
-          stream.removeListener('close', Done);
-
-          self.readState.stream = null;
-
-          if (!err && (totalBytes !== meta.size))
-            err = new Error('Invalid returned stream length (req='+meta.size+', ret='+totalBytes+')');
-
-          cb(err, totalBytes);
-        }
+        self.readState.stream_started = false;
+        self.readState.doneCb = function(err) {
+          debug('finished with input stream', meta.url);
+          cb(err);
+        };
+
+        // force a new _read in the future()
+        if (self.push(''))
+          self.stream_start();
       } else {
         process.nextTick(cb);
       }
     });
   }
 
-  this.stream_start = function(fresh, blocked) {
-    if (fresh) {
-      self.readState.stream_started = false;
-      if (self.readState.timer) {
+  // allow piping content to self
+  this.write = function(chunk) {
+    self.push(chunk);
+    return true;
+  };
+  this.end = function() {};
+
+  this.stream_start = function() {
+    var stream = self.readState.stream;
+    if (stream && !self.readState.stream_started) {
+      debug('pushing input stream to reader');
+
+      stream.pipe(self);
+
+      stream.on('error', Done);
+      stream.on('end', Done);
+
+      function Done(err) {
         clearTimeout(self.readState.timer);
-        self.readState.timer = null;
-      }
 
-      if (blocked) return self.readState.stream.pause();
-    }
+        stream.removeListener('error', Done);
+        stream.removeListener('end', Done);
 
-    if (self.readState.stream_started) return;
+        stream.unpipe(self);
 
-    var stream = self.readState.stream;
-    if (!stream) return;
-
-    if (typeof stream.abort == 'function') {
-      var duration = self.readState.currentSegment.duration || self.index.target_duration || 10;
-      self.readState.timer = setTimeout(function() {
-        if (self.readState.stream) {
-          debug('timed out waiting for data');
-          self.readState.stream.abort();
-        }
-        self.readState.timer = null;
-      }, 1.5*duration*1000);
+        self.readState.stream = null;
+        
+        self.readState.doneCb(err);
+      }
+
+      clearTimeout(self.readState.timer);
+
+      // abort() indicates a temporal stream. Ie. ensure it is completed in a timely fashion
+      if (self.index.isLive() && typeof stream.abort == 'function') {
+        var duration = self.readState.currentSegment.duration || self.index.target_duration || 10;
+        duration = Math.min(duration, self.index.target_duration || 10);
+        self.readState.timer = setTimeout(function() {
+          if (self.readState.stream) {
+            debug('timed out waiting for data');
+            self.readState.stream.abort();
+          }
+          // TODO: ensure Done() is always called
+          self.readState.timer = null;
+        }, 1.5*duration*1000);
+      }
+      self.readState.stream_started = true;
     }
-    self.readState.stream_started = true;
-    stream.resume();
   }
 
   Readable.call(this, options);
@@ -284,3 +346,10 @@ HlsStreamReader.prototype._read = function(n, cb) {
 function hlsreader(url, options) {
   return new HlsStreamReader(url, options);
 }
+
+function TempError(msg) {
+  Error.captureStackTrace(this, this);
+  this.message = msg || 'TempError';
+}
+util.inherits(TempError, Error);
+TempError.prototype.name = 'Temporary Error';

+ 3 - 2
package.json

@@ -28,13 +28,14 @@
     "debug": "~0.7.0",
     "carrier": "~0.1.8",
     "commander": "~1.1.1",
-    "request": "~2.14.0"
+    "readable-stream": "~1.0.0",
+    "request": "~2.16.0"
   },
   "devDependencies": {
     "mocha": "~1.7.4",
     "should": "~1.2.1"
   },
   "engines" : {
-    "node" : ">=0.9.12"
+    "node" : "~0.8.0 || >=0.9.12"
   }
 }