#!/usr/bin/env node "use strict"; var hlsdump = require('commander'); hlsdump.version('0.0.0') .usage('[options] ') .option('-o, --output ', 'target file') .option('-u, --udp [host:port]', 'relay TS over UDP', function(val) { var r = { host:'localhost', port:1234 }; if (val) { var s = val.split(':'); if (s.length === 1) { r.port = parseInt(s[0], 10); } else { r.host = s[0]; r.port = parseInt(s[1], 10); } } return r; }) .option('-b, --buffer-size |full', 'try to buffer of input data (implies -s)', function(val) { if (val === 'full') return 0x80000000-1; return parseInt(val, 0); }) .option('-s, --sync', 'clock sync using stream PCR') .option('-f, --full-stream', 'fetch all stream data') .option('-c, --concurrent ', 'fetch using concurrent connections', parseInt) .option('-a, --user-agent ', 'HTTP User-Agent') .option('-i, --info-port ', 'report status using HTTP + json', parseInt) .parse(process.argv); var util = require('util'), url = require('url'), fs = require('fs'), http = require('http'); var streamprocess = require('streamprocess'), oncemore = require('oncemore'); var reader = require('../lib/reader'), tssmooth = require('../lib/tssmooth'), tsblast = require('../lib/tsblast'); try { var Passthrough = require('stream').Passthrough; assert(Passthrough); } catch (e) { var Passthrough = require('readable-stream/passthrough'); } var stats = require('measured').createCollection(); var src = hlsdump.args[0]; if (!src) return hlsdump.help(); if (hlsdump.bufferSize) hlsdump.sync = true; var r = reader(src, {highWaterMark:(hlsdump.concurrent || 1) - 1, fullStream:hlsdump.fullStream}); var totalDuration = 0, currentSegment = -1; var reading = false; streamprocess(r, function (obj, done) { var meta = obj.meta; var duration = obj.segment.duration; var size = meta.size; var stream = oncemore(obj.stream); totalDuration += duration; console.error('piping segment', meta.url); var stopwatch = stats.timer('fetchTime').start(); stream.once('close', 'end', 'error', function(err) { stopwatch.end(); }); reading = true; currentSegment = obj.seq; stream.pipe(buffer, { end: false }); if (size === -1 || !hooked) { size = 0; obj.stream.on('data', function(chunk) { size += chunk.length; if (!hooked && size >= hlsdump.bufferSize) hook(buffer); }); } stream.once('end', 'error', function(err) { reading = false; console.error('segment done at '+totalDuration.toFixed(0)+' seconds, avg bitrate (kbps):', (size / (duration * 1024/8)).toFixed(1)); if (err) { stats.meter('streamErrors').mark(); console.error('stream error', err.stack || err); } hook(buffer); done(); }); }); r.once('index', function() { // wait until first index is returned before attaching error listener. // this will enable initials errors to throw r.on('error', function(err) { console.error('reader error', err.stack || err); }); }); r.on('end', function() { console.error('done'); }); var buffer = new Passthrough({highWaterMark:hlsdump.bufferSize}); var outputs = []; if (hlsdump.udp) outputs.push(tsblast(hlsdump.udp)); if (hlsdump.output) { if (hlsdump.output === '-') outputs.push(process.stdout); else outputs.push(fs.createWriteStream(hlsdump.output)); } // the hook is used to prebuffer var hooked = false; function hook(stream) { if (hooked) return; console.error('hooking output'); var s = stream; if (hlsdump.sync) { var smooth = tssmooth(); smooth.on('unpipe', function() { this.unpipe(); }); smooth.on('warning', function(err) { console.error('smoothing error', err); }); s = s.pipe(smooth); } outputs.forEach(function (o) { s.pipe(o); }); hooked = true; } if (!hlsdump.sync) hook(buffer); // setup stat tracking stats.gauge('bufferBytes', function() { return buffer._readableState.length/* + buffer._writableState.length*/; }); stats.gauge('currentSegment', function() { return currentSegment; }); stats.gauge('index.first', function() { return r.index ? r.index.first_seq_no : -1; }); stats.gauge('index.last', function() { return r.index ? r.index.lastSeqNo() : -1; }); stats.gauge('totalDuration', function() { return totalDuration; }); stats.meter('streamErrors'); if (hlsdump.infoPort) { http.createServer(function (req, res) { if (req.method === 'GET') { var data = JSON.stringify(stats, null, ' '); res.writeHead(200, { 'Content-Type': 'application/json', 'Content-Length': data.length }); res.write(data); } res.end(); }).listen(hlsdump.infoPort); }