소스 검색

implement group recording + identical stream collation feature

Gil Pedersen 11 년 전
부모
커밋
75f252e079
2개의 변경된 파일82개의 추가작업 그리고 26개의 파일을 삭제
  1. 80 25
      lib/recorder.js
  2. 2 1
      package.json

+ 80 - 25
lib/recorder.js

@@ -9,13 +9,14 @@ var mime = require('mime'),
     streamprocess = require('streamprocess'),
     oncemore = require('oncemore'),
     m3u8parse = require('m3u8parse'),
+    mkdirp = require('mkdirp'),
     debug = require('debug')('hls:recorder');
 
 function HlsStreamRecorder(reader, dst, options) {
   options = options || {};
 
   this.reader = reader;
-  this.dst = dst; // target directory 
+  this.dst = dst; // target directory
 
   this.nextSegmentSeq = -1;
   this.seq = 0;
@@ -23,12 +24,14 @@ function HlsStreamRecorder(reader, dst, options) {
 
   this.startOffset = parseFloat(options.startOffset);
   this.subreader = options.subreader;
+
+  this.recorders = [];
 }
 
 HlsStreamRecorder.prototype.start = function() {
   // TODO: make async?
   if (!fs.existsSync(this.dst))
-    fs.mkdirSync(this.dst);
+    mkdirp.sync(this.dst);
 
   streamprocess(this.reader, this.process.bind(this));
 
@@ -62,31 +65,68 @@ HlsStreamRecorder.prototype.updateIndex = function(update) {
     } else {
       debug('programs', this.index.programs);
       if (this.subreader) {
-        for (var programNo in this.index.programs) {
-          var programs = this.index.programs[programNo];
-
-          // remove backup sources
-          var used = {};
-          programs = programs.filter(function(program) {
-            var bw = parseInt(program.info.bandwidth, 10);
-            var res = !(bw in used);
-            used[bw] = true;
-            return res;
-          });
-
-          this.index.programs[programNo] = programs;
-
-          programs.forEach(function(program, index) {
-            var programUrl = url.resolve(self.reader.baseUrl, program.uri);
-            debug('url', programUrl);
+        var programNo = Object.keys(this.index.programs)[0];
+        var programs = this.index.programs[programNo];
+
+        // remove backup sources
+        var used = {};
+        programs = programs.filter(function(program) {
+          var bw = parseInt(program.info.bandwidth, 10);
+          var res = !(bw in used);
+          used[bw] = true;
+          return res;
+        });
+
+        this.index.programs[programNo] = programs;
+
+        programs.forEach(function(program, index) {
+          var programUrl = url.resolve(self.reader.baseUrl, program.uri);
+          debug('url', programUrl);
+
+          // check for duplicate source urls
+          var rec = this.recorderForUrl(programUrl);
+          if (!rec || !rec.localUrl) {
             var dir = self.variantName(program.info, index);
-            program.uri = path.join(dir, 'index.m3u8');
-            program.recorder = new HlsStreamRecorder(self.subreader(programUrl), path.join(self.dst, dir), { startOffset: self.startOffset }).start();
-          });
-        }
+            rec = new HlsStreamRecorder(self.subreader(programUrl), path.join(self.dst, dir), { startOffset: self.startOffset });
+            rec.localUrl = url.format({pathname: path.join(dir, 'index.m3u8')});
+            rec.remoteUrl = programUrl;
+
+            this.recorders.push(rec);
+          }
+
+          program.uri = rec.localUrl;
+        }, this);
+
+        var allGroups = [];
+        for (var group in this.index.groups)
+          [].push.apply(allGroups, this.index.groups[group]);
+
+
+        allGroups.forEach(function(groupItem, index) {
+          var srcUri = groupItem.quotedString('uri');
+          if (srcUri) {
+            var itemUrl = url.resolve(self.reader.baseUrl, srcUri);
+            debug('url', itemUrl);
+
+            var rec = this.recorderForUrl(itemUrl);
+            if (!rec || !rec.localUrl) {
+              var dir = self.groupSrcName(groupItem, index);
+              rec = new HlsStreamRecorder(self.subreader(itemUrl), path.join(self.dst, dir), { startOffset: self.startOffset });
+              rec.localUrl = url.format({pathname: path.join(dir, 'index.m3u8')});
+              rec.remoteUrl = itemUrl;
+
+              this.recorders.push(rec);
+            }
+
+            groupItem.quotedString('uri', rec.localUrl);
+          }
+        }, this);
+
+        // start all recordings
+        this.recorders.forEach(function(recording) {
+          recording.start();
+        });
 
-        // TODO: handle groups!!
-        this.index.groups = {};
         this.index.iframes = {};
       } else {
         this.index.programs = {};
@@ -143,6 +183,12 @@ HlsStreamRecorder.prototype.variantName = function(info, index) {
   return util.format('v%d', index);
 };
 
+HlsStreamRecorder.prototype.groupSrcName = function(info, index) {
+  var lang = (info.quotedString('language') || '').replace(/\W/g, '').toLowerCase();
+  var id = (info.quotedString('group-id') || 'unk').replace(/\W/g, '').toLowerCase();
+  return util.format('grp/%s/%s%d', id, lang ? lang + '-' : '', index);
+};
+
 HlsStreamRecorder.prototype.segmentName = function(seqNo) {
   function name(n) {
     var next = ~~(n / 26);
@@ -156,6 +202,15 @@ HlsStreamRecorder.prototype.segmentName = function(seqNo) {
 HlsStreamRecorder.prototype.flushIndex = function(cb) {
   // TODO: make atomic by writing to temp file & renaming
   fs.writeFile(path.join(this.dst, 'index.m3u8'), this.index, cb);
+
+HlsStreamRecorder.prototype.recorderForUrl = function(remoteUrl) {
+  var idx, len = this.recorders.length;
+  for (idx = 0; idx < len; idx++) {
+    var rec = this.recorders[idx];
+    if (rec.remoteUrl === remoteUrl)
+      return rec;
+  }
+  return null;
 };
 
 

+ 2 - 1
package.json

@@ -30,9 +30,10 @@
     "commander": "~1.1.1",
     "debug": "~0.7.0",
     "deep-equal": "0.0.0",
-    "m3u8parse": "^0.1.10",
+    "m3u8parse": "^0.1.13",
     "measured": "~0.1.3",
     "mime": "^1.2.11",
+    "mkdirp": "^0.5.0",
     "oncemore": "~0.1.0",
     "readable-stream": "~1.0.0",
     "streamprocess": "0.0.1",