hls-uploader.js 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. 'use strict';
  2. const Assert = require('assert');
  3. const Fs = require('fs');
  4. const Path = require('path');
  5. const Url = require('url');
  6. const Util = require('util');
  7. const debug = require('debug')('hls:uploader');
  8. const Aws = require('aws-sdk');
  9. const Mkdirp = require('mkdirp');
  10. const Pati = require('pati');
  11. const WriteFileAtomic = require('write-file-atomic');
  12. const internals = {};
  13. internals.fs = {
  14. appendFile: Util.promisify(Fs.appendFile),
  15. writeFile: Util.promisify(WriteFileAtomic)
  16. };
  17. class HlsUploader {
  18. constructor(targetUri, options) {
  19. const url = Url.parse(targetUri);
  20. Assert.ok(url.protocol === null || url.protocol === 's3:');
  21. this.targetUri = targetUri;
  22. this.indexName = options.indexName || 'index.m3u8';
  23. this.collect = !!options.collect;
  24. this.cacheDuration = options.cacheDuration || 7 * 24 * 3600 * 1000;
  25. // State
  26. this.lastIndexString = '';
  27. this.segmentBytes = 0;
  28. if (url.protocol === 's3:') {
  29. Assert.equal(options.collect, false, 'Collect not supported with s3:');
  30. const params = {
  31. params: {
  32. Bucket: url.host,
  33. ACL: 'public-read',
  34. StorageClass: 'REDUCED_REDUNDANCY'
  35. }
  36. };
  37. this.s3 = new Aws.S3(params);
  38. this.baseKey = (url.pathname || '/').slice(1);
  39. } else {
  40. // TODO: make async?
  41. if (!Fs.existsSync(this.targetUri)) {
  42. Mkdirp.sync(this.targetUri);
  43. }
  44. }
  45. }
  46. async pushSegment(stream, name, meta) {
  47. const append = this.collect && this.segmentBytes !== 0;
  48. if (this.s3) {
  49. const params = {
  50. Body: stream,
  51. Key: Path.join(this.baseKey, name),
  52. ContentType: meta.mime || 'video/MP2T',
  53. CacheControl: `max-age=${Math.floor(this.cacheDuration / 1000)}, public`,
  54. ContentLength: meta.size
  55. };
  56. return new Promise((resolve, reject) => {
  57. this.s3.upload(params, (err, data) => {
  58. debug('upload finished for', Path.join(this.baseKey, name), err);
  59. return err ? reject(err) : resolve(data);
  60. });
  61. });
  62. }
  63. const target = Fs.createWriteStream(Path.join(this.targetUri, name), { flags: append ? 'a' : 'w' });
  64. stream.pipe(target);
  65. const dispatcher = new Pati.EventDispatcher(stream);
  66. dispatcher.on('end', Pati.EventDispatcher.end);
  67. let bytesWritten = 0;
  68. dispatcher.on('data', (chunk) => {
  69. bytesWritten += +chunk.length;
  70. });
  71. try {
  72. // TODO: handle target errors & wait for end?
  73. await dispatcher.finish();
  74. return bytesWritten;
  75. }
  76. finally {
  77. this.segmentBytes += bytesWritten;
  78. }
  79. }
  80. async flushIndex(index) {
  81. const indexString = index.toString().trim();
  82. if (this.s3) {
  83. const cacheTime = index.ended ? this.cacheDuration : index.target_duration * 1000 / 2;
  84. const params = {
  85. Body: indexString,
  86. Key: Path.join(this.baseKey, this.indexName),
  87. ContentType: 'application/vnd.apple.mpegURL',
  88. CacheControl: `max-age=${Math.floor(cacheTime / 1000)}, public`
  89. };
  90. return new Promise((resolve, reject) => {
  91. this.s3.putObject(params, (err, data) => {
  92. return err ? reject(err) : resolve(data);
  93. });
  94. });
  95. }
  96. let appendString;
  97. if (this.lastIndexString && indexString.startsWith(this.lastIndexString)) {
  98. const lastLength = this.lastIndexString.length;
  99. appendString = indexString.substr(lastLength);
  100. }
  101. this.lastIndexString = indexString;
  102. if (appendString) {
  103. return internals.fs.appendFile(Path.join(this.targetUri, this.indexName), appendString);
  104. }
  105. else {
  106. return internals.fs.writeFile(Path.join(this.targetUri, this.indexName), indexString);
  107. }
  108. }
  109. };
  110. module.exports = HlsUploader;