Files
ISR/web.py
T
admin 907fd90a5e refactor: extract web UI page into webui.html
web.py was 1700 lines, more than half of it a single embedded HTML
string. The page now lives in webui.html (loaded once at startup),
so the frontend gets real syntax highlighting and web.py is pure
Python. Dockerfile copies the new file alongside web.py.

Also: ASCII startup banner (the arrow glyph crashed web.py on Windows
when stdout was redirected to a cp1252 file), and README fixes —
document the ALSA PCM-name device fallback and drop the monitor
device row, which the ALSA backend never supported.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-10 11:49:35 +02:00

691 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
ISR Web — Browse and download recorded audio files.
Shows a chronological table of all recordings, allows inline playback,
download, and analyses WAV/FLAC files for loud sections using RMS.
Usage:
python web.py # serves recordings/ on port 8080
python web.py --dir /path/to/audio # custom recordings directory
python web.py --port 8888 # custom port
python web.py --threshold 0.03 # loudness threshold (0-1, default 0.05)
"""
import argparse
import json
import math
import os
import re
import shutil
import struct
import subprocess
import tempfile
import wave
from datetime import datetime
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from urllib.parse import parse_qs, unquote, urlparse
try:
import numpy as np
NUMPY_AVAILABLE = True
except ImportError:
NUMPY_AVAILABLE = False
try:
import soundfile as sf
SOUNDFILE_AVAILABLE = True
except ImportError:
SOUNDFILE_AVAILABLE = False
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
AUDIO_EXTENSIONS = {'.wav', '.mp3', '.ogg', '.flac', '.aac', '.opus'}
WINDOW_SAMPLES = 4800 # 100 ms at 48 kHz
LOUD_THRESHOLD = 0.05 # RMS 01 scale; sections above this are "interesting"
MIN_GAP_SECONDS = 2.0 # merge loud sections separated by less than this
MIME_TYPES = {
'.wav': 'audio/wav',
'.mp3': 'audio/mpeg',
'.ogg': 'audio/ogg',
'.flac': 'audio/flac',
'.aac': 'audio/aac',
'.opus': 'audio/ogg',
}
# ---------------------------------------------------------------------------
# Audio analysis helpers
# ---------------------------------------------------------------------------
def _get_audio_duration(path: Path):
"""Return duration in seconds for any supported audio file, or None."""
ext = path.suffix.lower()
if ext == '.wav':
try:
with wave.open(str(path), 'rb') as wf:
return wf.getnframes() / wf.getframerate()
except Exception:
return None
if SOUNDFILE_AVAILABLE and ext in ('.flac', '.ogg', '.opus'):
try:
with sf.SoundFile(path) as f:
return round(len(f) / f.samplerate, 2)
except Exception:
return None
return None
def _compute_rms_windows_wav(wf, channels: int, sampwidth: int, framerate: int,
window_samples: int):
"""Yield rms_0_to_1 for every window in the open wave file."""
while True:
raw = wf.readframes(window_samples)
if not raw:
break
n_samp = len(raw) // (sampwidth * channels)
if n_samp == 0:
break
if NUMPY_AVAILABLE:
arr = np.frombuffer(raw[:n_samp * sampwidth * channels], dtype='<i2')
if channels > 1:
arr = arr.reshape(-1, channels).mean(axis=1)
rms = float(np.sqrt(np.mean(arr.astype(np.float64) ** 2))) / 32768.0
else:
fmt = f'<{n_samp * channels}h'
samples = struct.unpack(fmt, raw[:n_samp * sampwidth * channels])
mono = samples[::channels] if channels > 1 else samples
rms = math.sqrt(sum(s * s for s in mono) / len(mono)) / 32768.0
yield round(rms, 5)
def _loud_sections(rms_values: list, window_dur: float, duration: float,
threshold: float, min_gap: float = MIN_GAP_SECONDS) -> list:
sections = []
start_t = None
last_loud_t = None
for i, rms in enumerate(rms_values):
t = i * window_dur
if rms >= threshold:
if start_t is None:
start_t = t
last_loud_t = t
else:
if start_t is not None and (t - last_loud_t) > min_gap:
sections.append({'start': round(start_t, 1),
'end': round(last_loud_t + window_dur, 1)})
start_t = None
last_loud_t = None
if start_t is not None:
sections.append({'start': round(start_t, 1), 'end': round(duration, 1)})
return sections
def _package_result(rms_values: list, framerate: int, n_frames: int,
window_samples: int, threshold: float,
min_gap: float = MIN_GAP_SECONDS) -> dict:
window_dur = window_samples / framerate
duration = n_frames / framerate
if len(rms_values) > 800:
step = len(rms_values) / 800
rms_display = [rms_values[int(i * step)] for i in range(800)]
else:
rms_display = rms_values
return {
'rms': rms_values,
'rms_display': rms_display,
'sections': _loud_sections(rms_values, window_dur, duration, threshold, min_gap),
'duration': round(duration, 2),
'window': round(window_dur, 4),
}
def analyze_wav(path: Path, window_samples: int = WINDOW_SAMPLES,
threshold: float = LOUD_THRESHOLD,
min_gap: float = MIN_GAP_SECONDS) -> dict:
try:
with wave.open(str(path), 'rb') as wf:
channels = wf.getnchannels()
sampwidth = wf.getsampwidth()
framerate = wf.getframerate()
n_frames = wf.getnframes()
rms_values = list(_compute_rms_windows_wav(
wf, channels, sampwidth, framerate, window_samples))
except Exception as e:
return {'error': str(e)}
return _package_result(rms_values, framerate, n_frames, window_samples, threshold, min_gap)
def analyze_flac(path: Path, window_samples: int = WINDOW_SAMPLES,
threshold: float = LOUD_THRESHOLD,
min_gap: float = MIN_GAP_SECONDS) -> dict:
"""Analyse a FLAC file for loudness. Requires numpy and soundfile."""
if not NUMPY_AVAILABLE or not SOUNDFILE_AVAILABLE:
return {'error': 'FLAC analysis requires: pip install numpy soundfile'}
try:
with sf.SoundFile(path) as f:
framerate = f.samplerate
channels = f.channels
n_frames = len(f)
rms_values = []
while True:
frames = f.read(window_samples, dtype='float32', always_2d=True)
if len(frames) == 0:
break
mono = frames.mean(axis=1) if channels > 1 else frames[:, 0]
rms = float(np.sqrt(np.mean(mono.astype(np.float64) ** 2)))
rms_values.append(round(rms, 5))
except Exception as e:
return {'error': str(e)}
return _package_result(rms_values, framerate, n_frames, window_samples, threshold, min_gap)
# ---------------------------------------------------------------------------
# Analysis cache helpers
# ---------------------------------------------------------------------------
def _analysis_cache_path(analyses_base: Path, recordings_base: Path, audio_path: Path) -> Path:
rel = audio_path.relative_to(recordings_base)
return analyses_base / rel.parent / (rel.name + '.analysis.json')
def prune_orphan_analyses(analyses_base: Path, recordings_base: Path):
if not analyses_base.exists():
return
removed = 0
for cache in analyses_base.rglob('*.analysis.json'):
rel = cache.relative_to(analyses_base)
audio_path = recordings_base / rel.parent / rel.name[:-len('.analysis.json')]
if not audio_path.exists():
try:
cache.unlink()
removed += 1
except Exception:
pass
if removed:
print(f'Pruned {removed} orphaned analysis cache file(s)')
# ---------------------------------------------------------------------------
# File listing
# ---------------------------------------------------------------------------
def list_files(recordings_dir: str):
"""Return list of audio file metadata dicts, sorted newest first."""
base = Path(recordings_dir)
if not base.exists():
return []
# Load active recordings written by isr.py
active_files: set = set()
status_path = base / 'status.json'
if status_path.exists():
try:
with open(status_path) as fh:
active_files = set(json.load(fh).get('active', []))
except Exception:
pass
files = []
for path in base.rglob('*'):
if path.suffix.lower() not in AUDIO_EXTENSIONS:
continue
stat = path.stat()
rel = str(path.relative_to(base)).replace('\\', '/')
is_active = rel in active_files
# Skip reading partial headers for in-progress files — the WAV nframes
# field and FLAC total_samples are both unfinalized while recording,
# producing wildly incorrect values (e.g. 53375995583:39:01).
duration = None if is_active else _get_audio_duration(path)
files.append({
'name': rel,
'size': stat.st_size,
'mtime': stat.st_mtime,
'date': datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'),
'duration': duration,
'ext': path.suffix.lower().lstrip('.'),
'recording': is_active,
})
files.sort(key=lambda f: f['mtime'], reverse=True)
return files
# ---------------------------------------------------------------------------
# HTTP handler
# ---------------------------------------------------------------------------
class _Handler(BaseHTTPRequestHandler):
recordings_dir: str = 'recordings'
analyses_dir: str = 'recordings/analyses'
threshold: float = LOUD_THRESHOLD
min_gap: float = MIN_GAP_SECONDS
def do_DELETE(self):
parsed = urlparse(self.path)
p = parsed.path
if p.startswith('/api/files/'):
self._api_delete(unquote(p[len('/api/files/'):]))
else:
self._send(404, b'Not found', 'text/plain')
def do_GET(self):
parsed = urlparse(self.path)
qs = parse_qs(parsed.query)
p = parsed.path
if p == '/':
self._html()
elif p == '/api/files':
self._api_files()
elif p == '/api/analyze':
self._api_analyze(qs)
elif p == '/api/status':
self._api_status()
elif p == '/api/storage':
self._api_storage()
elif p == '/api/config':
self._api_config()
elif p == '/api/cut':
self._api_cut(qs)
elif p.startswith('/download/'):
self._download(unquote(p[len('/download/'):]))
elif p.startswith('/stream/'):
self._stream(unquote(p[len('/stream/'):]))
else:
self._send(404, b'Not found', 'text/plain')
def _html(self):
self._send(200, _HTML.encode('utf-8'), 'text/html; charset=utf-8')
def _api_files(self):
files = list_files(self.recordings_dir)
recordings_base = Path(self.recordings_dir).resolve()
analyses_base = Path(self.analyses_dir).resolve()
for f in files:
if f.get('ext') in ('wav', 'flac') and not f.get('recording'):
cache_path = _analysis_cache_path(
analyses_base, recordings_base, recordings_base / f['name'])
try:
cached = json.loads(cache_path.read_text('utf-8'))
f['cached_analysis'] = {
'threshold': cached['threshold'],
'min_gap': cached['min_gap'],
}
except Exception:
f['cached_analysis'] = None
else:
f['cached_analysis'] = None
self._send(200, json.dumps(files).encode('utf-8'), 'application/json')
def _api_analyze(self, qs):
filename = qs.get('file', [None])[0]
if not filename:
self._json_err(400, 'missing file parameter')
return
path = self._safe_path(filename)
if path is None:
return
try:
threshold = float(qs.get('threshold', [self.threshold])[0])
threshold = max(0.0, min(1.0, threshold))
except (ValueError, TypeError):
threshold = self.threshold
try:
min_gap = float(qs.get('min_gap', [self.min_gap])[0])
min_gap = max(0.0, min(300.0, min_gap))
except (ValueError, TypeError):
min_gap = self.min_gap
if self._is_active(filename):
self._json_err(409, 'File is currently being recorded — analysis unavailable until recording stops')
return
recordings_base = Path(self.recordings_dir).resolve()
analyses_base = Path(self.analyses_dir).resolve()
cache_path = _analysis_cache_path(analyses_base, recordings_base, path)
try:
cached = json.loads(cache_path.read_text('utf-8'))
if cached.get('threshold') == threshold and cached.get('min_gap') == min_gap:
payload = dict(cached['result']); payload['cached'] = True
self._send(200, json.dumps(payload).encode('utf-8'), 'application/json')
return
except Exception:
pass
ext = path.suffix.lower()
if ext == '.wav':
result = analyze_wav(path, threshold=threshold, min_gap=min_gap)
elif ext == '.flac':
if not (NUMPY_AVAILABLE and SOUNDFILE_AVAILABLE):
self._json_err(400, 'FLAC analysis requires: pip install numpy soundfile')
return
result = analyze_flac(path, threshold=threshold, min_gap=min_gap)
else:
self._json_err(400, f'Loudness analysis is not available for {ext} files')
return
try:
cache_path.parent.mkdir(parents=True, exist_ok=True)
tmp = cache_path.with_suffix('.tmp')
tmp.write_text(json.dumps({'threshold': threshold, 'min_gap': min_gap, 'result': result}), 'utf-8')
os.replace(tmp, cache_path)
except Exception as e:
print(f'Warning: could not write analysis cache {cache_path}: {e}', flush=True)
self._send(200, json.dumps(result).encode('utf-8'), 'application/json')
def _api_status(self):
status_path = Path(self.recordings_dir) / 'status.json'
if status_path.exists():
try:
self._send(200, status_path.read_bytes(), 'application/json')
return
except Exception:
pass
self._send(200, b'{"active":[]}', 'application/json')
def _download(self, filename: str):
path = self._safe_path(filename)
if path is None:
return
size = path.stat().st_size
self.send_response(200)
self.send_header('Content-Type', 'application/octet-stream')
self.send_header('Content-Disposition', f'attachment; filename="{path.name}"')
self.send_header('Content-Length', str(size))
self.end_headers()
with open(path, 'rb') as fh:
self._copy_to_response(fh)
def _stream(self, filename: str):
"""Serve audio for inline playback with HTTP Range support."""
path = self._safe_path(filename)
if path is None:
return
content_type = MIME_TYPES.get(path.suffix.lower(), 'application/octet-stream')
size = path.stat().st_size
range_header = self.headers.get('Range', '')
m = re.match(r'bytes=(\d+)-(\d*)', range_header) if range_header else None
if m:
start = int(m.group(1))
end = int(m.group(2)) if m.group(2) else size - 1
end = min(end, size - 1)
if start > end or start >= size:
self._send(416, b'Range Not Satisfiable', 'text/plain')
return
length = end - start + 1
self.send_response(206)
self.send_header('Content-Type', content_type)
self.send_header('Content-Range', f'bytes {start}-{end}/{size}')
self.send_header('Content-Length', str(length))
self.send_header('Accept-Ranges', 'bytes')
self.end_headers()
with open(path, 'rb') as fh:
fh.seek(start)
self._copy_to_response(fh, length)
else:
self.send_response(200)
self.send_header('Content-Type', content_type)
self.send_header('Content-Length', str(size))
self.send_header('Accept-Ranges', 'bytes')
self.end_headers()
with open(path, 'rb') as fh:
self._copy_to_response(fh)
def _api_storage(self):
base = Path(self.recordings_dir)
used = 0
if base.exists():
used = sum(
p.stat().st_size
for p in base.rglob('*')
if p.is_file() and p.suffix.lower() in AUDIO_EXTENSIONS
)
try:
du = shutil.disk_usage(str(base) if base.exists() else '.')
disk_free, disk_total = du.free, du.total
except Exception:
disk_free = disk_total = None
data = json.dumps({'used': used, 'disk_free': disk_free, 'disk_total': disk_total})
self._send(200, data.encode(), 'application/json')
def _api_config(self):
data = json.dumps({'threshold': self.threshold, 'min_gap': self.min_gap})
self._send(200, data.encode(), 'application/json')
def _api_delete(self, filename: str):
if self._is_active(filename):
self._json_err(409, 'Cannot delete a file that is currently being recorded')
return
path = self._safe_path(filename)
if path is None:
return
try:
path.unlink()
except Exception as e:
self._json_err(500, f'Failed to delete: {e}')
return
try:
_analysis_cache_path(
Path(self.analyses_dir).resolve(),
Path(self.recordings_dir).resolve(),
path,
).unlink()
except Exception:
pass
self._send(200, json.dumps({'deleted': filename}).encode(), 'application/json')
def _api_cut(self, qs):
filename = qs.get('file', [None])[0]
start_s = qs.get('start', [None])[0]
end_s = qs.get('end', [None])[0]
if not filename or start_s is None or end_s is None:
self._json_err(400, 'missing file, start, or end parameter')
return
try:
start = float(start_s)
end = float(end_s)
except (ValueError, TypeError):
self._json_err(400, 'start and end must be numbers (seconds)')
return
if start < 0 or end <= start:
self._json_err(400, 'start must be ≥ 0 and end must be > start')
return
path = self._safe_path(filename)
if path is None:
return
if self._is_active(filename):
self._json_err(409, 'Cannot cut a file that is currently being recorded')
return
if not shutil.which('ffmpeg'):
self._json_err(500, 'ffmpeg is not available on this server')
return
ext = path.suffix.lower()
out_name = f'{path.stem}_cut_{int(start)}s-{int(end)}s{ext}'
# For lossless formats, re-encode (not copy) so the container header
# is rewritten with the correct duration/size. For lossy formats,
# copy is fine — the audio stops at the right frame regardless.
_lossless = {'.wav': ['-c:a', 'pcm_s16le'], '.flac': ['-c:a', 'flac']}
codec_args = _lossless.get(ext, ['-c', 'copy'])
fd, tmp_path = tempfile.mkstemp(suffix=ext)
os.close(fd)
try:
cmd = ['ffmpeg', '-y',
'-i', str(path),
'-ss', str(start), '-to', str(end),
'-vn'] + codec_args + [tmp_path]
result = subprocess.run(cmd, capture_output=True, timeout=120)
if result.returncode != 0:
err = result.stderr.decode('utf-8', errors='replace')[-400:]
self._json_err(500, f'ffmpeg error: {err}')
return
tmp_size = os.path.getsize(tmp_path)
content_type = MIME_TYPES.get(ext, 'application/octet-stream')
self.send_response(200)
self.send_header('Content-Type', content_type)
self.send_header('Content-Disposition', f'attachment; filename="{out_name}"')
self.send_header('Content-Length', str(tmp_size))
self.end_headers()
with open(tmp_path, 'rb') as fh:
self._copy_to_response(fh)
except subprocess.TimeoutExpired:
self._json_err(504, 'ffmpeg timed out — file may be too large')
finally:
try:
os.unlink(tmp_path)
except Exception:
pass
def _is_active(self, filename: str) -> bool:
"""True if isr.py reports this file as currently being recorded."""
try:
with open(Path(self.recordings_dir) / 'status.json') as fh:
return filename in json.load(fh).get('active', [])
except Exception:
return False
def _copy_to_response(self, fh, length=None):
"""Stream an open binary file to the client in 64 KB chunks."""
remaining = length
while remaining is None or remaining > 0:
chunk = fh.read(65536 if remaining is None else min(65536, remaining))
if not chunk:
break
self.wfile.write(chunk)
if remaining is not None:
remaining -= len(chunk)
def _safe_path(self, filename: str):
base = Path(self.recordings_dir).resolve()
try:
path = (base / filename).resolve()
path.relative_to(base)
except Exception:
self._send(403, b'Forbidden', 'text/plain')
return None
if not path.is_file():
self._send(404, b'Not found', 'text/plain')
return None
return path
def _send(self, code: int, body: bytes, content_type: str):
self.send_response(code)
self.send_header('Content-Type', content_type)
self.send_header('Content-Length', str(len(body)))
self.end_headers()
self.wfile.write(body)
def _json_err(self, code: int, msg: str):
self._send(code, json.dumps({'error': msg}).encode('utf-8'), 'application/json')
def log_message(self, fmt, *args):
pass
# ---------------------------------------------------------------------------
# UI page — single-page HTML/CSS/JS, loaded once at startup
# ---------------------------------------------------------------------------
_HTML = (Path(__file__).resolve().parent / 'webui.html').read_text(encoding='utf-8')
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description='ISR Web — audio archive browser')
parser.add_argument('--dir', default='recordings',
help='Recordings directory (default: recordings)')
parser.add_argument('--port', type=int, default=8080,
help='HTTP port (default: 8080)')
parser.add_argument('--host', default='0.0.0.0',
help='Bind address (default: 0.0.0.0)')
parser.add_argument('--threshold', type=float, default=LOUD_THRESHOLD,
help=f'RMS loudness threshold 01 (default: {LOUD_THRESHOLD})')
parser.add_argument('--min-gap', type=float, default=MIN_GAP_SECONDS,
help=f'Seconds gap for merging loud sections (default: {MIN_GAP_SECONDS})')
parser.add_argument('--analyses-dir', default=None,
help='Directory for analysis cache files (default: <recordings-dir>/analyses)')
args = parser.parse_args()
rec_dir = Path(args.dir).resolve()
analyses_dir = Path(args.analyses_dir).resolve() if args.analyses_dir else rec_dir / 'analyses'
if not rec_dir.exists():
print(f"Warning: recordings directory '{rec_dir}' does not exist yet.")
prune_orphan_analyses(analyses_dir, rec_dir)
_analyses_dir = analyses_dir # class body can't close over a name it also assigns
class Handler(_Handler):
recordings_dir = str(rec_dir)
analyses_dir = str(_analyses_dir)
threshold = args.threshold
min_gap = args.min_gap
server = ThreadingHTTPServer((args.host, args.port), Handler)
print(f"ISR Web running on http://{args.host}:{args.port}/")
print(f"Recordings dir: {rec_dir}")
print(f"Analyses dir: {analyses_dir}")
print(f"Loud threshold: {args.threshold}")
if not NUMPY_AVAILABLE:
print("Note: numpy not installed — WAV RMS uses pure Python (slower); FLAC analysis unavailable")
elif not SOUNDFILE_AVAILABLE:
print("Note: soundfile not installed — FLAC loudness analysis unavailable")
print("Stop with Ctrl+C\n")
try:
server.serve_forever()
except KeyboardInterrupt:
print("Stopped.")
if __name__ == '__main__':
main()