uristream.js 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. var util = require('util'),
  2. url = require('url'),
  3. zlib = require('zlib'),
  4. assert = require('assert');
  5. var request = require('request'),
  6. extend = require('xtend'),
  7. equal = require('deep-equal'),
  8. oncemore = require('./oncemore'),
  9. debug = require('debug')('hls:uristream');
  10. try {
  11. var Readable = require('stream').Readable;
  12. assert(Readable);
  13. } catch (e) {
  14. var Readable = require('readable-stream');
  15. }
  16. function noop() {};
  17. var pkg = require('../package');
  18. var DEFAULT_AGENT = util.format('%s/v%s (http://github.com/kanongil/node-hls-tools) request/v%s node.js/%s', pkg.name, pkg.version, require('request/package').version, process.version);
  19. module.exports = uristream;
  20. uristream.UriFetchStream = UriFetchStream;
  21. uristream.PartialError = PartialError;
  22. function inheritErrors(stream) {
  23. stream.on('pipe', function(source) {
  24. source.on('error', stream.emit.bind(stream, 'error'));
  25. });
  26. stream.on('unpipe', function(source) {
  27. source.removeListener('error', stream.emit.bind(stream, 'error'));
  28. });
  29. return stream;
  30. }
  31. // 'pipe' any stream to a Readable
  32. function pump(src, dst, done) {
  33. src.on('data', function(chunk) {
  34. if (!dst.push(chunk))
  35. src.pause();
  36. });
  37. oncemore(src).once('end', 'error', function(err) {
  38. dst._read = noop;
  39. done(err);
  40. });
  41. dst._read = function(n) {
  42. src.resume();
  43. };
  44. }
  45. function setupHttp(uri, options, dst) {
  46. var defaults = {
  47. 'user-agent': DEFAULT_AGENT
  48. };
  49. // TODO: handle case in header names
  50. var timeout = options.timeout || 10*1000;
  51. var probe = !!options.probe;
  52. var offset = ~~options.start;
  53. var agent = options.agent || null;
  54. var tries = 10;
  55. if (!probe) defaults['accept-encoding'] = ['gzip','deflate'];
  56. var fetch = probe ? request.head : request.get;
  57. var headers = extend(defaults, options.headers);
  58. if ('range' in headers)
  59. throw new Error('range header is not allowed');
  60. // attach empty 'error' listener to keep dst from ever throwing
  61. dst.on('error', noop);
  62. function fetchHttp(start) {
  63. if (start > 0)
  64. headers['range'] = 'bytes=' + start + '-';
  65. var accum = 0, size = -1;
  66. var req = fetch({uri:uri, headers:headers, agent:agent, timeout:timeout});
  67. req.on('error', onreqerror);
  68. req.on('response', onresponse);
  69. var failed = false;
  70. function failOrRetry(err, temporary) {
  71. if (failed) return;
  72. failed = true;
  73. req.abort();
  74. if (--tries <= 0) {
  75. // remap error to partial error if we have received any data
  76. if (start - offset + accum !== 0)
  77. err = new PartialError(err, start - offset + accum, (size !== -1) ? start - offset + size : size);
  78. return dst.emit('error', err);
  79. }
  80. debug('retrying at ' + (start + accum));
  81. // TODO: delay retry?
  82. fetchHttp(start + accum);
  83. }
  84. function reqcleanup() {
  85. req.removeListener('error', onreqerror);
  86. req.removeListener('response', onresponse);
  87. req.on('error', noop);
  88. }
  89. function onreqerror(err) {
  90. reqcleanup();
  91. failOrRetry(err);
  92. }
  93. function onresponse(res) {
  94. reqcleanup();
  95. if (res.statusCode !== 200 && res.statusCode !== 206)
  96. return failOrRetry(new Error('Bad server response code: '+res.statusCode), res.statusCode >= 500 && res.statusCode !== 501);
  97. if (res.headers['content-length']) size = parseInt(res.headers['content-length'], 10);
  98. var filesize = (size >= 0) ? start + size : -1;
  99. // transparently handle gzip responses
  100. var stream = res;
  101. if (res.headers['content-encoding'] === 'gzip' || res.headers['content-encoding'] === 'deflate') {
  102. unzip = zlib.createUnzip();
  103. stream = stream.pipe(inheritErrors(unzip));
  104. filesize = -1;
  105. }
  106. // pipe it to self
  107. pump(stream, dst, function(err) {
  108. if (err || failed) return failOrRetry(err);
  109. debug('done fetching uri', uri);
  110. dst.push(null);
  111. });
  112. // allow aborting the request
  113. dst.abort = function() {
  114. tries = 0;
  115. req.abort();
  116. }
  117. // forward all future errors to response stream
  118. req.on('error', function(err) {
  119. if (dst.listeners('error').length !== 0)
  120. dst.emit('error', err);
  121. });
  122. // turn bad content-length into actual errors
  123. if (size >= 0 && !probe) {
  124. res.on('data', function(chunk) {
  125. accum += chunk.length;
  126. if (accum > size)
  127. req.abort();
  128. });
  129. oncemore(res).once('end', 'error', function(err) {
  130. if (!err && accum !== size)
  131. failOrRetry(new Error('Stream length did not match header'), accum && accum < size);
  132. });
  133. }
  134. // extract meta information from header
  135. var typeparts = /^(.+?\/.+?)(?:;\w*.*)?$/.exec(res.headers['content-type']) || [null, 'application/octet-stream'],
  136. mimetype = typeparts[1].toLowerCase(),
  137. modified = res.headers['last-modified'] ? new Date(res.headers['last-modified']) : null;
  138. var meta = { url:url.format(req.uri), mime:mimetype, size:filesize, modified:modified };
  139. if (dst.meta) {
  140. if (!equal(dst.meta, meta)) {
  141. tries = 0;
  142. failOrRetry(new Error('File has changed'));
  143. }
  144. } else {
  145. dst.meta = meta;
  146. dst.emit('meta', dst.meta);
  147. }
  148. }
  149. }
  150. fetchHttp(offset);
  151. }
  152. function UriFetchStream(uri, options) {
  153. var self = this;
  154. Readable.call(this, options);
  155. options = options || {};
  156. this.url = url.parse(uri);
  157. this.meta = null;
  158. if (this.url.protocol === 'http:' || this.url.protocol === 'https:') {
  159. setupHttp(uri, options, this);
  160. } else {
  161. throw new Error('Unsupported protocol: '+this.url.protocol);
  162. }
  163. }
  164. util.inherits(UriFetchStream, Readable);
  165. UriFetchStream.prototype._read = noop;
  166. function uristream(uri, options) {
  167. return new UriFetchStream(uri, options);
  168. }
  169. function PartialError(err, processed, expected) {
  170. Error.call(this);
  171. if (err.stack) {
  172. Object.defineProperty(this, 'stack', {
  173. enumerable: false,
  174. configurable: false,
  175. get: function() { return err.stack; }
  176. });
  177. }
  178. else Error.captureStackTrace(this, arguments.callee);
  179. this.message = err.message || err.toString();
  180. this.processed = processed || -1;
  181. this.expected = expected;
  182. }
  183. util.inherits(PartialError, Error);
  184. PartialError.prototype.name = 'Partial Error';