Files
ISR/tests/test_isr.py
T
admin 5e7620627b feat: name cut clips by wall-clock time; fix recording filename format
Cut downloads were named by byte offsets (`..._cut_740s-750s.flac`). They are
now named by the actual recording time the slice covers, e.g.
`20260523_22-31-30_22-32-30.flac` for a 22:31:30->22:32:30 cut of a recording
started at 22:00:00.

To make this reliable, the recording filename is now a fixed
`%Y%m%d_%H%M%S` start-time format (`FILENAME_FORMAT`) shared by isr.py and
web.py, replacing the user-configurable `filename_pattern` (web.py never reads
config.ini, so a custom pattern could not be parsed back). web.py parses the
start time out of the filename via `_recording_start()` and builds cut names
with `_cut_filename()`. The DATE column now also comes from the filename
(falling back to mtime only for non-standard names), since mtime is the last
write, not the start.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-11 14:30:30 +02:00

663 lines
23 KiB
Python

"""
Tests for isr.py
Run with: pytest tests/
"""
import logging
import struct
import wave
from datetime import datetime
from pathlib import Path
from typing import List
from unittest.mock import MagicMock, patch
import pytest
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def make_logger():
logger = logging.getLogger("test")
logger.setLevel(logging.CRITICAL) # suppress noise during tests
return logger
def fixed_clock(dt: datetime):
"""Return a zero-argument callable that always returns *dt*."""
return lambda: dt
# ---------------------------------------------------------------------------
# Import the module under test
# ---------------------------------------------------------------------------
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import isr # noqa: E402 (must come after sys.path manipulation)
# ===========================================================================
# AudioDevice / AudioSystem
# ===========================================================================
class TestAudioSystemFindDevice:
"""Tests for AudioSystem.find_device()."""
def _make_system(self, devices: List[isr.AudioDevice]) -> isr.AudioSystem:
system = isr.AudioSystem.__new__(isr.AudioSystem)
system.logger = make_logger()
system._backends = {}
system.list_all_devices = lambda: devices
return system
def _dev(self, id_, name, *, is_default=False, is_monitor=False, backend="portaudio"):
return isr.AudioDevice(
id=id_, name=name, channels=2, sample_rate=44100,
backend=backend, is_default=is_default, is_monitor=is_monitor,
)
def test_default_returns_default_device(self):
devices = [
self._dev("0", "Microphone"),
self._dev("1", "Stereo Mix", is_default=True),
]
system = self._make_system(devices)
result = system.find_device("default")
assert result.id == "1"
def test_default_falls_back_to_first_when_none_marked(self):
devices = [self._dev("0", "Mic"), self._dev("1", "Line In")]
system = self._make_system(devices)
result = system.find_device("default")
assert result.id == "0"
def test_monitor_returns_monitor_device(self):
devices = [
self._dev("0", "Microphone"),
self._dev("1", "Monitor of Built-in Audio", is_monitor=True),
]
system = self._make_system(devices)
result = system.find_device("monitor")
assert result.id == "1"
def test_monitor_returns_none_when_no_monitor(self):
devices = [self._dev("0", "Mic")]
system = self._make_system(devices)
assert system.find_device("monitor") is None
def test_exact_id_match(self):
devices = [self._dev("42", "Device 42"), self._dev("7", "Device 7")]
system = self._make_system(devices)
assert system.find_device("42").name == "Device 42"
def test_exact_name_match_case_insensitive(self):
devices = [self._dev("0", "Stereo Mix")]
system = self._make_system(devices)
assert system.find_device("stereo mix").id == "0"
def test_partial_name_match(self):
devices = [self._dev("0", "Realtek Stereo Mix")]
system = self._make_system(devices)
assert system.find_device("stereo").id == "0"
def test_returns_none_for_unknown_spec(self):
devices = [self._dev("0", "Mic")]
system = self._make_system(devices)
assert system.find_device("nonexistent_device_xyz") is None
def test_preferred_backend_filter_on_default(self):
devices = [
self._dev("0", "Mic", is_default=True, backend="pulseaudio"),
self._dev("1", "Mic", is_default=True, backend="portaudio"),
]
system = self._make_system(devices)
result = system.find_device("default", preferred_backend="portaudio")
assert result.backend == "portaudio"
# ===========================================================================
# BaseRecorder helpers
# ===========================================================================
class TestGetNextSplitTime:
"""Tests for BaseRecorder.get_next_split_time()."""
def _recorder(self, split_minutes: int, now: datetime) -> isr.BaseRecorder:
"""Create a concrete BaseRecorder subclass with an injected clock."""
class _Rec(isr.BaseRecorder):
def record(self): pass
cfg = {"split_minutes": split_minutes, "output_directory": "/tmp/isr_test"}
r = _Rec.__new__(_Rec)
r.name = "test"
r.config = cfg
r.logger = make_logger()
r.running = False
r.current_file = None
r.current_filename = None
r._clock = fixed_clock(now)
r.split_duration = split_minutes
r.output_dir = cfg["output_directory"]
r.max_retries = 3
r.retry_delay = 1
r.file_format = "auto"
return r
def test_30min_splits_at_next_half_hour(self):
now = datetime(2024, 1, 1, 14, 10, 30)
r = self._recorder(30, now)
nxt = r.get_next_split_time()
assert nxt == datetime(2024, 1, 1, 14, 30, 0)
def test_60min_splits_at_next_hour(self):
now = datetime(2024, 1, 1, 14, 45, 0)
r = self._recorder(60, now)
nxt = r.get_next_split_time()
assert nxt == datetime(2024, 1, 1, 15, 0, 0)
def test_exactly_on_boundary_schedules_next_period(self):
# At exactly 14:00 with 60-min splits → next split is 15:00
now = datetime(2024, 1, 1, 14, 0, 0)
r = self._recorder(60, now)
nxt = r.get_next_split_time()
assert nxt == datetime(2024, 1, 1, 15, 0, 0)
def test_120min_split_crosses_hour_boundary(self):
# 15:30 with 120-min splits → boundaries at 00:00, 02:00, …, 16:00
now = datetime(2024, 1, 1, 15, 30, 0)
r = self._recorder(120, now)
nxt = r.get_next_split_time()
assert nxt == datetime(2024, 1, 1, 16, 0, 0)
def test_seconds_are_zeroed(self):
now = datetime(2024, 1, 1, 12, 5, 45)
r = self._recorder(30, now)
assert r.get_next_split_time().second == 0
assert r.get_next_split_time().microsecond == 0
class TestGenerateFilename:
"""Tests for BaseRecorder.generate_filename()."""
def _recorder(self, now: datetime, output_dir: str) -> isr.BaseRecorder:
class _Rec(isr.BaseRecorder):
def record(self): pass
r = _Rec.__new__(_Rec)
r.name = "test"
r.config = {}
r.logger = make_logger()
r.running = False
r.current_file = None
r.current_filename = None
r._clock = fixed_clock(now)
r.split_duration = 60
r.output_dir = output_dir
r.max_retries = 3
r.retry_delay = 1
r.file_format = "auto"
return r
def test_fixed_format(self, tmp_path):
# Filenames are always <YYYYMMDD>_<HHMMSS>.<ext> — the recording start.
now = datetime(2024, 12, 25, 14, 30, 0)
r = self._recorder(now, str(tmp_path))
name = r.generate_filename("mp3")
assert name.endswith("20241225_143000.mp3")
def test_output_dir_prefix(self, tmp_path):
now = datetime(2024, 1, 1, 0, 0, 0)
r = self._recorder(now, str(tmp_path))
name = r.generate_filename("wav")
assert name.startswith(str(tmp_path))
# ===========================================================================
# StreamRecorder — format detection
# ===========================================================================
class TestDetectFormat:
"""Tests for StreamRecorder.detect_format()."""
def _recorder(self) -> isr.StreamRecorder:
cfg = {
"url": "http://example.com/stream",
"output_directory": "/tmp",
"split_minutes": 60,
"max_retries": 1,
"retry_delay_seconds": 0,
"format": "auto",
}
with patch.object(isr, "REQUESTS_AVAILABLE", True):
r = isr.StreamRecorder.__new__(isr.StreamRecorder)
r.name = "test"
r.config = cfg
r.logger = make_logger()
r.running = False
r.current_file = None
r.current_filename = None
r._clock = datetime.now
r.split_duration = 60
r.output_dir = "/tmp"
r.max_retries = 1
r.retry_delay = 0
r.file_format = "auto"
r.stream_url = "http://example.com/stream"
r.username = None
r.password = None
r.stream_headers = None
r.header_capture_complete = False
r.detected_format = None
return r
def _response(self, content_type: str) -> MagicMock:
resp = MagicMock()
resp.headers = {"Content-Type": content_type}
return resp
@pytest.mark.parametrize("ct,expected", [
("audio/mpeg", "mp3"),
("audio/mp3", "mp3"),
("audio/ogg", "ogg"),
("application/ogg", "ogg"),
("audio/aac", "aac"),
("audio/aacp", "aac"),
("audio/flac", "flac"),
("audio/opus", "opus"),
])
def test_known_content_types(self, ct, expected):
r = self._recorder()
assert r.detect_format(self._response(ct)) == expected
def test_unknown_content_type_defaults_to_mp3(self):
r = self._recorder()
assert r.detect_format(self._response("application/octet-stream")) == "mp3"
def test_needs_header_per_file_ogg(self):
r = self._recorder()
r.detected_format = "ogg"
assert r.needs_header_per_file() is True
def test_needs_header_per_file_mp3(self):
r = self._recorder()
r.detected_format = "mp3"
assert r.needs_header_per_file() is False
# ===========================================================================
# StreamRecorder — OGG page parsing
# ===========================================================================
def _build_ogg_page(granule_pos: int = 0, is_bos: bool = False,
payload: bytes = b"\x00" * 10) -> bytes:
"""Build a minimal valid OGG page for testing."""
header_type = 0x02 if is_bos else 0x00
# Single segment containing the entire payload
num_segments = 1
segment_table = bytes([len(payload)])
# Granule position (8 bytes little-endian)
granule_bytes = struct.pack("<Q", granule_pos)
# serial number, page sequence number, checksum (all zeroed for simplicity)
header = (
b"OggS"
+ bytes([0x00]) # version
+ bytes([header_type]) # header type
+ granule_bytes # granule position
+ b"\x00" * 4 # serial number
+ b"\x00" * 4 # page sequence number
+ b"\x00" * 4 # checksum
+ bytes([num_segments]) # number of segments
+ segment_table # lacing values
+ payload
)
return header
class TestParseOggPage:
def _recorder(self) -> isr.StreamRecorder:
r = isr.StreamRecorder.__new__(isr.StreamRecorder)
r.name = "test"
r.logger = make_logger()
return r
def test_parses_valid_page(self):
page = _build_ogg_page(granule_pos=0, is_bos=True, payload=b"X" * 10)
r = self._recorder()
result = r.parse_ogg_page(page, 0)
assert result is not None
page_bytes, is_header, next_offset = result
assert page_bytes == page
assert next_offset == len(page)
def test_returns_none_for_non_ogg_data(self):
r = self._recorder()
assert r.parse_ogg_page(b"ID3\x00" + b"\x00" * 50, 0) is None
def test_returns_none_when_data_too_short(self):
r = self._recorder()
assert r.parse_ogg_page(b"OggS\x00\x02", 0) is None
def test_bos_flag_detected(self):
page = _build_ogg_page(granule_pos=0, is_bos=True, payload=b"\x01" * 5)
r = self._recorder()
_, is_header, _ = r.parse_ogg_page(page, 0)
assert is_header is True
def test_non_zero_granule_is_not_header(self):
page = _build_ogg_page(granule_pos=1000, is_bos=False, payload=b"\x00" * 5)
r = self._recorder()
_, is_header, _ = r.parse_ogg_page(page, 0)
assert is_header is False
class TestExtractOggHeaders:
def _recorder(self) -> isr.StreamRecorder:
r = isr.StreamRecorder.__new__(isr.StreamRecorder)
r.name = "test"
r.logger = make_logger()
return r
def test_extracts_bos_pages(self):
header1 = _build_ogg_page(granule_pos=0, is_bos=True, payload=b"H1" * 5)
header2 = _build_ogg_page(granule_pos=0, is_bos=False, payload=b"H2" * 5)
audio = _build_ogg_page(granule_pos=500, is_bos=False, payload=b"audio" * 3)
data = header1 + header2 + audio
r = self._recorder()
headers, remaining = r.extract_ogg_headers(data)
# Both header pages (granule_pos == 0) should be captured
assert header1 in headers
assert header2 in headers
def test_non_ogg_data_returns_empty_headers(self):
r = self._recorder()
headers, remaining = r.extract_ogg_headers(b"ID3\x00garbage")
assert headers == b""
def test_remaining_data_preserved(self):
header = _build_ogg_page(granule_pos=0, is_bos=True, payload=b"hdr")
audio = _build_ogg_page(granule_pos=100, payload=b"aud")
r = self._recorder()
headers, remaining = r.extract_ogg_headers(header + audio)
assert len(headers) > 0
# The audio page should be in remaining
assert audio in remaining or len(remaining) >= len(audio) - 5
# ===========================================================================
# _AudioFileWriter
# ===========================================================================
class TestAudioFileWriter:
def test_wav_writes_valid_file(self, tmp_path):
path = str(tmp_path / "out.wav")
writer = isr._AudioFileWriter(path, channels=2, sample_rate=44100, fmt="wav")
# 1 frame of stereo int16 silence
writer.write(b"\x00" * 4)
writer.close()
with wave.open(path, "rb") as wf:
assert wf.getnchannels() == 2
assert wf.getsampwidth() == 2
assert wf.getframerate() == 44100
def test_flac_raises_when_soundfile_unavailable(self, tmp_path):
path = str(tmp_path / "out.flac")
with patch.object(isr, "SOUNDFILE_AVAILABLE", False):
with pytest.raises(ImportError, match="soundfile"):
isr._AudioFileWriter(path, channels=2, sample_rate=44100, fmt="flac")
@pytest.mark.skipif(not isr.SOUNDFILE_AVAILABLE, reason="soundfile not installed")
def test_flac_writes_valid_file(self, tmp_path):
import soundfile as sf
path = str(tmp_path / "out.flac")
writer = isr._AudioFileWriter(path, channels=2, sample_rate=44100, fmt="flac")
# 100ms of stereo silence at 44100 Hz → 44100 * 0.1 * 2 channels * 2 bytes = 17640 bytes
writer.write(b"\x00" * 17640)
writer.close()
info = sf.info(path)
assert info.channels == 2
assert info.samplerate == 44100
# ===========================================================================
# RecorderManager — config loading
# ===========================================================================
class TestRecorderManagerLoadConfig:
"""Verify that config.ini settings are correctly parsed and applied."""
def _minimal_config(self, tmp_path) -> str:
log_file = str(tmp_path / "test.log")
return f"""
[general]
output_directory = {str(tmp_path / "recordings")}
split_minutes = 30
max_retries = 3
retry_delay_seconds = 2
log_level = WARNING
log_file = {log_file}
[my_stream]
type = stream
url = http://stream.example.com:8000/live
format = ogg
"""
def test_stream_recorder_created(self, tmp_path):
cfg_path = tmp_path / "config.ini"
cfg_path.write_text(self._minimal_config(tmp_path))
# Patch AudioSystem so it doesn't probe real hardware
with patch.object(isr, "AudioSystem") as mock_as:
mock_as.return_value.available_backends = []
mgr = isr.RecorderManager(str(cfg_path))
assert len(mgr.recorders) == 1
rec = mgr.recorders[0]
assert isinstance(rec, isr.StreamRecorder)
assert rec.name == "my_stream"
assert rec.stream_url == "http://stream.example.com:8000/live"
def test_general_settings_inherited_by_source(self, tmp_path):
cfg_path = tmp_path / "config.ini"
cfg_path.write_text(self._minimal_config(tmp_path))
with patch.object(isr, "AudioSystem") as mock_as:
mock_as.return_value.available_backends = []
mgr = isr.RecorderManager(str(cfg_path))
rec = mgr.recorders[0]
assert rec.split_duration == 30
assert rec.max_retries == 3
def test_source_overrides_general(self, tmp_path):
log_file = str(tmp_path / "test.log")
config_text = f"""
[general]
output_directory = {str(tmp_path / "recordings")}
split_minutes = 60
max_retries = 10
retry_delay_seconds = 5
log_level = WARNING
log_file = {log_file}
[overriding_stream]
type = stream
url = http://example.com/stream
split_minutes = 15
"""
cfg_path = tmp_path / "config.ini"
cfg_path.write_text(config_text)
with patch.object(isr, "AudioSystem") as mock_as:
mock_as.return_value.available_backends = []
mgr = isr.RecorderManager(str(cfg_path))
rec = mgr.recorders[0]
assert rec.split_duration == 15
def test_unknown_type_is_skipped(self, tmp_path):
log_file = str(tmp_path / "test.log")
config_text = f"""
[general]
log_level = WARNING
log_file = {log_file}
[bad_source]
type = unknown_type
url = http://x.com/s
[good_source]
type = stream
url = http://x.com/s2
"""
cfg_path = tmp_path / "config.ini"
cfg_path.write_text(config_text)
with patch.object(isr, "AudioSystem") as mock_as:
mock_as.return_value.available_backends = []
mgr = isr.RecorderManager(str(cfg_path))
assert len(mgr.recorders) == 1
assert mgr.recorders[0].name == "good_source"
def test_missing_config_file_exits(self, tmp_path):
with pytest.raises(SystemExit):
# main() calls sys.exit(1) when the file is missing
with patch("sys.argv", ["isr.py", str(tmp_path / "nonexistent.ini")]):
isr.main()
# ===========================================================================
# StreamRecorder — record() integration (mocked HTTP)
# ===========================================================================
class TestStreamRecorderRecord:
"""Integration-level tests with a mocked requests session."""
def _recorder(self, fmt="mp3", clock=None) -> isr.StreamRecorder:
cfg = {
"url": "http://example.com/stream",
"output_directory": "", # overridden per-test with tmp_path
"split_minutes": 60,
"max_retries": 2,
"retry_delay_seconds": 0,
"format": fmt,
}
with patch.object(isr, "REQUESTS_AVAILABLE", True):
r = isr.StreamRecorder(
"test_stream", cfg, make_logger(), clock=clock or datetime.now
)
return r
def test_connection_failure_retries(self, tmp_path):
rec = self._recorder()
rec.output_dir = str(tmp_path)
rec.max_retries = 2
with patch.object(rec, "connect_stream", return_value=None) as mock_connect:
# connect_stream always fails → recorder gives up after max_retries
rec.record()
assert mock_connect.call_count == rec.max_retries
# ===========================================================================
# SoundcardRecorder — unit tests
# ===========================================================================
class TestSoundcardRecorder:
def _make_device(self, backend="portaudio") -> isr.AudioDevice:
return isr.AudioDevice(
id="0", name="Test Device", channels=2,
sample_rate=44100, backend=backend,
)
def _make_audio_system(self, device: isr.AudioDevice) -> isr.AudioSystem:
system = MagicMock(spec=isr.AudioSystem)
system.available_backends = [device.backend]
system.find_device.return_value = device
return system
def test_wav_file_written_correctly(self, tmp_path):
device = self._make_device()
audio_system = self._make_audio_system(device)
audio_system.get_backend.return_value = MagicMock()
cfg = {
"device": "default",
"sample_rate": 44100,
"channels": 2,
"format": "wav",
"output_directory": str(tmp_path),
"split_minutes": 60,
"max_retries": 1,
"retry_delay_seconds": 0,
}
rec = isr.SoundcardRecorder("sc_test", cfg, make_logger(), audio_system=audio_system)
rec._open_output_file()
# Simulate audio callback delivering one chunk
silence = b"\x00" * (44100 * 2 * 2 // 10) # 100ms of stereo int16 silence
rec._audio_callback(silence)
rec.close_current_file()
out_files = list(tmp_path.glob("*.wav"))
assert len(out_files) == 1
with wave.open(str(out_files[0]), "rb") as wf:
assert wf.getnchannels() == 2
assert wf.getframerate() == 44100
def test_flac_raises_when_soundfile_unavailable(self, tmp_path):
device = self._make_device()
audio_system = self._make_audio_system(device)
cfg = {
"device": "default",
"sample_rate": 44100,
"channels": 2,
"format": "flac",
"output_directory": str(tmp_path),
"split_minutes": 60,
"max_retries": 1,
"retry_delay_seconds": 0,
}
with patch.object(isr, "SOUNDFILE_AVAILABLE", False):
rec = isr.SoundcardRecorder("sc_test", cfg, make_logger(), audio_system=audio_system)
with pytest.raises(ImportError, match="soundfile"):
rec._open_output_file()
@pytest.mark.skipif(not isr.SOUNDFILE_AVAILABLE, reason="soundfile not installed")
def test_flac_file_written_correctly(self, tmp_path):
import soundfile as sf
device = self._make_device()
audio_system = self._make_audio_system(device)
cfg = {
"device": "default",
"sample_rate": 44100,
"channels": 2,
"format": "flac",
"output_directory": str(tmp_path),
"split_minutes": 60,
"max_retries": 1,
"retry_delay_seconds": 0,
}
rec = isr.SoundcardRecorder("sc_test", cfg, make_logger(), audio_system=audio_system)
rec._open_output_file()
silence = b"\x00" * (44100 * 2 * 2 // 10)
rec._audio_callback(silence)
rec.close_current_file()
out_files = list(tmp_path.glob("*.flac"))
assert len(out_files) == 1
info = sf.info(str(out_files[0]))
assert info.channels == 2
assert info.samplerate == 44100