Async Farm Workers
The async farm handles background job execution. Operators launch it with fast-app work in production, or with fast-app work --tui / fast-app work --verbose during development to get live diagnostics.
System Overview
Async farm is a RabbitMQ-backed worker pool with three core components:
- Supervisor (
fast_app.integrations.async_farm.supervisor.AsyncFarmSupervisor) — manages queue connections, spawns worker processes, and scales the pool. - Worker (
fast_app.integrations.async_farm.worker.AsyncFarmWorker) — consumes jobs fromasync_farm.jobs, executes callables, and streams status back. - Publisher API (
fast_app.integrations.async_farm.publisher.AsyncFarmPublisher) — enqueues jobs by serialising the target callable plus arguments.
Internally, job messages carry the dotted import path of the callable (func_path), positional/keyword arguments, optional request context, and soft/hard timeout headers. The worker dynamically imports the callable, restores context, and executes it (async or sync via asyncio.to_thread). stdout/stderr/log output is captured and sent back to the supervisor.
RabbitMQ contracts
| Purpose | Default | Override via |
|---|---|---|
| Broker URL | amqp://guest:guest@localhost:5672/ |
RABBITMQ_URL |
| Jobs queue | async_farm.jobs |
ASYNC_FARM_JOBS_QUEUE |
| Supervisor → Worker exchange (fanout) | async_farm.supervisor |
ASYNC_FARM_CONTROL_EXCHANGE |
| Worker → Supervisor exchange (direct) | async_farm.worker |
ASYNC_FARM_WORKER_EXCHANGE |
Workers declare exclusive control queues bound to the fanout exchange so they can receive shutdown or snapshot requests. Supervisors consume a dedicated queue for heartbeat and task events.
Running the Supervisor
The CLI wrapper instantiates AsyncFarmSupervisor, boots the FastApp container, and enters several async loops:
spawn_worker()— starts new worker processes (forked viamultiprocessing.Process).heartbeat_loop()— periodically broadcasts heartbeats to workers, ensuring they stay connected.scaling_loop()— inspects queue depth vs available capacity and scales worker count up/down within configured bounds.keep_alive_loop()— handles graceful shutdown, sendingshutdowncommands and joining worker processes with a grace period.monitor_workers_heartbeat_loop()— watches for stale heartbeats and terminates hung workers.
Scaling heuristics
- Maintain at least
MIN_WORKERSlive processes (default 1). - Cap the pool at
MAX_WORKERS(default 10) to avoid runaway costs. - Check queue length every
SCALE_CHECK_INTERVAL_Sseconds (default 1). - Scale up in batches of
SCALE_UP_BATCH_SIZE(default 2) when backlog exceeds aggregate capacity (prefetch_per_worker × alive workers). - Scale down by terminating idle workers in batches of
SCALE_DOWN_BATCH_SIZE(default 1) when load is light.
All values hydrate from environment variables at supervisor start. Tweaking them lets you optimise for latency or resource usage.
Shutdown lifecycle
- Request shutdown (signal handler or CLI exit).
- Publish
shutdowncontrol message with a grace period (WORKER_SHUTDOWN_GRACE_S, default 15s). - Wait for worker processes to exit normally (
await_processes_death). - Terminate remaining workers, then force-kill if necessary.
- Close AMQP channels cleanly.
Workers reciprocate by cancelling their job consumers, draining in-flight tasks (bounded by the largest soft timeout), and recording completion snapshots.
Worker Execution Model
Each worker keeps a bounded number of concurrent tasks (PREFETCH_PER_WORKER, default 10). For every message:
- Wrap it in a
Task(loads args, deadlines, context snapshot). - Register lifecycle callbacks for success/failure/timeouts.
- Execute the callable (async natively or offloaded to a thread).
- Capture stdout/stderr/logging and attach to the task snapshot.
- Ack the message once callbacks finish.
Timeout handling:
- Soft timeout — marks the task as
soft_timeout, cancels the asyncio wrapper, and notifies the supervisor, but lets the underlying work finish until the hard timeout. - Hard timeout — publishes an event, ack’s the job, records the logs, and requests worker shutdown (followed by a forced exit after a 2-minute safety window).
Supervisor watchdog
Workers track the last heartbeat from the supervisor. If it stalls longer than 7 × HEARTBEAT_INTERVAL_S, the worker self-terminates to avoid orphaned consumers.
Publishing Jobs
Use AsyncFarmPublisher (or the convenience helpers in your application) to enqueue work:
publisher = AsyncFarmPublisher()
await publisher.publish(
func_path="app.jobs.send_welcome_email",
args=(user_id,),
kwargs={"urgent": True},
context_snapshot=context.snapshot(),
soft_timeout_s=10,
hard_timeout_s=60,
)
Jobs are pickled, optionally compressed, and durable. Consumers must stay compatible with the message schema; deploy new workers before enqueuing messages that rely on new code paths.
Operational Tips
- Monitoring — Run
fast-app work --tuito launch the textual dashboard (SupervisorTUI). It displays live worker status, task logs, and lets you request snapshots. - Logging —
--verbosekeeps supervisor prints enabled. In production, rely on structured logging forwarded from workers. - Graceful deploys — set
WORKER_SHUTDOWN_GRACE_Sto cover your longest soft timeout. Rolling restarts ensure no job is dropped mid-flight. - Backpressure — tune
PREFETCH_PER_WORKERto balance throughput vs fairness. High values increase parallelism but can starve slow tasks. - Isolation — each worker process bootstraps FastApp (
fast_app.boot) before executing tasks, so application singletons and bindings are fresh per process. - Failure recovery — periodic heartbeats and supervisor watchdogs terminate zombie workers. RabbitMQ redelivers unacked messages when a worker shuts down.
Environment Configuration Summary
MIN_WORKERS=1
MAX_WORKERS=10
PREFETCH_PER_WORKER=10
SCALE_UP_BATCH_SIZE=2
SCALE_DOWN_BATCH_SIZE=1
SCALE_CHECK_INTERVAL_S=1.0
WORKER_SHUTDOWN_GRACE_S=15
HEARTBEAT_INTERVAL_S=1
RABBITMQ_URL=amqp://guest:guest@localhost:5672/
ASYNC_FARM_JOBS_QUEUE=async_farm.jobs
ASYNC_FARM_CONTROL_EXCHANGE=async_farm.supervisor
ASYNC_FARM_WORKER_EXCHANGE=async_farm.worker
TASK_HISTORY_MAX=300
Values may be overridden per deployment. The CLI reads them at process start, so restart the supervisor after changing config.