Gil Pedersen 12 роки тому
батько
коміт
26c149245b
2 змінених файлів з 34 додано та 40 видалено
  1. 32 39
      bin/hlsdump
  2. 2 1
      package.json

+ 32 - 39
bin/hlsdump

@@ -35,6 +35,7 @@ var util = require('util'),
     fs = require('fs'),
     http = require('http');
 
+var streamprocess = require('streamprocess');
 var reader = require('../lib/reader'),
     tssmooth = require('../lib/tssmooth'),
     tsblast = require('../lib/tsblast'),
@@ -59,50 +60,42 @@ var r = reader(src, {highWaterMark:(hlsdump.concurrent || 1) - 1, fullStream:hls
 var totalDuration = 0, currentSegment = -1;
 var reading = false;
 
-r.on('readable', function() {
-  if (reading) return;// console.error('readable call error');
+streamprocess(r, function (obj, done) {
+  var meta = obj.meta;
+  var duration = obj.segment.duration;
+  var size = meta.size;
+  var stream = oncemore(obj.stream);
+  totalDuration += duration;
 
-  function grabnext() {
-    var obj = r.read();
-    if (obj) {
-      var meta = obj.meta;
-      var duration = obj.segment.duration;
-      var size = meta.size;
-      var stream = oncemore(obj.stream);
-      totalDuration += duration;
+  console.error('piping segment', meta.url);
 
-      console.error('piping segment', meta.url);
-
-      var stopwatch = stats.timer('fetchTime').start();
-      stream.once('close', 'end', 'error', function(err) {
-        stopwatch.end();
-      });
+  var stopwatch = stats.timer('fetchTime').start();
+  stream.once('close', 'end', 'error', function(err) {
+    stopwatch.end();
+  });
 
-      reading = true;
-      currentSegment = obj.seq;
-      stream.pipe(buffer, { end: false });
-      if (size === -1 || !hooked) {
-        size = 0;
-        obj.stream.on('data', function(chunk) {
-          size += chunk.length;
-          if (!hooked && size >= hlsdump.bufferSize)
-            hook(buffer);
-        });
-      }
-
-      stream.once('end', 'error', function(err) {
-        reading = false;
-        console.error('segment done at '+totalDuration.toFixed(0)+' seconds, avg bitrate (kbps):', (size / (duration * 1024/8)).toFixed(1));
-        if (err) {
-          stats.meter('streamErrors').mark();
-          console.error('stream error', err.stack || err);
-        }
+  reading = true;
+  currentSegment = obj.seq;
+  stream.pipe(buffer, { end: false });
+  if (size === -1 || !hooked) {
+    size = 0;
+    obj.stream.on('data', function(chunk) {
+      size += chunk.length;
+      if (!hooked && size >= hlsdump.bufferSize)
         hook(buffer);
-        grabnext();
-      });
-    }
+    });
   }
-  grabnext();
+
+  stream.once('end', 'error', function(err) {
+    reading = false;
+    console.error('segment done at '+totalDuration.toFixed(0)+' seconds, avg bitrate (kbps):', (size / (duration * 1024/8)).toFixed(1));
+    if (err) {
+      stats.meter('streamErrors').mark();
+      console.error('stream error', err.stack || err);
+    }
+    hook(buffer);
+    done();
+  });
 });
 
 r.once('index', function() {

+ 2 - 1
package.json

@@ -32,7 +32,8 @@
     "request": "~2.21.0",
     "xtend": "~2.0.3",
     "deep-equal": "0.0.0",
-    "measured": "~0.1.3"
+    "measured": "~0.1.3",
+    "streamprocess": "0.0.1"
   },
   "devDependencies": {
     "mocha": "~1.11.0",