recorder.js 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. "use strict";
  2. var fs = require('fs'),
  3. path = require('path'),
  4. url = require('url'),
  5. util = require('util');
  6. var mime = require('mime'),
  7. streamprocess = require('streamprocess'),
  8. oncemore = require('oncemore'),
  9. m3u8parse = require('m3u8parse'),
  10. debug = require('debug')('hls:recorder');
  11. function HlsStreamRecorder(reader, dst, options) {
  12. options = options || {};
  13. this.reader = reader;
  14. this.dst = dst; // target directory
  15. this.nextSegmentSeq = -1;
  16. this.seq = 0;
  17. this.index = null;
  18. this.startOffset = parseFloat(options.startOffset);
  19. this.subreader = options.subreader;
  20. }
  21. HlsStreamRecorder.prototype.start = function() {
  22. // TODO: make async?
  23. if (!fs.existsSync(this.dst))
  24. fs.mkdirSync(this.dst);
  25. streamprocess(this.reader, this.process.bind(this));
  26. this.updateIndex(this.reader.index);
  27. this.reader.on('index', this.updateIndex.bind(this));
  28. };
  29. HlsStreamRecorder.prototype.updateIndex = function(update) {
  30. var self = this;
  31. if (!update) return;
  32. if (!this.index) {
  33. this.index = new m3u8parse.M3U8Playlist(update);
  34. if (!this.index.variant) {
  35. if (this.index.version < 2) {
  36. // version 2 is required to support the remapped IV attribute
  37. this.index.version = 2;
  38. debug('changed index version to:', this.index.version);
  39. }
  40. this.index.segments = [];
  41. this.index.first_seq_no = self.seq;
  42. this.index.type = 'EVENT';
  43. this.index.ended = false;
  44. this.index.discontinuity_sequence = 0; // not allowed in event playlists
  45. if (!isNaN(this.startOffset)) {
  46. var offset = this.startOffset;
  47. if (offset < 0) offset = Math.min(offset, -3 * this.target_duration);
  48. this.index.start.decimalInteger('offset', offset);
  49. }
  50. } else {
  51. debug('programs', this.index.programs);
  52. if (this.subreader) {
  53. for (var programNo in this.index.programs) {
  54. var programs = this.index.programs[programNo];
  55. // remove backup sources
  56. var used = {};
  57. programs = programs.filter(function(program) {
  58. var bw = parseInt(program.info.bandwidth, 10);
  59. var res = !(bw in used);
  60. used[bw] = true;
  61. return res;
  62. });
  63. this.index.programs[programNo] = programs;
  64. programs.forEach(function(program, index) {
  65. var programUrl = url.resolve(self.reader.baseUrl, program.uri);
  66. debug('url', programUrl);
  67. var dir = self.variantName(program.info, index);
  68. program.uri = path.join(dir, 'index.m3u8');
  69. program.recorder = new HlsStreamRecorder(self.subreader(programUrl), path.join(self.dst, dir), { startOffset: self.startOffset }).start();
  70. });
  71. }
  72. // TODO: handle groups!!
  73. this.index.groups = {};
  74. this.index.iframes = {};
  75. } else {
  76. this.index.programs = {};
  77. this.index.groups = {};
  78. this.index.iframes = {};
  79. }
  80. }
  81. // hook end listener
  82. this.reader.on('end', function() {
  83. self.index.ended = true;
  84. self.flushIndex(function(/*err*/) {
  85. debug('done');
  86. });
  87. });
  88. }
  89. // validate update
  90. if (this.index.target_duration > update.target_duration)
  91. throw new Error('Invalid index');
  92. };
  93. HlsStreamRecorder.prototype.process = function(obj, next) {
  94. var self = this;
  95. var segment = new m3u8parse.M3U8Segment(obj.segment);
  96. var meta = obj.meta;
  97. // mark discontinuities
  98. if (this.nextSegmentSeq !== -1 &&
  99. this.nextSegmentSeq !== obj.seq)
  100. segment.discontinuity = true;
  101. this.nextSegmentSeq = obj.seq + 1;
  102. // create our own uri
  103. segment.uri = util.format('%s.%s', this.segmentName(this.seq), mime.extension(meta.mime));
  104. // save the stream segment
  105. var stream = oncemore(obj.stream);
  106. stream.pipe(fs.createWriteStream(path.join(this.dst, segment.uri)));
  107. stream.once('end', 'error', function(err) {
  108. // only to report errors
  109. if (err) debug('stream error', err.stack || err);
  110. // update index
  111. self.index.segments.push(segment);
  112. self.flushIndex(next);
  113. });
  114. this.seq++;
  115. };
  116. HlsStreamRecorder.prototype.variantName = function(info, index) {
  117. return util.format('v%d', index);
  118. };
  119. HlsStreamRecorder.prototype.segmentName = function(seqNo) {
  120. function name(n) {
  121. var next = ~~(n / 26);
  122. var chr = String.fromCharCode(97 + n % 26); // 'a' + n
  123. if (next) return name(next - 1) + chr;
  124. return chr;
  125. }
  126. return name(seqNo);
  127. };
  128. HlsStreamRecorder.prototype.flushIndex = function(cb) {
  129. // TODO: make atomic by writing to temp file & renaming
  130. fs.writeFile(path.join(this.dst, 'index.m3u8'), this.index, cb);
  131. };
  132. var hlsrecorder = module.exports = function hlsrecorder(reader, dst, options) {
  133. return new HlsStreamRecorder(reader, dst, options);
  134. };
  135. hlsrecorder.HlsStreamRecorder = HlsStreamRecorder;