瀏覽代碼

Refactor stream saving

Gil Pedersen 8 年之前
父節點
當前提交
4db5872dd0
共有 3 個文件被更改,包括 110 次插入49 次删除
  1. 98 0
      lib/hls-uploader.js
  2. 10 48
      lib/recorder.js
  3. 2 1
      package.json

+ 98 - 0
lib/hls-uploader.js

@@ -0,0 +1,98 @@
+'use strict';
+
+const Assert = require('assert');
+const Fs = require('fs');
+const Path = require('path');
+const Url = require('url');
+const Util = require('util');
+
+const Mkdirp = require('mkdirp');
+const Pati = require('pati');
+const WriteFileAtomic = require('write-file-atomic');
+
+
+const internals = {};
+
+
+internals.fs = {
+    appendFile: Util.promisify(Fs.appendFile),
+    writeFile: Util.promisify(WriteFileAtomic)
+};
+
+
+class HlsUploader {
+
+    constructor(targetUri, options) {
+
+        Assert.equal(Url.parse(targetUri).protocol, null);
+
+        this.targetUri = targetUri;
+
+        this.indexName = options.indexName || 'index.m3u8';
+        this.collect = !!options.collect;
+
+        // State
+
+        this.lastIndexString = '';
+        this.segmentBytes = 0;
+
+        // TODO: make async?
+        if (!Fs.existsSync(this.targetUri)) {
+            Mkdirp.sync(this.targetUri);
+        }
+    }
+
+    async pushSegment(stream, name) {
+
+        const target = this.prepareTargetStream(name);
+
+        stream.pipe(target);
+
+        const dispatcher = new Pati.EventDispatcher(stream);
+
+        dispatcher.on('end', Pati.EventDispatcher.end);
+
+        let bytesWritten = 0;
+        dispatcher.on('data', (chunk) => {
+
+            bytesWritten += +chunk.length;
+        });
+
+        try {
+            // TODO: handle target errors & wait for end?
+            await dispatcher.finish();
+            return bytesWritten;
+        }
+        finally {
+            this.segmentBytes += bytesWritten;
+        }
+    }
+
+    prepareTargetStream(name) {
+
+        const append = this.collect && this.segmentBytes !== 0;
+        return Fs.createWriteStream(Path.join(this.targetUri, name), { flags: append ? 'a' : 'w' });
+    }
+
+    flushIndex(index) {
+
+        const indexString = index.toString().trim();
+
+        let appendString;
+        if (this.lastIndexString && indexString.startsWith(this.lastIndexString)) {
+            const lastLength = this.lastIndexString.length;
+            appendString = indexString.substr(lastLength);
+        }
+        this.lastIndexString = indexString;
+
+        if (appendString) {
+            return internals.fs.appendFile(Path.join(this.targetUri, this.indexName), appendString);
+        }
+        else {
+            return internals.fs.writeFile(Path.join(this.targetUri, this.indexName), indexString);
+        }
+    }
+};
+
+
+module.exports = HlsUploader;

+ 10 - 48
lib/recorder.js

@@ -1,17 +1,14 @@
 'use strict';
 
-const Fs = require('fs');
 const Path = require('path');
 const Url = require('url');
 
 const Mime = require('mime-types');
 const StreamEach = require('stream-each');
-const Oncemore = require('oncemore');
 const M3U8Parse = require('m3u8parse');
-const Mkdirp = require('mkdirp');
-const writeFileAtomic = require('write-file-atomic');
 const debug = require('debug')('hls:recorder');
 
+const HlsUploader = require('./hls-uploader');
 const SegmentDecrypt = require('./segment-decrypt');
 
 
@@ -41,15 +38,13 @@ function HlsStreamRecorder(reader, dst, options) {
   this.mapSeq = 0;
   this.nextMap = null;
 
-  this.writing = null; // tracks writing state
+  this.uploader = null;
+  this.segmentHead = 0;
 }
 
 HlsStreamRecorder.prototype.start = function() {
 
-  // TODO: make async?
-  if (!Fs.existsSync(this.dst)) {
-    Mkdirp.sync(this.dst);
-  }
+  this.uploader = new HlsUploader(this.dst, { collect: this.collect });
 
   StreamEach(this.reader, this.process.bind(this));
 
@@ -207,7 +202,7 @@ HlsStreamRecorder.prototype.processInfo = function (segmentInfo, callback) {
 
     // handle byterange
     if (this.collect) {
-      map.quotedString('byterange', `${bytesWritten}@${this.writing.bytes - bytesWritten}`);
+      map.quotedString('byterange', `${bytesWritten}@${this.uploader.segmentBytes - bytesWritten}`);
     }
 
     this.nextMap = map;
@@ -258,13 +253,13 @@ HlsStreamRecorder.prototype.processSegment = function (segmentInfo, callback) {
 
       // handle byterange
       if (this.collect) {
-        const isContigious = this.writing.segmentHead > 0 && ((this.writing.segmentHead + bytesWritten) === this.writing.bytes);
+        const isContigious = this.segmentHead > 0 && ((this.segmentHead + bytesWritten) === this.uploader.segmentBytes);
         segment.byterange = {
           length: bytesWritten,
-          offset: isContigious ? null : this.writing.bytes - bytesWritten
+          offset: isContigious ? null : this.uploader.segmentBytes - bytesWritten
         }
 
-        this.writing.segmentHead = this.writing.bytes;
+        this.segmentHead = this.uploader.segmentBytes;
       }
 
       // update index
@@ -278,28 +273,7 @@ HlsStreamRecorder.prototype.processSegment = function (segmentInfo, callback) {
 
 HlsStreamRecorder.prototype.writeStream = function (stream, name, callback) {
 
-  if (!this.writing || !this.collect) {
-    this.writing = {
-      bytes: 0,
-      segmentHead: 0
-    };
-  }
-
-  stream.pipe(Fs.createWriteStream(Path.join(this.dst, name), { flags: this.writing.bytes === 0 ? 'w' : 'a' }));
-
-  let bytesWritten = 0;
-  if (this.collect) {
-    stream.on('data', (chunk) => {
-
-      bytesWritten += +chunk.length;
-    });
-  }
-
-  Oncemore(stream).once('end', 'error', (err) => {
-
-    this.writing.bytes += bytesWritten;
-    return callback(err, bytesWritten);
-  });
+  this.uploader.pushSegment(stream, name).then((written) => callback(null, written), callback);
 };
 
 HlsStreamRecorder.prototype.variantName = function(info, index) {
@@ -329,19 +303,7 @@ HlsStreamRecorder.prototype.segmentName = function(seqNo, isInit) {
 
 HlsStreamRecorder.prototype.flushIndex = function(cb) {
 
-  let appendString, indexString = this.index.toString().trim();
-  if (this.lastIndexString && indexString.lastIndexOf(this.lastIndexString, 0) === 0) {
-    let lastLength = this.lastIndexString.length;
-    appendString = indexString.substr(lastLength);
-  }
-  this.lastIndexString = indexString;
-
-  if (appendString) {
-    Fs.appendFile(Path.join(this.dst, 'index.m3u8'), appendString, cb);
-  }
-  else {
-    writeFileAtomic(Path.join(this.dst, 'index.m3u8'), indexString, cb);
-  }
+  this.uploader.flushIndex(this.index).then(() => cb(), cb);
 };
 
 HlsStreamRecorder.prototype.recorderForUrl = function(remoteUrl) {

+ 2 - 1
package.json

@@ -35,11 +35,12 @@
     "mkdirp": "^0.5.0",
     "noptify": "0.0.3",
     "oncemore": "^1.0.0",
+    "pati": "^1.1.0",
     "readable-stream": "^2.0.2",
     "stream-each": "^1.1.2",
     "udp-blast": "^1.0.0",
     "uristream": "^2.0.0",
-    "write-file-atomic": "^1.1.0"
+    "write-file-atomic": "^2.3.0"
   },
   "devDependencies": {},
   "engines": {