hls-uploader.js 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. 'use strict';
  2. const Assert = require('assert');
  3. const Fs = require('fs');
  4. const Path = require('path');
  5. const Url = require('url');
  6. const Util = require('util');
  7. const Aws = require('aws-sdk');
  8. const Mkdirp = require('mkdirp');
  9. const Pati = require('pati');
  10. const WriteFileAtomic = require('write-file-atomic');
  11. const internals = {};
  12. internals.fs = {
  13. appendFile: Util.promisify(Fs.appendFile),
  14. writeFile: Util.promisify(WriteFileAtomic)
  15. };
  16. class HlsUploader {
  17. constructor(targetUri, options) {
  18. const url = Url.parse(targetUri);
  19. Assert.ok(url.protocol === null || url.protocol === 's3:');
  20. this.targetUri = targetUri;
  21. this.indexName = options.indexName || 'index.m3u8';
  22. this.collect = !!options.collect;
  23. this.cacheDuration = options.cacheDuration || 7 * 24 * 3600 * 1000;
  24. // State
  25. this.lastIndexString = '';
  26. this.segmentBytes = 0;
  27. if (url.protocol === 's3:') {
  28. Assert.equal(options.collect, false, 'Collect not supported with s3:');
  29. const params = {
  30. params: {
  31. Bucket: url.host,
  32. ACL: 'public-read',
  33. StorageClass: 'REDUCED_REDUNDANCY'
  34. }
  35. };
  36. this.s3 = new Aws.S3(params);
  37. this.baseKey = (url.pathname || '/').slice(1);
  38. } else {
  39. // TODO: make async?
  40. if (!Fs.existsSync(this.targetUri)) {
  41. Mkdirp.sync(this.targetUri);
  42. }
  43. }
  44. }
  45. async pushSegment(stream, name, meta) {
  46. const append = this.collect && this.segmentBytes !== 0;
  47. if (this.s3) {
  48. const params = {
  49. Body: stream,
  50. Key: Path.join(this.baseKey, name),
  51. ContentType: meta.mime || 'video/MP2T',
  52. CacheControl: `max-age=${Math.floor(this.cacheDuration / 1000)}, public`,
  53. ContentLength: meta.size
  54. };
  55. return new Promise((resolve, reject) => {
  56. this.s3.upload(params, (err, data) => {
  57. return err ? reject(err) : resolve(data);
  58. });
  59. });
  60. }
  61. const target = Fs.createWriteStream(Path.join(this.targetUri, name), { flags: append ? 'a' : 'w' });
  62. stream.pipe(target);
  63. const dispatcher = new Pati.EventDispatcher(stream);
  64. dispatcher.on('end', Pati.EventDispatcher.end);
  65. let bytesWritten = 0;
  66. dispatcher.on('data', (chunk) => {
  67. bytesWritten += +chunk.length;
  68. });
  69. try {
  70. // TODO: handle target errors & wait for end?
  71. await dispatcher.finish();
  72. return bytesWritten;
  73. }
  74. finally {
  75. this.segmentBytes += bytesWritten;
  76. }
  77. }
  78. async flushIndex(index) {
  79. const indexString = index.toString().trim();
  80. if (this.s3) {
  81. const cacheTime = index.ended ? this.cacheDuration : index.target_duration * 1000 / 2;
  82. const params = {
  83. Body: indexString,
  84. Key: Path.join(this.baseKey, this.indexName),
  85. ContentType: 'application/vnd.apple.mpegURL',
  86. CacheControl: `max-age=${Math.floor(cacheTime / 1000)}, public`
  87. };
  88. return new Promise((resolve, reject) => {
  89. this.s3.putObject(params, (err, data) => {
  90. return err ? reject(err) : resolve(data);
  91. });
  92. });
  93. }
  94. let appendString;
  95. if (this.lastIndexString && indexString.startsWith(this.lastIndexString)) {
  96. const lastLength = this.lastIndexString.length;
  97. appendString = indexString.substr(lastLength);
  98. }
  99. this.lastIndexString = indexString;
  100. if (appendString) {
  101. return internals.fs.appendFile(Path.join(this.targetUri, this.indexName), appendString);
  102. }
  103. else {
  104. return internals.fs.writeFile(Path.join(this.targetUri, this.indexName), indexString);
  105. }
  106. }
  107. };
  108. module.exports = HlsUploader;