Skip to main content

User Domain Example

info

This example demonstrates a complete user management domain with create, update, and deactivate operations.

Overview

This example shows how to build a user management domain using Fluvius Framework. It includes:

  • User aggregate with business logic
  • Commands for user operations
  • Events for state changes
  • State management

Complete Example

Aggregate Definition

from fluvius.domain import Aggregate, action
from fluvius.data import DataModel, field, UUID_TYPE

class UserState(DataModel):
name: str = field()
email: str = field()
active: bool = field(initial=True)
created_at: datetime = field(factory=timestamp)

class UserAggregate(Aggregate):
def __init__(self, domain):
super().__init__(domain)
self._state = None

@action(evt_key='user-created', resources=['user'])
async def create_user(self, name: str, email: str):
"""Create a new user"""
if self._state is not None:
raise ValueError("User already exists")

self._state = UserState(
name=name,
email=email,
active=True
)
return self._state

@action(evt_key='user-updated', resources=['user'])
async def update_user(self, name: str = None, email: str = None):
"""Update user information"""
if self._state is None:
raise ValueError("User does not exist")

updates = {}
if name is not None:
updates['name'] = name
if email is not None:
updates['email'] = email

self._state = self._state.update(**updates)
return self._state

@action(evt_key='user-deactivated', resources=['user'])
async def deactivate_user(self):
"""Deactivate a user"""
if self._state is None:
raise ValueError("User does not exist")

self._state = self._state.update(active=False)
return self._state

Domain Definition

from fluvius.domain import Domain
from .aggregate import UserAggregate

class UserDomain(Domain):
__aggregate__ = UserAggregate

class Meta:
revision = 1
tags = ["user", "identity"]
title = "User Management Domain"
description = "Domain for managing user accounts"

Usage

import asyncio
from fluvius.domain.context import SanicContext
from fluvius.data import UUID_GENR
from user_domain import UserDomain

async def main():
# Create context
ctx = SanicContext.create(namespace='app-user')

# Create domain
domain = UserDomain(ctx)

# Set aggregate root
user_id = UUID_GENR()
domain.set_aggroot('user', user_id)

# Create and process commands
commands = [
domain.create_command('create-user', {
'name': 'John Doe',
'email': '[email protected]'
}),
domain.create_command('update-user', {
'name': 'Jane Doe'
}),
domain.create_command('deactivate-user')
]

# Process commands
async for response in domain.command_processor.process(*commands):
print(f'Response: {response}')

# Query state
user = await domain.statemgr.fetch('user', user_id)
print(f'Final state: {user}')

if __name__ == '__main__':
asyncio.run(main())

Key Concepts

  1. Aggregate: Contains business logic and manages state
  2. Actions: Methods decorated with @action that generate events
  3. Commands: Created using domain.create_command()
  4. State: Managed through the aggregate and accessible via statemgr
  5. Events: Automatically generated from actions

Next Steps