#!/usr/bin/env python3 """ ISR Web — Browse and download recorded audio files. Shows a chronological table of all recordings, allows inline playback, download, and analyses WAV/FLAC files for loud sections using RMS. Usage: python web.py # serves recordings/ on port 8080 python web.py --dir /path/to/audio # custom recordings directory python web.py --port 8888 # custom port python web.py --threshold 0.03 # loudness threshold (0-1, default 0.05) """ import argparse import json import math import os import re import shutil import struct import subprocess import tempfile import wave from datetime import datetime from http.server import BaseHTTPRequestHandler, HTTPServer, ThreadingHTTPServer from pathlib import Path from urllib.parse import parse_qs, unquote, urlparse try: import numpy as np NUMPY_AVAILABLE = True except ImportError: NUMPY_AVAILABLE = False try: import soundfile as sf SOUNDFILE_AVAILABLE = True except ImportError: SOUNDFILE_AVAILABLE = False # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- AUDIO_EXTENSIONS = {'.wav', '.mp3', '.ogg', '.flac', '.aac', '.opus'} WINDOW_SAMPLES = 4800 # 100 ms at 48 kHz LOUD_THRESHOLD = 0.05 # RMS 0–1 scale; sections above this are "interesting" MIN_GAP_SECONDS = 2.0 # merge loud sections separated by less than this MIME_TYPES = { '.wav': 'audio/wav', '.mp3': 'audio/mpeg', '.ogg': 'audio/ogg', '.flac': 'audio/flac', '.aac': 'audio/aac', '.opus': 'audio/ogg', } # --------------------------------------------------------------------------- # Audio analysis helpers # --------------------------------------------------------------------------- def _get_wav_info(path: Path): """Return (duration_seconds, sample_rate, channels) or (None, None, None).""" try: with wave.open(str(path), 'rb') as wf: return wf.getnframes() / wf.getframerate(), wf.getframerate(), wf.getnchannels() except Exception: return None, None, None def _get_audio_duration(path: Path): """Return duration in seconds for any supported audio file, or None.""" ext = path.suffix.lower() if ext == '.wav': dur, _, _ = _get_wav_info(path) return dur if SOUNDFILE_AVAILABLE and ext in ('.flac', '.ogg', '.opus'): try: with sf.SoundFile(path) as f: return round(len(f) / f.samplerate, 2) except Exception: return None return None def _compute_rms_windows_wav(wf, channels: int, sampwidth: int, framerate: int, window_samples: int): """Yield rms_0_to_1 for every window in the open wave file.""" frame_pos = 0 while True: raw = wf.readframes(window_samples) if not raw: break n_samp = len(raw) // (sampwidth * channels) if n_samp == 0: break if NUMPY_AVAILABLE: arr = np.frombuffer(raw[:n_samp * sampwidth * channels], dtype=' 1: arr = arr.reshape(-1, channels).mean(axis=1) rms = float(np.sqrt(np.mean(arr.astype(np.float64) ** 2))) / 32768.0 else: fmt = f'<{n_samp * channels}h' samples = struct.unpack(fmt, raw[:n_samp * sampwidth * channels]) mono = samples[::channels] if channels > 1 else samples rms = math.sqrt(sum(s * s for s in mono) / len(mono)) / 32768.0 yield round(rms, 5) frame_pos += window_samples def _loud_sections(rms_values: list, window_dur: float, duration: float, threshold: float) -> list: sections = [] start_t = None last_loud_t = None for i, rms in enumerate(rms_values): t = i * window_dur if rms >= threshold: if start_t is None: start_t = t last_loud_t = t else: if start_t is not None and (t - last_loud_t) > MIN_GAP_SECONDS: sections.append({'start': round(start_t, 1), 'end': round(last_loud_t + window_dur, 1)}) start_t = None last_loud_t = None if start_t is not None: sections.append({'start': round(start_t, 1), 'end': round(duration, 1)}) return sections def _package_result(rms_values: list, framerate: int, n_frames: int, window_samples: int, threshold: float) -> dict: window_dur = window_samples / framerate duration = n_frames / framerate if len(rms_values) > 800: step = len(rms_values) / 800 rms_display = [rms_values[int(i * step)] for i in range(800)] else: rms_display = rms_values return { 'rms': rms_values, 'rms_display': rms_display, 'sections': _loud_sections(rms_values, window_dur, duration, threshold), 'duration': round(duration, 2), 'window': round(window_dur, 4), } def analyze_wav(path: Path, window_samples: int = WINDOW_SAMPLES, threshold: float = LOUD_THRESHOLD) -> dict: try: with wave.open(str(path), 'rb') as wf: channels = wf.getnchannels() sampwidth = wf.getsampwidth() framerate = wf.getframerate() n_frames = wf.getnframes() rms_values = list(_compute_rms_windows_wav( wf, channels, sampwidth, framerate, window_samples)) except Exception as e: return {'error': str(e)} return _package_result(rms_values, framerate, n_frames, window_samples, threshold) def analyze_flac(path: Path, window_samples: int = WINDOW_SAMPLES, threshold: float = LOUD_THRESHOLD) -> dict: """Analyse a FLAC file for loudness. Requires numpy and soundfile.""" if not NUMPY_AVAILABLE or not SOUNDFILE_AVAILABLE: return {'error': 'FLAC analysis requires: pip install numpy soundfile'} try: with sf.SoundFile(path) as f: framerate = f.samplerate channels = f.channels n_frames = len(f) rms_values = [] while True: frames = f.read(window_samples, dtype='float32', always_2d=True) if len(frames) == 0: break mono = frames.mean(axis=1) if channels > 1 else frames[:, 0] rms = float(np.sqrt(np.mean(mono.astype(np.float64) ** 2))) rms_values.append(round(rms, 5)) except Exception as e: return {'error': str(e)} return _package_result(rms_values, framerate, n_frames, window_samples, threshold) # --------------------------------------------------------------------------- # File listing # --------------------------------------------------------------------------- def list_files(recordings_dir: str): """Return list of audio file metadata dicts, sorted newest first.""" base = Path(recordings_dir) if not base.exists(): return [] # Load active recordings written by isr.py active_files: set = set() status_path = base / 'status.json' if status_path.exists(): try: with open(status_path) as fh: active_files = set(json.load(fh).get('active', [])) except Exception: pass files = [] for path in base.rglob('*'): if path.suffix.lower() not in AUDIO_EXTENSIONS: continue stat = path.stat() rel = str(path.relative_to(base)).replace('\\', '/') is_active = rel in active_files # Skip reading partial headers for in-progress files — the WAV nframes # field and FLAC total_samples are both unfinalized while recording, # producing wildly incorrect values (e.g. 53375995583:39:01). duration = None if is_active else _get_audio_duration(path) files.append({ 'name': rel, 'size': stat.st_size, 'mtime': stat.st_mtime, 'date': datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'), 'duration': duration, 'ext': path.suffix.lower().lstrip('.'), 'recording': is_active, }) files.sort(key=lambda f: f['mtime'], reverse=True) return files # --------------------------------------------------------------------------- # HTTP handler # --------------------------------------------------------------------------- class _Handler(BaseHTTPRequestHandler): recordings_dir: str = 'recordings' threshold: float = LOUD_THRESHOLD def do_DELETE(self): parsed = urlparse(self.path) p = parsed.path if p.startswith('/api/files/'): self._api_delete(unquote(p[len('/api/files/'):])) else: self._send(404, b'Not found', 'text/plain') def do_GET(self): parsed = urlparse(self.path) qs = parse_qs(parsed.query) p = parsed.path if p == '/': self._html() elif p == '/api/files': self._api_files() elif p == '/api/analyze': self._api_analyze(qs) elif p == '/api/status': self._api_status() elif p == '/api/storage': self._api_storage() elif p == '/api/config': self._api_config() elif p == '/api/cut': self._api_cut(qs) elif p.startswith('/download/'): self._download(unquote(p[len('/download/'):])) elif p.startswith('/stream/'): self._stream(unquote(p[len('/stream/'):])) else: self._send(404, b'Not found', 'text/plain') def _html(self): self._send(200, _HTML.encode('utf-8'), 'text/html; charset=utf-8') def _api_files(self): data = json.dumps(list_files(self.recordings_dir)).encode('utf-8') self._send(200, data, 'application/json') def _api_analyze(self, qs): filename = qs.get('file', [None])[0] if not filename: self._json_err(400, 'missing file parameter') return path = self._safe_path(filename) if path is None: return try: threshold = float(qs.get('threshold', [self.threshold])[0]) threshold = max(0.0, min(1.0, threshold)) except (ValueError, TypeError): threshold = self.threshold status_path = Path(self.recordings_dir) / 'status.json' try: with open(status_path) as fh: if filename in set(json.load(fh).get('active', [])): self._json_err(409, 'File is currently being recorded — analysis unavailable until recording stops') return except Exception: pass ext = path.suffix.lower() if ext == '.wav': result = analyze_wav(path, threshold=threshold) elif ext == '.flac': if not (NUMPY_AVAILABLE and SOUNDFILE_AVAILABLE): self._json_err(400, 'FLAC analysis requires: pip install numpy soundfile') return result = analyze_flac(path, threshold=threshold) else: self._json_err(400, f'Loudness analysis is not available for {ext} files') return self._send(200, json.dumps(result).encode('utf-8'), 'application/json') def _api_status(self): status_path = Path(self.recordings_dir) / 'status.json' if status_path.exists(): try: self._send(200, status_path.read_bytes(), 'application/json') return except Exception: pass self._send(200, b'{"active":[]}', 'application/json') def _download(self, filename: str): path = self._safe_path(filename) if path is None: return size = path.stat().st_size self.send_response(200) self.send_header('Content-Type', 'application/octet-stream') self.send_header('Content-Disposition', f'attachment; filename="{path.name}"') self.send_header('Content-Length', str(size)) self.end_headers() with open(path, 'rb') as fh: while True: chunk = fh.read(65536) if not chunk: break self.wfile.write(chunk) def _stream(self, filename: str): """Serve audio for inline playback with HTTP Range support.""" path = self._safe_path(filename) if path is None: return content_type = MIME_TYPES.get(path.suffix.lower(), 'application/octet-stream') size = path.stat().st_size range_header = self.headers.get('Range', '') m = re.match(r'bytes=(\d+)-(\d*)', range_header) if range_header else None if m: start = int(m.group(1)) end = int(m.group(2)) if m.group(2) else size - 1 end = min(end, size - 1) if start > end or start >= size: self._send(416, b'Range Not Satisfiable', 'text/plain') return length = end - start + 1 self.send_response(206) self.send_header('Content-Type', content_type) self.send_header('Content-Range', f'bytes {start}-{end}/{size}') self.send_header('Content-Length', str(length)) self.send_header('Accept-Ranges', 'bytes') self.end_headers() with open(path, 'rb') as fh: fh.seek(start) remaining = length while remaining > 0: chunk = fh.read(min(65536, remaining)) if not chunk: break self.wfile.write(chunk) remaining -= len(chunk) else: self.send_response(200) self.send_header('Content-Type', content_type) self.send_header('Content-Length', str(size)) self.send_header('Accept-Ranges', 'bytes') self.end_headers() with open(path, 'rb') as fh: while True: chunk = fh.read(65536) if not chunk: break self.wfile.write(chunk) def _api_storage(self): base = Path(self.recordings_dir) used = 0 if base.exists(): used = sum( p.stat().st_size for p in base.rglob('*') if p.is_file() and p.suffix.lower() in AUDIO_EXTENSIONS ) try: du = shutil.disk_usage(str(base) if base.exists() else '.') disk_free, disk_total = du.free, du.total except Exception: disk_free = disk_total = None data = json.dumps({'used': used, 'disk_free': disk_free, 'disk_total': disk_total}) self._send(200, data.encode(), 'application/json') def _api_config(self): data = json.dumps({'threshold': self.threshold}) self._send(200, data.encode(), 'application/json') def _api_delete(self, filename: str): status_path = Path(self.recordings_dir) / 'status.json' try: with open(status_path) as fh: if filename in set(json.load(fh).get('active', [])): self._json_err(409, 'Cannot delete a file that is currently being recorded') return except Exception: pass path = self._safe_path(filename) if path is None: return try: path.unlink() except Exception as e: self._json_err(500, f'Failed to delete: {e}') return self._send(200, json.dumps({'deleted': filename}).encode(), 'application/json') def _api_cut(self, qs): filename = qs.get('file', [None])[0] start_s = qs.get('start', [None])[0] end_s = qs.get('end', [None])[0] if not filename or start_s is None or end_s is None: self._json_err(400, 'missing file, start, or end parameter') return try: start = float(start_s) end = float(end_s) except (ValueError, TypeError): self._json_err(400, 'start and end must be numbers (seconds)') return if start < 0 or end <= start: self._json_err(400, 'start must be ≥ 0 and end must be > start') return path = self._safe_path(filename) if path is None: return status_path = Path(self.recordings_dir) / 'status.json' try: with open(status_path) as fh: if filename in set(json.load(fh).get('active', [])): self._json_err(409, 'Cannot cut a file that is currently being recorded') return except Exception: pass if not shutil.which('ffmpeg'): self._json_err(500, 'ffmpeg is not available on this server') return ext = path.suffix.lower() out_name = f'{path.stem}_cut_{int(start)}s-{int(end)}s{ext}' fd, tmp_path = tempfile.mkstemp(suffix=ext) os.close(fd) try: cmd = ['ffmpeg', '-y', '-i', str(path), '-ss', str(start), '-to', str(end), '-c', 'copy', tmp_path] result = subprocess.run(cmd, capture_output=True, timeout=120) if result.returncode != 0: err = result.stderr.decode('utf-8', errors='replace')[-400:] self._json_err(500, f'ffmpeg error: {err}') return tmp_size = os.path.getsize(tmp_path) content_type = MIME_TYPES.get(ext, 'application/octet-stream') self.send_response(200) self.send_header('Content-Type', content_type) self.send_header('Content-Disposition', f'attachment; filename="{out_name}"') self.send_header('Content-Length', str(tmp_size)) self.end_headers() with open(tmp_path, 'rb') as fh: while True: chunk = fh.read(65536) if not chunk: break self.wfile.write(chunk) except subprocess.TimeoutExpired: self._json_err(504, 'ffmpeg timed out — file may be too large') finally: try: os.unlink(tmp_path) except Exception: pass def _safe_path(self, filename: str): base = Path(self.recordings_dir).resolve() try: path = (base / filename).resolve() path.relative_to(base) except (ValueError, Exception): self._send(403, b'Forbidden', 'text/plain') return None if not path.is_file(): self._send(404, b'Not found', 'text/plain') return None return path def _send(self, code: int, body: bytes, content_type: str): self.send_response(code) self.send_header('Content-Type', content_type) self.send_header('Content-Length', str(len(body))) self.end_headers() self.wfile.write(body) def _json_err(self, code: int, msg: str): self._send(code, json.dumps({'error': msg}).encode('utf-8'), 'application/json') def log_message(self, fmt, *args): pass # --------------------------------------------------------------------------- # Embedded HTML / CSS / JS # --------------------------------------------------------------------------- _HTML = r""" ISR Archive

ISR Archive

Loading…
RMS 0–1 · sections above this value are marked loud
File Date Duration Size Loudness Actions
""" # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- def main(): parser = argparse.ArgumentParser(description='ISR Web — audio archive browser') parser.add_argument('--dir', default='recordings', help='Recordings directory (default: recordings)') parser.add_argument('--port', type=int, default=8080, help='HTTP port (default: 8080)') parser.add_argument('--host', default='0.0.0.0', help='Bind address (default: 0.0.0.0)') parser.add_argument('--threshold', type=float, default=LOUD_THRESHOLD, help=f'RMS loudness threshold 0–1 (default: {LOUD_THRESHOLD})') args = parser.parse_args() rec_dir = Path(args.dir) if not rec_dir.exists(): print(f"Warning: recordings directory '{args.dir}' does not exist yet.") class Handler(_Handler): recordings_dir = str(rec_dir.resolve()) threshold = args.threshold server = ThreadingHTTPServer((args.host, args.port), Handler) print(f"ISR Web running → http://{args.host}:{args.port}/") print(f"Recordings dir → {rec_dir.resolve()}") print(f"Loud threshold → {args.threshold}") if not NUMPY_AVAILABLE: print("Note: numpy not installed — WAV RMS uses pure Python (slower); FLAC analysis unavailable") elif not SOUNDFILE_AVAILABLE: print("Note: soundfile not installed — FLAC loudness analysis unavailable") print("Stop with Ctrl+C\n") try: server.serve_forever() except KeyboardInterrupt: print("Stopped.") if __name__ == '__main__': main()