| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386 |
- import json
- import subprocess
- from datetime import UTC, datetime
- from pathlib import Path
- from unittest.mock import patch
- import pytest
- from agency_swarm.ui.demos import persistence
- from agency_swarm.ui.demos.launcher import TerminalDemoLauncher
- from agency_swarm.utils.thread import ThreadManager
- class _Agency:
- def __init__(self, messages: list[dict[str, str]]) -> None:
- self.thread_manager = ThreadManager()
- self.thread_manager.replace_messages(messages)
- def test_persistence_roundtrip_exercises_helpers(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
- persistence.set_chats_dir(str(tmp_path))
- # Stabilise branch detection inside save_current_chat.
- monkeypatch.setattr(
- persistence.subprocess,
- "check_output",
- lambda *_, **__: b"main\n",
- )
- user_msg = {"role": "user", "content": "call me maybe"}
- assistant_msg = {"role": "assistant", "content": "ok"}
- agency = _Agency([user_msg, assistant_msg])
- chat_id = "chat_coverage"
- persistence.save_current_chat(agency, chat_id)
- # Index and file should exist with non-trivial metadata.
- chat_file = Path(persistence.chat_file_path(chat_id))
- assert chat_file.exists()
- index_file = Path(persistence.index_file_path())
- assert index_file.exists()
- # Loading should honour replace_messages and return True even when files exist.
- fresh_agency = _Agency([])
- assert persistence.load_chat(fresh_agency, chat_id) is True
- assert fresh_agency.thread_manager.get_all_messages() == [user_msg, assistant_msg]
- # Helpers: summarise by user content and render relative timestamps without crashing.
- summary = persistence.summarize_messages([assistant_msg, user_msg])
- assert summary == "call me maybe"
- timestamp = datetime.now(UTC).isoformat()
- assert persistence.format_relative(timestamp).endswith("ago")
- # list_chat_records should surface the saved entry and include metadata counts.
- records = TerminalDemoLauncher.list_chat_records()
- assert records and records[0]["chat_id"] == chat_id
- assert records[0]["msgs"] == 2
- # Sanity check that raw payload matches expectations.
- payload = json.loads(chat_file.read_text())
- assert payload["metadata"]["summary"] == "call me maybe"
- def test_save_current_chat_persists_usage_in_metadata(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
- persistence.set_chats_dir(str(tmp_path))
- # Stabilise branch detection inside save_current_chat.
- monkeypatch.setattr(
- persistence.subprocess,
- "check_output",
- lambda *_, **__: b"main\n",
- )
- agency = _Agency([{"role": "user", "content": "hello"}])
- chat_id = "chat_with_usage"
- usage = {
- "request_count": 3,
- "cached_tokens": 1,
- "input_tokens": 10,
- "output_tokens": 20,
- "total_tokens": 30,
- "total_cost": 0.123,
- "reasoning_tokens": 2,
- "audio_tokens": 0,
- }
- persistence.save_current_chat(agency, chat_id, usage=usage)
- payload = json.loads(Path(persistence.chat_file_path(chat_id)).read_text())
- assert payload["metadata"]["usage"] == usage
- meta = persistence.load_chat_metadata(chat_id)
- assert isinstance(meta, dict)
- assert meta["usage"] == usage
- launcher_meta = TerminalDemoLauncher.load_chat_metadata(chat_id)
- assert isinstance(launcher_meta, dict)
- assert launcher_meta["usage"] == usage
- def test_summarize_messages_edge_cases() -> None:
- """Test summarize_messages with various message patterns."""
- # Test with assistant message first (fallback to assistant)
- messages = [
- {"role": "assistant", "content": "I can help you with that"},
- {"role": "system", "content": "System message"},
- ]
- assert persistence.summarize_messages(messages) == "I can help you with that"
- # Test with system message containing quoted text
- system_msg = {
- "role": "system",
- "content": 'All user messages:\n1. "Hello world" - user greeting\n2. Something else',
- }
- assert persistence.summarize_messages([system_msg]) == "Hello world"
- # Test with system message with quotes but no "All user messages:" pattern
- system_msg_quotes = {"role": "system", "content": 'Some instructions with "quoted text" and more content'}
- assert persistence.summarize_messages([system_msg_quotes]) == "quoted text"
- # Test with system message but no quoted content
- system_msg_no_quotes = {"role": "system", "content": "No quotes here at all"}
- assert persistence.summarize_messages([system_msg_no_quotes]) == "(no summary)"
- # Test with empty messages
- assert persistence.summarize_messages([]) == "(no summary)"
- # Test with non-string content
- messages_non_string = [{"role": "user", "content": None}, {"role": "assistant", "content": 123}]
- assert persistence.summarize_messages(messages_non_string) == "(no summary)"
- # Test clipping long text
- long_message = {"role": "user", "content": "x" * 100}
- summary = persistence.summarize_messages([long_message])
- assert len(summary) <= 64
- assert summary.endswith("…")
- def test_format_relative_edge_cases() -> None:
- """Test format_relative with various timestamp scenarios."""
- # Test with None timestamp
- assert persistence.format_relative(None) == "-"
- # Test with empty string
- assert persistence.format_relative("") == "-"
- # Test with invalid timestamp format
- assert persistence.format_relative("invalid-timestamp") == "-"
- # Test with naive datetime (no timezone)
- now = datetime.now()
- naive_iso = now.isoformat()
- result = persistence.format_relative(naive_iso)
- assert result.endswith("ago")
- # Test various time intervals
- now_utc = datetime.now(UTC)
- # 30 seconds ago
- past_30s = now_utc.timestamp() - 30
- past_30s_dt = datetime.fromtimestamp(past_30s, UTC)
- assert "s ago" in persistence.format_relative(past_30s_dt.isoformat())
- # 30 minutes ago
- past_30m = now_utc.timestamp() - 30 * 60
- past_30m_dt = datetime.fromtimestamp(past_30m, UTC)
- assert "m ago" in persistence.format_relative(past_30m_dt.isoformat())
- # 12 hours ago
- past_12h = now_utc.timestamp() - 12 * 3600
- past_12h_dt = datetime.fromtimestamp(past_12h, UTC)
- assert "h ago" in persistence.format_relative(past_12h_dt.isoformat())
- # 3 days ago
- past_3d = now_utc.timestamp() - 3 * 24 * 3600
- past_3d_dt = datetime.fromtimestamp(past_3d, UTC)
- result = persistence.format_relative(past_3d_dt.isoformat())
- assert result == "3 days ago"
- # 10 days ago (plural)
- past_10d = now_utc.timestamp() - 10 * 24 * 3600
- past_10d_dt = datetime.fromtimestamp(past_10d, UTC)
- assert "days ago" in persistence.format_relative(past_10d_dt.isoformat())
- # 2 weeks ago
- past_2w = now_utc.timestamp() - 14 * 24 * 3600
- past_2w_dt = datetime.fromtimestamp(past_2w, UTC)
- result = persistence.format_relative(past_2w_dt.isoformat())
- assert "week" in result
- def test_list_chat_records_fallback_scanning(tmp_path: Path) -> None:
- """Test list_chat_records fallback when no index exists."""
- persistence.set_chats_dir(str(tmp_path))
- # Create message files without an index
- chat1_path = tmp_path / "messages_chat1.json"
- chat2_path = tmp_path / "messages_chat2.json"
- # Chat1: New format with metadata
- chat1_data = {
- "items": [{"role": "user", "content": "Hello"}, {"role": "assistant", "content": "Hi there"}],
- "metadata": {
- "created_at": "2024-01-01T12:00:00Z",
- "modified_at": "2024-01-01T12:05:00Z",
- "msgs": 2,
- "branch": "main",
- "summary": "Hello",
- },
- }
- with open(chat1_path, "w") as f:
- json.dump(chat1_data, f)
- # Chat2: Legacy format (just list of messages)
- chat2_data = [{"role": "user", "content": "Legacy message"}, {"role": "assistant", "content": "Legacy response"}]
- with open(chat2_path, "w") as f:
- json.dump(chat2_data, f)
- # Chat3: Invalid JSON format (should be skipped)
- chat3_path = tmp_path / "messages_chat3.json"
- with open(chat3_path, "w") as f:
- f.write("invalid json content")
- # Chat4: New format but with invalid metadata types
- chat4_path = tmp_path / "messages_chat4.json"
- chat4_data = {
- "items": [{"role": "user", "content": "Test"}],
- "metadata": "invalid_metadata_type", # Should be dict
- }
- with open(chat4_path, "w") as f:
- json.dump(chat4_data, f)
- records = persistence.list_chat_records()
- # Should have 3 valid records (chat1, chat2, chat4)
- assert len(records) == 3
- # Find chat1 record
- chat1_record = next(r for r in records if r["chat_id"] == "chat1")
- assert chat1_record["created_at"] == "2024-01-01T12:00:00Z"
- assert chat1_record["modified_at"] == "2024-01-01T12:05:00Z"
- assert chat1_record["msgs"] == 2
- assert chat1_record["branch"] == "main"
- assert chat1_record["summary"] == "Hello"
- # Find chat2 record (legacy format)
- chat2_record = next(r for r in records if r["chat_id"] == "chat2")
- assert chat2_record["created_at"] is None
- assert chat2_record["modified_at"] is None
- assert chat2_record["msgs"] == 2
- assert chat2_record["branch"] == ""
- assert chat2_record["summary"] == "Legacy message"
- # Find chat4 record (invalid metadata)
- chat4_record = next(r for r in records if r["chat_id"] == "chat4")
- assert chat4_record["msgs"] == 1
- assert chat4_record["summary"] == "Test"
- def test_save_current_chat_error_handling(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
- """Test save_current_chat error handling scenarios."""
- persistence.set_chats_dir(str(tmp_path))
- # Test with existing file that has invalid JSON (should handle gracefully)
- chat_id = "error_test"
- file_path = persistence.chat_file_path(chat_id)
- # Create invalid JSON file
- with open(file_path, "w") as f:
- f.write("invalid json content")
- agency = _Agency([{"role": "user", "content": "Test message"}])
- # Mock subprocess to fail (git command not available)
- def mock_subprocess_error(*args, **kwargs):
- raise subprocess.CalledProcessError(1, "git")
- monkeypatch.setattr(subprocess, "check_output", mock_subprocess_error)
- # Should not raise exception despite errors
- persistence.save_current_chat(agency, chat_id)
- # Verify file was created successfully
- assert Path(file_path).exists()
- # Verify content is valid JSON now
- with open(file_path) as f:
- data = json.load(f)
- assert "items" in data
- assert "metadata" in data
- assert data["metadata"]["branch"] == "" # Should be empty due to git error
- def test_load_chat_error_scenarios(tmp_path: Path) -> None:
- """Test load_chat error handling and edge cases."""
- persistence.set_chats_dir(str(tmp_path))
- agency = _Agency([])
- # Test loading non-existent chat
- assert persistence.load_chat(agency, "nonexistent") is False
- # Test loading chat with invalid JSON
- invalid_chat_id = "invalid_json"
- invalid_path = persistence.chat_file_path(invalid_chat_id)
- with open(invalid_path, "w") as f:
- f.write("invalid json content")
- assert persistence.load_chat(agency, invalid_chat_id) is False
- # Test loading chat with valid JSON but wrong structure
- wrong_structure_id = "wrong_structure"
- wrong_path = persistence.chat_file_path(wrong_structure_id)
- with open(wrong_path, "w") as f:
- json.dump({"wrong": "structure"}, f)
- # This should raise ValueError in _read_chat_messages
- assert persistence.load_chat(agency, wrong_structure_id) is False
- # Test loading empty chat (should return True)
- empty_chat_id = "empty_chat"
- empty_path = persistence.chat_file_path(empty_chat_id)
- with open(empty_path, "w") as f:
- json.dump({"items": [], "metadata": {}}, f)
- assert persistence.load_chat(agency, empty_chat_id) is True
- assert len(agency.thread_manager.get_all_messages()) == 0
- def test_read_chat_messages_error_cases(tmp_path: Path) -> None:
- """Test _read_chat_messages with various error scenarios."""
- persistence.set_chats_dir(str(tmp_path))
- # Test with non-existent file
- messages = persistence._read_chat_messages("nonexistent")
- assert messages == []
- # Test with invalid payload structure
- invalid_id = "invalid_payload"
- invalid_path = persistence.chat_file_path(invalid_id)
- with open(invalid_path, "w") as f:
- json.dump("string_payload", f) # Should be dict or list
- with pytest.raises(ValueError, match="Chat payload must be a list of messages"):
- persistence._read_chat_messages(invalid_id)
- def test_get_chats_dir_environment_variable(monkeypatch: pytest.MonkeyPatch) -> None:
- """Test get_chats_dir with environment variable."""
- # Reset global state
- persistence.set_chats_dir("")
- test_dir = "/tmp/test_chats"
- monkeypatch.setenv("AGENCY_SWARM_CHATS_DIR", test_dir)
- with patch("pathlib.Path.mkdir") as mock_mkdir:
- result = persistence.get_chats_dir()
- assert result == test_dir
- mock_mkdir.assert_called_once_with(parents=True, exist_ok=True)
- def test_load_index_with_invalid_data(tmp_path: Path) -> None:
- """Test load_index with invalid JSON data."""
- persistence.set_chats_dir(str(tmp_path))
- # Create index file with non-dict data
- index_path = persistence.index_file_path()
- with open(index_path, "w") as f:
- json.dump(["not", "a", "dict"], f)
- # Should return empty dict for non-dict data
- index = persistence.load_index()
- assert index == {}
- # Test with invalid JSON
- with open(index_path, "w") as f:
- f.write("invalid json")
- index = persistence.load_index()
- assert index == {}
|