| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236 |
- """Tests for the Vault class and secret management."""
- import pytest
- import uuid
- import os
- from unittest.mock import Mock, MagicMock
- from flowsint_core.core.vault import Vault
- from flowsint_core.core.models import Key
- @pytest.fixture
- def mock_db():
- """Create a mock database session."""
- return Mock()
- @pytest.fixture
- def owner_id():
- """Create a test owner ID."""
- return uuid.uuid4()
- @pytest.fixture
- def vault(mock_db, owner_id):
- """Create a Vault instance with mocked database."""
- return Vault(db=mock_db, owner_id=owner_id)
- @pytest.fixture(autouse=True)
- def mock_master_key(monkeypatch):
- """Mock the master key environment variable."""
- # Use a base64-encoded 32-byte key
- test_key = "base64:qnHTmwYb+uoygIw9MsRMY22vS5YPchY+QOi/E79GAvM="
- monkeypatch.setenv("MASTER_VAULT_KEY_V1", test_key)
- class TestVaultInitialization:
- """Tests for Vault initialization."""
- def test_vault_requires_owner_id(self, mock_db):
- """Test that Vault requires an owner_id."""
- with pytest.raises(ValueError, match="owner_id is required"):
- Vault(db=mock_db, owner_id=None)
- def test_vault_initialization_success(self, mock_db, owner_id):
- """Test successful Vault initialization."""
- vault = Vault(db=mock_db, owner_id=owner_id)
- assert vault.db == mock_db
- assert vault.owner_id == str(owner_id)
- assert vault.version == "V1"
- class TestVaultSetSecret:
- """Tests for Vault.set_secret() method."""
- def test_set_secret_creates_key(self, vault, mock_db):
- """Test that set_secret creates a new Key in the database."""
- vault_ref = "TEST_API_KEY"
- plain_key = "my-secret-api-key-12345"
- vault.set_secret(vault_ref, plain_key)
- # Verify that db.add, db.commit, and db.refresh were called
- assert mock_db.add.called
- assert mock_db.commit.called
- assert mock_db.refresh.called
- # Get the Key object that was added
- added_key = mock_db.add.call_args[0][0]
- assert isinstance(added_key, Key)
- assert added_key.name == vault_ref
- assert added_key.owner_id == vault.owner_id
- assert added_key.key_version == "V1"
- assert added_key.iv is not None
- assert added_key.salt is not None
- assert added_key.ciphertext is not None
- def test_set_secret_encrypts_data(self, vault, mock_db):
- """Test that set_secret properly encrypts the secret."""
- vault_ref = "TEST_API_KEY"
- plain_key = "my-secret-api-key-12345"
- vault.set_secret(vault_ref, plain_key)
- added_key = mock_db.add.call_args[0][0]
- # Ciphertext should not contain the plaintext
- assert plain_key.encode() not in added_key.ciphertext
- # IV and salt should be different lengths (12 and 16 bytes)
- assert len(added_key.iv) == 12
- assert len(added_key.salt) == 16
- class TestVaultGetSecret:
- """Tests for Vault.get_secret() method."""
- def test_get_secret_by_name_found(self, vault, mock_db, owner_id):
- """Test getting a secret by name when it exists."""
- vault_ref = "TEST_API_KEY"
- plain_key = "my-secret-api-key-12345"
- # Set a secret first to get encrypted data
- real_vault = Vault(db=MagicMock(), owner_id=owner_id)
- encrypted_data = real_vault._encrypt_key(plain_key)
- # Create a mock Key object
- mock_key = Mock()
- mock_key.name = vault_ref
- mock_key.id = uuid.uuid4()
- mock_key.owner_id = str(owner_id)
- mock_key.salt = encrypted_data["salt"]
- mock_key.iv = encrypted_data["iv"]
- mock_key.ciphertext = encrypted_data["ciphertext"]
- # Mock the database query
- mock_result = Mock()
- mock_result.scalars().first.return_value = mock_key
- mock_db.execute.return_value = mock_result
- # Get the secret
- result = vault.get_secret(vault_ref)
- assert result == plain_key
- assert mock_db.execute.called
- def test_get_secret_by_uuid_found(self, vault, mock_db, owner_id):
- """Test getting a secret by UUID when it exists."""
- key_id = uuid.uuid4()
- plain_key = "my-secret-api-key-12345"
- # Set a secret first to get encrypted data
- real_vault = Vault(db=MagicMock(), owner_id=owner_id)
- encrypted_data = real_vault._encrypt_key(plain_key)
- # Create a mock Key object
- mock_key = Mock()
- mock_key.name = "TEST_API_KEY"
- mock_key.id = key_id
- mock_key.owner_id = str(owner_id)
- mock_key.salt = encrypted_data["salt"]
- mock_key.iv = encrypted_data["iv"]
- mock_key.ciphertext = encrypted_data["ciphertext"]
- # Mock the database query
- mock_result = Mock()
- mock_result.scalars().first.return_value = mock_key
- mock_db.execute.return_value = mock_result
- # Get the secret by UUID
- result = vault.get_secret(str(key_id))
- assert result == plain_key
- assert mock_db.execute.called
- def test_get_secret_not_found(self, vault, mock_db):
- """Test getting a secret that doesn't exist."""
- vault_ref = "NONEXISTENT_KEY"
- # Mock the database query to return None
- mock_result = Mock()
- mock_result.scalars().first.return_value = None
- mock_db.execute.return_value = mock_result
- result = vault.get_secret(vault_ref)
- assert result is None
- def test_get_secret_wrong_owner(self, vault, mock_db):
- """Test that secrets from other owners cannot be accessed."""
- vault_ref = "TEST_API_KEY"
- # Mock the database query to return None (no key found for this owner)
- mock_result = Mock()
- mock_result.scalars().first.return_value = None
- mock_db.execute.return_value = mock_result
- result = vault.get_secret(vault_ref)
- assert result is None
- class TestVaultEncryptionDecryption:
- """Tests for encryption and decryption methods."""
- def test_encrypt_decrypt_roundtrip(self, vault):
- """Test that encryption and decryption work correctly."""
- plaintext = "my-secret-api-key-12345"
- # Encrypt
- encrypted_data = vault._encrypt_key(plaintext)
- assert "ciphertext" in encrypted_data
- assert "iv" in encrypted_data
- assert "salt" in encrypted_data
- assert plaintext.encode() not in encrypted_data["ciphertext"]
- # Decrypt
- decrypted = vault._decrypt_key(encrypted_data)
- assert decrypted == plaintext
- def test_different_salts_produce_different_ciphertexts(self, vault):
- """Test that the same plaintext with different salts produces different ciphertexts."""
- plaintext = "my-secret-api-key-12345"
- encrypted1 = vault._encrypt_key(plaintext)
- encrypted2 = vault._encrypt_key(plaintext)
- # Different salts and IVs
- assert encrypted1["salt"] != encrypted2["salt"]
- assert encrypted1["iv"] != encrypted2["iv"]
- # Different ciphertexts
- assert encrypted1["ciphertext"] != encrypted2["ciphertext"]
- def test_master_key_derivation(self, vault):
- """Test that master key is properly derived."""
- master_key = vault._get_master_key()
- assert isinstance(master_key, bytes)
- assert len(master_key) == 32 # 256 bits
- def test_invalid_master_key_length(self, vault, monkeypatch):
- """Test that invalid master key length raises error."""
- import base64
- # Set a valid base64 but wrong length (16 bytes instead of 32)
- short_key = base64.b64encode(b"0" * 16).decode()
- monkeypatch.setenv("MASTER_VAULT_KEY_V1", f"base64:{short_key}")
- with pytest.raises(ValueError, match="Master key must be 32 bytes \\(256 bits\\)"):
- vault._get_master_key()
- def test_missing_master_key(self, vault, monkeypatch):
- """Test that missing master key raises error."""
- monkeypatch.delenv("MASTER_VAULT_KEY_V1", raising=False)
- with pytest.raises(ValueError, match="Missing master key"):
- vault._get_master_key()
|