#!/usr/bin/env python3 """ ISR Web — Browse and download recorded audio files. Shows a chronological table of all recordings, allows inline playback, download, and analyses WAV/FLAC files for loud sections using RMS. Usage: python web.py # serves recordings/ on port 8080 python web.py --dir /path/to/audio # custom recordings directory python web.py --port 8888 # custom port python web.py --threshold 0.03 # loudness threshold (0-1, default 0.05) """ import argparse import json import math import os import re import shutil import struct import subprocess import tempfile import threading import wave from datetime import datetime from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from urllib.parse import parse_qs, unquote, urlparse try: import numpy as np NUMPY_AVAILABLE = True except ImportError: NUMPY_AVAILABLE = False try: import soundfile as sf SOUNDFILE_AVAILABLE = True except ImportError: SOUNDFILE_AVAILABLE = False # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- AUDIO_EXTENSIONS = {'.wav', '.mp3', '.ogg', '.flac', '.aac', '.opus'} WINDOW_SAMPLES = 4800 # 100 ms at 48 kHz LOUD_THRESHOLD = 0.05 # RMS 0–1 scale; sections above this are "interesting" MIN_GAP_SECONDS = 2.0 # merge loud sections separated by less than this MIME_TYPES = { '.wav': 'audio/wav', '.mp3': 'audio/mpeg', '.ogg': 'audio/ogg', '.flac': 'audio/flac', '.aac': 'audio/aac', '.opus': 'audio/ogg', } # --------------------------------------------------------------------------- # Audio analysis helpers # --------------------------------------------------------------------------- def _live_wav_header(path: Path, size: int): """Return the WAV header (through the 'data' chunk header) with RIFF and data sizes rewritten to match the current file size, or None. While a WAV file is still being recorded its header claims ~0 frames, so browsers show no duration and refuse to seek. Serving a header patched to the bytes recorded so far fixes both; the patch is the same length as the original header, so all byte offsets and Range math stay valid. """ try: with open(path, 'rb') as fh: hdr = fh.read(512) if len(hdr) < 44 or hdr[:4] != b'RIFF' or hdr[8:12] != b'WAVE': return None pos = 12 while pos + 8 <= len(hdr): chunk_id = hdr[pos:pos + 4] chunk_size = int.from_bytes(hdr[pos + 4:pos + 8], 'little') if chunk_id == b'data': data_off = pos + 8 patched = bytearray(hdr[:data_off]) patched[4:8] = (size - 8).to_bytes(4, 'little') patched[pos + 4:pos + 8] = (size - data_off).to_bytes(4, 'little') return bytes(patched) pos += 8 + chunk_size + (chunk_size & 1) return None except Exception: return None # CRC-8 (poly 0x07) used by FLAC frame headers _CRC8_TABLE = [] for _i in range(256): _c = _i for _ in range(8): _c = ((_c << 1) ^ 0x07) & 0xFF if _c & 0x80 else (_c << 1) & 0xFF _CRC8_TABLE.append(_c) _FLAC_BLOCKSIZES = {1: 192, 2: 576, 3: 1152, 4: 2304, 5: 4608, 8: 256, 9: 512, 10: 1024, 11: 2048, 12: 4096, 13: 8192, 14: 16384, 15: 32768} def _crc8(data: bytes) -> int: crc = 0 for b in data: crc = _CRC8_TABLE[crc ^ b] return crc def _flac_coded_number(buf: bytes, pos: int): """Decode the UTF-8-style frame/sample number; returns (value, next_pos).""" b0 = buf[pos] if b0 < 0x80: return b0, pos + 1 n, mask = 0, 0x40 while b0 & mask: n += 1 mask >>= 1 if n < 1 or n > 6: # 10xxxxxx is not a valid leading byte return None val = b0 & (mask - 1) for i in range(1, n + 1): c = buf[pos + i] if c & 0xC0 != 0x80: return None val = (val << 6) | (c & 0x3F) return val, pos + 1 + n def _flac_frame_samples(buf: bytes, pos: int, fixed_bs: int): """If a valid FLAC frame header starts at pos, return the stream sample count through the end of that frame, else None. Validity is confirmed by the header's CRC-8, so false sync matches in compressed data are rejected.""" try: variable = buf[pos + 1] & 0x01 bs_code = buf[pos + 2] >> 4 sr_code = buf[pos + 2] & 0x0F if bs_code == 0 or sr_code == 15 or buf[pos + 3] & 0x01: return None if (buf[pos + 3] >> 4) > 10: # reserved channel assignment return None coded = _flac_coded_number(buf, pos + 4) if coded is None: return None val, p = coded bs = _FLAC_BLOCKSIZES.get(bs_code) if bs_code == 6: bs = buf[p] + 1 p += 1 elif bs_code == 7: bs = int.from_bytes(buf[p:p + 2], 'big') + 1 p += 2 if sr_code == 12: p += 1 elif sr_code in (13, 14): p += 2 if _crc8(buf[pos:p]) != buf[p]: return None if variable: # val is the frame's starting sample number return val + (bs or 0) return val * (fixed_bs or bs or 4096) + (bs or 0) except IndexError: return None def _live_flac_header(path: Path, size: int): """Return the first 26 bytes of a FLAC file with STREAMINFO total_samples patched to the samples recorded so far, or None. Like _live_wav_header, but FLAC duration cannot be derived from the byte count (variable compression). Instead the sample count is parsed out of the last frame header in the file tail — each FLAC frame carries its own frame/sample number. """ try: with open(path, 'rb') as fh: head = fh.read(42) if len(head) < 42 or head[:4] != b'fLaC': return None # STREAMINFO must be the first metadata block if head[4] & 0x7F != 0 or int.from_bytes(head[5:8], 'big') != 34: return None fixed_bs = int.from_bytes(head[8:10], 'big') tail_len = min(size, 65536) fh.seek(size - tail_len) buf = fh.read(tail_len) samples = None for i in range(len(buf) - 20, -1, -1): if buf[i] == 0xFF and (buf[i + 1] & 0xFC) == 0xF8: samples = _flac_frame_samples(buf, i, fixed_bs) if samples: break if not samples: return None # Bytes 18-25 hold: sample rate (20 bits) | channels-1 (3) | # bps-1 (5) | total_samples (36). Replace only the low 36 bits. field = int.from_bytes(head[18:26], 'big') field = (field & ~((1 << 36) - 1)) | min(samples, (1 << 36) - 1) patched = bytearray(head[:26]) patched[18:26] = field.to_bytes(8, 'big') return bytes(patched) except Exception: return None def _get_audio_duration(path: Path): """Return duration in seconds for any supported audio file, or None.""" ext = path.suffix.lower() if ext == '.wav': try: with wave.open(str(path), 'rb') as wf: return wf.getnframes() / wf.getframerate() except Exception: return None if SOUNDFILE_AVAILABLE and ext in ('.flac', '.ogg', '.opus'): try: with sf.SoundFile(path) as f: return round(len(f) / f.samplerate, 2) except Exception: return None return None def _compute_rms_windows_wav(wf, channels: int, sampwidth: int, framerate: int, window_samples: int): """Yield rms_0_to_1 for every window in the open wave file.""" while True: raw = wf.readframes(window_samples) if not raw: break n_samp = len(raw) // (sampwidth * channels) if n_samp == 0: break if NUMPY_AVAILABLE: arr = np.frombuffer(raw[:n_samp * sampwidth * channels], dtype=' 1: arr = arr.reshape(-1, channels).mean(axis=1) rms = float(np.sqrt(np.mean(arr.astype(np.float64) ** 2))) / 32768.0 else: fmt = f'<{n_samp * channels}h' samples = struct.unpack(fmt, raw[:n_samp * sampwidth * channels]) mono = samples[::channels] if channels > 1 else samples rms = math.sqrt(sum(s * s for s in mono) / len(mono)) / 32768.0 yield round(rms, 5) def _loud_sections(rms_values: list, window_dur: float, duration: float, threshold: float, min_gap: float = MIN_GAP_SECONDS) -> list: sections = [] start_t = None last_loud_t = None for i, rms in enumerate(rms_values): t = i * window_dur if rms >= threshold: if start_t is None: start_t = t last_loud_t = t else: if start_t is not None and (t - last_loud_t) > min_gap: sections.append({'start': round(start_t, 1), 'end': round(last_loud_t + window_dur, 1)}) start_t = None last_loud_t = None if start_t is not None: sections.append({'start': round(start_t, 1), 'end': round(duration, 1)}) return sections def _package_result(rms_values: list, framerate: int, n_frames: int, window_samples: int, threshold: float, min_gap: float = MIN_GAP_SECONDS) -> dict: window_dur = window_samples / framerate duration = n_frames / framerate if len(rms_values) > 800: step = len(rms_values) / 800 rms_display = [rms_values[int(i * step)] for i in range(800)] else: rms_display = rms_values # Note: the full per-window RMS list is deliberately NOT returned — the UI # only renders rms_display (~800 points), and the full list is ~45x larger. return { 'rms_display': rms_display, 'sections': _loud_sections(rms_values, window_dur, duration, threshold, min_gap), 'duration': round(duration, 2), 'window': round(window_dur, 4), } def analyze_wav(path: Path, window_samples: int = WINDOW_SAMPLES, threshold: float = LOUD_THRESHOLD, min_gap: float = MIN_GAP_SECONDS) -> dict: try: with wave.open(str(path), 'rb') as wf: channels = wf.getnchannels() sampwidth = wf.getsampwidth() framerate = wf.getframerate() n_frames = wf.getnframes() rms_values = list(_compute_rms_windows_wav( wf, channels, sampwidth, framerate, window_samples)) except Exception as e: return {'error': str(e)} return _package_result(rms_values, framerate, n_frames, window_samples, threshold, min_gap) def analyze_flac(path: Path, window_samples: int = WINDOW_SAMPLES, threshold: float = LOUD_THRESHOLD, min_gap: float = MIN_GAP_SECONDS) -> dict: """Analyse a FLAC file for loudness. Requires numpy and soundfile.""" if not NUMPY_AVAILABLE or not SOUNDFILE_AVAILABLE: return {'error': 'FLAC analysis requires: pip install numpy soundfile'} try: with sf.SoundFile(path) as f: framerate = f.samplerate channels = f.channels n_frames = len(f) rms_values = [] while True: frames = f.read(window_samples, dtype='float32', always_2d=True) if len(frames) == 0: break mono = frames.mean(axis=1) if channels > 1 else frames[:, 0] rms = float(np.sqrt(np.mean(mono.astype(np.float64) ** 2))) rms_values.append(round(rms, 5)) except Exception as e: return {'error': str(e)} return _package_result(rms_values, framerate, n_frames, window_samples, threshold, min_gap) # --------------------------------------------------------------------------- # Analysis cache helpers # --------------------------------------------------------------------------- def _analysis_cache_path(analyses_base: Path, recordings_base: Path, audio_path: Path) -> Path: rel = audio_path.relative_to(recordings_base) return analyses_base / rel.parent / (rel.name + '.analysis.json') def _cached_analysis_params(cache_path: Path): """Read just threshold/min_gap from a cache file without parsing the whole JSON (the embedded result can be hundreds of KB). Relies on the writer in _api_analyze putting these two keys first.""" try: with open(cache_path, 'r', encoding='utf-8') as fh: head = fh.read(256) except OSError: return None m = re.search(r'"threshold":\s*([0-9.eE+-]+),\s*"min_gap":\s*([0-9.eE+-]+)', head) if not m: return None return {'threshold': float(m.group(1)), 'min_gap': float(m.group(2))} def prune_orphan_analyses(analyses_base: Path, recordings_base: Path): if not analyses_base.exists(): return removed = 0 for cache in analyses_base.rglob('*.analysis.json'): rel = cache.relative_to(analyses_base) audio_path = recordings_base / rel.parent / rel.name[:-len('.analysis.json')] if not audio_path.exists(): try: cache.unlink() removed += 1 except Exception: pass if removed: print(f'Pruned {removed} orphaned analysis cache file(s)') # --------------------------------------------------------------------------- # File listing # --------------------------------------------------------------------------- # rel-path -> ((mtime_ns, size), duration); avoids re-opening every audio # header on each /api/files request _DURATION_CACHE: dict = {} _DURATION_CACHE_LOCK = threading.Lock() def _cached_duration(path: Path, rel: str, stat) -> float: sig = (stat.st_mtime_ns, stat.st_size) with _DURATION_CACHE_LOCK: hit = _DURATION_CACHE.get(rel) if hit is not None and hit[0] == sig: return hit[1] duration = _get_audio_duration(path) with _DURATION_CACHE_LOCK: _DURATION_CACHE[rel] = (sig, duration) return duration def list_files(recordings_dir: str): """Return list of audio file metadata dicts, sorted newest first.""" base = Path(recordings_dir) if not base.exists(): return [] # Load active recordings written by isr.py active_files: set = set() status_path = base / 'status.json' if status_path.exists(): try: with open(status_path) as fh: active_files = set(json.load(fh).get('active', [])) except Exception: pass files = [] for path in base.rglob('*'): if path.suffix.lower() not in AUDIO_EXTENSIONS: continue try: stat = path.stat() except OSError: continue # deleted between rglob and stat rel = str(path.relative_to(base)).replace('\\', '/') is_active = rel in active_files # Skip reading partial headers for in-progress files — the WAV nframes # field and FLAC total_samples are both unfinalized while recording, # producing wildly incorrect values (e.g. 53375995583:39:01). duration = None if is_active else _cached_duration(path, rel, stat) files.append({ 'name': rel, 'size': stat.st_size, 'mtime': stat.st_mtime, 'date': datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'), 'duration': duration, 'ext': path.suffix.lower().lstrip('.'), 'recording': is_active, }) files.sort(key=lambda f: f['mtime'], reverse=True) return files # --------------------------------------------------------------------------- # HTTP handler # --------------------------------------------------------------------------- class _Handler(BaseHTTPRequestHandler): # Keep-alive: browsers reuse connections instead of a TCP handshake per # request. Safe because every response sets Content-Length. protocol_version = 'HTTP/1.1' recordings_dir: str = 'recordings' analyses_dir: str = 'recordings/analyses' threshold: float = LOUD_THRESHOLD min_gap: float = MIN_GAP_SECONDS def do_DELETE(self): parsed = urlparse(self.path) p = parsed.path if p.startswith('/api/files/'): self._api_delete(unquote(p[len('/api/files/'):])) else: self._send(404, b'Not found', 'text/plain') def do_GET(self): parsed = urlparse(self.path) qs = parse_qs(parsed.query) p = parsed.path if p == '/': self._html() elif p == '/api/files': self._api_files() elif p == '/api/analyze': self._api_analyze(qs) elif p == '/api/status': self._api_status() elif p == '/api/storage': self._api_storage() elif p == '/api/config': self._api_config() elif p == '/api/cut': self._api_cut(qs) elif p.startswith('/download/'): self._download(unquote(p[len('/download/'):])) elif p.startswith('/stream/'): self._stream(unquote(p[len('/stream/'):])) else: self._send(404, b'Not found', 'text/plain') def _html(self): self._send(200, _HTML.encode('utf-8'), 'text/html; charset=utf-8') def _api_files(self): files = list_files(self.recordings_dir) recordings_base = Path(self.recordings_dir).resolve() analyses_base = Path(self.analyses_dir).resolve() for f in files: if f.get('ext') in ('wav', 'flac') and not f.get('recording'): cache_path = _analysis_cache_path( analyses_base, recordings_base, recordings_base / f['name']) f['cached_analysis'] = _cached_analysis_params(cache_path) else: f['cached_analysis'] = None self._send(200, json.dumps(files).encode('utf-8'), 'application/json') def _api_analyze(self, qs): filename = qs.get('file', [None])[0] if not filename: self._json_err(400, 'missing file parameter') return path = self._safe_path(filename) if path is None: return try: threshold = float(qs.get('threshold', [self.threshold])[0]) threshold = max(0.0, min(1.0, threshold)) except (ValueError, TypeError): threshold = self.threshold try: min_gap = float(qs.get('min_gap', [self.min_gap])[0]) min_gap = max(0.0, min(300.0, min_gap)) except (ValueError, TypeError): min_gap = self.min_gap if self._is_active(filename): self._json_err(409, 'File is currently being recorded — analysis unavailable until recording stops') return recordings_base = Path(self.recordings_dir).resolve() analyses_base = Path(self.analyses_dir).resolve() cache_path = _analysis_cache_path(analyses_base, recordings_base, path) try: cached = json.loads(cache_path.read_text('utf-8')) if cached.get('threshold') == threshold and cached.get('min_gap') == min_gap: payload = dict(cached['result']) payload.pop('rms', None) # caches written before the full-RMS field was dropped payload['cached'] = True self._send(200, json.dumps(payload).encode('utf-8'), 'application/json') return except Exception: pass ext = path.suffix.lower() if ext == '.wav': result = analyze_wav(path, threshold=threshold, min_gap=min_gap) elif ext == '.flac': if not (NUMPY_AVAILABLE and SOUNDFILE_AVAILABLE): self._json_err(400, 'FLAC analysis requires: pip install numpy soundfile') return result = analyze_flac(path, threshold=threshold, min_gap=min_gap) else: self._json_err(400, f'Loudness analysis is not available for {ext} files') return try: cache_path.parent.mkdir(parents=True, exist_ok=True) tmp = cache_path.with_suffix('.tmp') tmp.write_text(json.dumps({'threshold': threshold, 'min_gap': min_gap, 'result': result}), 'utf-8') os.replace(tmp, cache_path) except Exception as e: print(f'Warning: could not write analysis cache {cache_path}: {e}', flush=True) self._send(200, json.dumps(result).encode('utf-8'), 'application/json') def _api_status(self): status_path = Path(self.recordings_dir) / 'status.json' if status_path.exists(): try: self._send(200, status_path.read_bytes(), 'application/json') return except Exception: pass self._send(200, b'{"active":[]}', 'application/json') def _download(self, filename: str): path = self._safe_path(filename) if path is None: return size = path.stat().st_size self.send_response(200) self.send_header('Content-Type', 'application/octet-stream') self.send_header('Content-Disposition', f'attachment; filename="{path.name}"') self.send_header('Content-Length', str(size)) self.end_headers() with open(path, 'rb') as fh: self._copy_to_response(fh, size) def _stream(self, filename: str): """Serve audio for inline playback with HTTP Range support. In-progress recordings are served with Cache-Control: no-store (the content is still growing) and, for WAV/FLAC, with a header patched to the duration recorded so far so the browser can show it and seek. """ path = self._safe_path(filename) if path is None: return content_type = MIME_TYPES.get(path.suffix.lower(), 'application/octet-stream') size = path.stat().st_size is_active = self._is_active(filename) prefix = b'' if is_active: ext = path.suffix.lower() if ext == '.wav': prefix = _live_wav_header(path, size) or b'' elif ext == '.flac': prefix = _live_flac_header(path, size) or b'' range_header = self.headers.get('Range', '') m = re.match(r'bytes=(\d+)-(\d*)', range_header) if range_header else None if m: start = int(m.group(1)) end = int(m.group(2)) if m.group(2) else size - 1 end = min(end, size - 1) if start > end or start >= size: self._send(416, b'Range Not Satisfiable', 'text/plain') return length = end - start + 1 self.send_response(206) self.send_header('Content-Type', content_type) self.send_header('Content-Range', f'bytes {start}-{end}/{size}') self.send_header('Content-Length', str(length)) self.send_header('Accept-Ranges', 'bytes') if is_active: self.send_header('Cache-Control', 'no-store') self.end_headers() with open(path, 'rb') as fh: sent = 0 if start < len(prefix): head = prefix[start:start + length] self.wfile.write(head) sent = len(head) if sent < length: fh.seek(start + sent) self._copy_to_response(fh, length - sent) else: self.send_response(200) self.send_header('Content-Type', content_type) self.send_header('Content-Length', str(size)) self.send_header('Accept-Ranges', 'bytes') if is_active: self.send_header('Cache-Control', 'no-store') self.end_headers() with open(path, 'rb') as fh: if prefix: self.wfile.write(prefix) fh.seek(len(prefix)) self._copy_to_response(fh, size - len(prefix)) else: # Bound the copy: the file may grow while we serve it, and # writing more than Content-Length desyncs keep-alive. self._copy_to_response(fh, size) def _api_storage(self): # 'used' is computed client-side from the file list; walking the whole # tree again here doubled the I/O of every page load. base = Path(self.recordings_dir) try: du = shutil.disk_usage(str(base) if base.exists() else '.') disk_free, disk_total = du.free, du.total except Exception: disk_free = disk_total = None data = json.dumps({'disk_free': disk_free, 'disk_total': disk_total}) self._send(200, data.encode(), 'application/json') def _api_config(self): data = json.dumps({'threshold': self.threshold, 'min_gap': self.min_gap}) self._send(200, data.encode(), 'application/json') def _api_delete(self, filename: str): if self._is_active(filename): self._json_err(409, 'Cannot delete a file that is currently being recorded') return path = self._safe_path(filename) if path is None: return try: path.unlink() except Exception as e: self._json_err(500, f'Failed to delete: {e}') return try: _analysis_cache_path( Path(self.analyses_dir).resolve(), Path(self.recordings_dir).resolve(), path, ).unlink() except Exception: pass self._send(200, json.dumps({'deleted': filename}).encode(), 'application/json') def _api_cut(self, qs): filename = qs.get('file', [None])[0] start_s = qs.get('start', [None])[0] end_s = qs.get('end', [None])[0] if not filename or start_s is None or end_s is None: self._json_err(400, 'missing file, start, or end parameter') return try: start = float(start_s) end = float(end_s) except (ValueError, TypeError): self._json_err(400, 'start and end must be numbers (seconds)') return if start < 0 or end <= start: self._json_err(400, 'start must be ≥ 0 and end must be > start') return path = self._safe_path(filename) if path is None: return if self._is_active(filename): self._json_err(409, 'Cannot cut a file that is currently being recorded') return if not shutil.which('ffmpeg'): self._json_err(500, 'ffmpeg is not available on this server') return ext = path.suffix.lower() out_name = f'{path.stem}_cut_{int(start)}s-{int(end)}s{ext}' # For lossless formats, re-encode (not copy) so the container header # is rewritten with the correct duration/size. For lossy formats, # copy is fine — the audio stops at the right frame regardless. _lossless = {'.wav': ['-c:a', 'pcm_s16le'], '.flac': ['-c:a', 'flac']} codec_args = _lossless.get(ext, ['-c', 'copy']) fd, tmp_path = tempfile.mkstemp(suffix=ext) os.close(fd) try: cmd = ['ffmpeg', '-y', '-i', str(path), '-ss', str(start), '-to', str(end), '-vn'] + codec_args + [tmp_path] result = subprocess.run(cmd, capture_output=True, timeout=120) if result.returncode != 0: err = result.stderr.decode('utf-8', errors='replace')[-400:] self._json_err(500, f'ffmpeg error: {err}') return tmp_size = os.path.getsize(tmp_path) content_type = MIME_TYPES.get(ext, 'application/octet-stream') self.send_response(200) self.send_header('Content-Type', content_type) self.send_header('Content-Disposition', f'attachment; filename="{out_name}"') self.send_header('Content-Length', str(tmp_size)) self.end_headers() with open(tmp_path, 'rb') as fh: self._copy_to_response(fh, tmp_size) except subprocess.TimeoutExpired: self._json_err(504, 'ffmpeg timed out — file may be too large') finally: try: os.unlink(tmp_path) except Exception: pass def _is_active(self, filename: str) -> bool: """True if isr.py reports this file as currently being recorded.""" try: with open(Path(self.recordings_dir) / 'status.json') as fh: return filename in json.load(fh).get('active', []) except Exception: return False def _copy_to_response(self, fh, length=None): """Stream an open binary file to the client in 64 KB chunks.""" remaining = length while remaining is None or remaining > 0: chunk = fh.read(65536 if remaining is None else min(65536, remaining)) if not chunk: break self.wfile.write(chunk) if remaining is not None: remaining -= len(chunk) # Sent fewer bytes than Content-Length promised (file truncated while # serving): the keep-alive connection is desynced, force it closed. if remaining is not None and remaining > 0: self.close_connection = True def _safe_path(self, filename: str): base = Path(self.recordings_dir).resolve() try: path = (base / filename).resolve() path.relative_to(base) except Exception: self._send(403, b'Forbidden', 'text/plain') return None if not path.is_file(): self._send(404, b'Not found', 'text/plain') return None return path def _send(self, code: int, body: bytes, content_type: str): self.send_response(code) self.send_header('Content-Type', content_type) self.send_header('Content-Length', str(len(body))) self.end_headers() self.wfile.write(body) def _json_err(self, code: int, msg: str): self._send(code, json.dumps({'error': msg}).encode('utf-8'), 'application/json') def log_message(self, fmt, *args): pass class _Server(ThreadingHTTPServer): """ThreadingHTTPServer that stays quiet when clients disconnect mid-stream (browsers abort audio range requests constantly while seeking).""" def handle_error(self, request, client_address): import sys exc = sys.exc_info()[1] if isinstance(exc, (ConnectionError, TimeoutError)): return super().handle_error(request, client_address) # --------------------------------------------------------------------------- # UI page — single-page HTML/CSS/JS, loaded once at startup # --------------------------------------------------------------------------- _HTML = (Path(__file__).resolve().parent / 'webui.html').read_text(encoding='utf-8') # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- def main(): parser = argparse.ArgumentParser(description='ISR Web — audio archive browser') parser.add_argument('--dir', default='recordings', help='Recordings directory (default: recordings)') parser.add_argument('--port', type=int, default=8080, help='HTTP port (default: 8080)') parser.add_argument('--host', default='0.0.0.0', help='Bind address (default: 0.0.0.0)') parser.add_argument('--threshold', type=float, default=LOUD_THRESHOLD, help=f'RMS loudness threshold 0–1 (default: {LOUD_THRESHOLD})') parser.add_argument('--min-gap', type=float, default=MIN_GAP_SECONDS, help=f'Seconds gap for merging loud sections (default: {MIN_GAP_SECONDS})') parser.add_argument('--analyses-dir', default=None, help='Directory for analysis cache files (default: /analyses)') args = parser.parse_args() rec_dir = Path(args.dir).resolve() analyses_dir = Path(args.analyses_dir).resolve() if args.analyses_dir else rec_dir / 'analyses' if not rec_dir.exists(): print(f"Warning: recordings directory '{rec_dir}' does not exist yet.") prune_orphan_analyses(analyses_dir, rec_dir) _analyses_dir = analyses_dir # class body can't close over a name it also assigns class Handler(_Handler): recordings_dir = str(rec_dir) analyses_dir = str(_analyses_dir) threshold = args.threshold min_gap = args.min_gap server = _Server((args.host, args.port), Handler) print(f"ISR Web running on http://{args.host}:{args.port}/") print(f"Recordings dir: {rec_dir}") print(f"Analyses dir: {analyses_dir}") print(f"Loud threshold: {args.threshold}") if not NUMPY_AVAILABLE: print("Note: numpy not installed — WAV RMS uses pure Python (slower); FLAC analysis unavailable") elif not SOUNDFILE_AVAILABLE: print("Note: soundfile not installed — FLAC loudness analysis unavailable") print("Stop with Ctrl+C\n") try: server.serve_forever() except KeyboardInterrupt: print("Stopped.") if __name__ == '__main__': main()