""" Tests for isr.py Run with: pytest tests/ """ import logging import struct import wave from datetime import datetime from pathlib import Path from typing import List from unittest.mock import MagicMock, patch import pytest # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def make_logger(): logger = logging.getLogger("test") logger.setLevel(logging.CRITICAL) # suppress noise during tests return logger def fixed_clock(dt: datetime): """Return a zero-argument callable that always returns *dt*.""" return lambda: dt # --------------------------------------------------------------------------- # Import the module under test # --------------------------------------------------------------------------- import sys import os sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) import isr # noqa: E402 (must come after sys.path manipulation) # =========================================================================== # AudioDevice / AudioSystem # =========================================================================== class TestAudioSystemFindDevice: """Tests for AudioSystem.find_device().""" def _make_system(self, devices: List[isr.AudioDevice]) -> isr.AudioSystem: system = isr.AudioSystem.__new__(isr.AudioSystem) system.logger = make_logger() system._backends = {} system.list_all_devices = lambda: devices return system def _dev(self, id_, name, *, is_default=False, is_monitor=False, backend="portaudio"): return isr.AudioDevice( id=id_, name=name, channels=2, sample_rate=44100, backend=backend, is_default=is_default, is_monitor=is_monitor, ) def test_default_returns_default_device(self): devices = [ self._dev("0", "Microphone"), self._dev("1", "Stereo Mix", is_default=True), ] system = self._make_system(devices) result = system.find_device("default") assert result.id == "1" def test_default_falls_back_to_first_when_none_marked(self): devices = [self._dev("0", "Mic"), self._dev("1", "Line In")] system = self._make_system(devices) result = system.find_device("default") assert result.id == "0" def test_monitor_returns_monitor_device(self): devices = [ self._dev("0", "Microphone"), self._dev("1", "Monitor of Built-in Audio", is_monitor=True), ] system = self._make_system(devices) result = system.find_device("monitor") assert result.id == "1" def test_monitor_returns_none_when_no_monitor(self): devices = [self._dev("0", "Mic")] system = self._make_system(devices) assert system.find_device("monitor") is None def test_exact_id_match(self): devices = [self._dev("42", "Device 42"), self._dev("7", "Device 7")] system = self._make_system(devices) assert system.find_device("42").name == "Device 42" def test_exact_name_match_case_insensitive(self): devices = [self._dev("0", "Stereo Mix")] system = self._make_system(devices) assert system.find_device("stereo mix").id == "0" def test_partial_name_match(self): devices = [self._dev("0", "Realtek Stereo Mix")] system = self._make_system(devices) assert system.find_device("stereo").id == "0" def test_returns_none_for_unknown_spec(self): devices = [self._dev("0", "Mic")] system = self._make_system(devices) assert system.find_device("nonexistent_device_xyz") is None def test_preferred_backend_filter_on_default(self): devices = [ self._dev("0", "Mic", is_default=True, backend="pulseaudio"), self._dev("1", "Mic", is_default=True, backend="portaudio"), ] system = self._make_system(devices) result = system.find_device("default", preferred_backend="portaudio") assert result.backend == "portaudio" # =========================================================================== # BaseRecorder helpers # =========================================================================== class TestGetNextSplitTime: """Tests for BaseRecorder.get_next_split_time().""" def _recorder(self, split_minutes: int, now: datetime) -> isr.BaseRecorder: """Create a concrete BaseRecorder subclass with an injected clock.""" class _Rec(isr.BaseRecorder): def record(self): pass cfg = {"split_minutes": split_minutes, "output_directory": "/tmp/isr_test"} r = _Rec.__new__(_Rec) r.name = "test" r.config = cfg r.logger = make_logger() r.running = False r.current_file = None r.current_filename = None r._clock = fixed_clock(now) r.split_duration = split_minutes r.output_dir = cfg["output_directory"] r.filename_pattern = "%Y%m%d_%H%M%S" r.max_retries = 3 r.retry_delay = 1 r.file_format = "auto" return r def test_30min_splits_at_next_half_hour(self): now = datetime(2024, 1, 1, 14, 10, 30) r = self._recorder(30, now) nxt = r.get_next_split_time() assert nxt == datetime(2024, 1, 1, 14, 30, 0) def test_60min_splits_at_next_hour(self): now = datetime(2024, 1, 1, 14, 45, 0) r = self._recorder(60, now) nxt = r.get_next_split_time() assert nxt == datetime(2024, 1, 1, 15, 0, 0) def test_exactly_on_boundary_schedules_next_period(self): # At exactly 14:00 with 60-min splits → next split is 15:00 now = datetime(2024, 1, 1, 14, 0, 0) r = self._recorder(60, now) nxt = r.get_next_split_time() assert nxt == datetime(2024, 1, 1, 15, 0, 0) def test_120min_split_crosses_hour_boundary(self): # 15:30 with 120-min splits → boundaries at 00:00, 02:00, …, 16:00 now = datetime(2024, 1, 1, 15, 30, 0) r = self._recorder(120, now) nxt = r.get_next_split_time() assert nxt == datetime(2024, 1, 1, 16, 0, 0) def test_seconds_are_zeroed(self): now = datetime(2024, 1, 1, 12, 5, 45) r = self._recorder(30, now) assert r.get_next_split_time().second == 0 assert r.get_next_split_time().microsecond == 0 class TestGenerateFilename: """Tests for BaseRecorder.generate_filename().""" def _recorder(self, pattern: str, now: datetime, output_dir: str) -> isr.BaseRecorder: class _Rec(isr.BaseRecorder): def record(self): pass r = _Rec.__new__(_Rec) r.name = "test" r.config = {} r.logger = make_logger() r.running = False r.current_file = None r.current_filename = None r._clock = fixed_clock(now) r.split_duration = 60 r.output_dir = output_dir r.filename_pattern = pattern r.max_retries = 3 r.retry_delay = 1 r.file_format = "auto" return r def test_basic_pattern(self, tmp_path): now = datetime(2024, 12, 25, 14, 30, 0) r = self._recorder("%Y%m%d_%H%M%S", now, str(tmp_path)) name = r.generate_filename("mp3") assert name.endswith("20241225_143000.mp3") def test_subdirectory_created(self, tmp_path): now = datetime(2024, 12, 25, 14, 30, 0) r = self._recorder("%Y/%m/%d/rec_%H%M%S", now, str(tmp_path)) name = r.generate_filename("ogg") parent = Path(name).parent assert parent.exists() def test_output_dir_prefix(self, tmp_path): now = datetime(2024, 1, 1, 0, 0, 0) r = self._recorder("%Y%m%d", now, str(tmp_path)) name = r.generate_filename("wav") assert name.startswith(str(tmp_path)) # =========================================================================== # StreamRecorder — format detection # =========================================================================== class TestDetectFormat: """Tests for StreamRecorder.detect_format().""" def _recorder(self) -> isr.StreamRecorder: cfg = { "url": "http://example.com/stream", "output_directory": "/tmp", "split_minutes": 60, "filename_pattern": "%Y%m%d_%H%M%S", "max_retries": 1, "retry_delay_seconds": 0, "format": "auto", } with patch.object(isr, "REQUESTS_AVAILABLE", True): r = isr.StreamRecorder.__new__(isr.StreamRecorder) r.name = "test" r.config = cfg r.logger = make_logger() r.running = False r.current_file = None r.current_filename = None r._clock = datetime.now r.split_duration = 60 r.output_dir = "/tmp" r.filename_pattern = "%Y%m%d_%H%M%S" r.max_retries = 1 r.retry_delay = 0 r.file_format = "auto" r.stream_url = "http://example.com/stream" r.username = None r.password = None r.stream_headers = None r.header_capture_complete = False r.detected_format = None return r def _response(self, content_type: str) -> MagicMock: resp = MagicMock() resp.headers = {"Content-Type": content_type} return resp @pytest.mark.parametrize("ct,expected", [ ("audio/mpeg", "mp3"), ("audio/mp3", "mp3"), ("audio/ogg", "ogg"), ("application/ogg", "ogg"), ("audio/aac", "aac"), ("audio/aacp", "aac"), ("audio/flac", "flac"), ("audio/opus", "opus"), ]) def test_known_content_types(self, ct, expected): r = self._recorder() assert r.detect_format(self._response(ct)) == expected def test_unknown_content_type_defaults_to_mp3(self): r = self._recorder() assert r.detect_format(self._response("application/octet-stream")) == "mp3" def test_needs_header_per_file_ogg(self): r = self._recorder() r.detected_format = "ogg" assert r.needs_header_per_file() is True def test_needs_header_per_file_mp3(self): r = self._recorder() r.detected_format = "mp3" assert r.needs_header_per_file() is False # =========================================================================== # StreamRecorder — OGG page parsing # =========================================================================== def _build_ogg_page(granule_pos: int = 0, is_bos: bool = False, payload: bytes = b"\x00" * 10) -> bytes: """Build a minimal valid OGG page for testing.""" header_type = 0x02 if is_bos else 0x00 # Single segment containing the entire payload num_segments = 1 segment_table = bytes([len(payload)]) # Granule position (8 bytes little-endian) granule_bytes = struct.pack(" isr.StreamRecorder: r = isr.StreamRecorder.__new__(isr.StreamRecorder) r.name = "test" r.logger = make_logger() return r def test_parses_valid_page(self): page = _build_ogg_page(granule_pos=0, is_bos=True, payload=b"X" * 10) r = self._recorder() result = r.parse_ogg_page(page, 0) assert result is not None page_bytes, is_header, next_offset = result assert page_bytes == page assert next_offset == len(page) def test_returns_none_for_non_ogg_data(self): r = self._recorder() assert r.parse_ogg_page(b"ID3\x00" + b"\x00" * 50, 0) is None def test_returns_none_when_data_too_short(self): r = self._recorder() assert r.parse_ogg_page(b"OggS\x00\x02", 0) is None def test_bos_flag_detected(self): page = _build_ogg_page(granule_pos=0, is_bos=True, payload=b"\x01" * 5) r = self._recorder() _, is_header, _ = r.parse_ogg_page(page, 0) assert is_header is True def test_non_zero_granule_is_not_header(self): page = _build_ogg_page(granule_pos=1000, is_bos=False, payload=b"\x00" * 5) r = self._recorder() _, is_header, _ = r.parse_ogg_page(page, 0) assert is_header is False class TestExtractOggHeaders: def _recorder(self) -> isr.StreamRecorder: r = isr.StreamRecorder.__new__(isr.StreamRecorder) r.name = "test" r.logger = make_logger() return r def test_extracts_bos_pages(self): header1 = _build_ogg_page(granule_pos=0, is_bos=True, payload=b"H1" * 5) header2 = _build_ogg_page(granule_pos=0, is_bos=False, payload=b"H2" * 5) audio = _build_ogg_page(granule_pos=500, is_bos=False, payload=b"audio" * 3) data = header1 + header2 + audio r = self._recorder() headers, remaining = r.extract_ogg_headers(data) # Both header pages (granule_pos == 0) should be captured assert header1 in headers assert header2 in headers def test_non_ogg_data_returns_empty_headers(self): r = self._recorder() headers, remaining = r.extract_ogg_headers(b"ID3\x00garbage") assert headers == b"" def test_remaining_data_preserved(self): header = _build_ogg_page(granule_pos=0, is_bos=True, payload=b"hdr") audio = _build_ogg_page(granule_pos=100, payload=b"aud") r = self._recorder() headers, remaining = r.extract_ogg_headers(header + audio) assert len(headers) > 0 # The audio page should be in remaining assert audio in remaining or len(remaining) >= len(audio) - 5 # =========================================================================== # _AudioFileWriter # =========================================================================== class TestAudioFileWriter: def test_wav_writes_valid_file(self, tmp_path): path = str(tmp_path / "out.wav") writer = isr._AudioFileWriter(path, channels=2, sample_rate=44100, fmt="wav") # 1 frame of stereo int16 silence writer.write(b"\x00" * 4) writer.close() with wave.open(path, "rb") as wf: assert wf.getnchannels() == 2 assert wf.getsampwidth() == 2 assert wf.getframerate() == 44100 def test_flac_raises_when_soundfile_unavailable(self, tmp_path): path = str(tmp_path / "out.flac") with patch.object(isr, "SOUNDFILE_AVAILABLE", False): with pytest.raises(ImportError, match="soundfile"): isr._AudioFileWriter(path, channels=2, sample_rate=44100, fmt="flac") @pytest.mark.skipif(not isr.SOUNDFILE_AVAILABLE, reason="soundfile not installed") def test_flac_writes_valid_file(self, tmp_path): import soundfile as sf path = str(tmp_path / "out.flac") writer = isr._AudioFileWriter(path, channels=2, sample_rate=44100, fmt="flac") # 100ms of stereo silence at 44100 Hz → 44100 * 0.1 * 2 channels * 2 bytes = 17640 bytes writer.write(b"\x00" * 17640) writer.close() info = sf.info(path) assert info.channels == 2 assert info.samplerate == 44100 # =========================================================================== # RecorderManager — config loading # =========================================================================== class TestRecorderManagerLoadConfig: """Verify that config.ini settings are correctly parsed and applied.""" def _minimal_config(self, tmp_path) -> str: log_file = str(tmp_path / "test.log") return f""" [general] output_directory = {str(tmp_path / "recordings")} split_minutes = 30 filename_pattern = test_%Y%m%d max_retries = 3 retry_delay_seconds = 2 log_level = WARNING log_file = {log_file} [my_stream] type = stream url = http://stream.example.com:8000/live format = ogg """ def test_stream_recorder_created(self, tmp_path): cfg_path = tmp_path / "config.ini" cfg_path.write_text(self._minimal_config(tmp_path)) # Patch AudioSystem so it doesn't probe real hardware with patch.object(isr, "AudioSystem") as mock_as: mock_as.return_value.available_backends = [] mgr = isr.RecorderManager(str(cfg_path)) assert len(mgr.recorders) == 1 rec = mgr.recorders[0] assert isinstance(rec, isr.StreamRecorder) assert rec.name == "my_stream" assert rec.stream_url == "http://stream.example.com:8000/live" def test_general_settings_inherited_by_source(self, tmp_path): cfg_path = tmp_path / "config.ini" cfg_path.write_text(self._minimal_config(tmp_path)) with patch.object(isr, "AudioSystem") as mock_as: mock_as.return_value.available_backends = [] mgr = isr.RecorderManager(str(cfg_path)) rec = mgr.recorders[0] assert rec.split_duration == 30 assert rec.max_retries == 3 def test_source_overrides_general(self, tmp_path): log_file = str(tmp_path / "test.log") config_text = f""" [general] output_directory = {str(tmp_path / "recordings")} split_minutes = 60 filename_pattern = %Y%m%d_%H%M%S max_retries = 10 retry_delay_seconds = 5 log_level = WARNING log_file = {log_file} [overriding_stream] type = stream url = http://example.com/stream split_minutes = 15 filename_pattern = custom_%Y%m%d """ cfg_path = tmp_path / "config.ini" cfg_path.write_text(config_text) with patch.object(isr, "AudioSystem") as mock_as: mock_as.return_value.available_backends = [] mgr = isr.RecorderManager(str(cfg_path)) rec = mgr.recorders[0] assert rec.split_duration == 15 assert rec.filename_pattern == "custom_%Y%m%d" def test_unknown_type_is_skipped(self, tmp_path): log_file = str(tmp_path / "test.log") config_text = f""" [general] log_level = WARNING log_file = {log_file} [bad_source] type = unknown_type url = http://x.com/s [good_source] type = stream url = http://x.com/s2 """ cfg_path = tmp_path / "config.ini" cfg_path.write_text(config_text) with patch.object(isr, "AudioSystem") as mock_as: mock_as.return_value.available_backends = [] mgr = isr.RecorderManager(str(cfg_path)) assert len(mgr.recorders) == 1 assert mgr.recorders[0].name == "good_source" def test_missing_config_file_exits(self, tmp_path): with pytest.raises(SystemExit): # main() calls sys.exit(1) when the file is missing with patch("sys.argv", ["isr.py", str(tmp_path / "nonexistent.ini")]): isr.main() # =========================================================================== # StreamRecorder — record() integration (mocked HTTP) # =========================================================================== class TestStreamRecorderRecord: """Integration-level tests with a mocked requests session.""" def _recorder(self, fmt="mp3", clock=None) -> isr.StreamRecorder: cfg = { "url": "http://example.com/stream", "output_directory": "", # overridden per-test with tmp_path "split_minutes": 60, "filename_pattern": "%Y%m%d_%H%M%S", "max_retries": 2, "retry_delay_seconds": 0, "format": fmt, } with patch.object(isr, "REQUESTS_AVAILABLE", True): r = isr.StreamRecorder( "test_stream", cfg, make_logger(), clock=clock or datetime.now ) return r def test_connection_failure_retries(self, tmp_path): rec = self._recorder() rec.output_dir = str(tmp_path) rec.max_retries = 2 with patch.object(rec, "connect_stream", return_value=None) as mock_connect: # connect_stream always fails → recorder gives up after max_retries rec.record() assert mock_connect.call_count == rec.max_retries # =========================================================================== # SoundcardRecorder — unit tests # =========================================================================== class TestSoundcardRecorder: def _make_device(self, backend="portaudio") -> isr.AudioDevice: return isr.AudioDevice( id="0", name="Test Device", channels=2, sample_rate=44100, backend=backend, ) def _make_audio_system(self, device: isr.AudioDevice) -> isr.AudioSystem: system = MagicMock(spec=isr.AudioSystem) system.available_backends = [device.backend] system.find_device.return_value = device return system def test_wav_file_written_correctly(self, tmp_path): device = self._make_device() audio_system = self._make_audio_system(device) audio_system.get_backend.return_value = MagicMock() cfg = { "device": "default", "sample_rate": 44100, "channels": 2, "format": "wav", "output_directory": str(tmp_path), "split_minutes": 60, "filename_pattern": "%Y%m%d_%H%M%S", "max_retries": 1, "retry_delay_seconds": 0, } rec = isr.SoundcardRecorder("sc_test", cfg, make_logger(), audio_system=audio_system) rec._open_output_file() # Simulate audio callback delivering one chunk silence = b"\x00" * (44100 * 2 * 2 // 10) # 100ms of stereo int16 silence rec._audio_callback(silence) rec.close_current_file() out_files = list(tmp_path.glob("*.wav")) assert len(out_files) == 1 with wave.open(str(out_files[0]), "rb") as wf: assert wf.getnchannels() == 2 assert wf.getframerate() == 44100 def test_flac_raises_when_soundfile_unavailable(self, tmp_path): device = self._make_device() audio_system = self._make_audio_system(device) cfg = { "device": "default", "sample_rate": 44100, "channels": 2, "format": "flac", "output_directory": str(tmp_path), "split_minutes": 60, "filename_pattern": "%Y%m%d_%H%M%S", "max_retries": 1, "retry_delay_seconds": 0, } with patch.object(isr, "SOUNDFILE_AVAILABLE", False): rec = isr.SoundcardRecorder("sc_test", cfg, make_logger(), audio_system=audio_system) with pytest.raises(ImportError, match="soundfile"): rec._open_output_file() @pytest.mark.skipif(not isr.SOUNDFILE_AVAILABLE, reason="soundfile not installed") def test_flac_file_written_correctly(self, tmp_path): import soundfile as sf device = self._make_device() audio_system = self._make_audio_system(device) cfg = { "device": "default", "sample_rate": 44100, "channels": 2, "format": "flac", "output_directory": str(tmp_path), "split_minutes": 60, "filename_pattern": "%Y%m%d_%H%M%S", "max_retries": 1, "retry_delay_seconds": 0, } rec = isr.SoundcardRecorder("sc_test", cfg, make_logger(), audio_system=audio_system) rec._open_output_file() silence = b"\x00" * (44100 * 2 * 2 // 10) rec._audio_callback(silence) rec.close_current_file() out_files = list(tmp_path.glob("*.flac")) assert len(out_files) == 1 info = sf.info(str(out_files[0])) assert info.channels == 2 assert info.samplerate == 44100