Files
ISR/web.py
T
admin f6031cfa16 feat: onset-aware section scoring so slow swells rank at the bottom
A section's score is now its peak dB above the noise floor capped by
the sharpest rise within ONSET_SECONDS (0.5 s). Real events (voices,
impacts, barks) rise fast and keep their full prominence; a gradual
swell that outruns the 30 s floor blocks (gusts, distant approaching
cars) still flags but scores near zero, so score-ranked review (chips,
U/I highlights, "Highlights only" mode) surfaces events first. A
section starting in a file's first 0.5 s is scored against the floor
instead, so events cut off by a file split are not punished as swells.

Old cached analyses carry now-wrong scores, so the cache gains a
leading "detector" version key (DETECTOR_VERSION = 2) checked by both
_cached_analysis_params() and the /api/analyze cache hit path; v1
caches never match and are recomputed on the next analyse.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 14:57:19 +02:00

1176 lines
45 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
ISR Web — Browse and download recorded audio files.
Shows a chronological table of all recordings, allows inline playback,
download, and analyses WAV/FLAC files for sections that stand out above the
background noise.
Usage:
python web.py # serves recordings/ on port 8080
python web.py --dir /path/to/audio # custom recordings directory
python web.py --port 8888 # custom port
python web.py --margin 15 # dB above noise floor (default 12)
"""
import argparse
import json
import math
import os
import re
import shutil
import struct
import subprocess
import tempfile
import threading
import wave
from datetime import datetime, timedelta
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from urllib.parse import parse_qs, unquote, urlparse
try:
import numpy as np
NUMPY_AVAILABLE = True
except ImportError:
NUMPY_AVAILABLE = False
try:
import soundfile as sf
SOUNDFILE_AVAILABLE = True
except ImportError:
SOUNDFILE_AVAILABLE = False
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
AUDIO_EXTENSIONS = {'.wav', '.mp3', '.ogg', '.flac', '.aac', '.opus'}
WINDOW_SAMPLES = 4800 # 100 ms at 48 kHz
MARGIN_DB = 12.0 # sections must rise this many dB above the noise floor
MIN_GAP_SECONDS = 2.0 # merge loud sections separated by less than this
MIN_DURATION_SECONDS = 0.5 # discard loud sections shorter than this
NOISE_BLOCK_SECONDS = 30.0 # noise floor is estimated per block of this length
NOISE_PERCENTILE = 20 # percentile of windowed dB levels taken as the floor
MIN_RMS = 0.002 # ≈ 54 dBFS; the floor never drops below this, so
# digital silence does not make every tiny sound loud
ONSET_SECONDS = 0.5 # a section's score is capped by its sharpest dB rise
# within this span, so slow swells rank low
# Bumped whenever section detection/scoring changes in a way that makes old
# cached results wrong (not just differently parameterised). Caches written
# with another version never match and are recomputed on the next analyse.
DETECTOR_VERSION = 2
CLIP_MAX_SECONDS = 600 # upper bound on /api/clip length
# Recording filenames encode the start time as this strftime format (kept in
# sync with isr.FILENAME_FORMAT). It is the authoritative recording start and
# the only reliable clock anchor — mtime drifts to the last write, so cut clip
# names and the displayed date are both derived from this.
FILENAME_FORMAT = '%Y%m%d_%H%M%S'
MIME_TYPES = {
'.wav': 'audio/wav',
'.mp3': 'audio/mpeg',
'.ogg': 'audio/ogg',
'.flac': 'audio/flac',
'.aac': 'audio/aac',
'.opus': 'audio/ogg',
}
# ---------------------------------------------------------------------------
# Audio analysis helpers
# ---------------------------------------------------------------------------
def _recording_start(stem: str):
"""Parse the recording start time encoded in a filename stem.
Returns a datetime, or None if the stem is not in FILENAME_FORMAT
(e.g. a manually renamed file). strptime ignores any extension because
callers pass Path.stem.
"""
try:
return datetime.strptime(stem, FILENAME_FORMAT)
except ValueError:
return None
def _cut_filename(stem: str, ext: str, start: float, end: float) -> str:
"""Name a cut by the real wall-clock span it covers.
For a recording that started at 22:00:00, a 22:31:30→22:32:30 slice
(start=1890, end=1950) becomes ``20260523_22-31-30_22-32-30.flac``.
Falls back to the source stem plus second offsets when the filename is
not in FILENAME_FORMAT (e.g. a manually renamed recording).
"""
started = _recording_start(stem)
if started is None:
return f'{stem}_cut_{int(start)}s-{int(end)}s{ext}'
cut_start = started + timedelta(seconds=start)
cut_end = started + timedelta(seconds=end)
return f'{cut_start:%Y%m%d}_{cut_start:%H-%M-%S}_{cut_end:%H-%M-%S}{ext}'
def _live_wav_header(path: Path, size: int):
"""Return the WAV header (through the 'data' chunk header) with RIFF and
data sizes rewritten to match the current file size, or None.
While a WAV file is still being recorded its header claims ~0 frames, so
browsers show no duration and refuse to seek. Serving a header patched to
the bytes recorded so far fixes both; the patch is the same length as the
original header, so all byte offsets and Range math stay valid.
"""
try:
with open(path, 'rb') as fh:
hdr = fh.read(512)
if len(hdr) < 44 or hdr[:4] != b'RIFF' or hdr[8:12] != b'WAVE':
return None
pos = 12
while pos + 8 <= len(hdr):
chunk_id = hdr[pos:pos + 4]
chunk_size = int.from_bytes(hdr[pos + 4:pos + 8], 'little')
if chunk_id == b'data':
data_off = pos + 8
patched = bytearray(hdr[:data_off])
patched[4:8] = (size - 8).to_bytes(4, 'little')
patched[pos + 4:pos + 8] = (size - data_off).to_bytes(4, 'little')
return bytes(patched)
pos += 8 + chunk_size + (chunk_size & 1)
return None
except Exception:
return None
# CRC-8 (poly 0x07) used by FLAC frame headers
_CRC8_TABLE = []
for _i in range(256):
_c = _i
for _ in range(8):
_c = ((_c << 1) ^ 0x07) & 0xFF if _c & 0x80 else (_c << 1) & 0xFF
_CRC8_TABLE.append(_c)
_FLAC_BLOCKSIZES = {1: 192, 2: 576, 3: 1152, 4: 2304, 5: 4608, 8: 256, 9: 512,
10: 1024, 11: 2048, 12: 4096, 13: 8192, 14: 16384, 15: 32768}
def _crc8(data: bytes) -> int:
crc = 0
for b in data:
crc = _CRC8_TABLE[crc ^ b]
return crc
def _flac_coded_number(buf: bytes, pos: int):
"""Decode the UTF-8-style frame/sample number; returns (value, next_pos)."""
b0 = buf[pos]
if b0 < 0x80:
return b0, pos + 1
n, mask = 0, 0x40
while b0 & mask:
n += 1
mask >>= 1
if n < 1 or n > 6: # 10xxxxxx is not a valid leading byte
return None
val = b0 & (mask - 1)
for i in range(1, n + 1):
c = buf[pos + i]
if c & 0xC0 != 0x80:
return None
val = (val << 6) | (c & 0x3F)
return val, pos + 1 + n
def _flac_frame_samples(buf: bytes, pos: int, fixed_bs: int):
"""If a valid FLAC frame header starts at pos, return the stream sample
count through the end of that frame, else None. Validity is confirmed by
the header's CRC-8, so false sync matches in compressed data are rejected."""
try:
variable = buf[pos + 1] & 0x01
bs_code = buf[pos + 2] >> 4
sr_code = buf[pos + 2] & 0x0F
if bs_code == 0 or sr_code == 15 or buf[pos + 3] & 0x01:
return None
if (buf[pos + 3] >> 4) > 10: # reserved channel assignment
return None
coded = _flac_coded_number(buf, pos + 4)
if coded is None:
return None
val, p = coded
bs = _FLAC_BLOCKSIZES.get(bs_code)
if bs_code == 6:
bs = buf[p] + 1
p += 1
elif bs_code == 7:
bs = int.from_bytes(buf[p:p + 2], 'big') + 1
p += 2
if sr_code == 12:
p += 1
elif sr_code in (13, 14):
p += 2
if _crc8(buf[pos:p]) != buf[p]:
return None
if variable: # val is the frame's starting sample number
return val + (bs or 0)
return val * (fixed_bs or bs or 4096) + (bs or 0)
except IndexError:
return None
def _live_flac_header(path: Path, size: int):
"""Return the first 26 bytes of a FLAC file with STREAMINFO total_samples
patched to the samples recorded so far, or None.
Like _live_wav_header, but FLAC duration cannot be derived from the byte
count (variable compression). Instead the sample count is parsed out of
the last frame header in the file tail — each FLAC frame carries its own
frame/sample number.
"""
try:
with open(path, 'rb') as fh:
head = fh.read(42)
if len(head) < 42 or head[:4] != b'fLaC':
return None
# STREAMINFO must be the first metadata block
if head[4] & 0x7F != 0 or int.from_bytes(head[5:8], 'big') != 34:
return None
fixed_bs = int.from_bytes(head[8:10], 'big')
tail_len = min(size, 65536)
fh.seek(size - tail_len)
buf = fh.read(tail_len)
samples = None
for i in range(len(buf) - 20, -1, -1):
if buf[i] == 0xFF and (buf[i + 1] & 0xFC) == 0xF8:
samples = _flac_frame_samples(buf, i, fixed_bs)
if samples:
break
if not samples:
return None
# Bytes 18-25 hold: sample rate (20 bits) | channels-1 (3) |
# bps-1 (5) | total_samples (36). Replace only the low 36 bits.
field = int.from_bytes(head[18:26], 'big')
field = (field & ~((1 << 36) - 1)) | min(samples, (1 << 36) - 1)
patched = bytearray(head[:26])
patched[18:26] = field.to_bytes(8, 'big')
return bytes(patched)
except Exception:
return None
def _wav_header(n_frames: int, channels: int, framerate: int, sampwidth: int) -> bytes:
"""Standard 44-byte PCM WAV header for a clip of known length."""
data_len = n_frames * channels * sampwidth
byte_rate = framerate * channels * sampwidth
return (b'RIFF' + (36 + data_len).to_bytes(4, 'little') + b'WAVE'
+ b'fmt ' + (16).to_bytes(4, 'little')
+ struct.pack('<HHIIHH', 1, channels, framerate, byte_rate,
channels * sampwidth, sampwidth * 8)
+ b'data' + data_len.to_bytes(4, 'little'))
def _get_audio_duration(path: Path):
"""Return duration in seconds for any supported audio file, or None."""
ext = path.suffix.lower()
if ext == '.wav':
try:
with wave.open(str(path), 'rb') as wf:
return wf.getnframes() / wf.getframerate()
except Exception:
return None
if SOUNDFILE_AVAILABLE and ext in ('.flac', '.ogg', '.opus'):
try:
with sf.SoundFile(path) as f:
return round(len(f) / f.samplerate, 2)
except Exception:
return None
return None
def _compute_rms_windows_wav(wf, channels: int, sampwidth: int, framerate: int,
window_samples: int):
"""Yield rms_0_to_1 for every window in the open wave file."""
while True:
raw = wf.readframes(window_samples)
if not raw:
break
n_samp = len(raw) // (sampwidth * channels)
if n_samp == 0:
break
if NUMPY_AVAILABLE:
arr = np.frombuffer(raw[:n_samp * sampwidth * channels], dtype='<i2')
if channels > 1:
arr = arr.reshape(-1, channels).mean(axis=1)
rms = float(np.sqrt(np.mean(arr.astype(np.float64) ** 2))) / 32768.0
else:
fmt = f'<{n_samp * channels}h'
samples = struct.unpack(fmt, raw[:n_samp * sampwidth * channels])
mono = samples[::channels] if channels > 1 else samples
rms = math.sqrt(sum(s * s for s in mono) / len(mono)) / 32768.0
yield round(rms, 5)
def _noise_floor_db(db_values: list, window_dur: float) -> list:
"""Per-window background noise floor in dBFS.
The floor is the NOISE_PERCENTILE-th percentile of the windowed dB levels
in each NOISE_BLOCK_SECONDS block, then min-smoothed over ±2 neighbouring
blocks so an event spanning a whole block cannot raise its own floor.
Tracks slow ambience changes (day/night, rain, traffic hum) so detection
is relative to "how loud it normally is right now"."""
n = len(db_values)
block = max(1, int(round(NOISE_BLOCK_SECONDS / window_dur)))
floors = []
for i in range(0, n, block):
chunk = sorted(db_values[i:i + block])
floors.append(chunk[int(len(chunk) * NOISE_PERCENTILE / 100)])
smoothed = [min(floors[max(0, b - 2):b + 3]) for b in range(len(floors))]
return [smoothed[min(i // block, len(smoothed) - 1)] for i in range(n)]
def _loud_sections(rms_values: list, window_dur: float, duration: float,
margin_db: float, min_gap: float = MIN_GAP_SECONDS,
min_duration: float = MIN_DURATION_SECONDS) -> list:
"""Sections whose level rises at least margin_db above the local noise
floor. Each section carries a 'score': its peak dB above the floor,
capped by the sharpest rise observed within ONSET_SECONDS. Real events
(voices, impacts, barks) have steep onsets, so their cap equals their
peak; a swell that drifts up slower than the noise-floor blocks can track
(wind, a distant approaching car) still flags but scores near zero, so
score-ranked review (chips, U/I highlights) surfaces events first.
Sections shorter than min_duration (after min_gap merging) are discarded:
without this, every isolated 100 ms window that pops above the floor — a
click, a single raindrop — becomes its own section and a day can drown in
thousands of sub-second clips."""
db = [20 * math.log10(max(r, 1e-6)) for r in rms_values]
floor = _noise_floor_db(db, window_dur)
min_db = 20 * math.log10(MIN_RMS)
onset_win = max(1, int(round(ONSET_SECONDS / window_dur)))
sections = []
start_t = None
last_loud_t = None
peak = 0.0
onset = 0.0
def _score():
return round(max(0.0, min(peak, onset)), 1)
for i, d in enumerate(db):
t = i * window_dur
floor_eff = max(floor[i], min_db)
if d >= floor_eff + margin_db:
if start_t is None:
start_t = t
peak = 0.0
onset = 0.0
last_loud_t = t
peak = max(peak, d - floor_eff)
# Rise within the onset span; a section starting before the file
# has history is measured against the floor instead (an event cut
# off by a file split must not be punished as a swell).
rise = d - db[i - onset_win] if i >= onset_win else d - floor_eff
onset = max(onset, rise)
else:
if start_t is not None and (t - last_loud_t) > min_gap:
end_t = last_loud_t + window_dur
if end_t - start_t >= min_duration - 1e-9:
sections.append({'start': round(start_t, 1),
'end': round(end_t, 1),
'score': _score()})
start_t = None
last_loud_t = None
if start_t is not None and (last_loud_t + window_dur - start_t) >= min_duration - 1e-9:
sections.append({'start': round(start_t, 1), 'end': round(duration, 1),
'score': _score()})
return sections
def _package_result(rms_values: list, framerate: int, n_frames: int,
window_samples: int, margin_db: float,
min_gap: float = MIN_GAP_SECONDS,
min_duration: float = MIN_DURATION_SECONDS) -> dict:
window_dur = window_samples / framerate
duration = n_frames / framerate
if len(rms_values) > 800:
step = len(rms_values) / 800
rms_display = [rms_values[int(i * step)] for i in range(800)]
else:
rms_display = rms_values
# Note: the full per-window RMS list is deliberately NOT returned — the UI
# only renders rms_display (~800 points), and the full list is ~45x larger.
return {
'rms_display': rms_display,
'sections': _loud_sections(rms_values, window_dur, duration, margin_db, min_gap, min_duration),
'duration': round(duration, 2),
'window': round(window_dur, 4),
}
def analyze_wav(path: Path, window_samples: int = WINDOW_SAMPLES,
margin_db: float = MARGIN_DB,
min_gap: float = MIN_GAP_SECONDS,
min_duration: float = MIN_DURATION_SECONDS) -> dict:
try:
with wave.open(str(path), 'rb') as wf:
channels = wf.getnchannels()
sampwidth = wf.getsampwidth()
framerate = wf.getframerate()
n_frames = wf.getnframes()
rms_values = list(_compute_rms_windows_wav(
wf, channels, sampwidth, framerate, window_samples))
except Exception as e:
return {'error': str(e)}
return _package_result(rms_values, framerate, n_frames, window_samples, margin_db, min_gap, min_duration)
def analyze_flac(path: Path, window_samples: int = WINDOW_SAMPLES,
margin_db: float = MARGIN_DB,
min_gap: float = MIN_GAP_SECONDS,
min_duration: float = MIN_DURATION_SECONDS) -> dict:
"""Analyse a FLAC file for loudness. Requires numpy and soundfile."""
if not NUMPY_AVAILABLE or not SOUNDFILE_AVAILABLE:
return {'error': 'FLAC analysis requires: pip install numpy soundfile'}
try:
with sf.SoundFile(path) as f:
framerate = f.samplerate
channels = f.channels
n_frames = len(f)
rms_values = []
while True:
frames = f.read(window_samples, dtype='float32', always_2d=True)
if len(frames) == 0:
break
mono = frames.mean(axis=1) if channels > 1 else frames[:, 0]
rms = float(np.sqrt(np.mean(mono.astype(np.float64) ** 2)))
rms_values.append(round(rms, 5))
except Exception as e:
return {'error': str(e)}
return _package_result(rms_values, framerate, n_frames, window_samples, margin_db, min_gap, min_duration)
# ---------------------------------------------------------------------------
# Analysis cache helpers
# ---------------------------------------------------------------------------
def _analysis_cache_path(analyses_base: Path, recordings_base: Path, audio_path: Path) -> Path:
rel = audio_path.relative_to(recordings_base)
return analyses_base / rel.parent / (rel.name + '.analysis.json')
def _cached_analysis_params(cache_path: Path):
"""Read just detector/margin/min_gap/min_duration from a cache file
without parsing the whole JSON (the embedded result can be hundreds of
KB). Relies on the writer in _api_analyze putting these keys first.
Caches written by other detector versions (or so old they lack a key)
simply never match and get recomputed on the next analyse."""
try:
with open(cache_path, 'r', encoding='utf-8') as fh:
head = fh.read(256)
except OSError:
return None
m = re.search(r'"detector":\s*(\d+),\s*"margin":\s*([0-9.eE+-]+),'
r'\s*"min_gap":\s*([0-9.eE+-]+),\s*"min_duration":\s*([0-9.eE+-]+)', head)
if not m or int(m.group(1)) != DETECTOR_VERSION:
return None
return {'margin': float(m.group(2)), 'min_gap': float(m.group(3)),
'min_duration': float(m.group(4))}
def prune_orphan_analyses(analyses_base: Path, recordings_base: Path):
if not analyses_base.exists():
return
removed = 0
for cache in analyses_base.rglob('*.analysis.json'):
rel = cache.relative_to(analyses_base)
audio_path = recordings_base / rel.parent / rel.name[:-len('.analysis.json')]
if not audio_path.exists():
try:
cache.unlink()
removed += 1
except Exception:
pass
if removed:
print(f'Pruned {removed} orphaned analysis cache file(s)')
# ---------------------------------------------------------------------------
# File listing
# ---------------------------------------------------------------------------
# rel-path -> ((mtime_ns, size), duration); avoids re-opening every audio
# header on each /api/files request
_DURATION_CACHE: dict = {}
_DURATION_CACHE_LOCK = threading.Lock()
def _cached_duration(path: Path, rel: str, stat) -> float:
sig = (stat.st_mtime_ns, stat.st_size)
with _DURATION_CACHE_LOCK:
hit = _DURATION_CACHE.get(rel)
if hit is not None and hit[0] == sig:
return hit[1]
duration = _get_audio_duration(path)
with _DURATION_CACHE_LOCK:
_DURATION_CACHE[rel] = (sig, duration)
return duration
def list_files(recordings_dir: str):
"""Return list of audio file metadata dicts, sorted newest first."""
base = Path(recordings_dir)
if not base.exists():
return []
# Load active recordings written by isr.py
active_files: set = set()
status_path = base / 'status.json'
if status_path.exists():
try:
with open(status_path) as fh:
active_files = set(json.load(fh).get('active', []))
except Exception:
pass
files = []
for path in base.rglob('*'):
if path.suffix.lower() not in AUDIO_EXTENSIONS:
continue
try:
stat = path.stat()
except OSError:
continue # deleted between rglob and stat
rel = str(path.relative_to(base)).replace('\\', '/')
is_active = rel in active_files
# The recording start is encoded in the filename and is the true clock
# anchor; mtime is only a fallback for files not in FILENAME_FORMAT.
started = _recording_start(path.stem)
date = (started or datetime.fromtimestamp(stat.st_mtime)).strftime('%Y-%m-%d %H:%M:%S')
# Skip reading partial headers for in-progress files — the WAV nframes
# field and FLAC total_samples are both unfinalized while recording,
# producing wildly incorrect values (e.g. 53375995583:39:01).
duration = None if is_active else _cached_duration(path, rel, stat)
files.append({
'name': rel,
'size': stat.st_size,
'mtime': stat.st_mtime,
'date': date,
'duration': duration,
'ext': path.suffix.lower().lstrip('.'),
'recording': is_active,
})
files.sort(key=lambda f: f['mtime'], reverse=True)
return files
# ---------------------------------------------------------------------------
# HTTP handler
# ---------------------------------------------------------------------------
class _Handler(BaseHTTPRequestHandler):
# Keep-alive: browsers reuse connections instead of a TCP handshake per
# request. Safe because every response sets Content-Length.
protocol_version = 'HTTP/1.1'
recordings_dir: str = 'recordings'
analyses_dir: str = 'recordings/analyses'
margin_db: float = MARGIN_DB
min_gap: float = MIN_GAP_SECONDS
min_duration: float = MIN_DURATION_SECONDS
def do_DELETE(self):
parsed = urlparse(self.path)
p = parsed.path
if p.startswith('/api/files/'):
self._api_delete(unquote(p[len('/api/files/'):]))
else:
self._send(404, b'Not found', 'text/plain')
def do_GET(self):
parsed = urlparse(self.path)
qs = parse_qs(parsed.query)
p = parsed.path
if p == '/':
self._html()
elif p == '/api/files':
self._api_files()
elif p == '/api/analyze':
self._api_analyze(qs)
elif p == '/api/status':
self._api_status()
elif p == '/api/storage':
self._api_storage()
elif p == '/api/config':
self._api_config()
elif p == '/api/cut':
self._api_cut(qs)
elif p == '/api/clip':
self._api_clip(qs)
elif p.startswith('/download/'):
self._download(unquote(p[len('/download/'):]))
elif p.startswith('/stream/'):
self._stream(unquote(p[len('/stream/'):]))
else:
self._send(404, b'Not found', 'text/plain')
def _html(self):
self._send(200, _HTML.encode('utf-8'), 'text/html; charset=utf-8')
def _api_files(self):
files = list_files(self.recordings_dir)
recordings_base = Path(self.recordings_dir).resolve()
analyses_base = Path(self.analyses_dir).resolve()
for f in files:
if f.get('ext') in ('wav', 'flac') and not f.get('recording'):
cache_path = _analysis_cache_path(
analyses_base, recordings_base, recordings_base / f['name'])
f['cached_analysis'] = _cached_analysis_params(cache_path)
else:
f['cached_analysis'] = None
self._send(200, json.dumps(files).encode('utf-8'), 'application/json')
def _api_analyze(self, qs):
filename = qs.get('file', [None])[0]
if not filename:
self._json_err(400, 'missing file parameter')
return
path = self._safe_path(filename)
if path is None:
return
try:
margin = float(qs.get('margin', [self.margin_db])[0])
margin = max(1.0, min(60.0, margin))
except (ValueError, TypeError):
margin = self.margin_db
try:
min_gap = float(qs.get('min_gap', [self.min_gap])[0])
min_gap = max(0.0, min(300.0, min_gap))
except (ValueError, TypeError):
min_gap = self.min_gap
try:
min_duration = float(qs.get('min_duration', [self.min_duration])[0])
min_duration = max(0.0, min(60.0, min_duration))
except (ValueError, TypeError):
min_duration = self.min_duration
if self._is_active(filename):
self._json_err(409, 'File is currently being recorded — analysis unavailable until recording stops')
return
recordings_base = Path(self.recordings_dir).resolve()
analyses_base = Path(self.analyses_dir).resolve()
cache_path = _analysis_cache_path(analyses_base, recordings_base, path)
try:
cached = json.loads(cache_path.read_text('utf-8'))
if (cached.get('detector') == DETECTOR_VERSION
and cached.get('margin') == margin and cached.get('min_gap') == min_gap
and cached.get('min_duration') == min_duration):
payload = dict(cached['result'])
payload.pop('rms', None) # caches written before the full-RMS field was dropped
payload['cached'] = True
self._send(200, json.dumps(payload).encode('utf-8'), 'application/json')
return
except Exception:
pass
ext = path.suffix.lower()
if ext == '.wav':
result = analyze_wav(path, margin_db=margin, min_gap=min_gap, min_duration=min_duration)
elif ext == '.flac':
if not (NUMPY_AVAILABLE and SOUNDFILE_AVAILABLE):
self._json_err(400, 'FLAC analysis requires: pip install numpy soundfile')
return
result = analyze_flac(path, margin_db=margin, min_gap=min_gap, min_duration=min_duration)
else:
self._json_err(400, f'Loudness analysis is not available for {ext} files')
return
try:
cache_path.parent.mkdir(parents=True, exist_ok=True)
tmp = cache_path.with_suffix('.tmp')
# detector, margin, min_gap and min_duration MUST stay first:
# _cached_analysis_params reads only the first 256 bytes of this file
tmp.write_text(json.dumps({'detector': DETECTOR_VERSION,
'margin': margin, 'min_gap': min_gap,
'min_duration': min_duration, 'result': result}), 'utf-8')
os.replace(tmp, cache_path)
except Exception as e:
print(f'Warning: could not write analysis cache {cache_path}: {e}', flush=True)
self._send(200, json.dumps(result).encode('utf-8'), 'application/json')
def _api_status(self):
status_path = Path(self.recordings_dir) / 'status.json'
if status_path.exists():
try:
self._send(200, status_path.read_bytes(), 'application/json')
return
except Exception:
pass
self._send(200, b'{"active":[]}', 'application/json')
def _download(self, filename: str):
path = self._safe_path(filename)
if path is None:
return
size = path.stat().st_size
self.send_response(200)
self.send_header('Content-Type', 'application/octet-stream')
self.send_header('Content-Disposition', f'attachment; filename="{path.name}"')
self.send_header('Content-Length', str(size))
self.end_headers()
with open(path, 'rb') as fh:
self._copy_to_response(fh, size)
def _stream(self, filename: str):
"""Serve audio for inline playback with HTTP Range support.
In-progress recordings are served with Cache-Control: no-store (the
content is still growing) and, for WAV/FLAC, with a header patched to
the duration recorded so far so the browser can show it and seek.
"""
path = self._safe_path(filename)
if path is None:
return
content_type = MIME_TYPES.get(path.suffix.lower(), 'application/octet-stream')
size = path.stat().st_size
is_active = self._is_active(filename)
prefix = b''
if is_active:
ext = path.suffix.lower()
if ext == '.wav':
prefix = _live_wav_header(path, size) or b''
elif ext == '.flac':
prefix = _live_flac_header(path, size) or b''
range_header = self.headers.get('Range', '')
m = re.match(r'bytes=(\d+)-(\d*)', range_header) if range_header else None
if m:
start = int(m.group(1))
end = int(m.group(2)) if m.group(2) else size - 1
end = min(end, size - 1)
if start > end or start >= size:
self._send(416, b'Range Not Satisfiable', 'text/plain')
return
length = end - start + 1
self.send_response(206)
self.send_header('Content-Type', content_type)
self.send_header('Content-Range', f'bytes {start}-{end}/{size}')
self.send_header('Content-Length', str(length))
self.send_header('Accept-Ranges', 'bytes')
if is_active:
self.send_header('Cache-Control', 'no-store')
self.end_headers()
with open(path, 'rb') as fh:
sent = 0
if start < len(prefix):
head = prefix[start:start + length]
self.wfile.write(head)
sent = len(head)
if sent < length:
fh.seek(start + sent)
self._copy_to_response(fh, length - sent)
else:
self.send_response(200)
self.send_header('Content-Type', content_type)
self.send_header('Content-Length', str(size))
self.send_header('Accept-Ranges', 'bytes')
if is_active:
self.send_header('Cache-Control', 'no-store')
self.end_headers()
with open(path, 'rb') as fh:
if prefix:
self.wfile.write(prefix)
fh.seek(len(prefix))
self._copy_to_response(fh, size - len(prefix))
else:
# Bound the copy: the file may grow while we serve it, and
# writing more than Content-Length desyncs keep-alive.
self._copy_to_response(fh, size)
def _api_storage(self):
# 'used' is computed client-side from the file list; walking the whole
# tree again here doubled the I/O of every page load.
base = Path(self.recordings_dir)
try:
du = shutil.disk_usage(str(base) if base.exists() else '.')
disk_free, disk_total = du.free, du.total
except Exception:
disk_free = disk_total = None
data = json.dumps({'disk_free': disk_free, 'disk_total': disk_total})
self._send(200, data.encode(), 'application/json')
def _api_config(self):
data = json.dumps({'margin': self.margin_db, 'min_gap': self.min_gap,
'min_duration': self.min_duration})
self._send(200, data.encode(), 'application/json')
def _api_delete(self, filename: str):
if self._is_active(filename):
self._json_err(409, 'Cannot delete a file that is currently being recorded')
return
path = self._safe_path(filename)
if path is None:
return
try:
path.unlink()
except Exception as e:
self._json_err(500, f'Failed to delete: {e}')
return
try:
_analysis_cache_path(
Path(self.analyses_dir).resolve(),
Path(self.recordings_dir).resolve(),
path,
).unlink()
except Exception:
pass
self._send(200, json.dumps({'deleted': filename}).encode(), 'application/json')
def _api_cut(self, qs):
filename = qs.get('file', [None])[0]
start_s = qs.get('start', [None])[0]
end_s = qs.get('end', [None])[0]
if not filename or start_s is None or end_s is None:
self._json_err(400, 'missing file, start, or end parameter')
return
try:
start = float(start_s)
end = float(end_s)
except (ValueError, TypeError):
self._json_err(400, 'start and end must be numbers (seconds)')
return
if start < 0 or end <= start:
self._json_err(400, 'start must be ≥ 0 and end must be > start')
return
path = self._safe_path(filename)
if path is None:
return
if self._is_active(filename):
self._json_err(409, 'Cannot cut a file that is currently being recorded')
return
if not shutil.which('ffmpeg'):
self._json_err(500, 'ffmpeg is not available on this server')
return
ext = path.suffix.lower()
out_name = _cut_filename(path.stem, ext, start, end)
# For lossless formats, re-encode (not copy) so the container header
# is rewritten with the correct duration/size. For lossy formats,
# copy is fine — the audio stops at the right frame regardless.
_lossless = {'.wav': ['-c:a', 'pcm_s16le'], '.flac': ['-c:a', 'flac']}
codec_args = _lossless.get(ext, ['-c', 'copy'])
fd, tmp_path = tempfile.mkstemp(suffix=ext)
os.close(fd)
try:
cmd = ['ffmpeg', '-y',
'-i', str(path),
'-ss', str(start), '-to', str(end),
'-vn'] + codec_args + [tmp_path]
result = subprocess.run(cmd, capture_output=True, timeout=120)
if result.returncode != 0:
err = result.stderr.decode('utf-8', errors='replace')[-400:]
self._json_err(500, f'ffmpeg error: {err}')
return
tmp_size = os.path.getsize(tmp_path)
content_type = MIME_TYPES.get(ext, 'application/octet-stream')
self.send_response(200)
self.send_header('Content-Type', content_type)
self.send_header('Content-Disposition', f'attachment; filename="{out_name}"')
self.send_header('Content-Length', str(tmp_size))
self.end_headers()
with open(tmp_path, 'rb') as fh:
self._copy_to_response(fh, tmp_size)
except subprocess.TimeoutExpired:
self._json_err(504, 'ffmpeg timed out — file may be too large')
finally:
try:
os.unlink(tmp_path)
except Exception:
pass
def _api_clip(self, qs):
"""Serve a section of a WAV/FLAC file as a small standalone WAV.
Decoding the slice server-side means the browser plays a tiny file
instantly instead of seeking inside a multi-hundred-MB recording
(FLACs have no seek table, so browser seeks bisect the whole file
with Range requests)."""
filename = qs.get('file', [None])[0]
start_s = qs.get('start', [None])[0]
end_s = qs.get('end', [None])[0]
if not filename or start_s is None or end_s is None:
self._json_err(400, 'missing file, start, or end parameter')
return
try:
start = max(0.0, float(start_s))
end = float(end_s)
except (ValueError, TypeError):
self._json_err(400, 'start and end must be numbers (seconds)')
return
if end <= start:
self._json_err(400, 'end must be > start')
return
end = min(end, start + CLIP_MAX_SECONDS)
path = self._safe_path(filename)
if path is None:
return
if self._is_active(filename):
self._json_err(409, 'File is currently being recorded — clips unavailable until recording stops')
return
ext = path.suffix.lower()
try:
if ext == '.wav':
self._clip_wav(path, start, end)
elif ext == '.flac':
if not (NUMPY_AVAILABLE and SOUNDFILE_AVAILABLE):
self._json_err(400, 'FLAC clips require: pip install numpy soundfile')
return
self._clip_flac(path, start, end)
else:
self._json_err(400, f'Clips are not available for {ext} files')
except (ConnectionError, BrokenPipeError):
raise
except Exception as e:
self._json_err(500, f'Clip failed: {e}')
def _send_clip_headers(self, header: bytes, n_frames: int, channels: int,
sampwidth: int):
self.send_response(200)
self.send_header('Content-Type', 'audio/wav')
self.send_header('Content-Length', str(len(header) + n_frames * channels * sampwidth))
# Finished recordings are immutable, so stepping back to a section
# replays the clip from the browser cache
self.send_header('Cache-Control', 'private, max-age=86400')
self.end_headers()
self.wfile.write(header)
def _clip_wav(self, path: Path, start: float, end: float):
with wave.open(str(path), 'rb') as wf:
channels = wf.getnchannels()
sampwidth = wf.getsampwidth()
framerate = wf.getframerate()
n_frames = wf.getnframes()
f0 = min(int(start * framerate), n_frames)
f1 = min(int(end * framerate), n_frames)
if f1 <= f0:
self._json_err(400, 'clip range is beyond the end of the file')
return
header = _wav_header(f1 - f0, channels, framerate, sampwidth)
self._send_clip_headers(header, f1 - f0, channels, sampwidth)
wf.setpos(f0)
remaining = f1 - f0
while remaining > 0:
chunk = wf.readframes(min(32768, remaining))
if not chunk:
self.close_connection = True # under-delivered
break
self.wfile.write(chunk)
remaining -= len(chunk) // (channels * sampwidth)
def _clip_flac(self, path: Path, start: float, end: float):
with sf.SoundFile(path) as f:
framerate = f.samplerate
channels = f.channels
n_frames = len(f)
f0 = min(int(start * framerate), n_frames)
f1 = min(int(end * framerate), n_frames)
if f1 <= f0:
self._json_err(400, 'clip range is beyond the end of the file')
return
header = _wav_header(f1 - f0, channels, framerate, 2)
self._send_clip_headers(header, f1 - f0, channels, 2)
f.seek(f0)
remaining = f1 - f0
while remaining > 0:
frames = f.read(min(32768, remaining), dtype='int16', always_2d=True)
if len(frames) == 0:
self.close_connection = True # under-delivered
break
self.wfile.write(frames.tobytes())
remaining -= len(frames)
def _is_active(self, filename: str) -> bool:
"""True if isr.py reports this file as currently being recorded."""
try:
with open(Path(self.recordings_dir) / 'status.json') as fh:
return filename in json.load(fh).get('active', [])
except Exception:
return False
def _copy_to_response(self, fh, length=None):
"""Stream an open binary file to the client in 64 KB chunks."""
remaining = length
while remaining is None or remaining > 0:
chunk = fh.read(65536 if remaining is None else min(65536, remaining))
if not chunk:
break
self.wfile.write(chunk)
if remaining is not None:
remaining -= len(chunk)
# Sent fewer bytes than Content-Length promised (file truncated while
# serving): the keep-alive connection is desynced, force it closed.
if remaining is not None and remaining > 0:
self.close_connection = True
def _safe_path(self, filename: str):
base = Path(self.recordings_dir).resolve()
try:
path = (base / filename).resolve()
path.relative_to(base)
except Exception:
self._send(403, b'Forbidden', 'text/plain')
return None
if not path.is_file():
self._send(404, b'Not found', 'text/plain')
return None
return path
def _send(self, code: int, body: bytes, content_type: str):
self.send_response(code)
self.send_header('Content-Type', content_type)
self.send_header('Content-Length', str(len(body)))
self.end_headers()
self.wfile.write(body)
def _json_err(self, code: int, msg: str):
self._send(code, json.dumps({'error': msg}).encode('utf-8'), 'application/json')
def log_message(self, fmt, *args):
pass
class _Server(ThreadingHTTPServer):
"""ThreadingHTTPServer that stays quiet when clients disconnect mid-stream
(browsers abort audio range requests constantly while seeking)."""
def handle_error(self, request, client_address):
import sys
exc = sys.exc_info()[1]
if isinstance(exc, (ConnectionError, TimeoutError)):
return
super().handle_error(request, client_address)
# ---------------------------------------------------------------------------
# UI page — single-page HTML/CSS/JS, loaded once at startup
# ---------------------------------------------------------------------------
_HTML = (Path(__file__).resolve().parent / 'webui.html').read_text(encoding='utf-8')
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description='ISR Web — audio archive browser')
parser.add_argument('--dir', default='recordings',
help='Recordings directory (default: recordings)')
parser.add_argument('--port', type=int, default=8080,
help='HTTP port (default: 8080)')
parser.add_argument('--host', default='0.0.0.0',
help='Bind address (default: 0.0.0.0)')
parser.add_argument('--margin', type=float, default=MARGIN_DB,
help=f'dB above the background noise floor for a section '
f'to count as loud (default: {MARGIN_DB})')
parser.add_argument('--min-gap', type=float, default=MIN_GAP_SECONDS,
help=f'Seconds gap for merging loud sections (default: {MIN_GAP_SECONDS})')
parser.add_argument('--min-duration', type=float, default=MIN_DURATION_SECONDS,
help=f'Discard loud sections shorter than this many seconds '
f'(default: {MIN_DURATION_SECONDS})')
parser.add_argument('--analyses-dir', default=None,
help='Directory for analysis cache files (default: <recordings-dir>/analyses)')
args = parser.parse_args()
rec_dir = Path(args.dir).resolve()
analyses_dir = Path(args.analyses_dir).resolve() if args.analyses_dir else rec_dir / 'analyses'
if not rec_dir.exists():
print(f"Warning: recordings directory '{rec_dir}' does not exist yet.")
prune_orphan_analyses(analyses_dir, rec_dir)
_analyses_dir = analyses_dir # class body can't close over a name it also assigns
class Handler(_Handler):
recordings_dir = str(rec_dir)
analyses_dir = str(_analyses_dir)
margin_db = args.margin
min_gap = args.min_gap
min_duration = args.min_duration
server = _Server((args.host, args.port), Handler)
print(f"ISR Web running on http://{args.host}:{args.port}/")
print(f"Recordings dir: {rec_dir}")
print(f"Analyses dir: {analyses_dir}")
print(f"Loudness margin: {args.margin} dB above noise floor")
if not NUMPY_AVAILABLE:
print("Note: numpy not installed — WAV RMS uses pure Python (slower); FLAC analysis unavailable")
elif not SOUNDFILE_AVAILABLE:
print("Note: soundfile not installed — FLAC loudness analysis unavailable")
print("Stop with Ctrl+C\n")
try:
server.serve_forever()
except KeyboardInterrupt:
print("Stopped.")
if __name__ == '__main__':
main()