recorder.js 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. "use strict";
  2. var fs = require('fs'),
  3. path = require('path'),
  4. url = require('url'),
  5. util = require('util');
  6. var mime = require('mime'),
  7. streamprocess = require('streamprocess'),
  8. oncemore = require('oncemore'),
  9. m3u8parse = require('m3u8parse'),
  10. debug = require('debug')('hls:recorder');
  11. function HlsStreamRecorder(reader, dst, options) {
  12. options = options || {};
  13. this.reader = reader;
  14. this.dst = dst; // target directory
  15. this.nextSegmentSeq = -1;
  16. this.seq = 0;
  17. this.index = null;
  18. this.startOffset = parseFloat(options.startOffset);
  19. this.subreader = options.subreader;
  20. }
  21. HlsStreamRecorder.prototype.start = function() {
  22. // TODO: make async?
  23. if (!fs.existsSync(this.dst))
  24. fs.mkdirSync(this.dst);
  25. streamprocess(this.reader, this.process.bind(this));
  26. this.updateIndex(this.reader.index);
  27. this.reader.on('index', this.updateIndex.bind(this));
  28. };
  29. HlsStreamRecorder.prototype.updateIndex = function(update) {
  30. var self = this;
  31. if (!update) return;
  32. if (!this.index) {
  33. this.index = new m3u8parse.M3U8Playlist(update);
  34. if (!this.index.variant) {
  35. this.index.segments = [];
  36. this.index.first_seq_no = self.seq;
  37. this.index.type = 'EVENT';
  38. this.index.ended = false;
  39. this.index.discontinuity_sequence = 0; // not allowed in event playlists
  40. if (!isNaN(this.startOffset)) {
  41. var offset = this.startOffset;
  42. if (offset < 0) offset = Math.min(offset, -3 * this.target_duration);
  43. this.index.start.decimalInteger('offset', offset);
  44. }
  45. } else {
  46. debug('programs', this.index.programs);
  47. if (this.subreader) {
  48. for (var programNo in this.index.programs) {
  49. var programs = this.index.programs[programNo];
  50. // remove backup sources
  51. var used = {};
  52. programs = programs.filter(function(program) {
  53. var bw = parseInt(program.info.bandwidth, 10);
  54. var res = !(bw in used);
  55. used[bw] = true;
  56. return res;
  57. });
  58. this.index.programs[programNo] = programs;
  59. programs.forEach(function(program, index) {
  60. var programUrl = url.resolve(self.reader.baseUrl, program.uri);
  61. debug('url', programUrl);
  62. var dir = self.variantName(program.info, index);
  63. program.uri = path.join(dir, 'index.m3u8');
  64. program.recorder = new HlsStreamRecorder(self.subreader(programUrl), path.join(self.dst, dir), { startOffset: self.startOffset }).start();
  65. });
  66. }
  67. // TODO: handle groups!!
  68. this.index.groups = {};
  69. this.index.iframes = {};
  70. } else {
  71. this.index.programs = {};
  72. this.index.groups = {};
  73. this.index.iframes = {};
  74. }
  75. }
  76. // hook end listener
  77. this.reader.on('end', function() {
  78. self.index.ended = true;
  79. self.flushIndex(function(/*err*/) {
  80. debug('done');
  81. });
  82. });
  83. }
  84. // validate update
  85. if (this.index.target_duration > update.target_duration)
  86. throw new Error('Invalid index');
  87. };
  88. HlsStreamRecorder.prototype.process = function(obj, next) {
  89. var self = this;
  90. var segment = new m3u8parse.M3U8Segment(obj.segment);
  91. var meta = obj.meta;
  92. // mark discontinuities
  93. if (this.nextSegmentSeq !== -1 &&
  94. this.nextSegmentSeq !== obj.seq)
  95. segment.discontinuity = true;
  96. this.nextSegmentSeq = obj.seq + 1;
  97. // create our own uri
  98. segment.uri = util.format('%s.%s', this.segmentName(this.seq), mime.extension(meta.mime));
  99. // manually set iv if sequence based, since we generate our own sequence numbering
  100. if (segment.key && !segment.key.iv) {
  101. segment.key.hexadecimalInteger('iv', obj.seq);
  102. if (this.index.version > 2) {
  103. this.index.version = 2;
  104. debug('changed index version to:', this.index.version);
  105. }
  106. }
  107. // save the stream segment
  108. var stream = oncemore(obj.stream);
  109. stream.pipe(fs.createWriteStream(path.join(this.dst, segment.uri)));
  110. stream.once('end', 'error', function(err) {
  111. // only to report errors
  112. if (err) debug('stream error', err.stack || err);
  113. // update index
  114. self.index.segments.push(segment);
  115. self.flushIndex(next);
  116. });
  117. this.seq++;
  118. };
  119. HlsStreamRecorder.prototype.variantName = function(info, index) {
  120. return util.format('v%d', index);
  121. };
  122. HlsStreamRecorder.prototype.segmentName = function(seqNo) {
  123. function name(n) {
  124. var next = ~~(n / 26);
  125. var chr = String.fromCharCode(97 + n % 26); // 'a' + n
  126. if (next) return name(next - 1) + chr;
  127. return chr;
  128. }
  129. return name(seqNo);
  130. };
  131. HlsStreamRecorder.prototype.flushIndex = function(cb) {
  132. // TODO: make atomic by writing to temp file & renaming
  133. fs.writeFile(path.join(this.dst, 'index.m3u8'), this.index, cb);
  134. };
  135. var hlsrecorder = module.exports = function hlsrecorder(reader, dst, options) {
  136. return new HlsStreamRecorder(reader, dst, options);
  137. };
  138. hlsrecorder.HlsStreamRecorder = HlsStreamRecorder;