#!/usr/bin/env python3 """ ISR - Audio Recorder Records from multiple sources: Icecast streams and soundcards. Supports time-based file splitting and concurrent recording. """ import json import os import sys import time import wave import struct import signal import logging import threading import configparser import subprocess import shutil from abc import ABC, abstractmethod from dataclasses import dataclass, field from datetime import datetime, timedelta from pathlib import Path from typing import Optional, Dict, Any, List, Callable # Optional imports - check availability try: import requests REQUESTS_AVAILABLE = True except ImportError: REQUESTS_AVAILABLE = False try: import numpy as np NUMPY_AVAILABLE = True except ImportError: NUMPY_AVAILABLE = False try: import soundfile as sf SOUNDFILE_AVAILABLE = True except ImportError: SOUNDFILE_AVAILABLE = False # ============================================================================= # Audio Device & Backend System # ============================================================================= @dataclass class AudioDevice: """Represents an audio input device.""" id: str # Backend-specific identifier name: str # Human-readable name channels: int # Max input channels sample_rate: int # Default sample rate backend: str # Backend name (pulseaudio, pipewire, portaudio) is_default: bool = False # Is system default is_monitor: bool = False # Is a monitor/loopback source description: str = "" # Extended description extra: Dict[str, Any] = field(default_factory=dict) def __str__(self): flags = [] if self.is_default: flags.append("DEFAULT") if self.is_monitor: flags.append("MONITOR") flag_str = f" [{', '.join(flags)}]" if flags else "" return f"{self.name}{flag_str} ({self.backend})" class AudioBackend(ABC): """Abstract base for audio capture backends.""" name: str = "base" priority: int = 0 # Higher = preferred @classmethod @abstractmethod def is_available(cls) -> bool: """Check if this backend can be used on the current system.""" pass @classmethod @abstractmethod def list_devices(cls) -> List[AudioDevice]: """List all available input devices.""" pass @abstractmethod def open_stream(self, device: AudioDevice, sample_rate: int, channels: int, callback: Callable[[bytes], None]) -> Any: """Open an audio capture stream. Returns a context manager.""" pass class ALSABackend(AudioBackend): """ALSA backend using arecord (raw PCM output, no sound server required).""" name = "alsa" priority = 5 # Lowest priority — direct hardware access, use when no sound server runs @classmethod def is_available(cls) -> bool: return shutil.which('arecord') is not None @classmethod def list_devices(cls) -> List[AudioDevice]: devices = [] try: result = subprocess.run( ['arecord', '-l'], capture_output=True, text=True, timeout=5 ) if result.returncode != 0: return devices for line in result.stdout.split('\n'): if not line.startswith('card '): continue try: card_part, rest = line.split(':', 1) card_num = card_part.replace('card', '').strip() if ', device ' in rest: dev_part = rest.split(', device ')[1] dev_num = dev_part.split(':')[0].strip() else: dev_num = '0' long_name = rest.split('[')[1].split(']')[0] if '[' in rest else rest.strip() hw_id = f"hw:{card_num},{dev_num}" devices.append(AudioDevice( id=hw_id, name=long_name, channels=2, sample_rate=44100, backend=cls.name, is_default=(card_num == '0' and dev_num == '0'), is_monitor=False, )) except (IndexError, ValueError): continue except Exception: pass return devices def __init__(self, logger: logging.Logger): self.logger = logger def open_stream(self, device: AudioDevice, sample_rate: int, channels: int, callback: Callable[[bytes], None]): return ALSAStream(device, sample_rate, channels, callback, self.logger) class ALSAStream: """Context manager for ALSA recording using arecord (raw PCM output).""" def __init__(self, device: AudioDevice, sample_rate: int, channels: int, callback: Callable[[bytes], None], logger: logging.Logger): self.device = device self.sample_rate = sample_rate self.channels = channels self.callback = callback self.logger = logger self._running = False self._thread = None self._process = None def __enter__(self): self._running = True cmd = [ 'arecord', '-D', self.device.id, '-f', 'S16_LE', '-r', str(self.sample_rate), '-c', str(self.channels), '--file-type', 'raw', '-', ] try: self._process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, ) except FileNotFoundError: raise RuntimeError("arecord not found - install alsa-utils: sudo apt install alsa-utils") self._thread = threading.Thread(target=self._read_loop, daemon=True) self._thread.start() return self def __exit__(self, *args): self._running = False if self._process: self._process.terminate() try: self._process.wait(timeout=2) except subprocess.TimeoutExpired: self._process.kill() if self._thread: self._thread.join(timeout=1) def _read_loop(self): chunk_size = self.sample_rate * self.channels * 2 // 10 # 100 ms chunks while self._running and self._process.poll() is None: try: data = self._process.stdout.read(chunk_size) if data: self.callback(data) except Exception as e: self.logger.error(f"ALSA read error: {e}") break class AudioSystem: """Manages audio backends and device discovery.""" # All available backend classes _backend_classes: List[type] = [ALSABackend] def __init__(self, logger: logging.Logger): self.logger = logger self._backends: Dict[str, AudioBackend] = {} self._discover_backends() def _discover_backends(self): """Find and initialize available backends.""" for cls in self._backend_classes: if cls.is_available(): self._backends[cls.name] = cls(self.logger) self.logger.debug(f"Audio backend available: {cls.name}") @property def available_backends(self) -> List[str]: return list(self._backends.keys()) def get_backend(self, name: str) -> Optional[AudioBackend]: return self._backends.get(name) def get_preferred_backend(self) -> Optional[AudioBackend]: """Get the highest priority available backend.""" if not self._backends: return None return max(self._backends.values(), key=lambda b: b.__class__.priority) def list_all_devices(self) -> List[AudioDevice]: """List devices from all available backends.""" all_devices = [] seen_names = set() # Get devices from backends in priority order for cls in sorted(self._backend_classes, key=lambda c: -c.priority): if cls.name in self._backends: for dev in cls.list_devices(): # Deduplicate by name (same device may appear in multiple backends) key = dev.name.lower() if key not in seen_names: all_devices.append(dev) seen_names.add(key) return all_devices def find_device(self, spec: str, preferred_backend: str = None) -> Optional[AudioDevice]: """Find a device by ID, name, or pattern.""" devices = self.list_all_devices() if not devices: return None # "default" - get the default device from preferred backend if spec.lower() == 'default': for dev in devices: if dev.is_default: if preferred_backend is None or dev.backend == preferred_backend: return dev return devices[0] if devices else None # "monitor" - get first monitor source if spec.lower() == 'monitor': for dev in devices: if dev.is_monitor: if preferred_backend is None or dev.backend == preferred_backend: return dev return None spec_lower = spec.lower() # Try exact ID match first for dev in devices: if dev.id == spec: return dev # Try exact name match for dev in devices: if dev.name.lower() == spec_lower: return dev # Try partial name match for dev in devices: if spec_lower in dev.name.lower(): return dev return None class BaseRecorder(ABC): """Abstract base class for all recorder types.""" def __init__(self, name: str, config: Dict[str, Any], logger: logging.Logger, clock: Callable[[], datetime] = None): self.name = name self.config = config self.logger = logger self.running = False self.current_file = None self.current_filename = None self._clock = clock or datetime.now # Common settings self.split_duration = config.get('split_minutes', 60) self.output_dir = config.get('output_directory', 'recordings') self.filename_pattern = config.get('filename_pattern', '%Y%m%d_%H%M%S') self.max_retries = config.get('max_retries', 10) self.retry_delay = config.get('retry_delay_seconds', 5) self.file_format = config.get('format', 'auto') def get_next_split_time(self) -> datetime: """Calculate the next split time aligned to split_duration boundaries.""" now = self._clock() total_minutes = now.hour * 60 + now.minute minutes_to_next = self.split_duration - (total_minutes % self.split_duration) next_split = now + timedelta(minutes=minutes_to_next) return next_split.replace(second=0, microsecond=0) def generate_filename(self, ext: str) -> str: """Generate filename from pattern with strftime substitution.""" now = self._clock() filename = now.strftime(self.filename_pattern) + f".{ext}" full_path = os.path.join(self.output_dir, filename) Path(full_path).parent.mkdir(parents=True, exist_ok=True) return full_path def close_current_file(self): """Close the current recording file if open.""" if self.current_file: try: self.current_file.close() self.logger.info(f"[{self.name}] Closed file: {self.current_filename}") except Exception as e: self.logger.error(f"[{self.name}] Error closing file: {e}") self.current_file = None self.current_filename = None def stop(self): """Signal the recorder to stop.""" self.running = False @abstractmethod def record(self): """Main recording loop - must be implemented by subclasses.""" pass class StreamRecorder(BaseRecorder): """Records from Icecast/HTTP audio streams.""" def __init__(self, name: str, config: Dict[str, Any], logger: logging.Logger, clock: Callable[[], datetime] = None): super().__init__(name, config, logger, clock) if not REQUESTS_AVAILABLE: raise ImportError("The 'requests' library is required for stream recording. Install with: pip install requests") self.stream_url = config.get('url') if not self.stream_url: raise ValueError(f"[{name}] Stream URL is required for stream type") self.username = config.get('username') or None self.password = config.get('password') or None # Stream header storage for formats that require headers in each file self.stream_headers = None self.header_capture_complete = False self.detected_format = None def detect_format(self, response) -> str: """Detect stream format from HTTP headers.""" content_type = response.headers.get('Content-Type', '').lower() format_map = { 'audio/mpeg': 'mp3', 'audio/mp3': 'mp3', 'audio/ogg': 'ogg', 'application/ogg': 'ogg', 'audio/aac': 'aac', 'audio/aacp': 'aac', 'audio/x-aac': 'aac', 'audio/flac': 'flac', 'audio/opus': 'opus' } for mime, fmt in format_map.items(): if mime in content_type: return fmt self.logger.warning(f"[{self.name}] Unknown content type: {content_type}, defaulting to mp3") return 'mp3' def parse_ogg_page(self, data: bytes, offset: int) -> Optional[tuple]: """Parse an OGG page starting at the given offset.""" if len(data) - offset < 27: return None if data[offset:offset+4] != b'OggS': return None header_type = data[offset + 5] is_bos = (header_type & 0x02) != 0 num_segments = data[offset + 26] if len(data) - offset < 27 + num_segments: return None segment_table = data[offset + 27:offset + 27 + num_segments] page_data_size = sum(segment_table) total_page_size = 27 + num_segments + page_data_size if len(data) - offset < total_page_size: return None page_bytes = data[offset:offset + total_page_size] granule_pos = int.from_bytes(data[offset + 6:offset + 14], 'little') is_header = is_bos or granule_pos == 0 return (page_bytes, is_header, offset + total_page_size) def extract_ogg_headers(self, data: bytes) -> tuple: """Extract OGG header pages from the beginning of stream data.""" headers = bytearray() offset = 0 header_count = 0 while offset < len(data): result = self.parse_ogg_page(data, offset) if result is None: break page_bytes, is_header, next_offset = result if is_header and header_count < 3: headers.extend(page_bytes) header_count += 1 offset = next_offset else: break return bytes(headers), data[offset:] def needs_header_per_file(self) -> bool: """Check if the detected format requires headers in each split file.""" return self.detected_format in ('ogg', 'opus', 'flac') def open_new_file(self): """Open a new recording file.""" self.close_current_file() ext = self.file_format if self.file_format != 'auto' else self.detected_format self.current_filename = self.generate_filename(ext) self.current_file = open(self.current_filename, 'wb') self.logger.info(f"[{self.name}] Started recording to: {self.current_filename}") if self.stream_headers and self.needs_header_per_file(): self.current_file.write(self.stream_headers) self.logger.debug(f"[{self.name}] Wrote {len(self.stream_headers)} bytes of stream headers") def connect_stream(self): """Connect to the stream.""" auth = None if self.username and self.password: auth = (self.username, self.password) try: self.logger.info(f"[{self.name}] Connecting to stream: {self.stream_url}") response = requests.get( self.stream_url, auth=auth, stream=True, timeout=10 ) response.raise_for_status() if self.file_format == 'auto': self.detected_format = self.detect_format(response) self.logger.info(f"[{self.name}] Detected format: {self.detected_format}") else: self.detected_format = self.file_format self.logger.info(f"[{self.name}] Using manual format: {self.file_format}") return response except requests.exceptions.RequestException as e: self.logger.error(f"[{self.name}] Connection failed: {e}") return None def record(self): """Main recording loop for streams.""" self.running = True retry_count = 0 next_split_time = self.get_next_split_time() self.logger.info(f"[{self.name}] Starting stream recorder - Split every {self.split_duration} minutes") self.logger.info(f"[{self.name}] Next split at: {next_split_time.strftime('%Y-%m-%d %H:%M:%S')}") while self.running: response = self.connect_stream() if response is None: retry_count += 1 if retry_count >= self.max_retries: self.logger.error(f"[{self.name}] Max retries ({self.max_retries}) reached. Stopping.") break self.logger.warning(f"[{self.name}] Retry {retry_count}/{self.max_retries} in {self.retry_delay} seconds...") time.sleep(self.retry_delay) continue retry_count = 0 self.header_capture_complete = False self.stream_headers = None header_buffer = bytearray() # Always open a new file on (re)connect — a reconnect means there is # a gap in the stream. For OGG/FLAC this is mandatory (header pages # must appear at the start of each file); for MP3/AAC it avoids # writing audio from two separate connections into the same file. self.open_new_file() try: for chunk in response.iter_content(chunk_size=8192): if not self.running: break if chunk: if self.needs_header_per_file() and not self.header_capture_complete: # Buffer data until we have enough to extract OGG/FLAC headers. # The chunk must NOT also be written directly — it is already in # header_buffer and will be flushed once headers are captured. header_buffer.extend(chunk) if len(header_buffer) >= 16384: self.stream_headers, _ = self.extract_ogg_headers(bytes(header_buffer)) if self.stream_headers: self.logger.info(f"[{self.name}] Captured {len(self.stream_headers)} bytes of stream headers") self.header_capture_complete = True self.current_file.write(bytes(header_buffer)) self.current_file.flush() header_buffer.clear() # Chunk is in the buffer; do not fall through to the write below. continue self.current_file.write(chunk) self.current_file.flush() if self._clock() >= next_split_time: self.open_new_file() next_split_time = self.get_next_split_time() self.logger.info(f"[{self.name}] Next split at: {next_split_time.strftime('%Y-%m-%d %H:%M:%S')}") except requests.exceptions.RequestException as e: self.logger.error(f"[{self.name}] Stream interrupted: {e}") if self.running: self.logger.info(f"[{self.name}] Attempting to reconnect...") time.sleep(self.retry_delay) continue self.close_current_file() self.logger.info(f"[{self.name}] Stream recorder stopped") class _AudioFileWriter: """Unified writer for WAV and FLAC output files. Wraps ``wave.Wave_write`` (WAV) and ``soundfile.SoundFile`` (FLAC) behind a common interface so the rest of the recorder does not need to branch on format. """ def __init__(self, path: str, channels: int, sample_rate: int, fmt: str): self.path = path self.channels = channels self.sample_rate = sample_rate self.fmt = fmt self._file = None if fmt == 'flac': if not NUMPY_AVAILABLE: raise ImportError( "numpy is required for FLAC output: pip install numpy" ) if not SOUNDFILE_AVAILABLE: raise ImportError( "soundfile is required for FLAC output: pip install soundfile" ) self._file = sf.SoundFile( path, mode='w', samplerate=sample_rate, channels=channels, subtype='PCM_16', format='FLAC', ) else: self._file = wave.open(path, 'wb') self._file.setnchannels(channels) self._file.setsampwidth(2) # 16-bit self._file.setframerate(sample_rate) def write(self, data: bytes) -> None: """Write raw int16 PCM bytes to the output file.""" if self.fmt == 'flac': arr = np.frombuffer(data, dtype=' 1: arr = arr.reshape(-1, self.channels) self._file.write(arr) else: self._file.writeframes(data) def close(self) -> None: if self._file is not None: self._file.close() self._file = None class SoundcardRecorder(BaseRecorder): """Records from soundcard/audio devices using multiple backends.""" def __init__(self, name: str, config: Dict[str, Any], logger: logging.Logger, audio_system: 'AudioSystem' = None, clock: Callable[[], datetime] = None): super().__init__(name, config, logger, clock) # Initialize audio system if not provided self.audio_system = audio_system or AudioSystem(logger) if not self.audio_system.available_backends: raise RuntimeError( "No audio backends available.\n" "Install ALSA utilities: sudo apt install alsa-utils\n" "(In Docker, also ensure /dev/snd is mapped into the container)" ) # Audio settings self.device_spec = config.get('device', 'default') self.sample_rate = config.get('sample_rate', 44100) self.channels = config.get('channels', 2) self.preferred_backend = config.get('backend', None) # Validate format if self.file_format not in ('wav', 'flac') and self.file_format != 'auto': self.logger.warning(f"[{self.name}] Format '{self.file_format}' not supported, using 'wav'") self.file_format = 'wav' elif self.file_format == 'auto': self.file_format = 'wav' # Resolve device self.device = self._resolve_device() # Audio buffer self.audio_buffer = [] self.buffer_lock = threading.Lock() def _resolve_device(self) -> AudioDevice: """Resolve device specification to AudioDevice.""" device = self.audio_system.find_device(self.device_spec, self.preferred_backend) if device is None: # List available devices in error available = self.audio_system.list_all_devices() device_list = "\n ".join(str(d) for d in available) if available else "None found" raise ValueError( f"[{self.name}] Device '{self.device_spec}' not found.\n" f"Available devices:\n {device_list}" ) self.logger.info(f"[{self.name}] Using device: {device}") return device def _audio_callback(self, data: bytes): """Callback for audio data from backend.""" with self.buffer_lock: self.audio_buffer.append(data) def _open_output_file(self): """Open a new WAV or FLAC output file for recording.""" self._flush_buffer_to_file() self.close_current_file() self.current_filename = self.generate_filename(self.file_format) self.current_file = _AudioFileWriter( self.current_filename, self.channels, self.sample_rate, self.file_format ) self.logger.info(f"[{self.name}] Recording to: {self.current_filename}") def _flush_buffer_to_file(self): """Write buffered audio data to file.""" if self.current_file is None: return with self.buffer_lock: if self.audio_buffer: for data in self.audio_buffer: self.current_file.write(data) self.audio_buffer.clear() def close_current_file(self): """Close current recording file.""" self._flush_buffer_to_file() if self.current_file: try: self.current_file.close() self.logger.info(f"[{self.name}] Closed: {self.current_filename}") except Exception as e: self.logger.error(f"[{self.name}] Error closing file: {e}") self.current_file = None self.current_filename = None def record(self): """Main recording loop.""" self.running = True retry_count = 0 next_split = self.get_next_split_time() backend = self.audio_system.get_backend(self.device.backend) if not backend: self.logger.error(f"[{self.name}] Backend '{self.device.backend}' not available") return self.logger.info(f"[{self.name}] Starting recorder - Split every {self.split_duration} min") self.logger.info(f"[{self.name}] Backend: {self.device.backend}, Device: {self.device.name}") self.logger.info(f"[{self.name}] Sample rate: {self.sample_rate}, Channels: {self.channels}") self.logger.info(f"[{self.name}] Next split: {next_split.strftime('%H:%M:%S')}") while self.running: try: if self.current_file is None: self._open_output_file() with backend.open_stream( self.device, self.sample_rate, self.channels, self._audio_callback ): self.logger.info(f"[{self.name}] Recording started") retry_count = 0 while self.running: time.sleep(0.5) self._flush_buffer_to_file() if self._clock() >= next_split: self._open_output_file() next_split = self.get_next_split_time() self.logger.info(f"[{self.name}] Next split: {next_split.strftime('%H:%M:%S')}") except Exception as e: self.logger.error(f"[{self.name}] Error: {e}") retry_count += 1 if retry_count >= self.max_retries: self.logger.error(f"[{self.name}] Max retries reached. Stopping.") break self.logger.warning(f"[{self.name}] Retry {retry_count}/{self.max_retries} in {self.retry_delay}s...") time.sleep(self.retry_delay) self.close_current_file() self.logger.info(f"[{self.name}] Recorder stopped") class RecorderManager: """Manages multiple recorders running concurrently.""" def __init__(self, config_file: str = 'config.ini'): self.config_file = config_file self.recorders: List[BaseRecorder] = [] self.threads: List[threading.Thread] = [] self.logger = None self.audio_system = None self.output_dir = 'recordings' self._status_running = False self._load_config() def _load_config(self): """Load and parse the configuration file.""" if not os.path.exists(self.config_file): print(f"Error: Configuration file '{self.config_file}' not found!") print("Usage: python isr.py [config.ini]") print(" python isr.py --list-devices") sys.exit(1) config = configparser.ConfigParser(interpolation=None) config.read(self.config_file) # Get general settings (defaults) general = { 'output_directory': config.get('general', 'output_directory', fallback='recordings'), 'split_minutes': config.getint('general', 'split_minutes', fallback=60), 'filename_pattern': config.get('general', 'filename_pattern', fallback='%Y%m%d_%H%M%S', raw=True), 'max_retries': config.getint('general', 'max_retries', fallback=10), 'retry_delay_seconds': config.getint('general', 'retry_delay_seconds', fallback=5), 'log_level': config.get('general', 'log_level', fallback='INFO').upper(), 'log_file': config.get('general', 'log_file', fallback='recorder.log'), } self.output_dir = general['output_directory'] # Setup logging self._setup_logging(general['log_level'], general['log_file']) # Initialize shared audio system for soundcard recorders self.audio_system = AudioSystem(self.logger) if self.audio_system.available_backends: self.logger.info(f"Audio backends: {', '.join(self.audio_system.available_backends)}") # Parse source sections for section in config.sections(): if section == 'general': continue source_type = config.get(section, 'type', fallback=None) if source_type is None: self.logger.warning(f"Section [{section}] has no 'type', skipping") continue # Build source config by merging general with section-specific source_config = general.copy() for key, value in config.items(section): if key == 'type': continue if key in ('split_minutes', 'max_retries', 'retry_delay_seconds', 'sample_rate', 'channels'): source_config[key] = int(value) else: source_config[key] = value # Create recorder try: if source_type == 'stream': recorder = StreamRecorder(section, source_config, self.logger) self.recorders.append(recorder) self.logger.info(f"Configured stream: [{section}]") elif source_type == 'soundcard': recorder = SoundcardRecorder(section, source_config, self.logger, self.audio_system) self.recorders.append(recorder) self.logger.info(f"Configured soundcard: [{section}]") else: self.logger.warning(f"Unknown type '{source_type}' in [{section}], skipping") except Exception as e: self.logger.error(f"Failed to configure [{section}]: {e}") if not self.recorders: self.logger.error("No valid recording sources configured!") def _setup_logging(self, level: str, log_file: str): """Setup logging configuration.""" numeric_level = getattr(logging, level, logging.INFO) # Clear any existing handlers root_logger = logging.getLogger() root_logger.handlers.clear() logging.basicConfig( level=numeric_level, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file), logging.StreamHandler(sys.stdout) ] ) self.logger = logging.getLogger(__name__) def _write_status(self): """Write active filenames to status.json for the web UI to read.""" active = [] for r in self.recorders: fn = r.current_filename if fn: try: rel = os.path.relpath(fn, self.output_dir).replace('\\', '/') active.append(rel) except ValueError: pass try: out = Path(self.output_dir) out.mkdir(parents=True, exist_ok=True) tmp = str(out / 'status.json.tmp') with open(tmp, 'w') as fh: json.dump({'active': active}, fh) os.replace(tmp, str(out / 'status.json')) except Exception: pass def _status_writer_loop(self): while self._status_running: self._write_status() time.sleep(2) def start(self): """Start all recorders in separate threads.""" if not self.recorders: self.logger.error("No recorders to start!") return self.logger.info(f"Starting {len(self.recorders)} recorder(s)...") for recorder in self.recorders: thread = threading.Thread(target=recorder.record, name=f"Recorder-{recorder.name}") thread.daemon = True self.threads.append(thread) thread.start() self._status_running = True st = threading.Thread(target=self._status_writer_loop, name='StatusWriter', daemon=True) st.start() self.logger.info("All recorders started") # Wait for interrupt try: while True: alive = [t for t in self.threads if t.is_alive()] if not alive: self.logger.info("All recorders have stopped") break time.sleep(1) except KeyboardInterrupt: self.logger.info("Received interrupt signal, stopping all recorders...") self.stop() def stop(self): """Stop all recorders gracefully.""" self._status_running = False for recorder in self.recorders: recorder.stop() # Use a shared deadline so N recorders don't each burn 5 s sequentially, # which would exceed Docker's stop_grace_period for more than 2 recorders. deadline = time.time() + 25 for thread in self.threads: remaining = max(0.1, deadline - time.time()) thread.join(timeout=remaining) # Clear status file so the web UI shows no active recordings try: status_path = Path(self.output_dir) / 'status.json' if status_path.exists(): status_path.unlink() except Exception: pass self.logger.info("All recorders stopped") def list_audio_devices(): """List available audio input devices from all backends.""" # Create a minimal logger for device listing logger = logging.getLogger('isr-devices') logger.setLevel(logging.WARNING) # Suppress debug output print("\n" + "=" * 70) print(" ISR Audio Device Discovery") print("=" * 70) # Check available backends available_backends = [] if ALSABackend.is_available(): available_backends.append(('alsa', 'ALSA (arecord)', 5)) if not available_backends: print("\n No audio backends available!") print("\n Install one of:") print(" sudo apt install alsa-utils (ALSA, always available on Linux)") print() return print("\n Available Backends:") for name, label, priority in sorted(available_backends, key=lambda x: -x[2]): marker = " (preferred)" if priority == max(b[2] for b in available_backends) else "" print(f" - {label}{marker}") # Initialize audio system and list devices audio_system = AudioSystem(logger) devices = audio_system.list_all_devices() if not devices: print("\n No input devices found!") print() return # Group by type monitors = [d for d in devices if d.is_monitor] inputs = [d for d in devices if not d.is_monitor] if inputs: print("\n Input Devices:") print(" " + "-" * 68) for dev in inputs: flags = [] if dev.is_default: flags.append("DEFAULT") flag_str = f" [{', '.join(flags)}]" if flags else "" print(f"\n {dev.name}{flag_str}") print(f" ID: {dev.id} | Backend: {dev.backend}") print(f" Channels: {dev.channels} | Sample Rate: {dev.sample_rate} Hz") if monitors: print("\n Monitor/Loopback Sources:") print(" " + "-" * 68) for dev in monitors: flags = ["MONITOR"] if dev.is_default: flags.append("DEFAULT") flag_str = f" [{', '.join(flags)}]" print(f"\n {dev.name}{flag_str}") print(f" ID: {dev.id} | Backend: {dev.backend}") print(f" Channels: {dev.channels} | Sample Rate: {dev.sample_rate} Hz") print("\n" + "=" * 70) print(" Configuration Examples:") print("-" * 70) print(" device = default # Use system default input") print(" device = monitor # Use first monitor/loopback source") print(" device = # Match by partial name") print(" device = # Use exact backend ID") print(" backend = pipewire # Force specific backend") print("=" * 70 + "\n") def main(): # Check for --list-devices flag if len(sys.argv) > 1 and sys.argv[1] in ('--list-devices', '-l'): list_audio_devices() return # Get config file config_file = 'config.ini' if len(sys.argv) > 1: config_file = sys.argv[1] if not os.path.exists(config_file): print(f"Error: Configuration file '{config_file}' not found!") print("Usage: python ISR.py [config.ini]") print(" python ISR.py --list-devices") sys.exit(1) # Docker sends SIGTERM before SIGKILL — treat it the same as Ctrl+C def _sigterm(sig, frame): raise KeyboardInterrupt() signal.signal(signal.SIGTERM, _sigterm) manager = RecorderManager(config_file) manager.start() if __name__ == '__main__': main()