""" Tests for isr.py Run with: pytest tests/ """ import io import logging import struct import wave from datetime import datetime from pathlib import Path from types import SimpleNamespace from typing import List from unittest.mock import MagicMock, patch, call import pytest # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def make_logger(): logger = logging.getLogger("test") logger.setLevel(logging.CRITICAL) # suppress noise during tests return logger def fixed_clock(dt: datetime): """Return a zero-argument callable that always returns *dt*.""" return lambda: dt # --------------------------------------------------------------------------- # Import the module under test # --------------------------------------------------------------------------- import sys import os sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) import isr # noqa: E402 (must come after sys.path manipulation) # =========================================================================== # AudioDevice / AudioSystem # =========================================================================== class TestAudioSystemFindDevice: """Tests for AudioSystem.find_device().""" def _make_system(self, devices: List[isr.AudioDevice]) -> isr.AudioSystem: system = isr.AudioSystem.__new__(isr.AudioSystem) system.logger = make_logger() system._backends = {} system.list_all_devices = lambda: devices return system def _dev(self, id_, name, *, is_default=False, is_monitor=False, backend="portaudio"): return isr.AudioDevice( id=id_, name=name, channels=2, sample_rate=44100, backend=backend, is_default=is_default, is_monitor=is_monitor, ) def test_default_returns_default_device(self): devices = [ self._dev("0", "Microphone"), self._dev("1", "Stereo Mix", is_default=True), ] system = self._make_system(devices) result = system.find_device("default") assert result.id == "1" def test_default_falls_back_to_first_when_none_marked(self): devices = [self._dev("0", "Mic"), self._dev("1", "Line In")] system = self._make_system(devices) result = system.find_device("default") assert result.id == "0" def test_monitor_returns_monitor_device(self): devices = [ self._dev("0", "Microphone"), self._dev("1", "Monitor of Built-in Audio", is_monitor=True), ] system = self._make_system(devices) result = system.find_device("monitor") assert result.id == "1" def test_monitor_returns_none_when_no_monitor(self): devices = [self._dev("0", "Mic")] system = self._make_system(devices) assert system.find_device("monitor") is None def test_exact_id_match(self): devices = [self._dev("42", "Device 42"), self._dev("7", "Device 7")] system = self._make_system(devices) assert system.find_device("42").name == "Device 42" def test_exact_name_match_case_insensitive(self): devices = [self._dev("0", "Stereo Mix")] system = self._make_system(devices) assert system.find_device("stereo mix").id == "0" def test_partial_name_match(self): devices = [self._dev("0", "Realtek Stereo Mix")] system = self._make_system(devices) assert system.find_device("stereo").id == "0" def test_returns_none_for_unknown_spec(self): devices = [self._dev("0", "Mic")] system = self._make_system(devices) assert system.find_device("nonexistent_device_xyz") is None def test_preferred_backend_filter_on_default(self): devices = [ self._dev("0", "Mic", is_default=True, backend="pulseaudio"), self._dev("1", "Mic", is_default=True, backend="portaudio"), ] system = self._make_system(devices) result = system.find_device("default", preferred_backend="portaudio") assert result.backend == "portaudio" # =========================================================================== # BaseRecorder helpers # =========================================================================== class TestGetNextSplitTime: """Tests for BaseRecorder.get_next_split_time().""" def _recorder(self, split_minutes: int, now: datetime) -> isr.BaseRecorder: """Create a concrete BaseRecorder subclass with an injected clock.""" class _Rec(isr.BaseRecorder): def record(self): pass cfg = {"split_minutes": split_minutes, "output_directory": "/tmp/isr_test"} r = _Rec.__new__(_Rec) r.name = "test" r.config = cfg r.logger = make_logger() r.running = False r.current_file = None r.current_filename = None r._clock = fixed_clock(now) r.split_duration = split_minutes r.output_dir = cfg["output_directory"] r.filename_pattern = "%Y%m%d_%H%M%S" r.max_retries = 3 r.retry_delay = 1 r.file_format = "auto" return r def test_30min_splits_at_next_half_hour(self): now = datetime(2024, 1, 1, 14, 10, 30) r = self._recorder(30, now) nxt = r.get_next_split_time() assert nxt == datetime(2024, 1, 1, 14, 30, 0) def test_60min_splits_at_next_hour(self): now = datetime(2024, 1, 1, 14, 45, 0) r = self._recorder(60, now) nxt = r.get_next_split_time() assert nxt == datetime(2024, 1, 1, 15, 0, 0) def test_exactly_on_boundary_schedules_next_period(self): # At exactly 14:00 with 60-min splits → next split is 15:00 now = datetime(2024, 1, 1, 14, 0, 0) r = self._recorder(60, now) nxt = r.get_next_split_time() assert nxt == datetime(2024, 1, 1, 15, 0, 0) def test_120min_split_crosses_hour_boundary(self): # 15:30 with 120-min splits → boundaries at 00:00, 02:00, …, 16:00 now = datetime(2024, 1, 1, 15, 30, 0) r = self._recorder(120, now) nxt = r.get_next_split_time() assert nxt == datetime(2024, 1, 1, 16, 0, 0) def test_seconds_are_zeroed(self): now = datetime(2024, 1, 1, 12, 5, 45) r = self._recorder(30, now) assert r.get_next_split_time().second == 0 assert r.get_next_split_time().microsecond == 0 class TestGenerateFilename: """Tests for BaseRecorder.generate_filename().""" def _recorder(self, pattern: str, now: datetime, output_dir: str) -> isr.BaseRecorder: class _Rec(isr.BaseRecorder): def record(self): pass r = _Rec.__new__(_Rec) r.name = "test" r.config = {} r.logger = make_logger() r.running = False r.current_file = None r.current_filename = None r._clock = fixed_clock(now) r.split_duration = 60 r.output_dir = output_dir r.filename_pattern = pattern r.max_retries = 3 r.retry_delay = 1 r.file_format = "auto" return r def test_basic_pattern(self, tmp_path): now = datetime(2024, 12, 25, 14, 30, 0) r = self._recorder("%Y%m%d_%H%M%S", now, str(tmp_path)) name = r.generate_filename("mp3") assert name.endswith("20241225_143000.mp3") def test_subdirectory_created(self, tmp_path): now = datetime(2024, 12, 25, 14, 30, 0) r = self._recorder("%Y/%m/%d/rec_%H%M%S", now, str(tmp_path)) name = r.generate_filename("ogg") parent = Path(name).parent assert parent.exists() def test_output_dir_prefix(self, tmp_path): now = datetime(2024, 1, 1, 0, 0, 0) r = self._recorder("%Y%m%d", now, str(tmp_path)) name = r.generate_filename("wav") assert name.startswith(str(tmp_path)) # =========================================================================== # StreamRecorder — format detection # =========================================================================== class TestDetectFormat: """Tests for StreamRecorder.detect_format().""" def _recorder(self) -> isr.StreamRecorder: cfg = { "url": "http://example.com/stream", "output_directory": "/tmp", "split_minutes": 60, "filename_pattern": "%Y%m%d_%H%M%S", "max_retries": 1, "retry_delay_seconds": 0, "format": "auto", } with patch.object(isr, "REQUESTS_AVAILABLE", True): r = isr.StreamRecorder.__new__(isr.StreamRecorder) r.name = "test" r.config = cfg r.logger = make_logger() r.running = False r.current_file = None r.current_filename = None r._clock = datetime.now r.split_duration = 60 r.output_dir = "/tmp" r.filename_pattern = "%Y%m%d_%H%M%S" r.max_retries = 1 r.retry_delay = 0 r.file_format = "auto" r.stream_url = "http://example.com/stream" r.username = None r.password = None r.stream_headers = None r.header_capture_complete = False r.detected_format = None return r def _response(self, content_type: str) -> MagicMock: resp = MagicMock() resp.headers = {"Content-Type": content_type} return resp @pytest.mark.parametrize("ct,expected", [ ("audio/mpeg", "mp3"), ("audio/mp3", "mp3"), ("audio/ogg", "ogg"), ("application/ogg", "ogg"), ("audio/aac", "aac"), ("audio/aacp", "aac"), ("audio/flac", "flac"), ("audio/opus", "opus"), ]) def test_known_content_types(self, ct, expected): r = self._recorder() assert r.detect_format(self._response(ct)) == expected def test_unknown_content_type_defaults_to_mp3(self): r = self._recorder() assert r.detect_format(self._response("application/octet-stream")) == "mp3" def test_needs_header_per_file_ogg(self): r = self._recorder() r.detected_format = "ogg" assert r.needs_header_per_file() is True def test_needs_header_per_file_mp3(self): r = self._recorder() r.detected_format = "mp3" assert r.needs_header_per_file() is False # =========================================================================== # StreamRecorder — OGG page parsing # =========================================================================== def _build_ogg_page(granule_pos: int = 0, is_bos: bool = False, payload: bytes = b"\x00" * 10) -> bytes: """Build a minimal valid OGG page for testing.""" header_type = 0x02 if is_bos else 0x00 # Single segment containing the entire payload num_segments = 1 segment_table = bytes([len(payload)]) # Granule position (8 bytes little-endian) granule_bytes = struct.pack(" isr.StreamRecorder: r = isr.StreamRecorder.__new__(isr.StreamRecorder) r.name = "test" r.logger = make_logger() return r def test_parses_valid_page(self): page = _build_ogg_page(granule_pos=0, is_bos=True, payload=b"X" * 10) r = self._recorder() result = r.parse_ogg_page(page, 0) assert result is not None page_bytes, is_header, next_offset = result assert page_bytes == page assert next_offset == len(page) def test_returns_none_for_non_ogg_data(self): r = self._recorder() assert r.parse_ogg_page(b"ID3\x00" + b"\x00" * 50, 0) is None def test_returns_none_when_data_too_short(self): r = self._recorder() assert r.parse_ogg_page(b"OggS\x00\x02", 0) is None def test_bos_flag_detected(self): page = _build_ogg_page(granule_pos=0, is_bos=True, payload=b"\x01" * 5) r = self._recorder() _, is_header, _ = r.parse_ogg_page(page, 0) assert is_header is True def test_non_zero_granule_is_not_header(self): page = _build_ogg_page(granule_pos=1000, is_bos=False, payload=b"\x00" * 5) r = self._recorder() _, is_header, _ = r.parse_ogg_page(page, 0) assert is_header is False class TestExtractOggHeaders: def _recorder(self) -> isr.StreamRecorder: r = isr.StreamRecorder.__new__(isr.StreamRecorder) r.name = "test" r.logger = make_logger() return r def test_extracts_bos_pages(self): header1 = _build_ogg_page(granule_pos=0, is_bos=True, payload=b"H1" * 5) header2 = _build_ogg_page(granule_pos=0, is_bos=False, payload=b"H2" * 5) audio = _build_ogg_page(granule_pos=500, is_bos=False, payload=b"audio" * 3) data = header1 + header2 + audio r = self._recorder() headers, remaining = r.extract_ogg_headers(data) # Both header pages (granule_pos == 0) should be captured assert header1 in headers assert header2 in headers def test_non_ogg_data_returns_empty_headers(self): r = self._recorder() headers, remaining = r.extract_ogg_headers(b"ID3\x00garbage") assert headers == b"" def test_remaining_data_preserved(self): header = _build_ogg_page(granule_pos=0, is_bos=True, payload=b"hdr") audio = _build_ogg_page(granule_pos=100, payload=b"aud") r = self._recorder() headers, remaining = r.extract_ogg_headers(header + audio) assert len(headers) > 0 # The audio page should be in remaining assert audio in remaining or len(remaining) >= len(audio) - 5 # =========================================================================== # _AudioFileWriter # =========================================================================== class TestAudioFileWriter: def test_wav_writes_valid_file(self, tmp_path): path = str(tmp_path / "out.wav") writer = isr._AudioFileWriter(path, channels=2, sample_rate=44100, fmt="wav") # 1 frame of stereo int16 silence writer.write(b"\x00" * 4) writer.close() with wave.open(path, "rb") as wf: assert wf.getnchannels() == 2 assert wf.getsampwidth() == 2 assert wf.getframerate() == 44100 def test_flac_raises_when_soundfile_unavailable(self, tmp_path): path = str(tmp_path / "out.flac") with patch.object(isr, "SOUNDFILE_AVAILABLE", False): with pytest.raises(ImportError, match="soundfile"): isr._AudioFileWriter(path, channels=2, sample_rate=44100, fmt="flac") @pytest.mark.skipif(not isr.SOUNDFILE_AVAILABLE, reason="soundfile not installed") def test_flac_writes_valid_file(self, tmp_path): import soundfile as sf path = str(tmp_path / "out.flac") writer = isr._AudioFileWriter(path, channels=2, sample_rate=44100, fmt="flac") # 100ms of stereo silence at 44100 Hz → 44100 * 0.1 * 2 channels * 2 bytes = 17640 bytes writer.write(b"\x00" * 17640) writer.close() info = sf.info(path) assert info.channels == 2 assert info.samplerate == 44100 # =========================================================================== # RecorderManager — config loading # =========================================================================== class TestRecorderManagerLoadConfig: """Verify that config.ini settings are correctly parsed and applied.""" def _minimal_config(self, tmp_path) -> str: log_file = str(tmp_path / "test.log") return f""" [general] output_directory = {str(tmp_path / "recordings")} split_minutes = 30 filename_pattern = test_%Y%m%d max_retries = 3 retry_delay_seconds = 2 log_level = WARNING log_file = {log_file} [my_stream] type = stream url = http://stream.example.com:8000/live format = ogg """ def test_stream_recorder_created(self, tmp_path): cfg_path = tmp_path / "config.ini" cfg_path.write_text(self._minimal_config(tmp_path)) # Patch AudioSystem so it doesn't probe real hardware with patch.object(isr, "AudioSystem") as mock_as: mock_as.return_value.available_backends = [] mgr = isr.RecorderManager(str(cfg_path)) assert len(mgr.recorders) == 1 rec = mgr.recorders[0] assert isinstance(rec, isr.StreamRecorder) assert rec.name == "my_stream" assert rec.stream_url == "http://stream.example.com:8000/live" def test_general_settings_inherited_by_source(self, tmp_path): cfg_path = tmp_path / "config.ini" cfg_path.write_text(self._minimal_config(tmp_path)) with patch.object(isr, "AudioSystem") as mock_as: mock_as.return_value.available_backends = [] mgr = isr.RecorderManager(str(cfg_path)) rec = mgr.recorders[0] assert rec.split_duration == 30 assert rec.max_retries == 3 def test_source_overrides_general(self, tmp_path): log_file = str(tmp_path / "test.log") config_text = f""" [general] output_directory = {str(tmp_path / "recordings")} split_minutes = 60 filename_pattern = %Y%m%d_%H%M%S max_retries = 10 retry_delay_seconds = 5 log_level = WARNING log_file = {log_file} [overriding_stream] type = stream url = http://example.com/stream split_minutes = 15 filename_pattern = custom_%Y%m%d """ cfg_path = tmp_path / "config.ini" cfg_path.write_text(config_text) with patch.object(isr, "AudioSystem") as mock_as: mock_as.return_value.available_backends = [] mgr = isr.RecorderManager(str(cfg_path)) rec = mgr.recorders[0] assert rec.split_duration == 15 assert rec.filename_pattern == "custom_%Y%m%d" def test_unknown_type_is_skipped(self, tmp_path): log_file = str(tmp_path / "test.log") config_text = f""" [general] log_level = WARNING log_file = {log_file} [bad_source] type = unknown_type url = http://x.com/s [good_source] type = stream url = http://x.com/s2 """ cfg_path = tmp_path / "config.ini" cfg_path.write_text(config_text) with patch.object(isr, "AudioSystem") as mock_as: mock_as.return_value.available_backends = [] mgr = isr.RecorderManager(str(cfg_path)) assert len(mgr.recorders) == 1 assert mgr.recorders[0].name == "good_source" def test_missing_config_file_exits(self, tmp_path): with pytest.raises(SystemExit): # main() calls sys.exit(1) when the file is missing with patch("sys.argv", ["isr.py", str(tmp_path / "nonexistent.ini")]): isr.main() # =========================================================================== # StreamRecorder — record() integration (mocked HTTP) # =========================================================================== class TestStreamRecorderRecord: """Integration-level tests with a mocked requests session.""" def _recorder(self, fmt="mp3", clock=None) -> isr.StreamRecorder: cfg = { "url": "http://example.com/stream", "output_directory": "", # overridden per-test with tmp_path "split_minutes": 60, "filename_pattern": "%Y%m%d_%H%M%S", "max_retries": 2, "retry_delay_seconds": 0, "format": fmt, } with patch.object(isr, "REQUESTS_AVAILABLE", True): r = isr.StreamRecorder( "test_stream", cfg, make_logger(), clock=clock or datetime.now ) return r def test_mp3_chunks_written_to_file(self, tmp_path): chunks = [b"A" * 512, b"B" * 512, b"C" * 512] rec = self._recorder(fmt="mp3") rec.output_dir = str(tmp_path) mock_resp = MagicMock() mock_resp.headers = {"Content-Type": "audio/mpeg"} mock_resp.iter_content.return_value = iter(chunks) def _stop_after_connect(*args, **kwargs): rec.running = True return mock_resp with patch.object(rec, "connect_stream", side_effect=[mock_resp, None]): # connect_stream returns the mocked response on the first call; # we stop the loop via a side-effectful iter_content original_iter = mock_resp.iter_content.return_value def _chunks_then_stop(chunk_size=8192): for c in chunks: yield c rec.running = False # stop the outer while loop mock_resp.iter_content.side_effect = _chunks_then_stop mock_resp.headers = {"Content-Type": "audio/mpeg"} rec.detected_format = "mp3" rec.running = True rec.header_capture_complete = True rec.stream_headers = None # Open a file manually so we can verify writes filename = rec.generate_filename("mp3") rec.current_file = open(filename, "wb") rec.current_filename = filename # Simulate one inner loop iteration for chunk in _chunks_then_stop(): if chunk: rec.current_file.write(chunk) rec.close_current_file() written = Path(filename).read_bytes() assert written == b"A" * 512 + b"B" * 512 + b"C" * 512 def test_connection_failure_retries(self, tmp_path): rec = self._recorder() rec.output_dir = str(tmp_path) rec.max_retries = 2 with patch.object(rec, "connect_stream", return_value=None) as mock_connect: # connect_stream always fails → recorder gives up after max_retries rec.record() assert mock_connect.call_count == rec.max_retries # =========================================================================== # SoundcardRecorder — unit tests # =========================================================================== class TestSoundcardRecorder: def _make_device(self, backend="portaudio") -> isr.AudioDevice: return isr.AudioDevice( id="0", name="Test Device", channels=2, sample_rate=44100, backend=backend, ) def _make_audio_system(self, device: isr.AudioDevice) -> isr.AudioSystem: system = MagicMock(spec=isr.AudioSystem) system.available_backends = [device.backend] system.find_device.return_value = device return system def test_wav_file_written_correctly(self, tmp_path): device = self._make_device() audio_system = self._make_audio_system(device) audio_system.get_backend.return_value = MagicMock() cfg = { "device": "default", "sample_rate": 44100, "channels": 2, "format": "wav", "output_directory": str(tmp_path), "split_minutes": 60, "filename_pattern": "%Y%m%d_%H%M%S", "max_retries": 1, "retry_delay_seconds": 0, } rec = isr.SoundcardRecorder("sc_test", cfg, make_logger(), audio_system=audio_system) rec._open_output_file() # Simulate audio callback delivering one chunk silence = b"\x00" * (44100 * 2 * 2 // 10) # 100ms of stereo int16 silence rec._audio_callback(silence) rec.close_current_file() out_files = list(tmp_path.glob("*.wav")) assert len(out_files) == 1 with wave.open(str(out_files[0]), "rb") as wf: assert wf.getnchannels() == 2 assert wf.getframerate() == 44100 def test_flac_raises_when_soundfile_unavailable(self, tmp_path): device = self._make_device() audio_system = self._make_audio_system(device) cfg = { "device": "default", "sample_rate": 44100, "channels": 2, "format": "flac", "output_directory": str(tmp_path), "split_minutes": 60, "filename_pattern": "%Y%m%d_%H%M%S", "max_retries": 1, "retry_delay_seconds": 0, } with patch.object(isr, "SOUNDFILE_AVAILABLE", False): rec = isr.SoundcardRecorder("sc_test", cfg, make_logger(), audio_system=audio_system) with pytest.raises(ImportError, match="soundfile"): rec._open_output_file() @pytest.mark.skipif(not isr.SOUNDFILE_AVAILABLE, reason="soundfile not installed") def test_flac_file_written_correctly(self, tmp_path): import soundfile as sf device = self._make_device() audio_system = self._make_audio_system(device) cfg = { "device": "default", "sample_rate": 44100, "channels": 2, "format": "flac", "output_directory": str(tmp_path), "split_minutes": 60, "filename_pattern": "%Y%m%d_%H%M%S", "max_retries": 1, "retry_delay_seconds": 0, } rec = isr.SoundcardRecorder("sc_test", cfg, make_logger(), audio_system=audio_system) rec._open_output_file() silence = b"\x00" * (44100 * 2 * 2 // 10) rec._audio_callback(silence) rec.close_current_file() out_files = list(tmp_path.glob("*.flac")) assert len(out_files) == 1 info = sf.info(str(out_files[0])) assert info.channels == 2 assert info.samplerate == 44100