#!/usr/bin/env python3 """ ISR Web — Browse and download recorded audio files. Shows a chronological table of all recordings, allows download, and analyses WAV files for loud sections using RMS. Usage: python web.py # serves recordings/ on port 8080 python web.py --dir /path/to/audio # custom recordings directory python web.py --port 8888 # custom port python web.py --threshold 0.03 # loudness threshold (0-1, default 0.05) """ import argparse import json import math import os import struct import wave from datetime import datetime from http.server import BaseHTTPRequestHandler, HTTPServer from pathlib import Path from urllib.parse import parse_qs, unquote, urlparse try: import numpy as np NUMPY_AVAILABLE = True except ImportError: NUMPY_AVAILABLE = False # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- AUDIO_EXTENSIONS = {'.wav', '.mp3', '.ogg', '.flac', '.aac', '.opus'} WINDOW_SAMPLES = 4800 # 100 ms at 48 kHz LOUD_THRESHOLD = 0.05 # RMS 0–1 scale; sections above this are "interesting" MIN_GAP_SECONDS = 2.0 # merge loud sections separated by less than this # --------------------------------------------------------------------------- # Audio helpers # --------------------------------------------------------------------------- def _get_wav_info(path: Path): """Return (duration_seconds, sample_rate, channels) or (None, None, None).""" try: with wave.open(str(path), 'rb') as wf: return wf.getnframes() / wf.getframerate(), wf.getframerate(), wf.getnchannels() except Exception: return None, None, None def _compute_rms_windows(wf, channels: int, sampwidth: int, framerate: int, window_samples: int): """Yield (time_seconds, rms_0_to_1) for every window in the open wave file.""" frame_pos = 0 while True: raw = wf.readframes(window_samples) if not raw: break n_samp = len(raw) // (sampwidth * channels) if n_samp == 0: break if NUMPY_AVAILABLE: arr = np.frombuffer(raw[:n_samp * sampwidth * channels], dtype=' 1: arr = arr.reshape(-1, channels).mean(axis=1) rms = float(np.sqrt(np.mean(arr.astype(np.float64) ** 2))) / 32768.0 else: fmt = f'<{n_samp * channels}h' samples = struct.unpack(fmt, raw[:n_samp * sampwidth * channels]) mono = samples[::channels] if channels > 1 else samples rms = math.sqrt(sum(s * s for s in mono) / len(mono)) / 32768.0 yield frame_pos / framerate, round(rms, 5) frame_pos += window_samples def analyze_wav(path: Path, window_samples: int = WINDOW_SAMPLES, threshold: float = LOUD_THRESHOLD): """ Analyse a WAV file. Returns a dict with: rms — full list of RMS values (one per window) rms_display — downsampled to ≤800 points for the sparkline sections — list of {start, end} dicts for loud passages duration — total seconds window — window duration in seconds """ try: with wave.open(str(path), 'rb') as wf: channels = wf.getnchannels() sampwidth = wf.getsampwidth() framerate = wf.getframerate() n_frames = wf.getnframes() rms_values = [rms for _, rms in _compute_rms_windows(wf, channels, sampwidth, framerate, window_samples)] except Exception as e: return {'error': str(e)} window_dur = window_samples / framerate duration = n_frames / framerate # Downsample for the sparkline (max 800 bars) if len(rms_values) > 800: step = len(rms_values) / 800 rms_display = [rms_values[int(i * step)] for i in range(800)] else: rms_display = rms_values # Find loud sections sections: list = [] start_t = None last_loud_t = None for i, rms in enumerate(rms_values): t = i * window_dur if rms >= threshold: if start_t is None: start_t = t last_loud_t = t else: if start_t is not None: gap = t - last_loud_t if gap > MIN_GAP_SECONDS: sections.append({'start': round(start_t, 1), 'end': round(last_loud_t + window_dur, 1)}) start_t = None last_loud_t = None if start_t is not None: sections.append({'start': round(start_t, 1), 'end': round(duration, 1)}) return { 'rms': rms_values, 'rms_display': rms_display, 'sections': sections, 'duration': round(duration, 2), 'window': round(window_dur, 4), } # --------------------------------------------------------------------------- # File listing # --------------------------------------------------------------------------- def list_files(recordings_dir: str): """Return list of audio file metadata dicts, sorted newest first.""" base = Path(recordings_dir) if not base.exists(): return [] files = [] for path in base.rglob('*'): if path.suffix.lower() not in AUDIO_EXTENSIONS: continue stat = path.stat() rel = path.relative_to(base) if path.suffix.lower() == '.wav': duration, _, _ = _get_wav_info(path) else: duration = None files.append({ 'name': str(rel).replace('\\', '/'), 'size': stat.st_size, 'mtime': stat.st_mtime, 'date': datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'), 'duration': duration, 'ext': path.suffix.lower().lstrip('.'), }) files.sort(key=lambda f: f['mtime'], reverse=True) return files # --------------------------------------------------------------------------- # HTTP handler # --------------------------------------------------------------------------- class _Handler(BaseHTTPRequestHandler): recordings_dir: str = 'recordings' threshold: float = LOUD_THRESHOLD # ---- routing ----------------------------------------------------------- def do_GET(self): parsed = urlparse(self.path) qs = parse_qs(parsed.query) p = parsed.path if p == '/': self._html() elif p == '/api/files': self._api_files() elif p == '/api/analyze': self._api_analyze(qs) elif p.startswith('/download/'): self._download(unquote(p[len('/download/'):])) else: self._send(404, b'Not found', 'text/plain') # ---- endpoints --------------------------------------------------------- def _html(self): self._send(200, _HTML.encode('utf-8'), 'text/html; charset=utf-8') def _api_files(self): data = json.dumps(list_files(self.recordings_dir)).encode('utf-8') self._send(200, data, 'application/json') def _api_analyze(self, qs): filename = qs.get('file', [None])[0] if not filename: self._json_err(400, 'missing file parameter') return path = self._safe_path(filename) if path is None: return if path.suffix.lower() != '.wav': self._json_err(400, 'loudness analysis is only available for WAV files') return result = analyze_wav(path, threshold=self.threshold) data = json.dumps(result).encode('utf-8') self._send(200, data, 'application/json') def _download(self, filename: str): path = self._safe_path(filename) if path is None: return size = path.stat().st_size self.send_response(200) self.send_header('Content-Type', 'application/octet-stream') self.send_header('Content-Disposition', f'attachment; filename="{path.name}"') self.send_header('Content-Length', str(size)) self.end_headers() with open(path, 'rb') as fh: while True: chunk = fh.read(65536) if not chunk: break self.wfile.write(chunk) # ---- helpers ----------------------------------------------------------- def _safe_path(self, filename: str): """Resolve filename within recordings_dir; return None and send error if invalid.""" base = Path(self.recordings_dir).resolve() try: path = (base / filename).resolve() path.relative_to(base) # raises ValueError if outside base except (ValueError, Exception): self._send(403, b'Forbidden', 'text/plain') return None if not path.exists(): self._send(404, b'Not found', 'text/plain') return None return path def _send(self, code: int, body: bytes, content_type: str): self.send_response(code) self.send_header('Content-Type', content_type) self.send_header('Content-Length', str(len(body))) self.end_headers() self.wfile.write(body) def _json_err(self, code: int, msg: str): self._send(code, json.dumps({'error': msg}).encode('utf-8'), 'application/json') def log_message(self, fmt, *args): pass # suppress default access log noise # --------------------------------------------------------------------------- # Embedded HTML/CSS/JS # --------------------------------------------------------------------------- _HTML = r""" ISR Archive

ISR Archive

Loading…
File Date Duration Size Waveform / Loud sections
""" # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- def main(): parser = argparse.ArgumentParser(description='ISR Web — audio archive browser') parser.add_argument('--dir', default='recordings', help='Recordings directory (default: recordings)') parser.add_argument('--port', type=int, default=8080, help='HTTP port (default: 8080)') parser.add_argument('--host', default='0.0.0.0', help='Bind address (default: 0.0.0.0)') parser.add_argument('--threshold', type=float, default=LOUD_THRESHOLD, help=f'RMS loudness threshold 0–1 (default: {LOUD_THRESHOLD})') args = parser.parse_args() rec_dir = Path(args.dir) if not rec_dir.exists(): print(f"Warning: recordings directory '{args.dir}' does not exist yet.") class Handler(_Handler): recordings_dir = str(rec_dir.resolve()) threshold = args.threshold server = HTTPServer((args.host, args.port), Handler) print(f"ISR Web running → http://{args.host}:{args.port}/") print(f"Recordings dir → {rec_dir.resolve()}") print(f"Loud threshold → {args.threshold}") if not NUMPY_AVAILABLE: print("Note: numpy not installed — RMS analysis uses pure Python (slower for large files)") print("Stop with Ctrl+C\n") try: server.serve_forever() except KeyboardInterrupt: print("Stopped.") if __name__ == '__main__': main()