|
|
@@ -35,6 +35,21 @@ function inheritErrors(stream) {
|
|
|
return stream;
|
|
|
}
|
|
|
|
|
|
+// 'pipe' any stream to a Readable
|
|
|
+function pump(src, dst, done) {
|
|
|
+ src.on('data', function(chunk) {
|
|
|
+ if (!dst.push(chunk))
|
|
|
+ src.pause();
|
|
|
+ });
|
|
|
+ oncemore(src).once('end', 'error', function(err) {
|
|
|
+ dst._read = noop;
|
|
|
+ done(err);
|
|
|
+ });
|
|
|
+ dst._read = function(n) {
|
|
|
+ src.resume();
|
|
|
+ };
|
|
|
+}
|
|
|
+
|
|
|
function setupHttp(uri, options, dst) {
|
|
|
var defaults = {
|
|
|
'user-agent': DEFAULT_AGENT,
|
|
|
@@ -62,7 +77,6 @@ function setupHttp(uri, options, dst) {
|
|
|
var accum = 0, size = -1;
|
|
|
var req = fetch({uri:uri, pool:false, headers:headers, timeout:timeout});
|
|
|
req.on('error', onreqerror);
|
|
|
- req.on('error', noop);
|
|
|
req.on('response', onresponse);
|
|
|
|
|
|
var failed = false;
|
|
|
@@ -86,6 +100,7 @@ function setupHttp(uri, options, dst) {
|
|
|
function reqcleanup() {
|
|
|
req.removeListener('error', onreqerror);
|
|
|
req.removeListener('response', onresponse);
|
|
|
+ req.on('error', noop);
|
|
|
}
|
|
|
|
|
|
function onreqerror(err) {
|
|
|
@@ -111,19 +126,11 @@ function setupHttp(uri, options, dst) {
|
|
|
}
|
|
|
|
|
|
// pipe it to self
|
|
|
- stream.on('data', function(chunk) {
|
|
|
- if (!dst.push(chunk))
|
|
|
- stream.pause();
|
|
|
- });
|
|
|
- oncemore(stream).once('end', 'error', function(err) {
|
|
|
- dst._read = noop;
|
|
|
- if (err) return failOrRetry(err);
|
|
|
- if (!failed)
|
|
|
- dst.push(null);
|
|
|
+ pump(stream, dst, function(err) {
|
|
|
+ if (err || failed) return failOrRetry(err);
|
|
|
+ debug('done fetching uri', uri);
|
|
|
+ dst.push(null);
|
|
|
});
|
|
|
- dst._read = function(n) {
|
|
|
- stream.resume();
|
|
|
- };
|
|
|
|
|
|
// allow aborting the request
|
|
|
dst.abort = function() {
|