uristream.js 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. var util = require('util'),
  2. url = require('url'),
  3. zlib = require('zlib'),
  4. assert = require('assert');
  5. var request = require('request'),
  6. extend = require('xtend'),
  7. oncemore = require('./oncemore'),
  8. debug = require('debug')('hls:uristream');
  9. try {
  10. var Readable = require('stream').Readable;
  11. assert(Readable);
  12. } catch (e) {
  13. var Readable = require('readable-stream');
  14. }
  15. function noop() {};
  16. var pkg = require('../package');
  17. var DEFAULT_AGENT = util.format('%s/v%s (http://github.com/kanongil/node-hls-tools) node.js/%s', pkg.name, pkg.version, process.version);
  18. module.exports = uristream;
  19. uristream.UriFetchStream = UriFetchStream;
  20. uristream.PartialError = PartialError;
  21. function inheritErrors(stream) {
  22. stream.on('pipe', function(source) {
  23. source.on('error', stream.emit.bind(stream, 'error'));
  24. });
  25. stream.on('unpipe', function(source) {
  26. source.removeListener('error', stream.emit.bind(stream, 'error'));
  27. });
  28. return stream;
  29. }
  30. function setupHttp(uri, options, dst) {
  31. var defaults = {
  32. 'user-agent': DEFAULT_AGENT,
  33. 'accept-encoding': ['gzip','deflate']
  34. };
  35. // TODO: handle case in header names
  36. var headers = extend(defaults, options.headers);
  37. var timeout = options.timeout || 60*1000;
  38. var probe = !!options.probe;
  39. var offset = ~~options.start;
  40. var tries = 10;
  41. var fetch = probe ? request.head : request.get;
  42. function fetchHttp(start) {
  43. if (start > 0)
  44. headers['range'] = 'bytes=' + start + '-';
  45. else
  46. delete headers['range'];
  47. var req = fetch({uri:uri, pool:false, headers:headers, timeout:timeout});
  48. req.on('error', onreqerror);
  49. req.on('error', noop);
  50. req.on('response', onresponse);
  51. function failOrRetry(err, temporary) {
  52. req.abort();
  53. return dst.emit('error', err);
  54. }
  55. function reqcleanup() {
  56. req.removeListener('error', onreqerror);
  57. req.removeListener('response', onresponse);
  58. }
  59. function onreqerror(err) {
  60. reqcleanup();
  61. failOrRetry(err);
  62. }
  63. function onresponse(res) {
  64. reqcleanup();
  65. if (res.statusCode !== 200 && res.statusCode !== 206)
  66. return failOrRetry(new Error('Bad server response code: '+res.statusCode), res.statusCode >= 500 && res.statusCode !== 501);
  67. var size = res.headers['content-length'] ? parseInt(res.headers['content-length'], 10) : -1;
  68. var filesize = (size >= 0) ? start + size - offset : -1;
  69. // transparently handle gzip responses
  70. var stream = res;
  71. if (res.headers['content-encoding'] === 'gzip' || res.headers['content-encoding'] === 'deflate') {
  72. unzip = zlib.createUnzip();
  73. stream = stream.pipe(inheritErrors(unzip));
  74. filesize = -1;
  75. }
  76. // pipe it to self
  77. stream.on('data', function(chunk) {
  78. if (!dst.push(chunk))
  79. stream.pause();
  80. });
  81. oncemore(stream).once('end', 'error', function(err) {
  82. dst._read = noop;
  83. if (err) return failOrRetry(err);
  84. dst.push(null);
  85. });
  86. dst._read = function(n) {
  87. stream.resume();
  88. };
  89. // allow aborting the request
  90. dst.abort = req.abort.bind(req);
  91. // forward all future errors to response stream
  92. req.on('error', function(err) {
  93. if (dst.listeners('error').length !== 0)
  94. dst.emit('error', err);
  95. });
  96. // turn bad content-length into actual errors
  97. if (size >= 0 && !probe) {
  98. var accum = 0;
  99. res.on('data', function(chunk) {
  100. accum += chunk.length;
  101. if (accum > size)
  102. req.abort();
  103. });
  104. oncemore(res).once('end', 'error', function(err) {
  105. if (!err && accum !== size)
  106. failOrRetry(new PartialError('Stream length did not match header', accum, size), accum && accum < size);
  107. });
  108. }
  109. // attach empty 'error' listener to keep it from ever throwing
  110. dst.on('error', noop);
  111. if (!dst.meta) {
  112. // extract meta information from header
  113. var typeparts = /^(.+?\/.+?)(?:;\w*.*)?$/.exec(res.headers['content-type']) || [null, 'application/octet-stream'],
  114. mimetype = typeparts[1].toLowerCase(),
  115. modified = res.headers['last-modified'] ? new Date(res.headers['last-modified']) : null;
  116. dst.meta = {url:url.format(req.uri), mime:mimetype, size:filesize, modified:modified};
  117. dst.emit('meta', dst.meta);
  118. }
  119. }
  120. }
  121. fetchHttp(offset);
  122. }
  123. function UriFetchStream(uri, options) {
  124. var self = this;
  125. Readable.call(this, options);
  126. options = options || {};
  127. this.url = url.parse(uri);
  128. this.meta = null;
  129. if (this.url.protocol === 'http:' || this.url.protocol === 'https:') {
  130. setupHttp(uri, options, this);
  131. } else {
  132. throw new Error('Unsupported protocol: '+this.url.protocol);
  133. }
  134. }
  135. util.inherits(UriFetchStream, Readable);
  136. UriFetchStream.prototype._read = noop;
  137. function uristream(uri, options) {
  138. return new UriFetchStream(uri, options);
  139. }
  140. function PartialError(msg, processed, expected) {
  141. Error.captureStackTrace(this, this);
  142. this.message = msg || 'PartialError';
  143. this.processed = processed || -1;
  144. this.expected = expected;
  145. }
  146. util.inherits(PartialError, Error);
  147. PartialError.prototype.name = 'Partial Error';