Browse Source

refactor stream fetching code

Gil Pedersen 13 years ago
parent
commit
8ccb3e7242
3 changed files with 235 additions and 231 deletions
  1. 50 228
      lib/reader.js
  2. 181 0
      lib/uristream.js
  3. 4 3
      package.json

+ 50 - 228
lib/reader.js

@@ -7,23 +7,20 @@ var util = require('util'),
 
 var request = require('request'),
     oncemore = require('./oncemore'),
+    uristream = require('./uristream'),
     debug = require('debug')('hls:reader');
 
 try {
   var Readable = require('stream').Readable;
   assert(Readable);
-  var Passthrough = null;
 } catch (e) {
   var Readable = require('readable-stream');
-  var Passthrough = require('readable-stream/passthrough');
 }
 
 var m3u8 = require('./m3u8');
 
 function noop() {};
 
-var DEFAULT_AGENT = util.format('hls-tools/v%s (http://github.com/kanongil/node-hls-tools) node.js/%s', require('../package').version, process.version);
-
 module.exports = hlsreader;
 hlsreader.HlsStreamReader = HlsStreamReader;
 
@@ -41,121 +38,6 @@ emits:
   segment (seqNo, duration, datetime, size?, )
 */
 
-function inheritErrors(stream) {
-  stream.on('pipe', function(source) {
-    source.on('error', stream.emit.bind(stream, 'error'));
-  });
-  stream.on('unpipe', function(source) {
-    source.removeListener('error', stream.emit.bind(stream, 'error'));
-  });
-  return stream;
-}
-
-function getFileStream(srcUrl, options, cb) {
-  assert(srcUrl.protocol);
-
-  if (typeof options === 'function') {
-    cb = options;
-    options = {};
-  }
-  options = options || {};
-
-  if (srcUrl.protocol === 'http:' || srcUrl.protocol === 'https:') {
-    var headers = options.headers || {};
-    if (!headers['user-agent']) headers['user-agent'] = DEFAULT_AGENT;
-    if (!headers['accept-encoding']) headers['accept-encoding'] = ['gzip','deflate'];
-    var start = options.start || 0;
-    if (start > 0)
-      headers['range'] = 'bytes=' + options.start + '-';
-
-    var req = (options.probe ? request.head : request.get)({uri:url.format(srcUrl), pool:false, headers:headers, timeout:60*1000});
-    req.on('error', onreqerror);
-    req.on('error', noop);
-    req.on('response', onresponse);
-
-    function onreqerror(err) {
-      req.removeListener('error', onreqerror);
-      req.removeListener('response', onresponse);
-      cb(err);
-    }
-
-    function onresponse(res) {
-      req.removeListener('error', onreqerror);
-      req.removeListener('response', onresponse);
-
-      if (res.statusCode !== 200 && res.statusCode !== 206) {
-        req.abort();
-        if (res.statusCode >= 500 && res.statusCode !== 501)
-          return cb(new TempError('HTTP Server returned: '+res.statusCode));
-        else
-          return cb(new Error('Bad server response code: '+res.statusCode));
-      }
-
-      var size = res.headers['content-length'] ? parseInt(res.headers['content-length'], 10) : -1;
-
-      // turn bad content-length into actual errors
-      if (size >= 0 && !options.probe) {
-        var accum = 0;
-        res.on('data', function(chunk) {
-          accum += chunk.length;
-          if (accum > size)
-            req.abort();
-        });
-        
-        oncemore(res).once('end', 'error', function(err) {
-          // TODO: make this a custom error?
-          if (!err && accum !== size)
-            stream.emit('error', new PartialError('Stream length did not match header', accum, size));
-        });
-      }
-
-      // transparently handle gzip responses
-      var stream = res;
-      if (res.headers['content-encoding'] === 'gzip' || res.headers['content-encoding'] === 'deflate') {
-        unzip = zlib.createUnzip();
-        stream = stream.pipe(inheritErrors(unzip));
-        size = -1;
-      }
-
-      // adapt old style streams for pre-streams2 node versions
-      if (Passthrough && !(stream instanceof Readable))
-        stream = stream.pipe(inheritErrors(new Passthrough()));
-
-      // allow aborting the request
-      stream.abort = req.abort.bind(req);
-
-      // forward all future errors to response stream
-      req.on('error', function(err) {
-        if (stream.listeners('error').length !== 0)
-          stream.emit('error', err);
-      });
-
-      // attach empty 'error' listener to keep it from ever throwing
-      stream.on('error', noop);
-
-      // extract meta information from header
-      var typeparts = /^(.+?\/.+?)(?:;\w*.*)?$/.exec(res.headers['content-type']) || [null, 'application/octet-stream'],
-          mimetype = typeparts[1].toLowerCase(),
-          modified = res.headers['last-modified'] ? new Date(res.headers['last-modified']) : null;
-
-      cb(null, stream, {url:url.format(req.uri), mime:mimetype, size:start+size, modified:modified});
-    }
-  } else {
-    process.nextTick(function() {
-      cb(new Error('Unsupported protocol: '+srcUrl.protocol));
-    });
-  }
-
-/*    if (srcUrl.protocol === 'file:') {
-      
-  } else if (srcUrl.protocol === 'data:') {
-    //var regex = /^data:(.+\/.+);base64,(.*)$/;
-    // add content-type && content-length headers
-  } else {
-      
-  }*/
-}
-
 function HlsStreamReader(src, options) {
   var self = this;
 
@@ -207,40 +89,33 @@ function HlsStreamReader(src, options) {
   }
 
   function updateindex() {
-    getFileStream(self.url, function(err, stream, meta) {
-      if (err) {
-        if (self.index && self.keepConnection) {
-          console.error('Failed to update index at '+url.format(self.url)+':', err.stack || err);
-          return updatecheck(false);
-        }
-        return self.emit('error', err);
-      }
-
+    var stream = uristream(url.format(self.url), { timeout:30*1000 });
+    stream.on('meta', function(meta) {
       if (meta.mime !== 'application/vnd.apple.mpegurl' &&
           meta.mime !== 'application/x-mpegurl' && meta.mime !== 'audio/mpegurl')
-        return self.emit('error', new Error('Invalid MIME type: '+meta.mime));
+        return stream.emit('error', new Error('Invalid MIME type: '+meta.mime));
       // FIXME: correctly handle .m3u us-ascii encoding
 
       self.baseUrl = meta.url;
-      m3u8.parse(stream, function(err, index) {
-        if (err) {
-          if (self.index && self.keepConnection) {
-            console.error('Failed to parse index at '+url.format(self.url)+':', err.stack || err);
-            return updatecheck(false);
-          }
-          return self.emit('error', err);
-        }
+    });
 
-        var updated = true;
-        if (self.index && self.index.lastSeqNo() === index.lastSeqNo()) {
-          debug('index was not updated');
-          updated = false;
+    m3u8.parse(stream, function(err, index) {
+      if (err) {
+        if (self.index && self.keepConnection) {
+          console.error('Failed to parse index at '+url.format(self.url)+':', err.stack || err);
+          return updatecheck(false);
         }
+        return self.emit('error', err);
+      }
 
-        self.index = index;
+      var updated = true;
+      if (self.index && self.index.lastSeqNo() === index.lastSeqNo()) {
+        debug('index was not updated');
+        updated = false;
+      }
 
-        updatecheck(updated);
-      });
+      self.index = index;
+      updatecheck(updated);
     });
   }
   updateindex();
@@ -260,7 +135,7 @@ function HlsStreamReader(src, options) {
             console.error('While fetching '+url+':', err.stack || err);
 
             // retry with missing range if it is still relevant
-            if (err instanceof PartialError && err.processed > 0 &&
+            if (err instanceof uristream.PartialError && err.processed > 0 &&
                 self.index.getSegment(seq))
                 return tryfetch(start + err.processed);
           }
@@ -284,80 +159,22 @@ function HlsStreamReader(src, options) {
 
   function fetchfrom(seqNo, segment, start, cb) {
     var segmentUrl = url.resolve(self.baseUrl, segment.uri)
+    var probe = !!self.noData;
 
     debug('fetching segment', segmentUrl);
-    getFileStream(url.parse(segmentUrl), {probe:!!self.noData, start:start}, function(err, stream, meta) {
-      if (err) return cb(err);
-
+    var stream = uristream(segmentUrl, { probe:probe, start:start, highWaterMark:100*1000*1000 });
+    stream.on('meta', function(meta) {
       debug('got segment info', meta);
       if (meta.mime !== 'video/mp2t'/* && 
           meta.mime !== 'audio/aac' && meta.mime !== 'audio/x-aac' &&
           meta.mime !== 'audio/ac3'*/) {
-        if (stream && stream.abort)
-          stream.abort();
-        return cb(new Error('Unsupported segment MIME type: '+meta.mime));
-      }
-
-      if (!start)
-        self.emit('segment', seqNo, segment.duration, meta);
-
-      if (stream) {
-        debug('preparing to push stream to reader', meta.url);
-
-        stream.on('error', onerror);
-
-        function onerror(err) {
-          debug('stream error', err);
-          stream.removeListener('error', onerror);
-          cb(err)
-        }
-
-        self.readState.stream = stream;
-        self.readState.stream_started = false;
-        self.readState.doneCb = function() {
-          debug('finished with input stream', meta.url);
-          stream.removeListener('error', onerror);
-          cb(null);
-        };
-
-        // force a new _read in the future()
-        if (self.push(''))
-          self.stream_start();
-      } else {
-        process.nextTick(cb);
+        if (stream.abort) stream.abort();
+        return stream.emit(new Error('Unsupported segment MIME type: '+meta.mime));
       }
-    });
-  }
-
-  // allow piping content to self
-  this.write = function(chunk) {
-    self.push(chunk);
-    return true;
-  };
-  this.end = function() {};
-
-  this.stream_start = function() {
-    var stream = self.readState.stream;
-    if (stream && !self.readState.stream_started) {
-      debug('pushing input stream to reader');
-
-      stream.pipe(self);
-
-      oncemore(stream).once('end', 'error', function(err) {
-        clearTimeout(self.readState.timer);
-
-        stream.unpipe(self);
-        self.readState.stream = null;
-
-        if (!err)
-          self.readState.doneCb();
-      });
-
-      clearTimeout(self.readState.timer);
 
       // abort() indicates a temporal stream. Ie. ensure it is completed in a timely fashion
       if (self.index.isLive() && typeof stream.abort == 'function') {
-        var duration = self.readState.currentSegment.duration || self.index.target_duration || 10;
+        var duration = segment.duration || self.index.target_duration || 10;
         duration = Math.min(duration, self.index.target_duration || 10);
         self.readState.timer = setTimeout(function() {
           if (self.readState.stream) {
@@ -368,34 +185,39 @@ function HlsStreamReader(src, options) {
           self.readState.timer = null;
         }, 1.5*duration*1000);
       }
-      self.readState.stream_started = true;
-    }
+
+      if (start === 0)
+        self.emit('segment', seqNo, segment.duration, meta);
+    });
+
+    stream.pipe(self);
+    oncemore(stream).once('end', 'error', function(err) {
+      clearTimeout(self.readState.timer);
+
+      stream.unpipe(self);
+      self.readState.stream = null;
+
+      if (err) debug('stream error', err);
+      else debug('finished with input stream', stream.meta.url);
+
+      cb(err);
+    });
+
+    self.readState.stream = stream;
   }
 
+  // allow piping content to self
+  this.write = this.push.bind(this);
+  this.end = function() {};
+
   Readable.call(this, options);
 }
 util.inherits(HlsStreamReader, Readable);
 
 HlsStreamReader.prototype._read = function(n, cb) {
-  this.stream_start();
+  this.emit('drain');
 };
 
 function hlsreader(url, options) {
   return new HlsStreamReader(url, options);
 }
-
-function TempError(msg) {
-  Error.captureStackTrace(this, this);
-  this.message = msg || 'TempError';
-}
-util.inherits(TempError, Error);
-TempError.prototype.name = 'Temporary Error';
-
-function PartialError(msg, processed, expected) {
-  Error.captureStackTrace(this, this);
-  this.message = msg || 'TempError';
-  this.processed = processed || -1;
-  this.expected = expected;
-}
-util.inherits(PartialError, Error);
-PartialError.prototype.name = 'Partial Error';

+ 181 - 0
lib/uristream.js

@@ -0,0 +1,181 @@
+var util = require('util'),
+    url = require('url'),
+    zlib = require('zlib'),
+    assert = require('assert');
+
+var request = require('request'),
+    extend = require('xtend'),
+    oncemore = require('./oncemore'),
+    debug = require('debug')('hls:uristream');
+
+try {
+  var Readable = require('stream').Readable;
+  assert(Readable);
+} catch (e) {
+  var Readable = require('readable-stream');
+}
+
+function noop() {};
+
+var pkg = require('../package');
+var DEFAULT_AGENT = util.format('%s/v%s (http://github.com/kanongil/node-hls-tools) node.js/%s', pkg.name, pkg.version, process.version);
+
+module.exports = uristream;
+uristream.UriFetchStream = UriFetchStream;
+uristream.PartialError = PartialError;
+
+function inheritErrors(stream) {
+  stream.on('pipe', function(source) {
+    source.on('error', stream.emit.bind(stream, 'error'));
+  });
+  stream.on('unpipe', function(source) {
+    source.removeListener('error', stream.emit.bind(stream, 'error'));
+  });
+  return stream;
+}
+
+function setupHttp(uri, options, dst) {
+  var defaults = {
+    'user-agent': DEFAULT_AGENT,
+    'accept-encoding': ['gzip','deflate']
+  };
+
+  // TODO: handle case in header names
+  var headers = extend(defaults, options.headers);
+  var timeout = options.timeout || 60*1000;
+  var probe = !!options.probe;
+
+  var retries = 10;
+  var metaEmitted = false;
+  var fetch = probe ? request.head : request.get;
+
+  function fetchHttp(start) {
+    if (start > 0)
+      headers['range'] = 'bytes=' + start + '-';
+    else
+      delete headers['range'];
+
+    var req = fetch({uri:uri, pool:false, headers:headers, timeout:timeout});
+    req.on('error', onreqerror);
+    req.on('error', noop);
+    req.on('response', onresponse);
+
+    function failOrRetry(err, temporary) {
+      req.abort();
+      dst.emit('error', err);
+    }
+
+    function reqcleanup() {
+      req.removeListener('error', onreqerror);
+      req.removeListener('response', onresponse);
+    }
+
+    function onreqerror(err) {
+      reqcleanup();
+      failOrRetry(err);
+    }
+
+    function onresponse(res) {
+      reqcleanup();
+
+      if (res.statusCode !== 200 && res.statusCode !== 206)
+        return failOrRetry(new Error('Bad server response code: '+res.statusCode), res.statusCode >= 500 && res.statusCode !== 501);
+
+      var size = res.headers['content-length'] ? parseInt(res.headers['content-length'], 10) : -1;
+
+      // transparently handle gzip responses
+      var stream = res;
+      if (res.headers['content-encoding'] === 'gzip' || res.headers['content-encoding'] === 'deflate') {
+        unzip = zlib.createUnzip();
+        stream = stream.pipe(inheritErrors(unzip));
+        size = -1;
+      }
+
+      // pipe it to self
+      stream.on('data', function(chunk) {
+        if (!dst.push(chunk))
+          stream.pause();
+      });
+      oncemore(stream).once('end', 'error', function(err) {
+        dst._read = noop;
+        if (err) return failOrRetry(err);
+        dst.push(null);
+      });
+      dst._read = function(n) {
+        stream.resume();
+      };
+
+      // allow aborting the request
+      dst.abort = req.abort.bind(req);
+
+      // forward all future errors to response stream
+      req.on('error', function(err) {
+        if (dst.listeners('error').length !== 0)
+          dst.emit('error', err);
+      });
+
+      // turn bad content-length into actual errors
+      if (size >= 0 && !probe) {
+        var accum = 0;
+        res.on('data', function(chunk) {
+          accum += chunk.length;
+          if (accum > size)
+            req.abort();
+        });
+      
+        oncemore(res).once('end', 'error', function(err) {
+          if (!err && accum !== size)
+            failOrRetry(new PartialError('Stream length did not match header', accum, size), accum && accum < size);
+        });
+      }
+
+      // attach empty 'error' listener to keep it from ever throwing
+      dst.on('error', noop);
+
+      if (!dst.meta) {
+        // extract meta information from header
+        var typeparts = /^(.+?\/.+?)(?:;\w*.*)?$/.exec(res.headers['content-type']) || [null, 'application/octet-stream'],
+            mimetype = typeparts[1].toLowerCase(),
+            modified = res.headers['last-modified'] ? new Date(res.headers['last-modified']) : null;
+
+        dst.meta = {url:url.format(req.uri), mime:mimetype, size:start+size, modified:modified};
+        dst.emit('meta', dst.meta);
+      }
+    }
+  }
+
+  fetchHttp(options.start || 0);
+}
+
+function UriFetchStream(uri, options) {
+  var self = this;
+  Readable.call(this, options);
+
+  options = options || {};
+
+  this.url = url.parse(uri);
+  this.meta = null;
+
+  if (this.url.protocol === 'http:' || this.url.protocol === 'https:') {
+    setupHttp(uri, options, this);
+  } else {
+    throw new Error('Unsupported protocol: '+this.url.protocol);
+  }
+}
+util.inherits(UriFetchStream, Readable);
+
+UriFetchStream.prototype._read = noop;
+
+
+function uristream(uri, options) {
+  return new UriFetchStream(uri, options);
+}
+
+function PartialError(msg, processed, expected) {
+  Error.captureStackTrace(this, this);
+  this.message = msg || 'PartialError';
+  this.processed = processed || -1;
+  this.expected = expected;
+}
+util.inherits(PartialError, Error);
+PartialError.prototype.name = 'Partial Error';

+ 4 - 3
package.json

@@ -29,13 +29,14 @@
     "carrier": "~0.1.8",
     "commander": "~1.1.1",
     "readable-stream": "~1.0.0",
-    "request": "~2.16.0"
+    "request": "~2.16.0",
+    "xtend": "~2.0.3"
   },
   "devDependencies": {
     "mocha": "~1.7.4",
     "should": "~1.2.1"
   },
-  "engines" : {
-    "node" : "~0.8.0 || >=0.9.12"
+  "engines": {
+    "node": "~0.8.0 || >=0.9.12"
   }
 }