Configuring a bus
Note
This guide assumes the [injection] extra is installed. If you use a different DI framework, register the factory below with your own container, not with @injectable.
Each bus can be customized by attaching listeners and middlewares. The recommended pattern is a factory function that builds a configured bus and registers it in the DI container:
from cq import CommandBus, new_command_bus
from injection import injectable
async def listener(message):
...
async def middleware(message):
# runs before the handler
result = yield
# runs after the handler
@injectable
def command_bus_factory() -> CommandBus:
bus = new_command_bus()
bus.add_listeners(listener)
bus.add_middlewares(middleware)
return bus
The same pattern applies to QueryBus and EventBus, with new_query_bus() and new_event_bus().
Listeners
Listeners are fire-and-forget callables that receive the message. They are useful for logging, metrics, or any side effect that does not need to influence the handler.
Listeners are scheduled in an anyio task group, so several listeners run concurrently. The timing depends on the bus type:
CommandBusandQueryBus: every listener must finish before the handler runs. The handler cannot start until listeners have settled, anddispatchreturns the handler's value as soon as it completes.EventBus: listeners and handlers share the same task group, so they all run concurrently.dispatchreturns once everything has finished.
Middlewares
A middleware wraps handler execution. Use it to run logic before and after the handler processes the message, or to handle exceptions.
import time
async def timing_middleware(message):
start = time.time()
yield
print(f"Execution time: {time.time() - start}s")
For commands and queries, the middleware stack wraps the single registered handler once. For events, the stack is applied around each handler independently, so a middleware sees one invocation per event handler.
The yield form makes the middleware look like a try/finally around the handler call. The expression result = yield receives the handler's return value, but only for inspection: middlewares of this form cannot replace it. This was a deliberate choice to keep the message and the result read-only by default.
Classic middlewares
If you need to read or substitute the return value, write a "classic" middleware. It takes call_next as its first argument and returns the value it wants to expose to the caller:
import time
async def timing_middleware(call_next, message):
start = time.time()
result = await call_next(message)
print(f"Execution time: {time.time() - start}s")
return result
Both styles can be mixed freely in the same bus.
Class-based listeners and middlewares
Listeners and middlewares can also be classes with a __call__ method, which is convenient when they need their own dependencies:
from dataclasses import dataclass
@dataclass
class LogListener:
logger: Logger
async def __call__(self, message):
self.logger.info(f"Received: {message}")
@dataclass
class TimingMiddleware:
metrics: MetricsService
async def __call__(self, message):
start = time.time()
yield
self.metrics.record(time.time() - start)
@dataclass
class ClassicTimingMiddleware:
metrics: MetricsService
async def __call__(self, call_next, message):
start = time.time()
result = await call_next(message)
self.metrics.record(time.time() - start)
return result
If you build these classes through your DI container, you get the same constructor injection as for handlers.
Built-in middlewares
RetryMiddleware
cq.middlewares.retry.RetryMiddleware retries the wrapped call when it raises one of the configured exception types:
from cq import new_command_bus
from cq.middlewares.retry import RetryMiddleware
bus = new_command_bus()
bus.add_middlewares(RetryMiddleware(retry=3, delay=0.5, exceptions=(TimeoutError,)))
The parameters are:
retry: total number of attempts (including the first one). Withretry=3, the call runs at most three times.delay: seconds to wait between attempts. Defaults to0.exceptions: the exception types that trigger a retry. Defaults to(Exception,), which retries on any non-BaseExceptionfailure.
If every attempt fails, the last exception is re-raised.