| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- "use strict";
- var fs = require('fs'),
- path = require('path'),
- url = require('url'),
- util = require('util');
- var mime = require('mime'),
- streamprocess = require('streamprocess'),
- oncemore = require('oncemore'),
- m3u8parse = require('m3u8parse'),
- debug = require('debug')('hls:recorder');
- function HlsStreamRecorder(reader, dst, options) {
- options = options || {};
- this.reader = reader;
- this.dst = dst; // target directory
- this.nextSegmentSeq = -1;
- this.seq = 0;
- this.index = null;
- this.subreader = options.subreader;
- }
- HlsStreamRecorder.prototype.start = function() {
- // TODO: make async?
- if (!fs.existsSync(this.dst))
- fs.mkdirSync(this.dst);
- streamprocess(this.reader, this.process.bind(this));
- this.updateIndex(this.reader.index);
- this.reader.on('index', this.updateIndex.bind(this));
- };
- HlsStreamRecorder.prototype.updateIndex = function(update) {
- var self = this;
- if (!update) return;
- if (!this.index) {
- this.index = new m3u8parse.M3U8Playlist(update);
- this.index.segments = [];
- this.index.first_seq_no = self.seq;
- this.index.type = 'EVENT';
- this.index.ended = false;
- debug('programs', this.index.programs);
- if (this.subreader) {
- for (var programNo in this.index.programs) {
- var programs = this.index.programs[programNo];
- // remove backup sources
- var used = {};
- programs = programs.filter(function(program) {
- var bw = parseInt(program.info.bandwidth, 10);
- var res = !(bw in used);
- used[bw] = true;
- return res;
- });
- this.index.programs[programNo] = programs;
- programs.forEach(function(program, index) {
- var programUrl = url.resolve(self.reader.baseUrl, program.uri);
- debug('url', programUrl);
- var dir = self.variantName(program.info, index);
- program.uri = path.join(dir, 'index.m3u8');
- program.recorder = new HlsStreamRecorder(self.subreader(programUrl), path.join(self.dst, dir)).start();
- });
- }
- // TODO: handle groups!!
- this.index.groups = {};
- this.index.iframes = {};
- } else {
- this.index.programs = {};
- this.index.groups = {};
- this.index.iframes = {};
- }
- // hook end listener
- this.reader.on('end', function() {
- self.index.ended = true;
- self.flushIndex(function(/*err*/) {
- debug('done');
- });
- });
- }
- // validate update
- if (this.index.target_duration > update.target_duration)
- throw new Error('Invalid index');
- };
- HlsStreamRecorder.prototype.process = function(obj, next) {
- var self = this;
- var segment = new m3u8parse.M3U8Segment(obj.segment);
- var meta = obj.meta;
- // mark discontinuities
- if (this.nextSegmentSeq !== -1 &&
- this.nextSegmentSeq !== obj.seq)
- segment.discontinuity = true;
- this.nextSegmentSeq = obj.seq + 1;
- // create our own uri
- segment.uri = util.format('%s.%s', this.segmentName(this.seq), mime.extension(meta.mime));
- // manually set iv if sequence based, since we generate our own sequence numbering
- if (segment.key && !segment.key.iv) {
- var seqStr = obj.seq.toString();
- segment.key.iv = '0x00000000000000000000000000000000'.slice(-seqStr.length) + seqStr;
- if (this.index.version > 2) {
- this.index.version = 2;
- debug('changed index version to:', this.index.version);
- }
- }
- // save the stream segment
- var stream = oncemore(obj.stream);
- stream.pipe(fs.createWriteStream(path.join(this.dst, segment.uri)));
- stream.once('end', 'error', function(err) {
- // only to report errors
- if (err) debug('stream error', err.stack || err);
- // update index
- self.index.segments.push(segment);
- self.flushIndex(next);
- });
- this.seq++;
- };
- HlsStreamRecorder.prototype.variantName = function(info, index) {
- return util.format('v%d', index);
- };
- HlsStreamRecorder.prototype.segmentName = function(seqNo) {
- function name(n) {
- var next = ~~(n / 26);
- var chr = String.fromCharCode(97 + n % 26); // 'a' + n
- if (next) return name(next - 1) + chr;
- return chr;
- }
- return name(seqNo);
- };
- HlsStreamRecorder.prototype.flushIndex = function(cb) {
- // TODO: make atomic by writing to temp file & renaming
- fs.writeFile(path.join(this.dst, 'index.m3u8'), this.index, cb);
- };
- var hlsrecorder = module.exports = function hlsrecorder(reader, dst, options) {
- return new HlsStreamRecorder(reader, dst, options);
- };
- hlsrecorder.HlsStreamRecorder = HlsStreamRecorder;
|