فهرست منبع

Basic s3 upload support

Gil Pedersen 8 سال پیش
والد
کامیت
87cfa7e367
4فایلهای تغییر یافته به همراه71 افزوده شده و 20 حذف شده
  1. 2 3
      bin/hlsrecord
  2. 62 12
      lib/hls-uploader.js
  3. 6 5
      lib/recorder.js
  4. 1 0
      package.json

+ 2 - 3
bin/hlsrecord

@@ -5,8 +5,7 @@
 
 // record a live hls-stream storing an on-demand ready version
 
-var fs = require('fs'),
-    path = require('path');
+var fs = require('fs');
 var nopt;
 try {
   nopt = require('noptify/node_modules/nopt');
@@ -40,7 +39,7 @@ nopt.typeDefs[HexValue] = { type: HexValue, validate: function (data, key, val)
 var hlsrecord = require('noptify')(process.argv, { program: 'hlsrecord <url>' });
 hlsrecord.version(require('../package').version)
    .option('collect', '-C', 'Collect output segments to a single file', Boolean)
-   .option('output', '-o', 'Output directory', path)
+   .option('output', '-o', 'Output directory', String)
    .option('create-dir', '-c', 'Explicitly create output dir', Boolean)
    .option('begin-date', '-b', 'Start recording at', DateValue)
    .option('end-date', '-e', 'Stop recording at', DateValue)

+ 62 - 12
lib/hls-uploader.js

@@ -6,6 +6,7 @@ const Path = require('path');
 const Url = require('url');
 const Util = require('util');
 
+const Aws = require('aws-sdk');
 const Mkdirp = require('mkdirp');
 const Pati = require('pati');
 const WriteFileAtomic = require('write-file-atomic');
@@ -24,28 +25,64 @@ class HlsUploader {
 
     constructor(targetUri, options) {
 
-        Assert.equal(Url.parse(targetUri).protocol, null);
+        const url = Url.parse(targetUri);
+        Assert.ok(url.protocol === null || url.protocol === 's3:');
 
         this.targetUri = targetUri;
 
         this.indexName = options.indexName || 'index.m3u8';
         this.collect = !!options.collect;
+        this.cacheDuration = options.cacheDuration || 7 * 24 * 3600 * 1000;
 
         // State
 
         this.lastIndexString = '';
         this.segmentBytes = 0;
 
-        // TODO: make async?
-        if (!Fs.existsSync(this.targetUri)) {
-            Mkdirp.sync(this.targetUri);
+        if (url.protocol === 's3:') {
+            Assert.equal(options.collect, false, 'Collect not supported with s3:');
+
+            const params = {
+                params: {
+                    Bucket: url.host,
+                    ACL: 'public-read',
+                    StorageClass: 'REDUCED_REDUNDANCY'
+                }
+            };
+
+            this.s3 = new Aws.S3(params);
+            this.baseKey = (url.pathname || '/').slice(1);
+        } else {
+            // TODO: make async?
+            if (!Fs.existsSync(this.targetUri)) {
+                Mkdirp.sync(this.targetUri);
+            }
         }
     }
 
-    async pushSegment(stream, name) {
+    async pushSegment(stream, name, meta) {
 
-        const target = this.prepareTargetStream(name);
+        const append = this.collect && this.segmentBytes !== 0;
+
+        if (this.s3) {
+            const params = {
+                Body: stream,
+                Key: Path.join(this.baseKey, name),
+                ContentType: meta.mime || 'video/MP2T',
+                CacheControl: `max-age=${Math.floor(this.cacheDuration / 1000)}, public`,
+                ContentLength: meta.size
+            };
+
+            return new Promise((resolve, reject) => {
+
+                this.s3.upload(params, (err, data) => {
 
+                    return err ? reject(err) : resolve(data);
+                });
+            });
+        }
+
+        const target = Fs.createWriteStream(Path.join(this.targetUri, name), { flags: append ? 'a' : 'w' });
         stream.pipe(target);
 
         const dispatcher = new Pati.EventDispatcher(stream);
@@ -68,15 +105,28 @@ class HlsUploader {
         }
     }
 
-    prepareTargetStream(name) {
+    async flushIndex(index) {
 
-        const append = this.collect && this.segmentBytes !== 0;
-        return Fs.createWriteStream(Path.join(this.targetUri, name), { flags: append ? 'a' : 'w' });
-    }
+        const indexString = index.toString().trim();
 
-    flushIndex(index) {
+        if (this.s3) {
+            const cacheTime = index.ended ? this.cacheDuration : index.target_duration * 1000 / 2;
 
-        const indexString = index.toString().trim();
+            const params = {
+                Body: indexString,
+                Key: Path.join(this.baseKey, this.indexName),
+                ContentType: 'application/vnd.apple.mpegURL',
+                CacheControl: `max-age=${Math.floor(cacheTime / 1000)}, public`
+            };
+
+            return new Promise((resolve, reject) => {
+
+                this.s3.putObject(params, (err, data) => {
+
+                    return err ? reject(err) : resolve(data);
+                });
+            });
+        }
 
         let appendString;
         if (this.lastIndexString && indexString.startsWith(this.lastIndexString)) {

+ 6 - 5
lib/recorder.js

@@ -22,7 +22,7 @@ function HlsStreamRecorder(reader, dst, options) {
   options = options || {};
 
   this.reader = reader;
-  this.dst = dst; // target directory
+  this.dst = dst; // target directory / s3 url
 
   this.nextSegmentSeq = -1;
   this.seq = 0;
@@ -191,7 +191,7 @@ HlsStreamRecorder.prototype.processInfo = function (segmentInfo, callback) {
   const meta = segmentInfo.file;
   const uri = `${this.segmentName(this.mapSeq, true)}.${Mime.extension(meta.mime)}`;
 
-  this.writeStream(segmentInfo.stream, uri, (err, bytesWritten) => {
+  this.writeStream(segmentInfo.stream, uri, meta, (err, bytesWritten) => {
 
     // only to report errors
     if (err) debug('stream error', err.stack || err);
@@ -244,9 +244,10 @@ HlsStreamRecorder.prototype.processSegment = function (segmentInfo, callback) {
     }
     else if (decrypted) {
       segment.keys = null;
+      meta = { mime: meta.mime, modified: meta.modified }; // size is no longer valid
     }
 
-    this.writeStream(stream, segment.uri, (err, bytesWritten) => {
+    this.writeStream(stream, segment.uri, meta, (err, bytesWritten) => {
 
       // only to report errors
       if (err) debug('stream error', err.stack || err);
@@ -271,9 +272,9 @@ HlsStreamRecorder.prototype.processSegment = function (segmentInfo, callback) {
   });
 };
 
-HlsStreamRecorder.prototype.writeStream = function (stream, name, callback) {
+HlsStreamRecorder.prototype.writeStream = function (stream, name, meta, callback) {
 
-  this.uploader.pushSegment(stream, name).then((written) => callback(null, written), callback);
+  this.uploader.pushSegment(stream, name, meta).then((written) => callback(null, written), callback);
 };
 
 HlsStreamRecorder.prototype.variantName = function(info, index) {

+ 1 - 0
package.json

@@ -26,6 +26,7 @@
   "author": "Gil Pedersen <gpdev@gpost.dk>",
   "license": "BSD-2-Clause",
   "dependencies": {
+    "aws-sdk": "^2.179.0",
     "commander": "^2.3.0",
     "debug": "^2.0.0",
     "hls-segment-reader": "^4.0.1",