recorder.js 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. "use strict";
  2. var fs = require('fs'),
  3. path = require('path'),
  4. url = require('url'),
  5. util = require('util');
  6. var mime = require('mime'),
  7. streamprocess = require('streamprocess'),
  8. oncemore = require('oncemore'),
  9. m3u8parse = require('m3u8parse'),
  10. debug = require('debug')('hls:recorder');
  11. function HlsStreamRecorder(reader, dst, options) {
  12. options = options || {};
  13. this.reader = reader;
  14. this.dst = dst; // target directory
  15. this.nextSegmentSeq = -1;
  16. this.seq = 0;
  17. this.index = null;
  18. this.subreader = options.subreader;
  19. }
  20. HlsStreamRecorder.prototype.start = function() {
  21. // TODO: make async?
  22. if (!fs.existsSync(this.dst))
  23. fs.mkdirSync(this.dst);
  24. streamprocess(this.reader, this.process.bind(this));
  25. this.updateIndex(this.reader.index);
  26. this.reader.on('index', this.updateIndex.bind(this));
  27. };
  28. HlsStreamRecorder.prototype.updateIndex = function(update) {
  29. var self = this;
  30. if (!update) return;
  31. if (!this.index) {
  32. this.index = new m3u8parse.M3U8Playlist(update);
  33. this.index.segments = [];
  34. this.index.first_seq_no = self.seq;
  35. this.index.type = 'EVENT';
  36. this.index.ended = false;
  37. debug('programs', this.index.programs);
  38. if (this.subreader) {
  39. for (var programNo in this.index.programs) {
  40. var programs = this.index.programs[programNo];
  41. // remove backup sources
  42. var used = {};
  43. programs = programs.filter(function(program) {
  44. var bw = parseInt(program.info.bandwidth, 10);
  45. var res = !(bw in used);
  46. used[bw] = true;
  47. return res;
  48. });
  49. this.index.programs[programNo] = programs;
  50. programs.forEach(function(program, index) {
  51. var programUrl = url.resolve(self.reader.baseUrl, program.uri);
  52. debug('url', programUrl);
  53. var dir = self.variantName(program.info, index);
  54. program.uri = path.join(dir, 'index.m3u8');
  55. program.recorder = new HlsStreamRecorder(self.subreader(programUrl), path.join(self.dst, dir)).start();
  56. });
  57. }
  58. // TODO: handle groups!!
  59. this.index.groups = {};
  60. this.index.iframes = {};
  61. } else {
  62. this.index.programs = {};
  63. this.index.groups = {};
  64. this.index.iframes = {};
  65. }
  66. // hook end listener
  67. this.reader.on('end', function() {
  68. self.index.ended = true;
  69. self.flushIndex(function(/*err*/) {
  70. debug('done');
  71. });
  72. });
  73. }
  74. // validate update
  75. if (this.index.target_duration > update.target_duration)
  76. throw new Error('Invalid index');
  77. };
  78. HlsStreamRecorder.prototype.process = function(obj, next) {
  79. var self = this;
  80. var segment = new m3u8parse.M3U8Segment(obj.segment);
  81. var meta = obj.meta;
  82. // mark discontinuities
  83. if (this.nextSegmentSeq !== -1 &&
  84. this.nextSegmentSeq !== obj.seq)
  85. segment.discontinuity = true;
  86. this.nextSegmentSeq = obj.seq + 1;
  87. // create our own uri
  88. segment.uri = util.format('%s.%s', this.segmentName(this.seq), mime.extension(meta.mime));
  89. // manually set iv if sequence based, since we generate our own sequence numbering
  90. if (segment.key && !segment.key.iv) {
  91. var seqStr = obj.seq.toString();
  92. segment.key.iv = '0x00000000000000000000000000000000'.slice(-seqStr.length) + seqStr;
  93. if (this.index.version > 2) {
  94. this.index.version = 2;
  95. debug('changed index version to:', this.index.version);
  96. }
  97. }
  98. // save the stream segment
  99. var stream = oncemore(obj.stream);
  100. stream.pipe(fs.createWriteStream(path.join(this.dst, segment.uri)));
  101. stream.once('end', 'error', function(err) {
  102. // only to report errors
  103. if (err) debug('stream error', err.stack || err);
  104. // update index
  105. self.index.segments.push(segment);
  106. self.flushIndex(next);
  107. });
  108. this.seq++;
  109. };
  110. HlsStreamRecorder.prototype.variantName = function(info, index) {
  111. return util.format('v%d', index);
  112. };
  113. HlsStreamRecorder.prototype.segmentName = function(seqNo) {
  114. function name(n) {
  115. var next = ~~(n / 26);
  116. var chr = String.fromCharCode(97 + n % 26); // 'a' + n
  117. if (next) return name(next - 1) + chr;
  118. return chr;
  119. }
  120. return name(seqNo);
  121. };
  122. HlsStreamRecorder.prototype.flushIndex = function(cb) {
  123. // TODO: make atomic by writing to temp file & renaming
  124. fs.writeFile(path.join(this.dst, 'index.m3u8'), this.index, cb);
  125. };
  126. var hlsrecorder = module.exports = function hlsrecorder(reader, dst, options) {
  127. return new HlsStreamRecorder(reader, dst, options);
  128. };
  129. hlsrecorder.HlsStreamRecorder = HlsStreamRecorder;