Main Abstractions
Understanding the main abstractions in Fluvius Framework helps you build effective domain-driven applications.
Domain
The Domain is the main entry point for domain logic. It coordinates aggregates, commands, events, and state management.
Key Responsibilities
- Aggregate Management: Manages aggregate instances
- Command Processing: Routes commands to aggregates
- Event Handling: Manages event generation and storage
- State Management: Provides access to current state
Example
from fluvius.domain import Domain
from fluvius.domain.context import SanicContext
class UserDomain(Domain):
__aggregate__ = UserAggregate
class Meta:
revision = 1
tags = ["user", "identity"]
# Create domain instance
ctx = SanicContext.create(namespace='app-user')
domain = UserDomain(ctx)
Aggregate
Aggregates contain business logic and manage state. They enforce business invariants and generate events.
Key Responsibilities
- Business Logic: Implement business rules
- State Management: Manage aggregate state
- Event Generation: Generate domain events
- Invariant Enforcement: Enforce business rules
Example
from fluvius.domain import Aggregate, action
from fluvius.data import DataModel, field
class UserState(DataModel):
name: str = field()
email: str = field()
active: bool = field(initial=True)
class UserAggregate(Aggregate):
def __init__(self, domain):
super().__init__(domain)
self._state = None
@action(evt_key='user-created', resources=['user'])
async def create_user(self, name: str, email: str):
# Business logic
if not email or '@' not in email:
raise ValueError("Invalid email")
# State update
self._state = UserState(name=name, email=email, active=True)
return self._state
Command
Commands represent intentions to change state. They are immutable and validated before processing.
Key Characteristics
- Immutable: Cannot be changed after creation
- Validated: Validated before processing
- Routed: Routed to appropriate aggregate action
- Idempotent: Can be safely retried
Example
# Create command
command = domain.create_command('create-user', {
'name': 'John Doe',
'email': '[email protected]'
})
# Process command
response = await domain.process_command(command)
Event
Events represent things that have happened. They are immutable and stored in the event log.
Key Characteristics
- Immutable: Cannot be changed
- Append-Only: Added to event log
- Ordered: Maintain chronological order
- Replayable: Can be replayed to reconstruct state
Example
@action(evt_key='user-created', resources=['user'])
async def create_user(self, name: str, email: str):
# Event 'user-created' is automatically generated
# with payload: {'name': name, 'email': email}
pass
State
State represents the current state of an aggregate. It's derived from events and optimized for reads.
Key Characteristics
- Derived: Computed from events
- Read-Optimized: Optimized for queries
- Denormalized: May be denormalized for performance
- Cached: Optional caching layer
Example
# Fetch current state
user = await domain.statemgr.fetch('user', user_id)
# Query state
users = await domain.statemgr.find('user', active=True)
State Manager
The State Manager provides read access to current state. It queries the state store optimized for reads.
Key Methods
fetch(): Fetch single entityfind(): Find multiple entitiescount(): Count entitiesexists(): Check if entity exists
Example
# Fetch single entity
user = await domain.statemgr.fetch('user', user_id)
# Find entities
users = await domain.statemgr.find('user', active=True)
# Query with filters
active_users = await domain.statemgr.find(
'user',
active=True,
limit=10,
offset=0
)
Command Processor
The Command Processor routes commands to aggregates and handles command processing.
Key Responsibilities
- Command Routing: Route commands to aggregates
- Action Invocation: Invoke aggregate actions
- Event Handling: Handle event generation
- Error Handling: Handle processing errors
Example
# Process single command
response = await domain.process_command(command)
# Process multiple commands
async for response in domain.command_processor.process(*commands):
print(response)
Event Store
The Event Store persists events in an append-only log.
Key Features
- Append-Only: Events are never modified
- Immutable: Events cannot be changed
- Ordered: Events maintain order
- Queryable: Query events by criteria
Context
Context provides request-scoped information and services.
Key Responsibilities
- Request Information: Provide request metadata
- Service Access: Access to services
- Transaction Management: Manage transactions
- Namespace Isolation: Isolate by namespace
Example
from fluvius.domain.context import SanicContext
# Create context
ctx = SanicContext.create(namespace='app-user')
# Access context in domain
domain = UserDomain(ctx)
Data Model
Data Models define the structure of aggregate state.
Key Features
- Type Safety: Type hints for validation
- Immutability: Immutable updates
- Validation: Automatic validation
- Serialization: JSON serialization
Example
from fluvius.data import DataModel, field
from datetime import datetime
class UserState(DataModel):
name: str = field()
email: str = field()
active: bool = field(initial=True)
created_at: datetime = field(default_factory=datetime.now)
Abstractions Working Together
Client Request
│
▼
Domain
│
├──► Command Processor
│ │
│ └──► Aggregate
│ │
│ ├──► Business Logic
│ │
│ ├──► State Update
│ │
│ └──► Event Generation
│ │
│ └──► Event Store
│
└──► State Manager
│
└──► State Store (Queries)
Best Practices
1. Keep Aggregates Focused
Each aggregate should have a single responsibility:
# Good: Focused aggregate
class UserAggregate(Aggregate):
# User-related logic only
pass
# Bad: Too many responsibilities
class UserOrderPaymentAggregate(Aggregate):
# Too many concerns
pass
2. Use Actions for State Changes
Always use @action decorator for state changes:
# Good: Uses action decorator
@action(evt_key='user-created')
async def create_user(self, name: str, email: str):
pass
# Bad: Direct state modification
async def create_user(self, name: str, email: str):
self._state = UserState(...) # No event generated
3. Validate in Aggregates
Enforce business rules in aggregates:
@action(evt_key='user-created')
async def create_user(self, name: str, email: str):
# Validate business rules
if not email or '@' not in email:
raise ValueError("Invalid email")
# Business logic
pass
4. Use State Manager for Queries
Use state manager for read operations:
# Good: Use state manager
user = await domain.statemgr.fetch('user', user_id)
# Bad: Direct database access
user = await db.query("SELECT * FROM users WHERE id = ?", user_id)
Next Steps
- Learn about Request Lifecycle
- Explore Messaging & Events
- Check Data Persistence