Przeglądaj źródła

retry partial downloads

Gil Pedersen 13 lat temu
rodzic
commit
d726270e3b
2 zmienionych plików z 45 dodań i 18 usunięć
  1. 44 18
      lib/reader.js
  2. 1 0
      lib/tsblast.js

+ 44 - 18
lib/reader.js

@@ -93,18 +93,22 @@ function getFileStream(srcUrl, options, cb) {
     cb = options;
     options = {};
   }
+  options = options || {};
 
   if (srcUrl.protocol === 'http:' || srcUrl.protocol === 'https:') {
     var headers = options.headers || {};
     if (!headers['user-agent']) headers['user-agent'] = DEFAULT_AGENT;
     if (!headers['accept-encoding']) headers['accept-encoding'] = ['gzip','deflate'];
+    var start = options.start || 0;
+    if (start > 0)
+      headers['range'] = 'bytes=' + options.start + '-';
 
     var req = (options.probe ? request.head : request.get)({uri:url.format(srcUrl), pool:false, headers:headers, timeout:60*1000});
     req.on('error', cb);
     req.on('response', function (res) {
       req.removeListener('error', cb);
 
-      if (res.statusCode !== 200) {
+      if (res.statusCode !== 200 && res.statusCode !== 206) {
         req.abort();
         if (res.statusCode >= 500 && res.statusCode !== 501)
           return cb(new TempError('HTTP Server returned: '+res.statusCode));
@@ -126,7 +130,7 @@ function getFileStream(srcUrl, options, cb) {
         oncemore(res).once('end', 'error', function(err) {
           // TODO: make this a custom error?
           if (!err && accum !== size)
-            stream.emit('error', new Error('Invalid returned stream length (req='+size+', ret='+accum+')'));
+            stream.emit('error', new PartialError('Stream length did not match header', accum, size));
         });
       }
 
@@ -150,7 +154,7 @@ function getFileStream(srcUrl, options, cb) {
         if (stream.listeners('error').length !== 0)
           stream.emit('error', err);
       });
-      
+
       // attach empty 'error' listener to keep it from ever throwing
       stream.on('error', noop);
 
@@ -159,7 +163,7 @@ function getFileStream(srcUrl, options, cb) {
           mimetype = typeparts[1].toLowerCase(),
           modified = res.headers['last-modified'] ? new Date(res.headers['last-modified']) : null;
 
-      cb(null, stream, {url:url.format(req.uri), mime:mimetype, size:size, modified:modified});
+      cb(null, stream, {url:url.format(req.uri), mime:mimetype, size:start+size, modified:modified});
     });
   } else {
     process.nextTick(function() {
@@ -266,16 +270,28 @@ function HlsStreamReader(src, options) {
     self.readState.currentSegment = self.index.getSegment(self.readState.currentSeq);
     if (self.readState.currentSegment) {
       var url = self.readState.currentSegment.uri;
-      fetchfrom(self.readState.currentSeq, self.readState.currentSegment, function(err) {
-        self.readState.currentSegment = null;
-        if (err) {
-          if (!self.keepConnection) return self.emit('error', err);
-          console.error('While fetching '+url+':', err.stack || err);
-          //if (!transferred && err instanceof TempError) return; // TODO: retry with a range header
-        }
-        self.readState.currentSeq++;
-        checkcurrent();
-      });
+
+      function tryfetch(start) {
+        var seq = self.readState.currentSeq;
+        fetchfrom(seq, self.readState.currentSegment, start, function(err) {
+          if (err) {
+            if (!self.keepConnection) return self.emit('error', err);
+            console.error('While fetching '+url+':', err.stack || err);
+
+            // retry with missing range if it is still relevant
+            if (err instanceof PartialError && err.processed > 0 &&
+                self.index.getSegment(seq))
+                return tryfetch(start + err.processed);
+          }
+
+          self.readState.currentSegment = null;
+          if (seq === self.readState.currentSeq)
+            self.readState.currentSeq++;
+
+          checkcurrent();
+        });
+      }
+      tryfetch(0);
     } else if (self.index.ended)
       self.end();
     else if (!self.index.type && (self.index.lastSeqNo() < self.readState.currentSeq-1)) {
@@ -285,11 +301,11 @@ function HlsStreamReader(src, options) {
     }
   }
 
-  function fetchfrom(seqNo, segment, cb) {
+  function fetchfrom(seqNo, segment, start, cb) {
     var segmentUrl = url.resolve(self.baseUrl, segment.uri)
 
     debug('fetching segment', segmentUrl);
-    getFileStream(url.parse(segmentUrl), {probe:!!self.noData}, function(err, stream, meta) {
+    getFileStream(url.parse(segmentUrl), {probe:!!self.noData, start:start}, function(err, stream, meta) {
       if (err) return cb(err);
 
       debug('got segment info', meta);
@@ -301,7 +317,8 @@ function HlsStreamReader(src, options) {
         return cb(new Error('Unsupported segment MIME type: '+meta.mime));
       }
 
-      self.emit('segment', seqNo, segment.duration, meta);
+      if (!start)
+        self.emit('segment', seqNo, segment.duration, meta);
 
       if (stream) {
         debug('preparing to push stream to reader', meta.url);
@@ -391,4 +408,13 @@ function TempError(msg) {
   this.message = msg || 'TempError';
 }
 util.inherits(TempError, Error);
-TempError.prototype.name = 'Temporary Error';
+TempError.prototype.name = 'Temporary Error';
+
+function PartialError(msg, processed, expected) {
+  Error.captureStackTrace(this, this);
+  this.message = msg || 'TempError';
+  this.processed = processed || -1;
+  this.expected = expected;
+}
+util.inherits(PartialError, Error);
+PartialError.prototype.name = 'Partial Error';

+ 1 - 0
lib/tsblast.js

@@ -24,6 +24,7 @@ function TsBlast(dst, options) {
 
   this.buffer = new Buffer(0);
   this.client = dgram.createSocket('udp4');
+  this.client.bind();
 
   this.on('finish', function() {
     this.client.close();