Skip to main content

Worker Module Overview

info

The Worker module provides background job processing using ARQ (Async Redis Queue).

Introduction

The Worker module enables:

  • Background task execution
  • Scheduled jobs (cron)
  • Job tracking and monitoring
  • Distributed task processing
  • Integration with domain events

Quick Start

Define Tasks

from fluvius.worker import export_task, export_cron

@export_task(name='process-payment', timeout=300)
async def process_payment(ctx, payment_id):
"""Process a payment in the background"""
# Your task logic here
pass

@export_cron(hour=0, minute=0, name='daily-report')
async def generate_daily_report(ctx):
"""Generate daily report at midnight"""
# Your cron job logic here
pass

Create Worker

from fluvius.worker import create_worker

worker = create_worker(
functions=[process_payment, generate_daily_report],
redis_settings={'host': 'localhost', 'port': 6379}
)

if __name__ == '__main__':
worker.run()

Enqueue Tasks

from fluvius.worker import enqueue_job

# Enqueue a task
await enqueue_job('process-payment', payment_id='123')

Features

  • Async Tasks: Execute async functions as background jobs
  • Cron Jobs: Schedule recurring tasks
  • Job Tracking: Monitor job status and results
  • Retries: Automatic retry on failure
  • Timeouts: Configurable job timeouts
  • Distributed: Multiple workers can process jobs

Next Steps