#!/usr/bin/env node /* eslint-disable no-process-exit */ "use strict"; var hlsdump = require('commander'); hlsdump.version('0.0.0') .usage('[options] ') .option('-o, --output ', 'target file') .option('-u, --udp [host:port]', 'relay TS over UDP', function(val) { var r = { host: 'localhost', port: 1234 }; if (val) { var s = val.split(':'); if (s.length === 1) { r.port = parseInt(s[0], 10); } else { r.host = s[0]; r.port = parseInt(s[1], 10); } } return r; }) .option('-b, --buffer-size |full', 'try to buffer of input data (implies -s)', function(val) { if (val === 'full') return 0x80000000 - 1; return parseInt(val, 0); }) .option('-s, --sync', 'clock sync using stream PCR') .option('-f, --full-stream', 'fetch all stream data') .option('-c, --concurrent ', 'fetch using concurrent connections', parseInt) .option('-a, --user-agent ', 'HTTP User-Agent') .option('-i, --info-port ', 'report status using HTTP + json', parseInt) .option('--cookie ', 'add cookie header to key requests') .option('--key ', 'use oob key for decrypting segments', function(opt) {return new Buffer(opt, 'hex');}) .parse(process.argv); var url = require('url'), fs = require('fs'), http = require('http'), crypto = require('crypto'); var streamprocess = require('streamprocess'), oncemore = require('oncemore'), uristream = require('uristream'), HlsSegmentReader = require('hls-segment-reader'), UdpBlast = require('udp-blast'); var tssmooth = require('../lib/tssmooth'); var Passthrough = require('readable-stream/passthrough'); var stats = require('measured').createCollection(); var src = hlsdump.args[0]; if (!src) { hlsdump.help(); process.exit(-1); } if (hlsdump.bufferSize) hlsdump.sync = true; var r = new HlsSegmentReader(src, {highWaterMark:(hlsdump.concurrent || 1) - 1, fullStream:hlsdump.fullStream}); var totalDuration = 0, currentSegment = -1; var reading = false, hooked = false; var keyCache = {}; streamprocess(r, function (obj, done) { var meta = obj.meta; var duration = obj.segment.duration; var downloadSize = meta.size; var stream = oncemore(obj.stream); totalDuration += duration; console.error('piping segment', meta.url); var stopwatch = stats.timer('fetchTime').start(); stream.once('close', 'end', 'error', function() { stopwatch.end(); }); reading = true; currentSegment = obj.seq; // calculate size when missing if (downloadSize === -1) { downloadSize = 0; obj.stream.on('data', function(chunk) { downloadSize += chunk.length; }); } var keyData = obj.segment.key; if (keyData && keyData.method === 'AES-128' && keyData.uri && keyData.uri.length > 2) { fetchKey(function(err, key) { if (err) { console.error('key fetch failed:', err); return pushBuffer(stream); } var iv = new Buffer(keyData.iv.slice(-32), 'hex'); try { var decrypt = crypto.createDecipheriv('aes-128-cbc', key, iv); } catch (ex) { console.error('crypto setup failed:', ex.stack || ex); return pushBuffer(stream); } stream.on('error', function(err) { decrypt.emit('error', err); }); pushBuffer(oncemore(stream.pipe(decrypt))); }); } else { pushBuffer(stream); } function fetchKey(cb) { if (hlsdump.key) return cb(null, hlsdump.key); var uri = url.resolve(r.url, keyData.uri.slice(1,-1)); var entry = keyCache[uri]; if (entry && entry.length) return cb(null, keyCache[uri]); var key = new Buffer(0); var headers = {}; if (hlsdump.cookie) headers.Cookie = hlsdump.cookie; oncemore(uristream(uri, { headers:headers, whitelist:['http', 'https', 'data'], timeout: 10 * 1000 })) .on('data', function(chunk) { key = Buffer.concat([key, chunk]); }) .once('error', 'end', function(err) { keyCache[uri] = key; return cb(err, key); }); } function pushBuffer(stream) { if (!hooked) { // pull data and detect if we need to hook before end var buffered = 0; stream.on('data', function(chunk) { buffered += chunk.length; if (!hooked && buffered >= hlsdump.bufferSize) hook(buffer); }); } stream.pipe(buffer, { end: false }); stream.once('end', 'error', function(err) { reading = false; console.error('segment done at ' + totalDuration.toFixed(0) + ' seconds, avg bitrate (kbps):', (downloadSize / (duration * 1024 / 8)).toFixed(1)); if (err) { stats.meter('streamErrors').mark(); console.error('stream error', err.stack || err); } hook(buffer); done(); }); } }); r.once('index', function() { // wait until first index is returned before attaching error listener. // this will enable initials errors to throw r.on('error', function(err) { console.error('reader error', err.stack || err); }); }); r.on('end', function() { console.error('done'); }); var buffer = new Passthrough({highWaterMark:hlsdump.bufferSize}); var outputs = []; if (hlsdump.udp) { var dst = (hlsdump.udp === true) ? null : hlsdump.udp; outputs.push(new UdpBlast(dst, { packetSize: 7 * 188 })); } if (hlsdump.output) { if (hlsdump.output === '-') outputs.push(process.stdout); else outputs.push(fs.createWriteStream(hlsdump.output)); } // the hook is used to prebuffer function hook(stream) { if (hooked) return; console.error('hooking output'); var s = stream; if (hlsdump.sync) { var smooth = tssmooth(); smooth.on('unpipe', function() { this.unpipe(); }); smooth.on('warning', function(err) { console.error('smoothing error', err); }); s = s.pipe(smooth); } outputs.forEach(function (o) { s.pipe(o); }); hooked = true; } if (!hlsdump.sync || !(hlsdump.bufferSize > 0)) hook(buffer); // setup stat tracking stats.gauge('bufferBytes', function() { return buffer._readableState.length/* + buffer._writableState.length*/; }); stats.gauge('currentSegment', function() { return currentSegment; }); stats.gauge('index.first', function() { return r.index ? r.index.first_seq_no : -1; }); stats.gauge('index.last', function() { return r.index ? r.index.lastSeqNo() : -1; }); stats.gauge('totalDuration', function() { return totalDuration; }); stats.meter('streamErrors'); if (hlsdump.infoPort) { http.createServer(function (req, res) { if (req.method === 'GET') { var data = JSON.stringify(stats, null, ' '); res.writeHead(200, { 'Content-Type': 'application/json', 'Content-Length': data.length }); res.write(data); } res.end(); }).listen(hlsdump.infoPort); }