From 8254ccde86a11acc99be66356fb44f1eefaf22f6 Mon Sep 17 00:00:00 2001 From: Jonathan Schuster Date: Sun, 26 Apr 2026 10:56:55 +0200 Subject: [PATCH] Add Docker support, fix stale docs, translate UI to English - Dockerfile + docker-compose.yml: two services (recorder + web) sharing ./recordings bind mount; recorder maps /dev/snd for ALSA soundcard access - requirements.txt: requests, numpy, soundfile - .dockerignore, updated .gitignore (add __pycache__, .pytest_cache) - isr.py: add SIGTERM handler for clean Docker shutdown; fix stale error message that referenced removed PulseAudio/PipeWire/PortAudio backends - web.py: translate all German UI strings to English - config.example.ini: remove PipeWire/PulseAudio/PortAudio backend refs, simplify soundcard tips to ALSA only - README.md: full rewrite as user guide (quick start, config reference, Docker notes, how it works) - CLAUDE.md: update architecture section to reflect ALSA-only backend - Delete changelog.txt and guide.md (internal session notes) --- .dockerignore | 12 + .gitignore | 9 + CLAUDE.md | 69 +++ Dockerfile | 16 + README.md | 189 +++++++- config.example.ini | 146 +++++++ docker-compose.yml | 19 + isr.py | 1023 ++++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 3 + tests/__init__.py | 0 tests/test_isr.py | 726 +++++++++++++++++++++++++++++++ web.py | 518 ++++++++++++++++++++++ 12 files changed, 2729 insertions(+), 1 deletion(-) create mode 100644 .dockerignore create mode 100644 .gitignore create mode 100644 CLAUDE.md create mode 100644 Dockerfile create mode 100644 config.example.ini create mode 100644 docker-compose.yml create mode 100644 isr.py create mode 100644 requirements.txt create mode 100644 tests/__init__.py create mode 100644 tests/test_isr.py create mode 100644 web.py diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..0c4da0f --- /dev/null +++ b/.dockerignore @@ -0,0 +1,12 @@ +__pycache__/ +*.pyc +*.pyo +.git/ +.gitignore +.claude/ +tests/ +config.ini +recordings/ +*.log +changelog.txt +guide.md diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1d222ac --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +recordings/ +recorder.log +config.ini +__pycache__/ +*.pyc +*.pyo +.pytest_cache/ +.claude/ +nul diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..e1b6473 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,69 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +ISR is a Python audio recording application that captures from multiple simultaneous sources (Icecast/HTTP streams and ALSA soundcard devices) with time-based file splitting. All application code is in two files: `isr.py` (recorder) and `web.py` (archive browser UI). + +## Commands + +```bash +# Run the recorder +python isr.py # uses config.ini +python isr.py myconfig.ini # custom config file +python isr.py --list-devices # list available ALSA devices + +# Run the web UI +python web.py # http://localhost:8080 +python web.py --dir recordings # custom recordings directory + +# Stop: Ctrl+C (or docker compose down) + +# Install dependencies +pip install requests # for stream recording +pip install numpy soundfile # for FLAC output and web waveform analysis (optional) + +# Docker +docker compose up -d +docker compose logs -f +docker compose down +``` + +## Architecture + +### Audio Backend System +- **AudioDevice** — Dataclass: id, name, channels, sample_rate, backend type +- **AudioBackend** (ABC) — Abstract base for audio capture backends + - **ALSABackend** — Native ALSA support via `arecord` subprocess (the only backend) +- **ALSAStream** — Context manager that wraps an `arecord` subprocess and reads PCM in a thread +- **AudioSystem** — Discovers available backends, lists devices, resolves device specs + +### Recorder Classes +- **BaseRecorder** (ABC) — Common settings, `get_next_split_time()`, `generate_filename()`, `record()` interface +- **StreamRecorder(BaseRecorder)** — Records HTTP/Icecast streams with format auto-detection and OGG/FLAC header injection +- **SoundcardRecorder(BaseRecorder)** — Records from ALSA devices; outputs WAV or FLAC via `_AudioFileWriter` +- **_AudioFileWriter** — Unified write/close interface for wave (WAV) and soundfile (FLAC) +- **RecorderManager** — Loads config, creates recorders, manages threads, handles shutdown + +### Key Implementation Details +- ALSA backend spawns `arecord` as a subprocess; raw PCM is read in 100 ms chunks via a reader thread +- Device selection: `default`, `monitor` (loopback), partial name match, or exact `hw:X,Y` ID +- Thread-safe audio buffering with `threading.Lock()` +- OGG/Opus/FLAC headers captured from first ~16 KB of stream and prepended to each split file +- File splits aligned to time period boundaries (`get_next_split_time()`) +- SIGTERM handled in `main()` so Docker `docker compose down` shuts down cleanly + +## Configuration + +Copy `config.example.ini` to `config.ini`. Each section defines a recording source: +- `type = stream` — HTTP/Icecast stream recording +- `type = soundcard` — ALSA device recording + +In Docker, set `output_directory = /recordings` and `log_file = /recordings/recorder.log`. + +## Docker + +Two services share a `./recordings` bind mount: +- `recorder` — runs `isr.py`, maps `/dev/snd` for ALSA access +- `web` — runs `web.py`, read-only access to recordings, exposes port 8080 diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..9c8204f --- /dev/null +++ b/Dockerfile @@ -0,0 +1,16 @@ +FROM python:3.12-slim + +RUN apt-get update && apt-get install -y --no-install-recommends \ + alsa-utils \ + && rm -rf /var/lib/apt/lists/* + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY isr.py web.py ./ + +RUN mkdir -p /recordings + +CMD ["python", "isr.py", "/app/config.ini"] diff --git a/README.md b/README.md index ad266df..43049b5 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,189 @@ -# ISR +# ISR — Audio Recorder +> AI-generated code. Run at your own risk. MIT licence. + +Records from multiple simultaneous sources — Icecast/HTTP streams and ALSA soundcards — with time-based file splitting. + +## Features + +- Multiple sources recorded in parallel (each in its own thread) +- **Stream recording** — HTTP/Icecast, auto-detects MP3 / OGG / AAC / FLAC / Opus from `Content-Type` +- **Soundcard recording** — ALSA (`arecord`), works on any Linux / Raspberry Pi +- Time-aligned file splits (e.g. every hour, on the hour) +- OGG / Opus / FLAC header injection so every split file is independently playable +- Auto-reconnect on stream drops or device errors +- WAV and FLAC output for soundcard sources +- Web UI to browse, download, and analyse recordings + +--- + +## Quick start — bare-metal + +```bash +pip install requests # stream recording +pip install numpy soundfile # FLAC output + web waveform analysis (optional) + +cp config.example.ini config.ini +# edit config.ini to add your sources +python isr.py # start recorder (Ctrl+C to stop) +python web.py # start web UI at http://localhost:8080 +``` + +## Quick start — Docker + +```bash +cp config.example.ini config.ini +# In config.ini set: output_directory = /recordings +# Optionally set: log_file = /recordings/recorder.log + +docker compose up -d +# recorder starts immediately; web UI at http://:8080 +docker compose logs -f # tail logs from both services +docker compose down # graceful stop +``` + +Recordings land in `./recordings/` on the host (bind-mounted into both containers). + +If you only record streams (no soundcard), comment out the `devices` block in `docker-compose.yml`. + +--- + +## Configuration + +`config.ini` uses standard INI format. `[general]` provides defaults; every other section is a recording source. Source sections inherit all general settings and can override any of them. + +### `[general]` + +| Key | Default | Description | +|-----|---------|-------------| +| `output_directory` | `recordings` | Output path. Use `/recordings` in Docker. | +| `split_minutes` | `60` | Split into a new file every N minutes, aligned to clock boundaries (e.g. 60 → files start at :00, 30 → at :00 and :30). | +| `filename_pattern` | `%Y%m%d_%H%M%S` | strftime pattern; file extension is appended automatically. | +| `max_retries` | `10` | Give up after this many consecutive failures per source. | +| `retry_delay_seconds` | `5` | Wait between retries. | +| `log_level` | `INFO` | `DEBUG` / `INFO` / `WARNING` / `ERROR` / `CRITICAL` | +| `log_file` | `recorder.log` | Log file path. Use `/recordings/recorder.log` in Docker. | + +### `type = stream` + +```ini +[my_stream] +type = stream +url = http://icecast.example.com:8000/live +username = # leave blank for public streams +password = +format = auto # auto | mp3 | ogg | aac | flac | opus +``` + +`format = auto` detects from the `Content-Type` response header. For OGG/Opus/FLAC the first ~16 KB of each connection is buffered to extract codec headers, which are then prepended to every split file — all files are independently playable. + +A new file is always opened on (re)connect so gaps between connections are never silently merged. + +### `type = soundcard` + +```ini +[mic_in] +type = soundcard +device = default # see device selection below +sample_rate = 44100 +channels = 2 +format = wav # wav | flac +``` + +**Device selection:** + +| Value | Behaviour | +|-------|-----------| +| `default` | System default input | +| `monitor` | First loopback/monitor source (capture system audio) | +| `` | Case-insensitive substring match against device name | +| `hw:X,Y` | Exact ALSA hardware ID | + +Run `python isr.py --list-devices` (or `arecord -l`) to see available devices and their IDs. + +FLAC output requires `pip install soundfile numpy`. + +### Multiple sources + +Every section except `[general]` is a source — they all record simultaneously: + +```ini +[general] +output_directory = recordings +split_minutes = 60 + +[radio1] +type = stream +url = http://radio.example.com:8000/stream1 +filename_pattern = radio1_%Y%m%d_%H%M%S + +[system_audio] +type = soundcard +device = hw:0,0 +filename_pattern = system_%Y%m%d_%H%M%S +``` + +--- + +## Filename patterns + +strftime codes are substituted at split time. The file extension is added automatically. + +| Pattern | Example | +|---------|---------| +| `%Y%m%d_%H%M%S` | `20241225_143000.mp3` | +| `radio_%Y-%m-%d_%H%M` | `radio_2024-12-25_1430.mp3` | +| `%Y/%m/%d/rec_%H%M%S` | `2024/12/25/rec_143000.mp3` *(subdirs created automatically)* | + +--- + +## Web UI (`web.py`) + +```bash +python web.py # serves ./recordings on port 8080 +python web.py --dir /path/to/audio # custom recordings directory +python web.py --port 8888 # custom port +python web.py --threshold 0.03 # loudness threshold 0–1 (default 0.05) +``` + +Shows a table of all recordings sorted newest-first with file size, duration (WAV only), and a waveform analysis button. Analysis computes RMS per 100 ms window and highlights contiguous sections above the loudness threshold. + +Waveform analysis is WAV-only; numpy speeds it up significantly (pure-Python fallback available without it). + +--- + +## How it works + +**Streams:** Connect via HTTP → detect format from `Content-Type` → buffer first ~16 KB to extract OGG/FLAC codec headers → stream raw bytes to disk → at each split boundary open a new file and prepend the saved headers. No transcoding, no decoding — raw bytes in, raw bytes out. + +**Soundcard:** Spawn `arecord` as a subprocess (raw PCM output) → read 100 ms chunks via a thread → write 16-bit PCM to WAV or FLAC → split at configured boundaries. + +Both recorder types run in separate threads and retry independently up to `max_retries`. + +--- + +## Docker notes + +**ALSA device access:** The `recorder` container needs `/dev/snd` mapped. The container runs as root, so no group configuration is needed — the device mapping alone is sufficient. + +If ALSA still fails to find the device inside the container, verify the device exists on the host: +```bash +arecord -l # list capture hardware +ls -la /dev/snd # check device nodes +``` + +**Stream-only deployments:** If you don't use soundcard recording, remove the `devices` block from `docker-compose.yml` — the image works fine without it. + +**Log file in Docker:** Set `log_file = /recordings/recorder.log` in `config.ini` so logs survive container restarts. Alternatively, use `docker compose logs` (the recorder always logs to stdout as well). + +**File retention:** ISR never deletes recordings. Add a cron job on the host if needed: +```bash +# Delete recordings older than 30 days +find recordings/ -type f -mtime +30 -delete +``` + +--- + +## Licence + +MIT diff --git a/config.example.ini b/config.example.ini new file mode 100644 index 0000000..24705ca --- /dev/null +++ b/config.example.ini @@ -0,0 +1,146 @@ +# ISR - Audio Recorder Configuration +# Supports multiple recording sources: Icecast streams and soundcards +# +# Configuration sections: +# [general] - Shared settings for all sources (optional) +# [sourcename] - One section per recording source (name is your choice) +# +# Each source section must have 'type' set to either 'stream' or 'soundcard' + +# ============================================================================= +# GENERAL SETTINGS (optional - can be overridden per source) +# ============================================================================= +[general] +# Output directory for recordings (will be created if it doesn't exist) +output_directory = recordings + +# Duration in minutes after which to split into a new file +split_minutes = 60 + +# Filename pattern with strftime format codes +# Examples: +# %Y%m%d_%H%M%S -> 20241216_143000.ext +# recording_%Y-%m-%d_%H%M -> recording_2024-12-16_1430.ext +# %Y/%m/%d/audio_%H%M%S -> 2024/12/16/audio_143000.ext (creates subdirs) +# Common codes: %Y=year, %m=month, %d=day, %H=hour, %M=minute, %S=second +filename_pattern = %Y%m%d_%H%M%S + +# Maximum number of connection/recording retry attempts before giving up +max_retries = 10 + +# Delay in seconds between retry attempts +retry_delay_seconds = 5 + +# Logging level: DEBUG, INFO, WARNING, ERROR, CRITICAL +log_level = INFO + +# Log file location +log_file = recorder.log + + +# ============================================================================= +# EXAMPLE: ICECAST STREAM SOURCE +# ============================================================================= +[mystream] +# Source type (required): stream or soundcard +type = stream + +# Stream URL (required for stream type) +url = http://example.com:8000/stream + +# Authentication (optional - leave empty for public streams) +username = +password = + +# Audio format: auto (detect from stream), mp3, ogg, aac, flac, opus +format = auto + +# Override general settings for this source (optional): +# output_directory = recordings/streams +# split_minutes = 30 +# filename_pattern = mystream_%Y%m%d_%H%M%S + + +# ============================================================================= +# EXAMPLE: SOUNDCARD SOURCE (Linux) +# ============================================================================= +# Uncomment and configure to record from a soundcard +# +# [stereo_mix] +# type = soundcard +# +# # Device selection (use one of these methods): +# # device = default - Use system default input device +# # device = monitor - Auto-select first monitor/loopback source (for system audio) +# # device = - Match device by partial name (e.g., "Stereo Mix") +# # device = - Use exact device ID from --list-devices +# # +# # To list available devices, run: python isr.py --list-devices +# device = default +# +# # Audio backend (only 'alsa' is supported): +# # backend = alsa +# +# # Sample rate in Hz (common: 44100, 48000, 96000) +# sample_rate = 44100 +# +# # Number of audio channels (1 = mono, 2 = stereo) +# channels = 2 +# +# # Output format: wav, flac +# format = wav +# +# # Override general settings for this source (optional): +# # output_directory = recordings/soundcard +# # split_minutes = 60 +# # filename_pattern = soundcard_%Y%m%d_%H%M%S + + +# ============================================================================= +# MULTIPLE SOURCES EXAMPLE +# ============================================================================= +# You can define as many sources as you want. Each will record simultaneously. +# +# [radio_station_1] +# type = stream +# url = http://radio1.example.com:8000/live +# format = auto +# filename_pattern = radio1_%Y%m%d_%H%M%S +# +# [radio_station_2] +# type = stream +# url = http://radio2.example.com:8000/live +# format = auto +# filename_pattern = radio2_%Y%m%d_%H%M%S +# +# [system_audio] +# type = soundcard +# device = Stereo Mix +# sample_rate = 48000 +# channels = 2 +# format = flac +# filename_pattern = system_%Y%m%d_%H%M%S + + +# ============================================================================= +# SOUNDCARD TIPS (Linux / Raspberry Pi) +# ============================================================================= +# ISR uses ALSA (arecord) for soundcard recording. arecord is pre-installed +# on Raspberry Pi OS and most Linux distros (package: alsa-utils). +# +# Run: python isr.py --list-devices (or: arecord -l) to list devices. +# +# EASY: Use the "monitor" keyword to capture system/loopback audio: +# [system_audio] +# type = soundcard +# device = monitor +# format = wav +# +# MANUAL: Specify a device by ALSA hardware ID from --list-devices: +# [usb_mic] +# type = soundcard +# device = hw:1,0 +# backend = alsa +# sample_rate = 44100 +# channels = 2 +# format = flac diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..87e51d4 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,19 @@ +services: + recorder: + build: . + volumes: + - ./config.ini:/app/config.ini:ro + - ./recordings:/recordings + # Soundcard (ALSA) access — comment out if you only record streams + devices: + - /dev/snd:/dev/snd + restart: unless-stopped + + web: + build: . + volumes: + - ./recordings:/recordings:ro + ports: + - "8080:8080" + restart: unless-stopped + command: ["python", "web.py", "--dir", "/recordings"] diff --git a/isr.py b/isr.py new file mode 100644 index 0000000..06fb0e4 --- /dev/null +++ b/isr.py @@ -0,0 +1,1023 @@ +#!/usr/bin/env python3 +""" +ISR - Audio Recorder +Records from multiple sources: Icecast streams and soundcards. +Supports time-based file splitting and concurrent recording. +""" + +import os +import sys +import time +import wave +import struct +import signal +import logging +import threading +import configparser +import subprocess +import shutil +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from datetime import datetime, timedelta +from pathlib import Path +from typing import Optional, Dict, Any, List, Callable + +# Optional imports - check availability +try: + import requests + REQUESTS_AVAILABLE = True +except ImportError: + REQUESTS_AVAILABLE = False + +try: + import numpy as np + NUMPY_AVAILABLE = True +except ImportError: + NUMPY_AVAILABLE = False + +try: + import soundfile as sf + SOUNDFILE_AVAILABLE = True +except ImportError: + SOUNDFILE_AVAILABLE = False + + +# ============================================================================= +# Audio Device & Backend System +# ============================================================================= + +@dataclass +class AudioDevice: + """Represents an audio input device.""" + id: str # Backend-specific identifier + name: str # Human-readable name + channels: int # Max input channels + sample_rate: int # Default sample rate + backend: str # Backend name (pulseaudio, pipewire, portaudio) + is_default: bool = False # Is system default + is_monitor: bool = False # Is a monitor/loopback source + description: str = "" # Extended description + extra: Dict[str, Any] = field(default_factory=dict) + + def __str__(self): + flags = [] + if self.is_default: + flags.append("DEFAULT") + if self.is_monitor: + flags.append("MONITOR") + flag_str = f" [{', '.join(flags)}]" if flags else "" + return f"{self.name}{flag_str} ({self.backend})" + + +class AudioBackend(ABC): + """Abstract base for audio capture backends.""" + + name: str = "base" + priority: int = 0 # Higher = preferred + + @classmethod + @abstractmethod + def is_available(cls) -> bool: + """Check if this backend can be used on the current system.""" + pass + + @classmethod + @abstractmethod + def list_devices(cls) -> List[AudioDevice]: + """List all available input devices.""" + pass + + @abstractmethod + def open_stream(self, device: AudioDevice, sample_rate: int, channels: int, + callback: Callable[[bytes], None]) -> Any: + """Open an audio capture stream. Returns a context manager.""" + pass + + +class ALSABackend(AudioBackend): + """ALSA backend using arecord (raw PCM output, no sound server required).""" + + name = "alsa" + priority = 5 # Lowest priority — direct hardware access, use when no sound server runs + + @classmethod + def is_available(cls) -> bool: + return shutil.which('arecord') is not None + + @classmethod + def list_devices(cls) -> List[AudioDevice]: + devices = [] + try: + result = subprocess.run( + ['arecord', '-l'], + capture_output=True, + text=True, + timeout=5 + ) + if result.returncode != 0: + return devices + + for line in result.stdout.split('\n'): + if not line.startswith('card '): + continue + try: + card_part, rest = line.split(':', 1) + card_num = card_part.replace('card', '').strip() + + if ', device ' in rest: + dev_part = rest.split(', device ')[1] + dev_num = dev_part.split(':')[0].strip() + else: + dev_num = '0' + + long_name = rest.split('[')[1].split(']')[0] if '[' in rest else rest.strip() + hw_id = f"hw:{card_num},{dev_num}" + + devices.append(AudioDevice( + id=hw_id, + name=long_name, + channels=2, + sample_rate=44100, + backend=cls.name, + is_default=(card_num == '0' and dev_num == '0'), + is_monitor=False, + )) + except (IndexError, ValueError): + continue + except Exception: + pass + return devices + + def __init__(self, logger: logging.Logger): + self.logger = logger + + def open_stream(self, device: AudioDevice, sample_rate: int, channels: int, + callback: Callable[[bytes], None]): + return ALSAStream(device, sample_rate, channels, callback, self.logger) + + +class ALSAStream: + """Context manager for ALSA recording using arecord (raw PCM output).""" + + def __init__(self, device: AudioDevice, sample_rate: int, channels: int, + callback: Callable[[bytes], None], logger: logging.Logger): + self.device = device + self.sample_rate = sample_rate + self.channels = channels + self.callback = callback + self.logger = logger + self._running = False + self._thread = None + self._process = None + + def __enter__(self): + self._running = True + cmd = [ + 'arecord', + '-D', self.device.id, + '-f', 'S16_LE', + '-r', str(self.sample_rate), + '-c', str(self.channels), + '--file-type', 'raw', + '-', + ] + try: + self._process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + ) + except FileNotFoundError: + raise RuntimeError("arecord not found - install alsa-utils: sudo apt install alsa-utils") + + self._thread = threading.Thread(target=self._read_loop, daemon=True) + self._thread.start() + return self + + def __exit__(self, *args): + self._running = False + if self._process: + self._process.terminate() + try: + self._process.wait(timeout=2) + except subprocess.TimeoutExpired: + self._process.kill() + if self._thread: + self._thread.join(timeout=1) + + def _read_loop(self): + chunk_size = self.sample_rate * self.channels * 2 // 10 # 100 ms chunks + while self._running and self._process.poll() is None: + try: + data = self._process.stdout.read(chunk_size) + if data: + self.callback(data) + except Exception as e: + self.logger.error(f"ALSA read error: {e}") + break + + +class AudioSystem: + """Manages audio backends and device discovery.""" + + # All available backend classes + _backend_classes: List[type] = [ALSABackend] + + def __init__(self, logger: logging.Logger): + self.logger = logger + self._backends: Dict[str, AudioBackend] = {} + self._discover_backends() + + def _discover_backends(self): + """Find and initialize available backends.""" + for cls in self._backend_classes: + if cls.is_available(): + self._backends[cls.name] = cls(self.logger) + self.logger.debug(f"Audio backend available: {cls.name}") + + @property + def available_backends(self) -> List[str]: + return list(self._backends.keys()) + + def get_backend(self, name: str) -> Optional[AudioBackend]: + return self._backends.get(name) + + def get_preferred_backend(self) -> Optional[AudioBackend]: + """Get the highest priority available backend.""" + if not self._backends: + return None + return max(self._backends.values(), key=lambda b: b.__class__.priority) + + def list_all_devices(self) -> List[AudioDevice]: + """List devices from all available backends.""" + all_devices = [] + seen_names = set() + + # Get devices from backends in priority order + for cls in sorted(self._backend_classes, key=lambda c: -c.priority): + if cls.name in self._backends: + for dev in cls.list_devices(): + # Deduplicate by name (same device may appear in multiple backends) + key = dev.name.lower() + if key not in seen_names: + all_devices.append(dev) + seen_names.add(key) + + return all_devices + + def find_device(self, spec: str, preferred_backend: str = None) -> Optional[AudioDevice]: + """Find a device by ID, name, or pattern.""" + devices = self.list_all_devices() + + if not devices: + return None + + # "default" - get the default device from preferred backend + if spec.lower() == 'default': + for dev in devices: + if dev.is_default: + if preferred_backend is None or dev.backend == preferred_backend: + return dev + return devices[0] if devices else None + + # "monitor" - get first monitor source + if spec.lower() == 'monitor': + for dev in devices: + if dev.is_monitor: + if preferred_backend is None or dev.backend == preferred_backend: + return dev + return None + + spec_lower = spec.lower() + + # Try exact ID match first + for dev in devices: + if dev.id == spec: + return dev + + # Try exact name match + for dev in devices: + if dev.name.lower() == spec_lower: + return dev + + # Try partial name match + for dev in devices: + if spec_lower in dev.name.lower(): + return dev + + return None + + +class BaseRecorder(ABC): + """Abstract base class for all recorder types.""" + + def __init__(self, name: str, config: Dict[str, Any], logger: logging.Logger, + clock: Callable[[], datetime] = None): + self.name = name + self.config = config + self.logger = logger + self.running = False + self.current_file = None + self.current_filename = None + self._clock = clock or datetime.now + + # Common settings + self.split_duration = config.get('split_minutes', 60) + self.output_dir = config.get('output_directory', 'recordings') + self.filename_pattern = config.get('filename_pattern', '%Y%m%d_%H%M%S') + self.max_retries = config.get('max_retries', 10) + self.retry_delay = config.get('retry_delay_seconds', 5) + self.file_format = config.get('format', 'auto') + + def get_next_split_time(self) -> datetime: + """Calculate the next split time aligned to split_duration boundaries.""" + now = self._clock() + total_minutes = now.hour * 60 + now.minute + minutes_to_next = self.split_duration - (total_minutes % self.split_duration) + next_split = now + timedelta(minutes=minutes_to_next) + return next_split.replace(second=0, microsecond=0) + + def generate_filename(self, ext: str) -> str: + """Generate filename from pattern with strftime substitution.""" + now = self._clock() + filename = now.strftime(self.filename_pattern) + f".{ext}" + full_path = os.path.join(self.output_dir, filename) + Path(full_path).parent.mkdir(parents=True, exist_ok=True) + return full_path + + def close_current_file(self): + """Close the current recording file if open.""" + if self.current_file: + try: + self.current_file.close() + self.logger.info(f"[{self.name}] Closed file: {self.current_filename}") + except Exception as e: + self.logger.error(f"[{self.name}] Error closing file: {e}") + self.current_file = None + self.current_filename = None + + def stop(self): + """Signal the recorder to stop.""" + self.running = False + + @abstractmethod + def record(self): + """Main recording loop - must be implemented by subclasses.""" + pass + + +class StreamRecorder(BaseRecorder): + """Records from Icecast/HTTP audio streams.""" + + def __init__(self, name: str, config: Dict[str, Any], logger: logging.Logger, + clock: Callable[[], datetime] = None): + super().__init__(name, config, logger, clock) + + if not REQUESTS_AVAILABLE: + raise ImportError("The 'requests' library is required for stream recording. Install with: pip install requests") + + self.stream_url = config.get('url') + if not self.stream_url: + raise ValueError(f"[{name}] Stream URL is required for stream type") + + self.username = config.get('username') or None + self.password = config.get('password') or None + + # Stream header storage for formats that require headers in each file + self.stream_headers = None + self.header_capture_complete = False + self.detected_format = None + + def detect_format(self, response) -> str: + """Detect stream format from HTTP headers.""" + content_type = response.headers.get('Content-Type', '').lower() + + format_map = { + 'audio/mpeg': 'mp3', + 'audio/mp3': 'mp3', + 'audio/ogg': 'ogg', + 'application/ogg': 'ogg', + 'audio/aac': 'aac', + 'audio/aacp': 'aac', + 'audio/x-aac': 'aac', + 'audio/flac': 'flac', + 'audio/opus': 'opus' + } + + for mime, fmt in format_map.items(): + if mime in content_type: + return fmt + + self.logger.warning(f"[{self.name}] Unknown content type: {content_type}, defaulting to mp3") + return 'mp3' + + def parse_ogg_page(self, data: bytes, offset: int) -> Optional[tuple]: + """Parse an OGG page starting at the given offset.""" + if len(data) - offset < 27: + return None + + if data[offset:offset+4] != b'OggS': + return None + + header_type = data[offset + 5] + is_bos = (header_type & 0x02) != 0 + num_segments = data[offset + 26] + + if len(data) - offset < 27 + num_segments: + return None + + segment_table = data[offset + 27:offset + 27 + num_segments] + page_data_size = sum(segment_table) + total_page_size = 27 + num_segments + page_data_size + + if len(data) - offset < total_page_size: + return None + + page_bytes = data[offset:offset + total_page_size] + granule_pos = int.from_bytes(data[offset + 6:offset + 14], 'little') + is_header = is_bos or granule_pos == 0 + + return (page_bytes, is_header, offset + total_page_size) + + def extract_ogg_headers(self, data: bytes) -> tuple: + """Extract OGG header pages from the beginning of stream data.""" + headers = bytearray() + offset = 0 + header_count = 0 + + while offset < len(data): + result = self.parse_ogg_page(data, offset) + if result is None: + break + + page_bytes, is_header, next_offset = result + + if is_header and header_count < 3: + headers.extend(page_bytes) + header_count += 1 + offset = next_offset + else: + break + + return bytes(headers), data[offset:] + + def needs_header_per_file(self) -> bool: + """Check if the detected format requires headers in each split file.""" + return self.detected_format in ('ogg', 'opus', 'flac') + + def open_new_file(self): + """Open a new recording file.""" + self.close_current_file() + + ext = self.file_format if self.file_format != 'auto' else self.detected_format + self.current_filename = self.generate_filename(ext) + self.current_file = open(self.current_filename, 'wb') + self.logger.info(f"[{self.name}] Started recording to: {self.current_filename}") + + if self.stream_headers and self.needs_header_per_file(): + self.current_file.write(self.stream_headers) + self.logger.debug(f"[{self.name}] Wrote {len(self.stream_headers)} bytes of stream headers") + + def connect_stream(self): + """Connect to the stream.""" + auth = None + if self.username and self.password: + auth = (self.username, self.password) + + try: + self.logger.info(f"[{self.name}] Connecting to stream: {self.stream_url}") + response = requests.get( + self.stream_url, + auth=auth, + stream=True, + timeout=10 + ) + response.raise_for_status() + + if self.file_format == 'auto': + self.detected_format = self.detect_format(response) + self.logger.info(f"[{self.name}] Detected format: {self.detected_format}") + else: + self.detected_format = self.file_format + self.logger.info(f"[{self.name}] Using manual format: {self.file_format}") + + return response + + except requests.exceptions.RequestException as e: + self.logger.error(f"[{self.name}] Connection failed: {e}") + return None + + def record(self): + """Main recording loop for streams.""" + self.running = True + retry_count = 0 + next_split_time = self.get_next_split_time() + + self.logger.info(f"[{self.name}] Starting stream recorder - Split every {self.split_duration} minutes") + self.logger.info(f"[{self.name}] Next split at: {next_split_time.strftime('%Y-%m-%d %H:%M:%S')}") + + while self.running: + response = self.connect_stream() + + if response is None: + retry_count += 1 + if retry_count >= self.max_retries: + self.logger.error(f"[{self.name}] Max retries ({self.max_retries}) reached. Stopping.") + break + + self.logger.warning(f"[{self.name}] Retry {retry_count}/{self.max_retries} in {self.retry_delay} seconds...") + time.sleep(self.retry_delay) + continue + + retry_count = 0 + self.header_capture_complete = False + self.stream_headers = None + header_buffer = bytearray() + + # Always open a new file on (re)connect — a reconnect means there is + # a gap in the stream. For OGG/FLAC this is mandatory (header pages + # must appear at the start of each file); for MP3/AAC it avoids + # writing audio from two separate connections into the same file. + self.open_new_file() + + try: + for chunk in response.iter_content(chunk_size=8192): + if not self.running: + break + + if chunk: + if self.needs_header_per_file() and not self.header_capture_complete: + # Buffer data until we have enough to extract OGG/FLAC headers. + # The chunk must NOT also be written directly — it is already in + # header_buffer and will be flushed once headers are captured. + header_buffer.extend(chunk) + + if len(header_buffer) >= 16384: + self.stream_headers, _ = self.extract_ogg_headers(bytes(header_buffer)) + if self.stream_headers: + self.logger.info(f"[{self.name}] Captured {len(self.stream_headers)} bytes of stream headers") + self.header_capture_complete = True + self.current_file.write(bytes(header_buffer)) + self.current_file.flush() + header_buffer.clear() + # Chunk is in the buffer; do not fall through to the write below. + continue + + self.current_file.write(chunk) + self.current_file.flush() + + if self._clock() >= next_split_time: + self.open_new_file() + next_split_time = self.get_next_split_time() + self.logger.info(f"[{self.name}] Next split at: {next_split_time.strftime('%Y-%m-%d %H:%M:%S')}") + + except requests.exceptions.RequestException as e: + self.logger.error(f"[{self.name}] Stream interrupted: {e}") + if self.running: + self.logger.info(f"[{self.name}] Attempting to reconnect...") + time.sleep(self.retry_delay) + continue + + self.close_current_file() + self.logger.info(f"[{self.name}] Stream recorder stopped") + + +class _AudioFileWriter: + """Unified writer for WAV and FLAC output files. + + Wraps ``wave.Wave_write`` (WAV) and ``soundfile.SoundFile`` (FLAC) behind a + common interface so the rest of the recorder does not need to branch on format. + """ + + def __init__(self, path: str, channels: int, sample_rate: int, fmt: str): + self.path = path + self.channels = channels + self.sample_rate = sample_rate + self.fmt = fmt + self._file = None + + if fmt == 'flac': + if not NUMPY_AVAILABLE: + raise ImportError( + "numpy is required for FLAC output: pip install numpy" + ) + if not SOUNDFILE_AVAILABLE: + raise ImportError( + "soundfile is required for FLAC output: pip install soundfile" + ) + self._file = sf.SoundFile( + path, mode='w', + samplerate=sample_rate, + channels=channels, + subtype='PCM_16', + format='FLAC', + ) + else: + self._file = wave.open(path, 'wb') + self._file.setnchannels(channels) + self._file.setsampwidth(2) # 16-bit + self._file.setframerate(sample_rate) + + def write(self, data: bytes) -> None: + """Write raw int16 PCM bytes to the output file.""" + if self.fmt == 'flac': + arr = np.frombuffer(data, dtype=' 1: + arr = arr.reshape(-1, self.channels) + self._file.write(arr) + else: + self._file.writeframes(data) + + def close(self) -> None: + if self._file is not None: + self._file.close() + self._file = None + + +class SoundcardRecorder(BaseRecorder): + """Records from soundcard/audio devices using multiple backends.""" + + def __init__(self, name: str, config: Dict[str, Any], logger: logging.Logger, + audio_system: 'AudioSystem' = None, + clock: Callable[[], datetime] = None): + super().__init__(name, config, logger, clock) + + # Initialize audio system if not provided + self.audio_system = audio_system or AudioSystem(logger) + + if not self.audio_system.available_backends: + raise RuntimeError( + "No audio backends available.\n" + "Install ALSA utilities: sudo apt install alsa-utils\n" + "(In Docker, also ensure /dev/snd is mapped into the container)" + ) + + # Audio settings + self.device_spec = config.get('device', 'default') + self.sample_rate = config.get('sample_rate', 44100) + self.channels = config.get('channels', 2) + self.preferred_backend = config.get('backend', None) + + # Validate format + if self.file_format not in ('wav', 'flac') and self.file_format != 'auto': + self.logger.warning(f"[{self.name}] Format '{self.file_format}' not supported, using 'wav'") + self.file_format = 'wav' + elif self.file_format == 'auto': + self.file_format = 'wav' + + # Resolve device + self.device = self._resolve_device() + + # Audio buffer + self.audio_buffer = [] + self.buffer_lock = threading.Lock() + + def _resolve_device(self) -> AudioDevice: + """Resolve device specification to AudioDevice.""" + device = self.audio_system.find_device(self.device_spec, self.preferred_backend) + + if device is None: + # List available devices in error + available = self.audio_system.list_all_devices() + device_list = "\n ".join(str(d) for d in available) if available else "None found" + raise ValueError( + f"[{self.name}] Device '{self.device_spec}' not found.\n" + f"Available devices:\n {device_list}" + ) + + self.logger.info(f"[{self.name}] Using device: {device}") + return device + + def _audio_callback(self, data: bytes): + """Callback for audio data from backend.""" + with self.buffer_lock: + self.audio_buffer.append(data) + + def _open_output_file(self): + """Open a new WAV or FLAC output file for recording.""" + self._flush_buffer_to_file() + self.close_current_file() + + self.current_filename = self.generate_filename(self.file_format) + self.current_file = _AudioFileWriter( + self.current_filename, self.channels, self.sample_rate, self.file_format + ) + self.logger.info(f"[{self.name}] Recording to: {self.current_filename}") + + def _flush_buffer_to_file(self): + """Write buffered audio data to file.""" + if self.current_file is None: + return + + with self.buffer_lock: + if self.audio_buffer: + for data in self.audio_buffer: + self.current_file.write(data) + self.audio_buffer.clear() + + def close_current_file(self): + """Close current recording file.""" + self._flush_buffer_to_file() + if self.current_file: + try: + self.current_file.close() + self.logger.info(f"[{self.name}] Closed: {self.current_filename}") + except Exception as e: + self.logger.error(f"[{self.name}] Error closing file: {e}") + self.current_file = None + self.current_filename = None + + def record(self): + """Main recording loop.""" + self.running = True + retry_count = 0 + next_split = self.get_next_split_time() + + backend = self.audio_system.get_backend(self.device.backend) + if not backend: + self.logger.error(f"[{self.name}] Backend '{self.device.backend}' not available") + return + + self.logger.info(f"[{self.name}] Starting recorder - Split every {self.split_duration} min") + self.logger.info(f"[{self.name}] Backend: {self.device.backend}, Device: {self.device.name}") + self.logger.info(f"[{self.name}] Sample rate: {self.sample_rate}, Channels: {self.channels}") + self.logger.info(f"[{self.name}] Next split: {next_split.strftime('%H:%M:%S')}") + + while self.running: + try: + if self.current_file is None: + self._open_output_file() + + with backend.open_stream( + self.device, self.sample_rate, self.channels, self._audio_callback + ): + self.logger.info(f"[{self.name}] Recording started") + retry_count = 0 + + while self.running: + time.sleep(0.5) + self._flush_buffer_to_file() + + if self._clock() >= next_split: + self._open_output_file() + next_split = self.get_next_split_time() + self.logger.info(f"[{self.name}] Next split: {next_split.strftime('%H:%M:%S')}") + + except Exception as e: + self.logger.error(f"[{self.name}] Error: {e}") + retry_count += 1 + + if retry_count >= self.max_retries: + self.logger.error(f"[{self.name}] Max retries reached. Stopping.") + break + + self.logger.warning(f"[{self.name}] Retry {retry_count}/{self.max_retries} in {self.retry_delay}s...") + time.sleep(self.retry_delay) + + self.close_current_file() + self.logger.info(f"[{self.name}] Recorder stopped") + + +class RecorderManager: + """Manages multiple recorders running concurrently.""" + + def __init__(self, config_file: str = 'config.ini'): + self.config_file = config_file + self.recorders: List[BaseRecorder] = [] + self.threads: List[threading.Thread] = [] + self.logger = None + self.audio_system = None + + self._load_config() + + def _load_config(self): + """Load and parse the configuration file.""" + if not os.path.exists(self.config_file): + print(f"Error: Configuration file '{self.config_file}' not found!") + print("Usage: python isr.py [config.ini]") + print(" python isr.py --list-devices") + sys.exit(1) + + config = configparser.ConfigParser(interpolation=None) + config.read(self.config_file) + + # Get general settings (defaults) + general = { + 'output_directory': config.get('general', 'output_directory', fallback='recordings'), + 'split_minutes': config.getint('general', 'split_minutes', fallback=60), + 'filename_pattern': config.get('general', 'filename_pattern', fallback='%Y%m%d_%H%M%S', raw=True), + 'max_retries': config.getint('general', 'max_retries', fallback=10), + 'retry_delay_seconds': config.getint('general', 'retry_delay_seconds', fallback=5), + 'log_level': config.get('general', 'log_level', fallback='INFO').upper(), + 'log_file': config.get('general', 'log_file', fallback='recorder.log'), + } + + # Setup logging + self._setup_logging(general['log_level'], general['log_file']) + + # Initialize shared audio system for soundcard recorders + self.audio_system = AudioSystem(self.logger) + if self.audio_system.available_backends: + self.logger.info(f"Audio backends: {', '.join(self.audio_system.available_backends)}") + + # Parse source sections + for section in config.sections(): + if section == 'general': + continue + + source_type = config.get(section, 'type', fallback=None) + if source_type is None: + self.logger.warning(f"Section [{section}] has no 'type', skipping") + continue + + # Build source config by merging general with section-specific + source_config = general.copy() + + for key, value in config.items(section): + if key == 'type': + continue + if key in ('split_minutes', 'max_retries', 'retry_delay_seconds', 'sample_rate', 'channels'): + source_config[key] = int(value) + else: + source_config[key] = value + + # Create recorder + try: + if source_type == 'stream': + recorder = StreamRecorder(section, source_config, self.logger) + self.recorders.append(recorder) + self.logger.info(f"Configured stream: [{section}]") + elif source_type == 'soundcard': + recorder = SoundcardRecorder(section, source_config, self.logger, self.audio_system) + self.recorders.append(recorder) + self.logger.info(f"Configured soundcard: [{section}]") + else: + self.logger.warning(f"Unknown type '{source_type}' in [{section}], skipping") + except Exception as e: + self.logger.error(f"Failed to configure [{section}]: {e}") + + if not self.recorders: + self.logger.error("No valid recording sources configured!") + + def _setup_logging(self, level: str, log_file: str): + """Setup logging configuration.""" + numeric_level = getattr(logging, level, logging.INFO) + + # Clear any existing handlers + root_logger = logging.getLogger() + root_logger.handlers.clear() + + logging.basicConfig( + level=numeric_level, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler(log_file), + logging.StreamHandler(sys.stdout) + ] + ) + self.logger = logging.getLogger(__name__) + + def start(self): + """Start all recorders in separate threads.""" + if not self.recorders: + self.logger.error("No recorders to start!") + return + + self.logger.info(f"Starting {len(self.recorders)} recorder(s)...") + + for recorder in self.recorders: + thread = threading.Thread(target=recorder.record, name=f"Recorder-{recorder.name}") + thread.daemon = True + self.threads.append(thread) + thread.start() + + self.logger.info("All recorders started") + + # Wait for interrupt + try: + while True: + # Check if all threads are still alive + alive = [t for t in self.threads if t.is_alive()] + if not alive: + self.logger.info("All recorders have stopped") + break + time.sleep(1) + except KeyboardInterrupt: + self.logger.info("Received interrupt signal, stopping all recorders...") + self.stop() + + def stop(self): + """Stop all recorders gracefully.""" + for recorder in self.recorders: + recorder.stop() + + # Wait for threads to finish + for thread in self.threads: + thread.join(timeout=5) + + self.logger.info("All recorders stopped") + + +def list_audio_devices(): + """List available audio input devices from all backends.""" + # Create a minimal logger for device listing + logger = logging.getLogger('isr-devices') + logger.setLevel(logging.WARNING) # Suppress debug output + + print("\n" + "=" * 70) + print(" ISR Audio Device Discovery") + print("=" * 70) + + # Check available backends + available_backends = [] + if ALSABackend.is_available(): + available_backends.append(('alsa', 'ALSA (arecord)', 5)) + + if not available_backends: + print("\n No audio backends available!") + print("\n Install one of:") + print(" sudo apt install alsa-utils (ALSA, always available on Linux)") + print() + return + + print("\n Available Backends:") + for name, label, priority in sorted(available_backends, key=lambda x: -x[2]): + marker = " (preferred)" if priority == max(b[2] for b in available_backends) else "" + print(f" - {label}{marker}") + + # Initialize audio system and list devices + audio_system = AudioSystem(logger) + devices = audio_system.list_all_devices() + + if not devices: + print("\n No input devices found!") + print() + return + + # Group by type + monitors = [d for d in devices if d.is_monitor] + inputs = [d for d in devices if not d.is_monitor] + + if inputs: + print("\n Input Devices:") + print(" " + "-" * 68) + for dev in inputs: + flags = [] + if dev.is_default: + flags.append("DEFAULT") + flag_str = f" [{', '.join(flags)}]" if flags else "" + print(f"\n {dev.name}{flag_str}") + print(f" ID: {dev.id} | Backend: {dev.backend}") + print(f" Channels: {dev.channels} | Sample Rate: {dev.sample_rate} Hz") + + if monitors: + print("\n Monitor/Loopback Sources:") + print(" " + "-" * 68) + for dev in monitors: + flags = ["MONITOR"] + if dev.is_default: + flags.append("DEFAULT") + flag_str = f" [{', '.join(flags)}]" + print(f"\n {dev.name}{flag_str}") + print(f" ID: {dev.id} | Backend: {dev.backend}") + print(f" Channels: {dev.channels} | Sample Rate: {dev.sample_rate} Hz") + + print("\n" + "=" * 70) + print(" Configuration Examples:") + print("-" * 70) + print(" device = default # Use system default input") + print(" device = monitor # Use first monitor/loopback source") + print(" device = # Match by partial name") + print(" device = # Use exact backend ID") + print(" backend = pipewire # Force specific backend") + print("=" * 70 + "\n") + + +def main(): + # Check for --list-devices flag + if len(sys.argv) > 1 and sys.argv[1] in ('--list-devices', '-l'): + list_audio_devices() + return + + # Get config file + config_file = 'config.ini' + if len(sys.argv) > 1: + config_file = sys.argv[1] + + if not os.path.exists(config_file): + print(f"Error: Configuration file '{config_file}' not found!") + print("Usage: python ISR.py [config.ini]") + print(" python ISR.py --list-devices") + sys.exit(1) + + # Docker sends SIGTERM before SIGKILL — treat it the same as Ctrl+C + def _sigterm(sig, frame): + raise KeyboardInterrupt() + signal.signal(signal.SIGTERM, _sigterm) + + manager = RecorderManager(config_file) + manager.start() + + +if __name__ == '__main__': + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..da290a7 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +requests +numpy +soundfile diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_isr.py b/tests/test_isr.py new file mode 100644 index 0000000..95a9e91 --- /dev/null +++ b/tests/test_isr.py @@ -0,0 +1,726 @@ +""" +Tests for isr.py + +Run with: pytest tests/ +""" + +import io +import logging +import struct +import wave +from datetime import datetime +from pathlib import Path +from types import SimpleNamespace +from typing import List +from unittest.mock import MagicMock, patch, call + +import pytest + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def make_logger(): + logger = logging.getLogger("test") + logger.setLevel(logging.CRITICAL) # suppress noise during tests + return logger + + +def fixed_clock(dt: datetime): + """Return a zero-argument callable that always returns *dt*.""" + return lambda: dt + + +# --------------------------------------------------------------------------- +# Import the module under test +# --------------------------------------------------------------------------- + +import sys +import os +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +import isr # noqa: E402 (must come after sys.path manipulation) + + +# =========================================================================== +# AudioDevice / AudioSystem +# =========================================================================== + +class TestAudioSystemFindDevice: + """Tests for AudioSystem.find_device().""" + + def _make_system(self, devices: List[isr.AudioDevice]) -> isr.AudioSystem: + system = isr.AudioSystem.__new__(isr.AudioSystem) + system.logger = make_logger() + system._backends = {} + system.list_all_devices = lambda: devices + return system + + def _dev(self, id_, name, *, is_default=False, is_monitor=False, backend="portaudio"): + return isr.AudioDevice( + id=id_, name=name, channels=2, sample_rate=44100, + backend=backend, is_default=is_default, is_monitor=is_monitor, + ) + + def test_default_returns_default_device(self): + devices = [ + self._dev("0", "Microphone"), + self._dev("1", "Stereo Mix", is_default=True), + ] + system = self._make_system(devices) + result = system.find_device("default") + assert result.id == "1" + + def test_default_falls_back_to_first_when_none_marked(self): + devices = [self._dev("0", "Mic"), self._dev("1", "Line In")] + system = self._make_system(devices) + result = system.find_device("default") + assert result.id == "0" + + def test_monitor_returns_monitor_device(self): + devices = [ + self._dev("0", "Microphone"), + self._dev("1", "Monitor of Built-in Audio", is_monitor=True), + ] + system = self._make_system(devices) + result = system.find_device("monitor") + assert result.id == "1" + + def test_monitor_returns_none_when_no_monitor(self): + devices = [self._dev("0", "Mic")] + system = self._make_system(devices) + assert system.find_device("monitor") is None + + def test_exact_id_match(self): + devices = [self._dev("42", "Device 42"), self._dev("7", "Device 7")] + system = self._make_system(devices) + assert system.find_device("42").name == "Device 42" + + def test_exact_name_match_case_insensitive(self): + devices = [self._dev("0", "Stereo Mix")] + system = self._make_system(devices) + assert system.find_device("stereo mix").id == "0" + + def test_partial_name_match(self): + devices = [self._dev("0", "Realtek Stereo Mix")] + system = self._make_system(devices) + assert system.find_device("stereo").id == "0" + + def test_returns_none_for_unknown_spec(self): + devices = [self._dev("0", "Mic")] + system = self._make_system(devices) + assert system.find_device("nonexistent_device_xyz") is None + + def test_preferred_backend_filter_on_default(self): + devices = [ + self._dev("0", "Mic", is_default=True, backend="pulseaudio"), + self._dev("1", "Mic", is_default=True, backend="portaudio"), + ] + system = self._make_system(devices) + result = system.find_device("default", preferred_backend="portaudio") + assert result.backend == "portaudio" + + +# =========================================================================== +# BaseRecorder helpers +# =========================================================================== + +class TestGetNextSplitTime: + """Tests for BaseRecorder.get_next_split_time().""" + + def _recorder(self, split_minutes: int, now: datetime) -> isr.BaseRecorder: + """Create a concrete BaseRecorder subclass with an injected clock.""" + class _Rec(isr.BaseRecorder): + def record(self): pass + + cfg = {"split_minutes": split_minutes, "output_directory": "/tmp/isr_test"} + r = _Rec.__new__(_Rec) + r.name = "test" + r.config = cfg + r.logger = make_logger() + r.running = False + r.current_file = None + r.current_filename = None + r._clock = fixed_clock(now) + r.split_duration = split_minutes + r.output_dir = cfg["output_directory"] + r.filename_pattern = "%Y%m%d_%H%M%S" + r.max_retries = 3 + r.retry_delay = 1 + r.file_format = "auto" + return r + + def test_30min_splits_at_next_half_hour(self): + now = datetime(2024, 1, 1, 14, 10, 30) + r = self._recorder(30, now) + nxt = r.get_next_split_time() + assert nxt == datetime(2024, 1, 1, 14, 30, 0) + + def test_60min_splits_at_next_hour(self): + now = datetime(2024, 1, 1, 14, 45, 0) + r = self._recorder(60, now) + nxt = r.get_next_split_time() + assert nxt == datetime(2024, 1, 1, 15, 0, 0) + + def test_exactly_on_boundary_schedules_next_period(self): + # At exactly 14:00 with 60-min splits → next split is 15:00 + now = datetime(2024, 1, 1, 14, 0, 0) + r = self._recorder(60, now) + nxt = r.get_next_split_time() + assert nxt == datetime(2024, 1, 1, 15, 0, 0) + + def test_120min_split_crosses_hour_boundary(self): + # 15:30 with 120-min splits → boundaries at 00:00, 02:00, …, 16:00 + now = datetime(2024, 1, 1, 15, 30, 0) + r = self._recorder(120, now) + nxt = r.get_next_split_time() + assert nxt == datetime(2024, 1, 1, 16, 0, 0) + + def test_seconds_are_zeroed(self): + now = datetime(2024, 1, 1, 12, 5, 45) + r = self._recorder(30, now) + assert r.get_next_split_time().second == 0 + assert r.get_next_split_time().microsecond == 0 + + +class TestGenerateFilename: + """Tests for BaseRecorder.generate_filename().""" + + def _recorder(self, pattern: str, now: datetime, output_dir: str) -> isr.BaseRecorder: + class _Rec(isr.BaseRecorder): + def record(self): pass + + r = _Rec.__new__(_Rec) + r.name = "test" + r.config = {} + r.logger = make_logger() + r.running = False + r.current_file = None + r.current_filename = None + r._clock = fixed_clock(now) + r.split_duration = 60 + r.output_dir = output_dir + r.filename_pattern = pattern + r.max_retries = 3 + r.retry_delay = 1 + r.file_format = "auto" + return r + + def test_basic_pattern(self, tmp_path): + now = datetime(2024, 12, 25, 14, 30, 0) + r = self._recorder("%Y%m%d_%H%M%S", now, str(tmp_path)) + name = r.generate_filename("mp3") + assert name.endswith("20241225_143000.mp3") + + def test_subdirectory_created(self, tmp_path): + now = datetime(2024, 12, 25, 14, 30, 0) + r = self._recorder("%Y/%m/%d/rec_%H%M%S", now, str(tmp_path)) + name = r.generate_filename("ogg") + parent = Path(name).parent + assert parent.exists() + + def test_output_dir_prefix(self, tmp_path): + now = datetime(2024, 1, 1, 0, 0, 0) + r = self._recorder("%Y%m%d", now, str(tmp_path)) + name = r.generate_filename("wav") + assert name.startswith(str(tmp_path)) + + +# =========================================================================== +# StreamRecorder — format detection +# =========================================================================== + +class TestDetectFormat: + """Tests for StreamRecorder.detect_format().""" + + def _recorder(self) -> isr.StreamRecorder: + cfg = { + "url": "http://example.com/stream", + "output_directory": "/tmp", + "split_minutes": 60, + "filename_pattern": "%Y%m%d_%H%M%S", + "max_retries": 1, + "retry_delay_seconds": 0, + "format": "auto", + } + with patch.object(isr, "REQUESTS_AVAILABLE", True): + r = isr.StreamRecorder.__new__(isr.StreamRecorder) + r.name = "test" + r.config = cfg + r.logger = make_logger() + r.running = False + r.current_file = None + r.current_filename = None + r._clock = datetime.now + r.split_duration = 60 + r.output_dir = "/tmp" + r.filename_pattern = "%Y%m%d_%H%M%S" + r.max_retries = 1 + r.retry_delay = 0 + r.file_format = "auto" + r.stream_url = "http://example.com/stream" + r.username = None + r.password = None + r.stream_headers = None + r.header_capture_complete = False + r.detected_format = None + return r + + def _response(self, content_type: str) -> MagicMock: + resp = MagicMock() + resp.headers = {"Content-Type": content_type} + return resp + + @pytest.mark.parametrize("ct,expected", [ + ("audio/mpeg", "mp3"), + ("audio/mp3", "mp3"), + ("audio/ogg", "ogg"), + ("application/ogg", "ogg"), + ("audio/aac", "aac"), + ("audio/aacp", "aac"), + ("audio/flac", "flac"), + ("audio/opus", "opus"), + ]) + def test_known_content_types(self, ct, expected): + r = self._recorder() + assert r.detect_format(self._response(ct)) == expected + + def test_unknown_content_type_defaults_to_mp3(self): + r = self._recorder() + assert r.detect_format(self._response("application/octet-stream")) == "mp3" + + def test_needs_header_per_file_ogg(self): + r = self._recorder() + r.detected_format = "ogg" + assert r.needs_header_per_file() is True + + def test_needs_header_per_file_mp3(self): + r = self._recorder() + r.detected_format = "mp3" + assert r.needs_header_per_file() is False + + +# =========================================================================== +# StreamRecorder — OGG page parsing +# =========================================================================== + +def _build_ogg_page(granule_pos: int = 0, is_bos: bool = False, + payload: bytes = b"\x00" * 10) -> bytes: + """Build a minimal valid OGG page for testing.""" + header_type = 0x02 if is_bos else 0x00 + # Single segment containing the entire payload + num_segments = 1 + segment_table = bytes([len(payload)]) + # Granule position (8 bytes little-endian) + granule_bytes = struct.pack(" isr.StreamRecorder: + r = isr.StreamRecorder.__new__(isr.StreamRecorder) + r.name = "test" + r.logger = make_logger() + return r + + def test_parses_valid_page(self): + page = _build_ogg_page(granule_pos=0, is_bos=True, payload=b"X" * 10) + r = self._recorder() + result = r.parse_ogg_page(page, 0) + assert result is not None + page_bytes, is_header, next_offset = result + assert page_bytes == page + assert next_offset == len(page) + + def test_returns_none_for_non_ogg_data(self): + r = self._recorder() + assert r.parse_ogg_page(b"ID3\x00" + b"\x00" * 50, 0) is None + + def test_returns_none_when_data_too_short(self): + r = self._recorder() + assert r.parse_ogg_page(b"OggS\x00\x02", 0) is None + + def test_bos_flag_detected(self): + page = _build_ogg_page(granule_pos=0, is_bos=True, payload=b"\x01" * 5) + r = self._recorder() + _, is_header, _ = r.parse_ogg_page(page, 0) + assert is_header is True + + def test_non_zero_granule_is_not_header(self): + page = _build_ogg_page(granule_pos=1000, is_bos=False, payload=b"\x00" * 5) + r = self._recorder() + _, is_header, _ = r.parse_ogg_page(page, 0) + assert is_header is False + + +class TestExtractOggHeaders: + def _recorder(self) -> isr.StreamRecorder: + r = isr.StreamRecorder.__new__(isr.StreamRecorder) + r.name = "test" + r.logger = make_logger() + return r + + def test_extracts_bos_pages(self): + header1 = _build_ogg_page(granule_pos=0, is_bos=True, payload=b"H1" * 5) + header2 = _build_ogg_page(granule_pos=0, is_bos=False, payload=b"H2" * 5) + audio = _build_ogg_page(granule_pos=500, is_bos=False, payload=b"audio" * 3) + data = header1 + header2 + audio + r = self._recorder() + headers, remaining = r.extract_ogg_headers(data) + # Both header pages (granule_pos == 0) should be captured + assert header1 in headers + assert header2 in headers + + def test_non_ogg_data_returns_empty_headers(self): + r = self._recorder() + headers, remaining = r.extract_ogg_headers(b"ID3\x00garbage") + assert headers == b"" + + def test_remaining_data_preserved(self): + header = _build_ogg_page(granule_pos=0, is_bos=True, payload=b"hdr") + audio = _build_ogg_page(granule_pos=100, payload=b"aud") + r = self._recorder() + headers, remaining = r.extract_ogg_headers(header + audio) + assert len(headers) > 0 + # The audio page should be in remaining + assert audio in remaining or len(remaining) >= len(audio) - 5 + + +# =========================================================================== +# _AudioFileWriter +# =========================================================================== + +class TestAudioFileWriter: + def test_wav_writes_valid_file(self, tmp_path): + path = str(tmp_path / "out.wav") + writer = isr._AudioFileWriter(path, channels=2, sample_rate=44100, fmt="wav") + # 1 frame of stereo int16 silence + writer.write(b"\x00" * 4) + writer.close() + + with wave.open(path, "rb") as wf: + assert wf.getnchannels() == 2 + assert wf.getsampwidth() == 2 + assert wf.getframerate() == 44100 + + def test_flac_raises_when_soundfile_unavailable(self, tmp_path): + path = str(tmp_path / "out.flac") + with patch.object(isr, "SOUNDFILE_AVAILABLE", False): + with pytest.raises(ImportError, match="soundfile"): + isr._AudioFileWriter(path, channels=2, sample_rate=44100, fmt="flac") + + @pytest.mark.skipif(not isr.SOUNDFILE_AVAILABLE, reason="soundfile not installed") + def test_flac_writes_valid_file(self, tmp_path): + import soundfile as sf + path = str(tmp_path / "out.flac") + writer = isr._AudioFileWriter(path, channels=2, sample_rate=44100, fmt="flac") + # 100ms of stereo silence at 44100 Hz → 44100 * 0.1 * 2 channels * 2 bytes = 17640 bytes + writer.write(b"\x00" * 17640) + writer.close() + info = sf.info(path) + assert info.channels == 2 + assert info.samplerate == 44100 + + +# =========================================================================== +# RecorderManager — config loading +# =========================================================================== + +class TestRecorderManagerLoadConfig: + """Verify that config.ini settings are correctly parsed and applied.""" + + def _minimal_config(self, tmp_path) -> str: + log_file = str(tmp_path / "test.log") + return f""" +[general] +output_directory = {str(tmp_path / "recordings")} +split_minutes = 30 +filename_pattern = test_%Y%m%d +max_retries = 3 +retry_delay_seconds = 2 +log_level = WARNING +log_file = {log_file} + +[my_stream] +type = stream +url = http://stream.example.com:8000/live +format = ogg +""" + + def test_stream_recorder_created(self, tmp_path): + cfg_path = tmp_path / "config.ini" + cfg_path.write_text(self._minimal_config(tmp_path)) + + # Patch AudioSystem so it doesn't probe real hardware + with patch.object(isr, "AudioSystem") as mock_as: + mock_as.return_value.available_backends = [] + mgr = isr.RecorderManager(str(cfg_path)) + + assert len(mgr.recorders) == 1 + rec = mgr.recorders[0] + assert isinstance(rec, isr.StreamRecorder) + assert rec.name == "my_stream" + assert rec.stream_url == "http://stream.example.com:8000/live" + + def test_general_settings_inherited_by_source(self, tmp_path): + cfg_path = tmp_path / "config.ini" + cfg_path.write_text(self._minimal_config(tmp_path)) + + with patch.object(isr, "AudioSystem") as mock_as: + mock_as.return_value.available_backends = [] + mgr = isr.RecorderManager(str(cfg_path)) + + rec = mgr.recorders[0] + assert rec.split_duration == 30 + assert rec.max_retries == 3 + + def test_source_overrides_general(self, tmp_path): + log_file = str(tmp_path / "test.log") + config_text = f""" +[general] +output_directory = {str(tmp_path / "recordings")} +split_minutes = 60 +filename_pattern = %Y%m%d_%H%M%S +max_retries = 10 +retry_delay_seconds = 5 +log_level = WARNING +log_file = {log_file} + +[overriding_stream] +type = stream +url = http://example.com/stream +split_minutes = 15 +filename_pattern = custom_%Y%m%d +""" + cfg_path = tmp_path / "config.ini" + cfg_path.write_text(config_text) + + with patch.object(isr, "AudioSystem") as mock_as: + mock_as.return_value.available_backends = [] + mgr = isr.RecorderManager(str(cfg_path)) + + rec = mgr.recorders[0] + assert rec.split_duration == 15 + assert rec.filename_pattern == "custom_%Y%m%d" + + def test_unknown_type_is_skipped(self, tmp_path): + log_file = str(tmp_path / "test.log") + config_text = f""" +[general] +log_level = WARNING +log_file = {log_file} + +[bad_source] +type = unknown_type +url = http://x.com/s + +[good_source] +type = stream +url = http://x.com/s2 +""" + cfg_path = tmp_path / "config.ini" + cfg_path.write_text(config_text) + + with patch.object(isr, "AudioSystem") as mock_as: + mock_as.return_value.available_backends = [] + mgr = isr.RecorderManager(str(cfg_path)) + + assert len(mgr.recorders) == 1 + assert mgr.recorders[0].name == "good_source" + + def test_missing_config_file_exits(self, tmp_path): + with pytest.raises(SystemExit): + # main() calls sys.exit(1) when the file is missing + with patch("sys.argv", ["isr.py", str(tmp_path / "nonexistent.ini")]): + isr.main() + + +# =========================================================================== +# StreamRecorder — record() integration (mocked HTTP) +# =========================================================================== + +class TestStreamRecorderRecord: + """Integration-level tests with a mocked requests session.""" + + def _recorder(self, fmt="mp3", clock=None) -> isr.StreamRecorder: + cfg = { + "url": "http://example.com/stream", + "output_directory": "", # overridden per-test with tmp_path + "split_minutes": 60, + "filename_pattern": "%Y%m%d_%H%M%S", + "max_retries": 2, + "retry_delay_seconds": 0, + "format": fmt, + } + with patch.object(isr, "REQUESTS_AVAILABLE", True): + r = isr.StreamRecorder( + "test_stream", cfg, make_logger(), clock=clock or datetime.now + ) + return r + + def test_mp3_chunks_written_to_file(self, tmp_path): + chunks = [b"A" * 512, b"B" * 512, b"C" * 512] + rec = self._recorder(fmt="mp3") + rec.output_dir = str(tmp_path) + + mock_resp = MagicMock() + mock_resp.headers = {"Content-Type": "audio/mpeg"} + mock_resp.iter_content.return_value = iter(chunks) + + def _stop_after_connect(*args, **kwargs): + rec.running = True + return mock_resp + + with patch.object(rec, "connect_stream", side_effect=[mock_resp, None]): + # connect_stream returns the mocked response on the first call; + # we stop the loop via a side-effectful iter_content + original_iter = mock_resp.iter_content.return_value + + def _chunks_then_stop(chunk_size=8192): + for c in chunks: + yield c + rec.running = False # stop the outer while loop + + mock_resp.iter_content.side_effect = _chunks_then_stop + mock_resp.headers = {"Content-Type": "audio/mpeg"} + rec.detected_format = "mp3" + rec.running = True + rec.header_capture_complete = True + rec.stream_headers = None + + # Open a file manually so we can verify writes + filename = rec.generate_filename("mp3") + rec.current_file = open(filename, "wb") + rec.current_filename = filename + + # Simulate one inner loop iteration + for chunk in _chunks_then_stop(): + if chunk: + rec.current_file.write(chunk) + rec.close_current_file() + + written = Path(filename).read_bytes() + assert written == b"A" * 512 + b"B" * 512 + b"C" * 512 + + def test_connection_failure_retries(self, tmp_path): + rec = self._recorder() + rec.output_dir = str(tmp_path) + rec.max_retries = 2 + + with patch.object(rec, "connect_stream", return_value=None) as mock_connect: + # connect_stream always fails → recorder gives up after max_retries + rec.record() + + assert mock_connect.call_count == rec.max_retries + + +# =========================================================================== +# SoundcardRecorder — unit tests +# =========================================================================== + +class TestSoundcardRecorder: + def _make_device(self, backend="portaudio") -> isr.AudioDevice: + return isr.AudioDevice( + id="0", name="Test Device", channels=2, + sample_rate=44100, backend=backend, + ) + + def _make_audio_system(self, device: isr.AudioDevice) -> isr.AudioSystem: + system = MagicMock(spec=isr.AudioSystem) + system.available_backends = [device.backend] + system.find_device.return_value = device + return system + + def test_wav_file_written_correctly(self, tmp_path): + device = self._make_device() + audio_system = self._make_audio_system(device) + audio_system.get_backend.return_value = MagicMock() + + cfg = { + "device": "default", + "sample_rate": 44100, + "channels": 2, + "format": "wav", + "output_directory": str(tmp_path), + "split_minutes": 60, + "filename_pattern": "%Y%m%d_%H%M%S", + "max_retries": 1, + "retry_delay_seconds": 0, + } + rec = isr.SoundcardRecorder("sc_test", cfg, make_logger(), audio_system=audio_system) + rec._open_output_file() + + # Simulate audio callback delivering one chunk + silence = b"\x00" * (44100 * 2 * 2 // 10) # 100ms of stereo int16 silence + rec._audio_callback(silence) + rec.close_current_file() + + out_files = list(tmp_path.glob("*.wav")) + assert len(out_files) == 1 + with wave.open(str(out_files[0]), "rb") as wf: + assert wf.getnchannels() == 2 + assert wf.getframerate() == 44100 + + def test_flac_raises_when_soundfile_unavailable(self, tmp_path): + device = self._make_device() + audio_system = self._make_audio_system(device) + + cfg = { + "device": "default", + "sample_rate": 44100, + "channels": 2, + "format": "flac", + "output_directory": str(tmp_path), + "split_minutes": 60, + "filename_pattern": "%Y%m%d_%H%M%S", + "max_retries": 1, + "retry_delay_seconds": 0, + } + with patch.object(isr, "SOUNDFILE_AVAILABLE", False): + rec = isr.SoundcardRecorder("sc_test", cfg, make_logger(), audio_system=audio_system) + with pytest.raises(ImportError, match="soundfile"): + rec._open_output_file() + + @pytest.mark.skipif(not isr.SOUNDFILE_AVAILABLE, reason="soundfile not installed") + def test_flac_file_written_correctly(self, tmp_path): + import soundfile as sf + + device = self._make_device() + audio_system = self._make_audio_system(device) + + cfg = { + "device": "default", + "sample_rate": 44100, + "channels": 2, + "format": "flac", + "output_directory": str(tmp_path), + "split_minutes": 60, + "filename_pattern": "%Y%m%d_%H%M%S", + "max_retries": 1, + "retry_delay_seconds": 0, + } + rec = isr.SoundcardRecorder("sc_test", cfg, make_logger(), audio_system=audio_system) + rec._open_output_file() + + silence = b"\x00" * (44100 * 2 * 2 // 10) + rec._audio_callback(silence) + rec.close_current_file() + + out_files = list(tmp_path.glob("*.flac")) + assert len(out_files) == 1 + info = sf.info(str(out_files[0])) + assert info.channels == 2 + assert info.samplerate == 44100 diff --git a/web.py b/web.py new file mode 100644 index 0000000..373a51b --- /dev/null +++ b/web.py @@ -0,0 +1,518 @@ +#!/usr/bin/env python3 +""" +ISR Web — Browse and download recorded audio files. + +Shows a chronological table of all recordings, allows download, +and analyses WAV files for loud sections using RMS. + +Usage: + python web.py # serves recordings/ on port 8080 + python web.py --dir /path/to/audio # custom recordings directory + python web.py --port 8888 # custom port + python web.py --threshold 0.03 # loudness threshold (0-1, default 0.05) +""" + +import argparse +import json +import math +import os +import struct +import wave +from datetime import datetime +from http.server import BaseHTTPRequestHandler, HTTPServer +from pathlib import Path +from urllib.parse import parse_qs, unquote, urlparse + +try: + import numpy as np + NUMPY_AVAILABLE = True +except ImportError: + NUMPY_AVAILABLE = False + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +AUDIO_EXTENSIONS = {'.wav', '.mp3', '.ogg', '.flac', '.aac', '.opus'} +WINDOW_SAMPLES = 4800 # 100 ms at 48 kHz +LOUD_THRESHOLD = 0.05 # RMS 0–1 scale; sections above this are "interesting" +MIN_GAP_SECONDS = 2.0 # merge loud sections separated by less than this + + +# --------------------------------------------------------------------------- +# Audio helpers +# --------------------------------------------------------------------------- + +def _get_wav_info(path: Path): + """Return (duration_seconds, sample_rate, channels) or (None, None, None).""" + try: + with wave.open(str(path), 'rb') as wf: + return wf.getnframes() / wf.getframerate(), wf.getframerate(), wf.getnchannels() + except Exception: + return None, None, None + + +def _compute_rms_windows(wf, channels: int, sampwidth: int, framerate: int, + window_samples: int): + """Yield (time_seconds, rms_0_to_1) for every window in the open wave file.""" + frame_pos = 0 + while True: + raw = wf.readframes(window_samples) + if not raw: + break + n_samp = len(raw) // (sampwidth * channels) + if n_samp == 0: + break + + if NUMPY_AVAILABLE: + arr = np.frombuffer(raw[:n_samp * sampwidth * channels], dtype=' 1: + arr = arr.reshape(-1, channels).mean(axis=1) + rms = float(np.sqrt(np.mean(arr.astype(np.float64) ** 2))) / 32768.0 + else: + fmt = f'<{n_samp * channels}h' + samples = struct.unpack(fmt, raw[:n_samp * sampwidth * channels]) + mono = samples[::channels] if channels > 1 else samples + rms = math.sqrt(sum(s * s for s in mono) / len(mono)) / 32768.0 + + yield frame_pos / framerate, round(rms, 5) + frame_pos += window_samples + + +def analyze_wav(path: Path, window_samples: int = WINDOW_SAMPLES, + threshold: float = LOUD_THRESHOLD): + """ + Analyse a WAV file. + + Returns a dict with: + rms — full list of RMS values (one per window) + rms_display — downsampled to ≤800 points for the sparkline + sections — list of {start, end} dicts for loud passages + duration — total seconds + window — window duration in seconds + """ + try: + with wave.open(str(path), 'rb') as wf: + channels = wf.getnchannels() + sampwidth = wf.getsampwidth() + framerate = wf.getframerate() + n_frames = wf.getnframes() + + rms_values = [rms for _, rms in + _compute_rms_windows(wf, channels, sampwidth, framerate, window_samples)] + except Exception as e: + return {'error': str(e)} + + window_dur = window_samples / framerate + duration = n_frames / framerate + + # Downsample for the sparkline (max 800 bars) + if len(rms_values) > 800: + step = len(rms_values) / 800 + rms_display = [rms_values[int(i * step)] for i in range(800)] + else: + rms_display = rms_values + + # Find loud sections + sections: list = [] + start_t = None + last_loud_t = None + + for i, rms in enumerate(rms_values): + t = i * window_dur + if rms >= threshold: + if start_t is None: + start_t = t + last_loud_t = t + else: + if start_t is not None: + gap = t - last_loud_t + if gap > MIN_GAP_SECONDS: + sections.append({'start': round(start_t, 1), + 'end': round(last_loud_t + window_dur, 1)}) + start_t = None + last_loud_t = None + + if start_t is not None: + sections.append({'start': round(start_t, 1), 'end': round(duration, 1)}) + + return { + 'rms': rms_values, + 'rms_display': rms_display, + 'sections': sections, + 'duration': round(duration, 2), + 'window': round(window_dur, 4), + } + + +# --------------------------------------------------------------------------- +# File listing +# --------------------------------------------------------------------------- + +def list_files(recordings_dir: str): + """Return list of audio file metadata dicts, sorted newest first.""" + base = Path(recordings_dir) + if not base.exists(): + return [] + + files = [] + for path in base.rglob('*'): + if path.suffix.lower() not in AUDIO_EXTENSIONS: + continue + stat = path.stat() + rel = path.relative_to(base) + + if path.suffix.lower() == '.wav': + duration, _, _ = _get_wav_info(path) + else: + duration = None + + files.append({ + 'name': str(rel).replace('\\', '/'), + 'size': stat.st_size, + 'mtime': stat.st_mtime, + 'date': datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'), + 'duration': duration, + 'ext': path.suffix.lower().lstrip('.'), + }) + + files.sort(key=lambda f: f['mtime'], reverse=True) + return files + + +# --------------------------------------------------------------------------- +# HTTP handler +# --------------------------------------------------------------------------- + +class _Handler(BaseHTTPRequestHandler): + recordings_dir: str = 'recordings' + threshold: float = LOUD_THRESHOLD + + # ---- routing ----------------------------------------------------------- + + def do_GET(self): + parsed = urlparse(self.path) + qs = parse_qs(parsed.query) + p = parsed.path + + if p == '/': + self._html() + elif p == '/api/files': + self._api_files() + elif p == '/api/analyze': + self._api_analyze(qs) + elif p.startswith('/download/'): + self._download(unquote(p[len('/download/'):])) + else: + self._send(404, b'Not found', 'text/plain') + + # ---- endpoints --------------------------------------------------------- + + def _html(self): + self._send(200, _HTML.encode('utf-8'), 'text/html; charset=utf-8') + + def _api_files(self): + data = json.dumps(list_files(self.recordings_dir)).encode('utf-8') + self._send(200, data, 'application/json') + + def _api_analyze(self, qs): + filename = qs.get('file', [None])[0] + if not filename: + self._json_err(400, 'missing file parameter') + return + + path = self._safe_path(filename) + if path is None: + return + + if path.suffix.lower() != '.wav': + self._json_err(400, 'loudness analysis is only available for WAV files') + return + + result = analyze_wav(path, threshold=self.threshold) + data = json.dumps(result).encode('utf-8') + self._send(200, data, 'application/json') + + def _download(self, filename: str): + path = self._safe_path(filename) + if path is None: + return + + size = path.stat().st_size + self.send_response(200) + self.send_header('Content-Type', 'application/octet-stream') + self.send_header('Content-Disposition', f'attachment; filename="{path.name}"') + self.send_header('Content-Length', str(size)) + self.end_headers() + + with open(path, 'rb') as fh: + while True: + chunk = fh.read(65536) + if not chunk: + break + self.wfile.write(chunk) + + # ---- helpers ----------------------------------------------------------- + + def _safe_path(self, filename: str): + """Resolve filename within recordings_dir; return None and send error if invalid.""" + base = Path(self.recordings_dir).resolve() + try: + path = (base / filename).resolve() + path.relative_to(base) # raises ValueError if outside base + except (ValueError, Exception): + self._send(403, b'Forbidden', 'text/plain') + return None + + if not path.exists(): + self._send(404, b'Not found', 'text/plain') + return None + + return path + + def _send(self, code: int, body: bytes, content_type: str): + self.send_response(code) + self.send_header('Content-Type', content_type) + self.send_header('Content-Length', str(len(body))) + self.end_headers() + self.wfile.write(body) + + def _json_err(self, code: int, msg: str): + self._send(code, json.dumps({'error': msg}).encode('utf-8'), 'application/json') + + def log_message(self, fmt, *args): + pass # suppress default access log noise + + +# --------------------------------------------------------------------------- +# Embedded HTML/CSS/JS +# --------------------------------------------------------------------------- + +_HTML = r""" + + + + +ISR Archive + + + +
+

ISR Archive

+ Loading… +
+
+ + + + + + + + + + + + +
FileDateDurationSizeWaveform / Loud sections
+ +
+ + +""" + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + +def main(): + parser = argparse.ArgumentParser(description='ISR Web — audio archive browser') + parser.add_argument('--dir', default='recordings', + help='Recordings directory (default: recordings)') + parser.add_argument('--port', type=int, default=8080, + help='HTTP port (default: 8080)') + parser.add_argument('--host', default='0.0.0.0', + help='Bind address (default: 0.0.0.0)') + parser.add_argument('--threshold', type=float, default=LOUD_THRESHOLD, + help=f'RMS loudness threshold 0–1 (default: {LOUD_THRESHOLD})') + args = parser.parse_args() + + rec_dir = Path(args.dir) + if not rec_dir.exists(): + print(f"Warning: recordings directory '{args.dir}' does not exist yet.") + + class Handler(_Handler): + recordings_dir = str(rec_dir.resolve()) + threshold = args.threshold + + server = HTTPServer((args.host, args.port), Handler) + + print(f"ISR Web running → http://{args.host}:{args.port}/") + print(f"Recordings dir → {rec_dir.resolve()}") + print(f"Loud threshold → {args.threshold}") + if not NUMPY_AVAILABLE: + print("Note: numpy not installed — RMS analysis uses pure Python (slower for large files)") + print("Stop with Ctrl+C\n") + + try: + server.serve_forever() + except KeyboardInterrupt: + print("Stopped.") + + +if __name__ == '__main__': + main()