da67523170
docker-compose.yml: mount ./recordings at /app/recordings (matches output_directory = recordings in config.ini); previously the recorder wrote to /app/recordings while the web container read from /recordings, causing all files to appear missing — explaining the 9-byte Not-found download from /stream/ and the 0-byte recordings in the UI. Add stop_grace_period: 30s so Docker waits long enough for files to close. isr.py: replace per-thread join(timeout=5) with a shared 25 s deadline; with N recorders the old code could exceed Docker's SIGKILL window and leave WAV/FLAC files unclosed (corrupt headers). web.py: add Content-Disposition: inline to /stream/ responses so browsers never treat the audio response as a file download. CLAUDE.md: document web.py endpoints, status.json lifecycle, corrected Docker volume layout, and web.py CLI flags.
1070 lines
38 KiB
Python
1070 lines
38 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
ISR - Audio Recorder
|
|
Records from multiple sources: Icecast streams and soundcards.
|
|
Supports time-based file splitting and concurrent recording.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
import wave
|
|
import struct
|
|
import signal
|
|
import logging
|
|
import threading
|
|
import configparser
|
|
import subprocess
|
|
import shutil
|
|
from abc import ABC, abstractmethod
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Optional, Dict, Any, List, Callable
|
|
|
|
# Optional imports - check availability
|
|
try:
|
|
import requests
|
|
REQUESTS_AVAILABLE = True
|
|
except ImportError:
|
|
REQUESTS_AVAILABLE = False
|
|
|
|
try:
|
|
import numpy as np
|
|
NUMPY_AVAILABLE = True
|
|
except ImportError:
|
|
NUMPY_AVAILABLE = False
|
|
|
|
try:
|
|
import soundfile as sf
|
|
SOUNDFILE_AVAILABLE = True
|
|
except ImportError:
|
|
SOUNDFILE_AVAILABLE = False
|
|
|
|
|
|
# =============================================================================
|
|
# Audio Device & Backend System
|
|
# =============================================================================
|
|
|
|
@dataclass
|
|
class AudioDevice:
|
|
"""Represents an audio input device."""
|
|
id: str # Backend-specific identifier
|
|
name: str # Human-readable name
|
|
channels: int # Max input channels
|
|
sample_rate: int # Default sample rate
|
|
backend: str # Backend name (pulseaudio, pipewire, portaudio)
|
|
is_default: bool = False # Is system default
|
|
is_monitor: bool = False # Is a monitor/loopback source
|
|
description: str = "" # Extended description
|
|
extra: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
def __str__(self):
|
|
flags = []
|
|
if self.is_default:
|
|
flags.append("DEFAULT")
|
|
if self.is_monitor:
|
|
flags.append("MONITOR")
|
|
flag_str = f" [{', '.join(flags)}]" if flags else ""
|
|
return f"{self.name}{flag_str} ({self.backend})"
|
|
|
|
|
|
class AudioBackend(ABC):
|
|
"""Abstract base for audio capture backends."""
|
|
|
|
name: str = "base"
|
|
priority: int = 0 # Higher = preferred
|
|
|
|
@classmethod
|
|
@abstractmethod
|
|
def is_available(cls) -> bool:
|
|
"""Check if this backend can be used on the current system."""
|
|
pass
|
|
|
|
@classmethod
|
|
@abstractmethod
|
|
def list_devices(cls) -> List[AudioDevice]:
|
|
"""List all available input devices."""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def open_stream(self, device: AudioDevice, sample_rate: int, channels: int,
|
|
callback: Callable[[bytes], None]) -> Any:
|
|
"""Open an audio capture stream. Returns a context manager."""
|
|
pass
|
|
|
|
|
|
class ALSABackend(AudioBackend):
|
|
"""ALSA backend using arecord (raw PCM output, no sound server required)."""
|
|
|
|
name = "alsa"
|
|
priority = 5 # Lowest priority — direct hardware access, use when no sound server runs
|
|
|
|
@classmethod
|
|
def is_available(cls) -> bool:
|
|
return shutil.which('arecord') is not None
|
|
|
|
@classmethod
|
|
def list_devices(cls) -> List[AudioDevice]:
|
|
devices = []
|
|
try:
|
|
result = subprocess.run(
|
|
['arecord', '-l'],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if result.returncode != 0:
|
|
return devices
|
|
|
|
for line in result.stdout.split('\n'):
|
|
if not line.startswith('card '):
|
|
continue
|
|
try:
|
|
card_part, rest = line.split(':', 1)
|
|
card_num = card_part.replace('card', '').strip()
|
|
|
|
if ', device ' in rest:
|
|
dev_part = rest.split(', device ')[1]
|
|
dev_num = dev_part.split(':')[0].strip()
|
|
else:
|
|
dev_num = '0'
|
|
|
|
long_name = rest.split('[')[1].split(']')[0] if '[' in rest else rest.strip()
|
|
hw_id = f"hw:{card_num},{dev_num}"
|
|
|
|
devices.append(AudioDevice(
|
|
id=hw_id,
|
|
name=long_name,
|
|
channels=2,
|
|
sample_rate=44100,
|
|
backend=cls.name,
|
|
is_default=(card_num == '0' and dev_num == '0'),
|
|
is_monitor=False,
|
|
))
|
|
except (IndexError, ValueError):
|
|
continue
|
|
except Exception:
|
|
pass
|
|
return devices
|
|
|
|
def __init__(self, logger: logging.Logger):
|
|
self.logger = logger
|
|
|
|
def open_stream(self, device: AudioDevice, sample_rate: int, channels: int,
|
|
callback: Callable[[bytes], None]):
|
|
return ALSAStream(device, sample_rate, channels, callback, self.logger)
|
|
|
|
|
|
class ALSAStream:
|
|
"""Context manager for ALSA recording using arecord (raw PCM output)."""
|
|
|
|
def __init__(self, device: AudioDevice, sample_rate: int, channels: int,
|
|
callback: Callable[[bytes], None], logger: logging.Logger):
|
|
self.device = device
|
|
self.sample_rate = sample_rate
|
|
self.channels = channels
|
|
self.callback = callback
|
|
self.logger = logger
|
|
self._running = False
|
|
self._thread = None
|
|
self._process = None
|
|
|
|
def __enter__(self):
|
|
self._running = True
|
|
cmd = [
|
|
'arecord',
|
|
'-D', self.device.id,
|
|
'-f', 'S16_LE',
|
|
'-r', str(self.sample_rate),
|
|
'-c', str(self.channels),
|
|
'--file-type', 'raw',
|
|
'-',
|
|
]
|
|
try:
|
|
self._process = subprocess.Popen(
|
|
cmd,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.DEVNULL,
|
|
)
|
|
except FileNotFoundError:
|
|
raise RuntimeError("arecord not found - install alsa-utils: sudo apt install alsa-utils")
|
|
|
|
self._thread = threading.Thread(target=self._read_loop, daemon=True)
|
|
self._thread.start()
|
|
return self
|
|
|
|
def __exit__(self, *args):
|
|
self._running = False
|
|
if self._process:
|
|
self._process.terminate()
|
|
try:
|
|
self._process.wait(timeout=2)
|
|
except subprocess.TimeoutExpired:
|
|
self._process.kill()
|
|
if self._thread:
|
|
self._thread.join(timeout=1)
|
|
|
|
def _read_loop(self):
|
|
chunk_size = self.sample_rate * self.channels * 2 // 10 # 100 ms chunks
|
|
while self._running and self._process.poll() is None:
|
|
try:
|
|
data = self._process.stdout.read(chunk_size)
|
|
if data:
|
|
self.callback(data)
|
|
except Exception as e:
|
|
self.logger.error(f"ALSA read error: {e}")
|
|
break
|
|
|
|
|
|
class AudioSystem:
|
|
"""Manages audio backends and device discovery."""
|
|
|
|
# All available backend classes
|
|
_backend_classes: List[type] = [ALSABackend]
|
|
|
|
def __init__(self, logger: logging.Logger):
|
|
self.logger = logger
|
|
self._backends: Dict[str, AudioBackend] = {}
|
|
self._discover_backends()
|
|
|
|
def _discover_backends(self):
|
|
"""Find and initialize available backends."""
|
|
for cls in self._backend_classes:
|
|
if cls.is_available():
|
|
self._backends[cls.name] = cls(self.logger)
|
|
self.logger.debug(f"Audio backend available: {cls.name}")
|
|
|
|
@property
|
|
def available_backends(self) -> List[str]:
|
|
return list(self._backends.keys())
|
|
|
|
def get_backend(self, name: str) -> Optional[AudioBackend]:
|
|
return self._backends.get(name)
|
|
|
|
def get_preferred_backend(self) -> Optional[AudioBackend]:
|
|
"""Get the highest priority available backend."""
|
|
if not self._backends:
|
|
return None
|
|
return max(self._backends.values(), key=lambda b: b.__class__.priority)
|
|
|
|
def list_all_devices(self) -> List[AudioDevice]:
|
|
"""List devices from all available backends."""
|
|
all_devices = []
|
|
seen_names = set()
|
|
|
|
# Get devices from backends in priority order
|
|
for cls in sorted(self._backend_classes, key=lambda c: -c.priority):
|
|
if cls.name in self._backends:
|
|
for dev in cls.list_devices():
|
|
# Deduplicate by name (same device may appear in multiple backends)
|
|
key = dev.name.lower()
|
|
if key not in seen_names:
|
|
all_devices.append(dev)
|
|
seen_names.add(key)
|
|
|
|
return all_devices
|
|
|
|
def find_device(self, spec: str, preferred_backend: str = None) -> Optional[AudioDevice]:
|
|
"""Find a device by ID, name, or pattern."""
|
|
devices = self.list_all_devices()
|
|
|
|
if not devices:
|
|
return None
|
|
|
|
# "default" - get the default device from preferred backend
|
|
if spec.lower() == 'default':
|
|
for dev in devices:
|
|
if dev.is_default:
|
|
if preferred_backend is None or dev.backend == preferred_backend:
|
|
return dev
|
|
return devices[0] if devices else None
|
|
|
|
# "monitor" - get first monitor source
|
|
if spec.lower() == 'monitor':
|
|
for dev in devices:
|
|
if dev.is_monitor:
|
|
if preferred_backend is None or dev.backend == preferred_backend:
|
|
return dev
|
|
return None
|
|
|
|
spec_lower = spec.lower()
|
|
|
|
# Try exact ID match first
|
|
for dev in devices:
|
|
if dev.id == spec:
|
|
return dev
|
|
|
|
# Try exact name match
|
|
for dev in devices:
|
|
if dev.name.lower() == spec_lower:
|
|
return dev
|
|
|
|
# Try partial name match
|
|
for dev in devices:
|
|
if spec_lower in dev.name.lower():
|
|
return dev
|
|
|
|
return None
|
|
|
|
|
|
class BaseRecorder(ABC):
|
|
"""Abstract base class for all recorder types."""
|
|
|
|
def __init__(self, name: str, config: Dict[str, Any], logger: logging.Logger,
|
|
clock: Callable[[], datetime] = None):
|
|
self.name = name
|
|
self.config = config
|
|
self.logger = logger
|
|
self.running = False
|
|
self.current_file = None
|
|
self.current_filename = None
|
|
self._clock = clock or datetime.now
|
|
|
|
# Common settings
|
|
self.split_duration = config.get('split_minutes', 60)
|
|
self.output_dir = config.get('output_directory', 'recordings')
|
|
self.filename_pattern = config.get('filename_pattern', '%Y%m%d_%H%M%S')
|
|
self.max_retries = config.get('max_retries', 10)
|
|
self.retry_delay = config.get('retry_delay_seconds', 5)
|
|
self.file_format = config.get('format', 'auto')
|
|
|
|
def get_next_split_time(self) -> datetime:
|
|
"""Calculate the next split time aligned to split_duration boundaries."""
|
|
now = self._clock()
|
|
total_minutes = now.hour * 60 + now.minute
|
|
minutes_to_next = self.split_duration - (total_minutes % self.split_duration)
|
|
next_split = now + timedelta(minutes=minutes_to_next)
|
|
return next_split.replace(second=0, microsecond=0)
|
|
|
|
def generate_filename(self, ext: str) -> str:
|
|
"""Generate filename from pattern with strftime substitution."""
|
|
now = self._clock()
|
|
filename = now.strftime(self.filename_pattern) + f".{ext}"
|
|
full_path = os.path.join(self.output_dir, filename)
|
|
Path(full_path).parent.mkdir(parents=True, exist_ok=True)
|
|
return full_path
|
|
|
|
def close_current_file(self):
|
|
"""Close the current recording file if open."""
|
|
if self.current_file:
|
|
try:
|
|
self.current_file.close()
|
|
self.logger.info(f"[{self.name}] Closed file: {self.current_filename}")
|
|
except Exception as e:
|
|
self.logger.error(f"[{self.name}] Error closing file: {e}")
|
|
self.current_file = None
|
|
self.current_filename = None
|
|
|
|
def stop(self):
|
|
"""Signal the recorder to stop."""
|
|
self.running = False
|
|
|
|
@abstractmethod
|
|
def record(self):
|
|
"""Main recording loop - must be implemented by subclasses."""
|
|
pass
|
|
|
|
|
|
class StreamRecorder(BaseRecorder):
|
|
"""Records from Icecast/HTTP audio streams."""
|
|
|
|
def __init__(self, name: str, config: Dict[str, Any], logger: logging.Logger,
|
|
clock: Callable[[], datetime] = None):
|
|
super().__init__(name, config, logger, clock)
|
|
|
|
if not REQUESTS_AVAILABLE:
|
|
raise ImportError("The 'requests' library is required for stream recording. Install with: pip install requests")
|
|
|
|
self.stream_url = config.get('url')
|
|
if not self.stream_url:
|
|
raise ValueError(f"[{name}] Stream URL is required for stream type")
|
|
|
|
self.username = config.get('username') or None
|
|
self.password = config.get('password') or None
|
|
|
|
# Stream header storage for formats that require headers in each file
|
|
self.stream_headers = None
|
|
self.header_capture_complete = False
|
|
self.detected_format = None
|
|
|
|
def detect_format(self, response) -> str:
|
|
"""Detect stream format from HTTP headers."""
|
|
content_type = response.headers.get('Content-Type', '').lower()
|
|
|
|
format_map = {
|
|
'audio/mpeg': 'mp3',
|
|
'audio/mp3': 'mp3',
|
|
'audio/ogg': 'ogg',
|
|
'application/ogg': 'ogg',
|
|
'audio/aac': 'aac',
|
|
'audio/aacp': 'aac',
|
|
'audio/x-aac': 'aac',
|
|
'audio/flac': 'flac',
|
|
'audio/opus': 'opus'
|
|
}
|
|
|
|
for mime, fmt in format_map.items():
|
|
if mime in content_type:
|
|
return fmt
|
|
|
|
self.logger.warning(f"[{self.name}] Unknown content type: {content_type}, defaulting to mp3")
|
|
return 'mp3'
|
|
|
|
def parse_ogg_page(self, data: bytes, offset: int) -> Optional[tuple]:
|
|
"""Parse an OGG page starting at the given offset."""
|
|
if len(data) - offset < 27:
|
|
return None
|
|
|
|
if data[offset:offset+4] != b'OggS':
|
|
return None
|
|
|
|
header_type = data[offset + 5]
|
|
is_bos = (header_type & 0x02) != 0
|
|
num_segments = data[offset + 26]
|
|
|
|
if len(data) - offset < 27 + num_segments:
|
|
return None
|
|
|
|
segment_table = data[offset + 27:offset + 27 + num_segments]
|
|
page_data_size = sum(segment_table)
|
|
total_page_size = 27 + num_segments + page_data_size
|
|
|
|
if len(data) - offset < total_page_size:
|
|
return None
|
|
|
|
page_bytes = data[offset:offset + total_page_size]
|
|
granule_pos = int.from_bytes(data[offset + 6:offset + 14], 'little')
|
|
is_header = is_bos or granule_pos == 0
|
|
|
|
return (page_bytes, is_header, offset + total_page_size)
|
|
|
|
def extract_ogg_headers(self, data: bytes) -> tuple:
|
|
"""Extract OGG header pages from the beginning of stream data."""
|
|
headers = bytearray()
|
|
offset = 0
|
|
header_count = 0
|
|
|
|
while offset < len(data):
|
|
result = self.parse_ogg_page(data, offset)
|
|
if result is None:
|
|
break
|
|
|
|
page_bytes, is_header, next_offset = result
|
|
|
|
if is_header and header_count < 3:
|
|
headers.extend(page_bytes)
|
|
header_count += 1
|
|
offset = next_offset
|
|
else:
|
|
break
|
|
|
|
return bytes(headers), data[offset:]
|
|
|
|
def needs_header_per_file(self) -> bool:
|
|
"""Check if the detected format requires headers in each split file."""
|
|
return self.detected_format in ('ogg', 'opus', 'flac')
|
|
|
|
def open_new_file(self):
|
|
"""Open a new recording file."""
|
|
self.close_current_file()
|
|
|
|
ext = self.file_format if self.file_format != 'auto' else self.detected_format
|
|
self.current_filename = self.generate_filename(ext)
|
|
self.current_file = open(self.current_filename, 'wb')
|
|
self.logger.info(f"[{self.name}] Started recording to: {self.current_filename}")
|
|
|
|
if self.stream_headers and self.needs_header_per_file():
|
|
self.current_file.write(self.stream_headers)
|
|
self.logger.debug(f"[{self.name}] Wrote {len(self.stream_headers)} bytes of stream headers")
|
|
|
|
def connect_stream(self):
|
|
"""Connect to the stream."""
|
|
auth = None
|
|
if self.username and self.password:
|
|
auth = (self.username, self.password)
|
|
|
|
try:
|
|
self.logger.info(f"[{self.name}] Connecting to stream: {self.stream_url}")
|
|
response = requests.get(
|
|
self.stream_url,
|
|
auth=auth,
|
|
stream=True,
|
|
timeout=10
|
|
)
|
|
response.raise_for_status()
|
|
|
|
if self.file_format == 'auto':
|
|
self.detected_format = self.detect_format(response)
|
|
self.logger.info(f"[{self.name}] Detected format: {self.detected_format}")
|
|
else:
|
|
self.detected_format = self.file_format
|
|
self.logger.info(f"[{self.name}] Using manual format: {self.file_format}")
|
|
|
|
return response
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"[{self.name}] Connection failed: {e}")
|
|
return None
|
|
|
|
def record(self):
|
|
"""Main recording loop for streams."""
|
|
self.running = True
|
|
retry_count = 0
|
|
next_split_time = self.get_next_split_time()
|
|
|
|
self.logger.info(f"[{self.name}] Starting stream recorder - Split every {self.split_duration} minutes")
|
|
self.logger.info(f"[{self.name}] Next split at: {next_split_time.strftime('%Y-%m-%d %H:%M:%S')}")
|
|
|
|
while self.running:
|
|
response = self.connect_stream()
|
|
|
|
if response is None:
|
|
retry_count += 1
|
|
if retry_count >= self.max_retries:
|
|
self.logger.error(f"[{self.name}] Max retries ({self.max_retries}) reached. Stopping.")
|
|
break
|
|
|
|
self.logger.warning(f"[{self.name}] Retry {retry_count}/{self.max_retries} in {self.retry_delay} seconds...")
|
|
time.sleep(self.retry_delay)
|
|
continue
|
|
|
|
retry_count = 0
|
|
self.header_capture_complete = False
|
|
self.stream_headers = None
|
|
header_buffer = bytearray()
|
|
|
|
# Always open a new file on (re)connect — a reconnect means there is
|
|
# a gap in the stream. For OGG/FLAC this is mandatory (header pages
|
|
# must appear at the start of each file); for MP3/AAC it avoids
|
|
# writing audio from two separate connections into the same file.
|
|
self.open_new_file()
|
|
|
|
try:
|
|
for chunk in response.iter_content(chunk_size=8192):
|
|
if not self.running:
|
|
break
|
|
|
|
if chunk:
|
|
if self.needs_header_per_file() and not self.header_capture_complete:
|
|
# Buffer data until we have enough to extract OGG/FLAC headers.
|
|
# The chunk must NOT also be written directly — it is already in
|
|
# header_buffer and will be flushed once headers are captured.
|
|
header_buffer.extend(chunk)
|
|
|
|
if len(header_buffer) >= 16384:
|
|
self.stream_headers, _ = self.extract_ogg_headers(bytes(header_buffer))
|
|
if self.stream_headers:
|
|
self.logger.info(f"[{self.name}] Captured {len(self.stream_headers)} bytes of stream headers")
|
|
self.header_capture_complete = True
|
|
self.current_file.write(bytes(header_buffer))
|
|
self.current_file.flush()
|
|
header_buffer.clear()
|
|
# Chunk is in the buffer; do not fall through to the write below.
|
|
continue
|
|
|
|
self.current_file.write(chunk)
|
|
self.current_file.flush()
|
|
|
|
if self._clock() >= next_split_time:
|
|
self.open_new_file()
|
|
next_split_time = self.get_next_split_time()
|
|
self.logger.info(f"[{self.name}] Next split at: {next_split_time.strftime('%Y-%m-%d %H:%M:%S')}")
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"[{self.name}] Stream interrupted: {e}")
|
|
if self.running:
|
|
self.logger.info(f"[{self.name}] Attempting to reconnect...")
|
|
time.sleep(self.retry_delay)
|
|
continue
|
|
|
|
self.close_current_file()
|
|
self.logger.info(f"[{self.name}] Stream recorder stopped")
|
|
|
|
|
|
class _AudioFileWriter:
|
|
"""Unified writer for WAV and FLAC output files.
|
|
|
|
Wraps ``wave.Wave_write`` (WAV) and ``soundfile.SoundFile`` (FLAC) behind a
|
|
common interface so the rest of the recorder does not need to branch on format.
|
|
"""
|
|
|
|
def __init__(self, path: str, channels: int, sample_rate: int, fmt: str):
|
|
self.path = path
|
|
self.channels = channels
|
|
self.sample_rate = sample_rate
|
|
self.fmt = fmt
|
|
self._file = None
|
|
|
|
if fmt == 'flac':
|
|
if not NUMPY_AVAILABLE:
|
|
raise ImportError(
|
|
"numpy is required for FLAC output: pip install numpy"
|
|
)
|
|
if not SOUNDFILE_AVAILABLE:
|
|
raise ImportError(
|
|
"soundfile is required for FLAC output: pip install soundfile"
|
|
)
|
|
self._file = sf.SoundFile(
|
|
path, mode='w',
|
|
samplerate=sample_rate,
|
|
channels=channels,
|
|
subtype='PCM_16',
|
|
format='FLAC',
|
|
)
|
|
else:
|
|
self._file = wave.open(path, 'wb')
|
|
self._file.setnchannels(channels)
|
|
self._file.setsampwidth(2) # 16-bit
|
|
self._file.setframerate(sample_rate)
|
|
|
|
def write(self, data: bytes) -> None:
|
|
"""Write raw int16 PCM bytes to the output file."""
|
|
if self.fmt == 'flac':
|
|
arr = np.frombuffer(data, dtype='<i2')
|
|
if self.channels > 1:
|
|
arr = arr.reshape(-1, self.channels)
|
|
self._file.write(arr)
|
|
else:
|
|
self._file.writeframes(data)
|
|
|
|
def close(self) -> None:
|
|
if self._file is not None:
|
|
self._file.close()
|
|
self._file = None
|
|
|
|
|
|
class SoundcardRecorder(BaseRecorder):
|
|
"""Records from soundcard/audio devices using multiple backends."""
|
|
|
|
def __init__(self, name: str, config: Dict[str, Any], logger: logging.Logger,
|
|
audio_system: 'AudioSystem' = None,
|
|
clock: Callable[[], datetime] = None):
|
|
super().__init__(name, config, logger, clock)
|
|
|
|
# Initialize audio system if not provided
|
|
self.audio_system = audio_system or AudioSystem(logger)
|
|
|
|
if not self.audio_system.available_backends:
|
|
raise RuntimeError(
|
|
"No audio backends available.\n"
|
|
"Install ALSA utilities: sudo apt install alsa-utils\n"
|
|
"(In Docker, also ensure /dev/snd is mapped into the container)"
|
|
)
|
|
|
|
# Audio settings
|
|
self.device_spec = config.get('device', 'default')
|
|
self.sample_rate = config.get('sample_rate', 44100)
|
|
self.channels = config.get('channels', 2)
|
|
self.preferred_backend = config.get('backend', None)
|
|
|
|
# Validate format
|
|
if self.file_format not in ('wav', 'flac') and self.file_format != 'auto':
|
|
self.logger.warning(f"[{self.name}] Format '{self.file_format}' not supported, using 'wav'")
|
|
self.file_format = 'wav'
|
|
elif self.file_format == 'auto':
|
|
self.file_format = 'wav'
|
|
|
|
# Resolve device
|
|
self.device = self._resolve_device()
|
|
|
|
# Audio buffer
|
|
self.audio_buffer = []
|
|
self.buffer_lock = threading.Lock()
|
|
|
|
def _resolve_device(self) -> AudioDevice:
|
|
"""Resolve device specification to AudioDevice."""
|
|
device = self.audio_system.find_device(self.device_spec, self.preferred_backend)
|
|
|
|
if device is None:
|
|
# List available devices in error
|
|
available = self.audio_system.list_all_devices()
|
|
device_list = "\n ".join(str(d) for d in available) if available else "None found"
|
|
raise ValueError(
|
|
f"[{self.name}] Device '{self.device_spec}' not found.\n"
|
|
f"Available devices:\n {device_list}"
|
|
)
|
|
|
|
self.logger.info(f"[{self.name}] Using device: {device}")
|
|
return device
|
|
|
|
def _audio_callback(self, data: bytes):
|
|
"""Callback for audio data from backend."""
|
|
with self.buffer_lock:
|
|
self.audio_buffer.append(data)
|
|
|
|
def _open_output_file(self):
|
|
"""Open a new WAV or FLAC output file for recording."""
|
|
self._flush_buffer_to_file()
|
|
self.close_current_file()
|
|
|
|
self.current_filename = self.generate_filename(self.file_format)
|
|
self.current_file = _AudioFileWriter(
|
|
self.current_filename, self.channels, self.sample_rate, self.file_format
|
|
)
|
|
self.logger.info(f"[{self.name}] Recording to: {self.current_filename}")
|
|
|
|
def _flush_buffer_to_file(self):
|
|
"""Write buffered audio data to file."""
|
|
if self.current_file is None:
|
|
return
|
|
|
|
with self.buffer_lock:
|
|
if self.audio_buffer:
|
|
for data in self.audio_buffer:
|
|
self.current_file.write(data)
|
|
self.audio_buffer.clear()
|
|
|
|
def close_current_file(self):
|
|
"""Close current recording file."""
|
|
self._flush_buffer_to_file()
|
|
if self.current_file:
|
|
try:
|
|
self.current_file.close()
|
|
self.logger.info(f"[{self.name}] Closed: {self.current_filename}")
|
|
except Exception as e:
|
|
self.logger.error(f"[{self.name}] Error closing file: {e}")
|
|
self.current_file = None
|
|
self.current_filename = None
|
|
|
|
def record(self):
|
|
"""Main recording loop."""
|
|
self.running = True
|
|
retry_count = 0
|
|
next_split = self.get_next_split_time()
|
|
|
|
backend = self.audio_system.get_backend(self.device.backend)
|
|
if not backend:
|
|
self.logger.error(f"[{self.name}] Backend '{self.device.backend}' not available")
|
|
return
|
|
|
|
self.logger.info(f"[{self.name}] Starting recorder - Split every {self.split_duration} min")
|
|
self.logger.info(f"[{self.name}] Backend: {self.device.backend}, Device: {self.device.name}")
|
|
self.logger.info(f"[{self.name}] Sample rate: {self.sample_rate}, Channels: {self.channels}")
|
|
self.logger.info(f"[{self.name}] Next split: {next_split.strftime('%H:%M:%S')}")
|
|
|
|
while self.running:
|
|
try:
|
|
if self.current_file is None:
|
|
self._open_output_file()
|
|
|
|
with backend.open_stream(
|
|
self.device, self.sample_rate, self.channels, self._audio_callback
|
|
):
|
|
self.logger.info(f"[{self.name}] Recording started")
|
|
retry_count = 0
|
|
|
|
while self.running:
|
|
time.sleep(0.5)
|
|
self._flush_buffer_to_file()
|
|
|
|
if self._clock() >= next_split:
|
|
self._open_output_file()
|
|
next_split = self.get_next_split_time()
|
|
self.logger.info(f"[{self.name}] Next split: {next_split.strftime('%H:%M:%S')}")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"[{self.name}] Error: {e}")
|
|
retry_count += 1
|
|
|
|
if retry_count >= self.max_retries:
|
|
self.logger.error(f"[{self.name}] Max retries reached. Stopping.")
|
|
break
|
|
|
|
self.logger.warning(f"[{self.name}] Retry {retry_count}/{self.max_retries} in {self.retry_delay}s...")
|
|
time.sleep(self.retry_delay)
|
|
|
|
self.close_current_file()
|
|
self.logger.info(f"[{self.name}] Recorder stopped")
|
|
|
|
|
|
class RecorderManager:
|
|
"""Manages multiple recorders running concurrently."""
|
|
|
|
def __init__(self, config_file: str = 'config.ini'):
|
|
self.config_file = config_file
|
|
self.recorders: List[BaseRecorder] = []
|
|
self.threads: List[threading.Thread] = []
|
|
self.logger = None
|
|
self.audio_system = None
|
|
self.output_dir = 'recordings'
|
|
self._status_running = False
|
|
|
|
self._load_config()
|
|
|
|
def _load_config(self):
|
|
"""Load and parse the configuration file."""
|
|
if not os.path.exists(self.config_file):
|
|
print(f"Error: Configuration file '{self.config_file}' not found!")
|
|
print("Usage: python isr.py [config.ini]")
|
|
print(" python isr.py --list-devices")
|
|
sys.exit(1)
|
|
|
|
config = configparser.ConfigParser(interpolation=None)
|
|
config.read(self.config_file)
|
|
|
|
# Get general settings (defaults)
|
|
general = {
|
|
'output_directory': config.get('general', 'output_directory', fallback='recordings'),
|
|
'split_minutes': config.getint('general', 'split_minutes', fallback=60),
|
|
'filename_pattern': config.get('general', 'filename_pattern', fallback='%Y%m%d_%H%M%S', raw=True),
|
|
'max_retries': config.getint('general', 'max_retries', fallback=10),
|
|
'retry_delay_seconds': config.getint('general', 'retry_delay_seconds', fallback=5),
|
|
'log_level': config.get('general', 'log_level', fallback='INFO').upper(),
|
|
'log_file': config.get('general', 'log_file', fallback='recorder.log'),
|
|
}
|
|
|
|
self.output_dir = general['output_directory']
|
|
|
|
# Setup logging
|
|
self._setup_logging(general['log_level'], general['log_file'])
|
|
|
|
# Initialize shared audio system for soundcard recorders
|
|
self.audio_system = AudioSystem(self.logger)
|
|
if self.audio_system.available_backends:
|
|
self.logger.info(f"Audio backends: {', '.join(self.audio_system.available_backends)}")
|
|
|
|
# Parse source sections
|
|
for section in config.sections():
|
|
if section == 'general':
|
|
continue
|
|
|
|
source_type = config.get(section, 'type', fallback=None)
|
|
if source_type is None:
|
|
self.logger.warning(f"Section [{section}] has no 'type', skipping")
|
|
continue
|
|
|
|
# Build source config by merging general with section-specific
|
|
source_config = general.copy()
|
|
|
|
for key, value in config.items(section):
|
|
if key == 'type':
|
|
continue
|
|
if key in ('split_minutes', 'max_retries', 'retry_delay_seconds', 'sample_rate', 'channels'):
|
|
source_config[key] = int(value)
|
|
else:
|
|
source_config[key] = value
|
|
|
|
# Create recorder
|
|
try:
|
|
if source_type == 'stream':
|
|
recorder = StreamRecorder(section, source_config, self.logger)
|
|
self.recorders.append(recorder)
|
|
self.logger.info(f"Configured stream: [{section}]")
|
|
elif source_type == 'soundcard':
|
|
recorder = SoundcardRecorder(section, source_config, self.logger, self.audio_system)
|
|
self.recorders.append(recorder)
|
|
self.logger.info(f"Configured soundcard: [{section}]")
|
|
else:
|
|
self.logger.warning(f"Unknown type '{source_type}' in [{section}], skipping")
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to configure [{section}]: {e}")
|
|
|
|
if not self.recorders:
|
|
self.logger.error("No valid recording sources configured!")
|
|
|
|
def _setup_logging(self, level: str, log_file: str):
|
|
"""Setup logging configuration."""
|
|
numeric_level = getattr(logging, level, logging.INFO)
|
|
|
|
# Clear any existing handlers
|
|
root_logger = logging.getLogger()
|
|
root_logger.handlers.clear()
|
|
|
|
logging.basicConfig(
|
|
level=numeric_level,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler(log_file),
|
|
logging.StreamHandler(sys.stdout)
|
|
]
|
|
)
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
def _write_status(self):
|
|
"""Write active filenames to status.json for the web UI to read."""
|
|
active = []
|
|
for r in self.recorders:
|
|
fn = r.current_filename
|
|
if fn:
|
|
try:
|
|
rel = os.path.relpath(fn, self.output_dir).replace('\\', '/')
|
|
active.append(rel)
|
|
except ValueError:
|
|
pass
|
|
try:
|
|
out = Path(self.output_dir)
|
|
out.mkdir(parents=True, exist_ok=True)
|
|
tmp = str(out / 'status.json.tmp')
|
|
with open(tmp, 'w') as fh:
|
|
json.dump({'active': active}, fh)
|
|
os.replace(tmp, str(out / 'status.json'))
|
|
except Exception:
|
|
pass
|
|
|
|
def _status_writer_loop(self):
|
|
while self._status_running:
|
|
self._write_status()
|
|
time.sleep(2)
|
|
|
|
def start(self):
|
|
"""Start all recorders in separate threads."""
|
|
if not self.recorders:
|
|
self.logger.error("No recorders to start!")
|
|
return
|
|
|
|
self.logger.info(f"Starting {len(self.recorders)} recorder(s)...")
|
|
|
|
for recorder in self.recorders:
|
|
thread = threading.Thread(target=recorder.record, name=f"Recorder-{recorder.name}")
|
|
thread.daemon = True
|
|
self.threads.append(thread)
|
|
thread.start()
|
|
|
|
self._status_running = True
|
|
st = threading.Thread(target=self._status_writer_loop, name='StatusWriter', daemon=True)
|
|
st.start()
|
|
|
|
self.logger.info("All recorders started")
|
|
|
|
# Wait for interrupt
|
|
try:
|
|
while True:
|
|
alive = [t for t in self.threads if t.is_alive()]
|
|
if not alive:
|
|
self.logger.info("All recorders have stopped")
|
|
break
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
self.logger.info("Received interrupt signal, stopping all recorders...")
|
|
self.stop()
|
|
|
|
def stop(self):
|
|
"""Stop all recorders gracefully."""
|
|
self._status_running = False
|
|
for recorder in self.recorders:
|
|
recorder.stop()
|
|
|
|
# Use a shared deadline so N recorders don't each burn 5 s sequentially,
|
|
# which would exceed Docker's stop_grace_period for more than 2 recorders.
|
|
deadline = time.time() + 25
|
|
for thread in self.threads:
|
|
remaining = max(0.1, deadline - time.time())
|
|
thread.join(timeout=remaining)
|
|
|
|
# Clear status file so the web UI shows no active recordings
|
|
try:
|
|
status_path = Path(self.output_dir) / 'status.json'
|
|
if status_path.exists():
|
|
status_path.unlink()
|
|
except Exception:
|
|
pass
|
|
|
|
self.logger.info("All recorders stopped")
|
|
|
|
|
|
def list_audio_devices():
|
|
"""List available audio input devices from all backends."""
|
|
# Create a minimal logger for device listing
|
|
logger = logging.getLogger('isr-devices')
|
|
logger.setLevel(logging.WARNING) # Suppress debug output
|
|
|
|
print("\n" + "=" * 70)
|
|
print(" ISR Audio Device Discovery")
|
|
print("=" * 70)
|
|
|
|
# Check available backends
|
|
available_backends = []
|
|
if ALSABackend.is_available():
|
|
available_backends.append(('alsa', 'ALSA (arecord)', 5))
|
|
|
|
if not available_backends:
|
|
print("\n No audio backends available!")
|
|
print("\n Install one of:")
|
|
print(" sudo apt install alsa-utils (ALSA, always available on Linux)")
|
|
print()
|
|
return
|
|
|
|
print("\n Available Backends:")
|
|
for name, label, priority in sorted(available_backends, key=lambda x: -x[2]):
|
|
marker = " (preferred)" if priority == max(b[2] for b in available_backends) else ""
|
|
print(f" - {label}{marker}")
|
|
|
|
# Initialize audio system and list devices
|
|
audio_system = AudioSystem(logger)
|
|
devices = audio_system.list_all_devices()
|
|
|
|
if not devices:
|
|
print("\n No input devices found!")
|
|
print()
|
|
return
|
|
|
|
# Group by type
|
|
monitors = [d for d in devices if d.is_monitor]
|
|
inputs = [d for d in devices if not d.is_monitor]
|
|
|
|
if inputs:
|
|
print("\n Input Devices:")
|
|
print(" " + "-" * 68)
|
|
for dev in inputs:
|
|
flags = []
|
|
if dev.is_default:
|
|
flags.append("DEFAULT")
|
|
flag_str = f" [{', '.join(flags)}]" if flags else ""
|
|
print(f"\n {dev.name}{flag_str}")
|
|
print(f" ID: {dev.id} | Backend: {dev.backend}")
|
|
print(f" Channels: {dev.channels} | Sample Rate: {dev.sample_rate} Hz")
|
|
|
|
if monitors:
|
|
print("\n Monitor/Loopback Sources:")
|
|
print(" " + "-" * 68)
|
|
for dev in monitors:
|
|
flags = ["MONITOR"]
|
|
if dev.is_default:
|
|
flags.append("DEFAULT")
|
|
flag_str = f" [{', '.join(flags)}]"
|
|
print(f"\n {dev.name}{flag_str}")
|
|
print(f" ID: {dev.id} | Backend: {dev.backend}")
|
|
print(f" Channels: {dev.channels} | Sample Rate: {dev.sample_rate} Hz")
|
|
|
|
print("\n" + "=" * 70)
|
|
print(" Configuration Examples:")
|
|
print("-" * 70)
|
|
print(" device = default # Use system default input")
|
|
print(" device = monitor # Use first monitor/loopback source")
|
|
print(" device = <name> # Match by partial name")
|
|
print(" device = <id> # Use exact backend ID")
|
|
print(" backend = pipewire # Force specific backend")
|
|
print("=" * 70 + "\n")
|
|
|
|
|
|
def main():
|
|
# Check for --list-devices flag
|
|
if len(sys.argv) > 1 and sys.argv[1] in ('--list-devices', '-l'):
|
|
list_audio_devices()
|
|
return
|
|
|
|
# Get config file
|
|
config_file = 'config.ini'
|
|
if len(sys.argv) > 1:
|
|
config_file = sys.argv[1]
|
|
|
|
if not os.path.exists(config_file):
|
|
print(f"Error: Configuration file '{config_file}' not found!")
|
|
print("Usage: python ISR.py [config.ini]")
|
|
print(" python ISR.py --list-devices")
|
|
sys.exit(1)
|
|
|
|
# Docker sends SIGTERM before SIGKILL — treat it the same as Ctrl+C
|
|
def _sigterm(sig, frame):
|
|
raise KeyboardInterrupt()
|
|
signal.signal(signal.SIGTERM, _sigterm)
|
|
|
|
manager = RecorderManager(config_file)
|
|
manager.start()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|