uristream.js 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. "use strict";
  2. var util = require('util'),
  3. url = require('url'),
  4. zlib = require('zlib'),
  5. assert = require('assert');
  6. var request = require('request'),
  7. extend = require('xtend'),
  8. equal = require('deep-equal'),
  9. oncemore = require('oncemore'),
  10. debug = require('debug')('hls:uristream');
  11. try {
  12. var Readable = require('stream').Readable;
  13. assert(Readable);
  14. } catch (e) {
  15. var Readable = require('readable-stream');
  16. }
  17. function noop() {};
  18. var pkg = require('../package');
  19. var DEFAULT_AGENT = util.format('%s/v%s (http://github.com/kanongil/node-hls-tools) request/v%s node.js/%s', pkg.name, pkg.version, require('request/package').version, process.version);
  20. module.exports = uristream;
  21. uristream.UriFetchStream = UriFetchStream;
  22. uristream.PartialError = PartialError;
  23. function inheritErrors(stream) {
  24. stream.on('pipe', function(source) {
  25. source.on('error', stream.emit.bind(stream, 'error'));
  26. });
  27. stream.on('unpipe', function(source) {
  28. source.removeListener('error', stream.emit.bind(stream, 'error'));
  29. });
  30. return stream;
  31. }
  32. // 'pipe' any stream to a Readable
  33. function pump(src, dst, done) {
  34. src.on('data', function(chunk) {
  35. if (!dst.push(chunk))
  36. src.pause();
  37. });
  38. oncemore(src).once('end', 'error', function(err) {
  39. dst._read = noop;
  40. done(err);
  41. });
  42. dst._read = function(n) {
  43. src.resume();
  44. };
  45. }
  46. function setupHttp(uri, options, dst) {
  47. var defaults = {
  48. 'user-agent': DEFAULT_AGENT
  49. };
  50. // TODO: handle case in header names
  51. var timeout = options.timeout || 10*1000;
  52. var probe = !!options.probe;
  53. var offset = ~~options.start;
  54. var agent = options.agent || null;
  55. var tries = 10;
  56. if (!probe) defaults['accept-encoding'] = ['gzip','deflate'];
  57. var fetch = probe ? request.head : request.get;
  58. var headers = extend(defaults, options.headers);
  59. if ('range' in headers)
  60. throw new Error('range header is not allowed');
  61. // attach empty 'error' listener to keep dst from ever throwing
  62. dst.on('error', noop);
  63. function fetchHttp(start) {
  64. if (start > 0)
  65. headers['range'] = 'bytes=' + start + '-';
  66. var accum = 0, size = -1;
  67. var req = fetch({uri:uri, headers:headers, agent:agent, timeout:timeout});
  68. req.on('error', onreqerror);
  69. req.on('response', onresponse);
  70. var failed = false;
  71. function failOrRetry(err, temporary) {
  72. if (failed) return;
  73. failed = true;
  74. req.abort();
  75. if (--tries <= 0) {
  76. // remap error to partial error if we have received any data
  77. if (start - offset + accum !== 0)
  78. err = new PartialError(err, start - offset + accum, (size !== -1) ? start - offset + size : size);
  79. return dst.emit('error', err);
  80. }
  81. debug('retrying at ' + (start + accum));
  82. // TODO: delay retry?
  83. fetchHttp(start + accum);
  84. }
  85. function reqcleanup() {
  86. req.removeListener('error', onreqerror);
  87. req.removeListener('response', onresponse);
  88. req.on('error', noop);
  89. }
  90. function onreqerror(err) {
  91. reqcleanup();
  92. failOrRetry(err);
  93. }
  94. function onresponse(res) {
  95. reqcleanup();
  96. if (res.statusCode !== 200 && res.statusCode !== 206)
  97. return failOrRetry(new Error('Bad server response code: '+res.statusCode), res.statusCode >= 500 && res.statusCode !== 501);
  98. if (res.headers['content-length']) size = parseInt(res.headers['content-length'], 10);
  99. var filesize = (size >= 0) ? start + size : -1;
  100. // transparently handle gzip responses
  101. var stream = res;
  102. if (res.headers['content-encoding'] === 'gzip' || res.headers['content-encoding'] === 'deflate') {
  103. unzip = zlib.createUnzip();
  104. stream = stream.pipe(inheritErrors(unzip));
  105. filesize = -1;
  106. }
  107. // pipe it to self
  108. pump(stream, dst, function(err) {
  109. if (err || failed) return failOrRetry(err);
  110. debug('done fetching uri', uri);
  111. dst.push(null);
  112. dst.closed = true;
  113. dst.emit('close');
  114. });
  115. // allow aborting the request
  116. dst.abort = function(reason) {
  117. if (!dst.closed) {
  118. tries = 0;
  119. req.abort();
  120. }
  121. }
  122. // forward all future errors to response stream
  123. req.on('error', function(err) {
  124. if (dst.listeners('error').length !== 0)
  125. dst.emit('error', err);
  126. });
  127. // turn bad content-length into actual errors
  128. if (size >= 0 && !probe) {
  129. res.on('data', function(chunk) {
  130. accum += chunk.length;
  131. if (accum > size)
  132. req.abort();
  133. });
  134. oncemore(res).once('end', 'error', function(err) {
  135. if (!err && accum !== size)
  136. failOrRetry(new Error('Stream length did not match header'), accum && accum < size);
  137. });
  138. }
  139. // extract meta information from header
  140. var typeparts = /^(.+?\/.+?)(?:;\w*.*)?$/.exec(res.headers['content-type']) || [null, 'application/octet-stream'],
  141. mimetype = typeparts[1].toLowerCase(),
  142. modified = res.headers['last-modified'] ? new Date(res.headers['last-modified']) : null;
  143. var meta = { url:url.format(req.uri), mime:mimetype, size:filesize, modified:modified };
  144. if (dst.meta) {
  145. if (!equal(dst.meta, meta)) {
  146. tries = 0;
  147. failOrRetry(new Error('File has changed'));
  148. }
  149. } else {
  150. dst.meta = meta;
  151. dst.emit('meta', dst.meta);
  152. }
  153. }
  154. }
  155. fetchHttp(offset);
  156. }
  157. function UriFetchStream(uri, options) {
  158. var self = this;
  159. Readable.call(this, options);
  160. options = options || {};
  161. this.url = url.parse(uri);
  162. this.meta = null;
  163. if (this.url.protocol === 'http:' || this.url.protocol === 'https:') {
  164. setupHttp(uri, options, this);
  165. } else {
  166. throw new Error('Unsupported protocol: '+this.url.protocol);
  167. }
  168. }
  169. util.inherits(UriFetchStream, Readable);
  170. UriFetchStream.prototype._read = noop;
  171. function uristream(uri, options) {
  172. return new UriFetchStream(uri, options);
  173. }
  174. function PartialError(err, processed, expected) {
  175. Error.call(this);
  176. if (err.stack) {
  177. Object.defineProperty(this, 'stack', {
  178. enumerable: false,
  179. configurable: false,
  180. get: function() { return err.stack; }
  181. });
  182. }
  183. else Error.captureStackTrace(this, arguments.callee);
  184. this.message = err.message || err.toString();
  185. this.processed = processed || -1;
  186. this.expected = expected;
  187. }
  188. util.inherits(PartialError, Error);
  189. PartialError.prototype.name = 'Partial Error';