hls-reader.js 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. "use strict";
  2. var Url = require('url'),
  3. Util = require('util'),
  4. Crypto = require('crypto');
  5. var StreamProcess = require('streamprocess'),
  6. oncemore = require('oncemore'),
  7. UriStream = require('uristream');
  8. var Readable = require('readable-stream/readable'),
  9. Passthrough = require('readable-stream/passthrough');
  10. var tssmooth = require('./tssmooth');
  11. var internals = {
  12. keyCache: {},
  13. };
  14. var NOOP = function(){};
  15. // 'pipe' stream to a Readable
  16. function pump(src, dst, done) {
  17. src.on('data', function(chunk) {
  18. if (!dst.push(chunk)) {
  19. src.pause();
  20. }
  21. });
  22. oncemore(src).once('end', 'error', function(err) {
  23. // TODO: flush source buffer on error?
  24. dst._read = NOOP;
  25. done(err);
  26. });
  27. dst._read = function() {
  28. src.resume();
  29. };
  30. }
  31. // TODO: use pipe as interface to segment-reader?
  32. function HlsReader(segmentReader, options) {
  33. if (!(this instanceof HlsReader))
  34. return new HlsReader(segmentReader, options);
  35. options = options || {};
  36. Readable.call(this, { lowWaterMark: options.lowWaterMark, highWaterMark: options.highWaterMark });
  37. var self = this;
  38. this.reader = segmentReader;
  39. this.sync = !!options.sync; // output in real-time
  40. this.bufferSize = ~~options.bufferSize;
  41. this.cookie = options.cookie;
  42. this.key = options.key;
  43. if (options.key && !Buffer.isBuffer(options.key) && options.key.length !== 32) {
  44. throw new TypeError('key must be a 32 byte Buffer');
  45. }
  46. this.isReading = false;
  47. this.isHooked = false;
  48. this.buffer = new Passthrough({ highWaterMark: this.bufferSize });
  49. StreamProcess(this.reader, function (segmentInfo, done) {
  50. self.isReading = true;
  51. return self.decrypt(segmentInfo.stream, segmentInfo.details.key, function (err, stream) {
  52. if (err) {
  53. console.error('decrypt failed', err.stack);
  54. stream = segmentInfo.stream;
  55. }
  56. self.emit('segment', segmentInfo);
  57. stream = oncemore(stream);
  58. if (!self.isHooked) {
  59. // pull data and detect if we need to hook before end
  60. var buffered = 0;
  61. stream.on('data', function(chunk) {
  62. buffered += chunk.length;
  63. if (!self.isHooked && buffered >= self.bufferSize)
  64. self.hook();
  65. });
  66. }
  67. stream.pipe(self.buffer, { end: false });
  68. stream.once('end', 'error', function(err) {
  69. self.isReading = false;
  70. if (err) {
  71. console.error('stream error', err.stack || err);
  72. }
  73. self.hook();
  74. done();
  75. });
  76. });
  77. });
  78. this.reader.on('end', function() {
  79. self.buffer.end();
  80. });
  81. // start output if needed
  82. if (!this.sync) {
  83. process.nextTick(function() {
  84. self.hook();
  85. });
  86. }
  87. }
  88. Util.inherits(HlsReader, Readable);
  89. HlsReader.prototype._read = NOOP;
  90. // the hook is used to prebuffer
  91. HlsReader.prototype.hook = function hook() {
  92. var self = this;
  93. if (this.isHooked) return;
  94. self.isHooked = true;
  95. var s = this.buffer;
  96. if (this.sync) {
  97. var smooth = tssmooth();
  98. smooth.on('unpipe', function() {
  99. this.unpipe();
  100. });
  101. smooth.on('warning', function(err) {
  102. console.error('smoothing error', err);
  103. });
  104. s = s.pipe(smooth);
  105. }
  106. pump(s, this, function(err) {
  107. if (err) {
  108. return self.emit('error', err);
  109. }
  110. self.push(null);
  111. });
  112. this.emit('ready');
  113. };
  114. HlsReader.prototype.decrypt = function (stream, keyAttrs, next) {
  115. if (!keyAttrs) return next(null, stream);
  116. if (keyAttrs.enumeratedString('method') !== 'AES-128' ||
  117. !keyAttrs.quotedString('uri') || !keyAttrs.hexadecimalInteger('iv')) {
  118. // TODO: hard error when key is not recognized?
  119. return next(new Error('unknown encryption parameters'));
  120. }
  121. return this.fetchKey(keyAttrs.quotedString('uri'), function(err, key) {
  122. if (err)
  123. return next(new Error('key fetch failed: ' + (err.stack || err)));
  124. var iv = keyAttrs.hexadecimalInteger('iv');
  125. try {
  126. var decrypt = Crypto.createDecipheriv('aes-128-cbc', key, iv);
  127. } catch (ex) {
  128. return next(new Error('crypto setup failed: ' (ex.stack || ex)));
  129. }
  130. // forward stream errors
  131. stream.on('error', function(err) {
  132. decrypt.emit('error', err);
  133. });
  134. return next(null, stream.pipe(decrypt));
  135. });
  136. };
  137. HlsReader.prototype.fetchKey = function (keyUri, next) {
  138. if (this.key) return next(null, this.key);
  139. var uri = Url.resolve(this.reader.url, keyUri);
  140. var entry = internals.keyCache[uri];
  141. if (entry && entry.length) return next(null, internals.keyCache[uri]);
  142. var key = new Buffer(0);
  143. var headers = {};
  144. if (this.cookie)
  145. headers.Cookie = this.cookie;
  146. oncemore(UriStream(uri, { headers: headers, whitelist: ['http', 'https', 'data'], timeout: 10 * 1000 }))
  147. .on('data', function(chunk) {
  148. key = Buffer.concat([key, chunk]);
  149. })
  150. .once('error', 'end', function(err) {
  151. internals.keyCache[uri] = key;
  152. return next(err, key);
  153. });
  154. };
  155. var hlsreader = module.exports = function hlsreader(segmentReader, options) {
  156. return new HlsReader(segmentReader, options);
  157. };
  158. hlsreader.HlsReader = HlsReader;