Skip to main content

Integration Tests

info

Learn how to write integration tests for Fluvius Framework applications.

API Integration Tests

Test API Endpoints

import pytest
from fastapi.testclient import TestClient
from myapp.api.main import app

client = TestClient(app)

def test_create_user():
response = client.post("/api/users", json={
"name": "John Doe",
"email": "[email protected]"
})
assert response.status_code == 200
assert response.json()["name"] == "John Doe"

Test with Database

import pytest
from myapp.domains.user import UserDomain
from fluvius.data import SQLiteDriver

@pytest.fixture
async def test_database():
driver = SQLiteDriver(database_path=":memory:")
yield driver
# Cleanup

@pytest.mark.asyncio
async def test_user_creation(test_database):
ctx = SanicContext.create(namespace='test', driver=test_database)
domain = UserDomain(ctx)

command = domain.create_command('create-user', {...})
response = await domain.process_command(command)

# Verify in database
user = await domain.statemgr.fetch('user', response['id'])
assert user is not None

Next Steps