Skip to content

Executing multiple commands

In CQRS, the saga pattern is typically used to orchestrate a sequence of commands. Sagas are designed for distributed systems and can feel overengineered for a local workflow. python-cq offers ContextCommandPipeline as a lightweight alternative for chaining commands in-process.

Pipeline basics

A pipeline runs a sequence of commands. Each step receives the result of the previous command and produces the next command to dispatch. The pipeline itself is the context: instance attributes you assign in one step are visible in the next.

from cq import ContextCommandPipeline

class PaymentContext:
    transaction_id: int

    pipeline: ContextCommandPipeline[ValidateCartCommand] = ContextCommandPipeline()

    @pipeline.step
    def _(self, result: CartValidatedResult) -> CreateTransactionCommand:
        return CreateTransactionCommand(cart_id=result.cart_id, amount=result.total)

    @pipeline.step
    def _(self, result: TransactionCreatedResult) -> NotifyMerchantCommand:
        self.transaction_id = result.transaction_id
        return NotifyMerchantCommand(transaction_id=result.transaction_id)

    @pipeline.step
    def _(self, result: MerchantNotifiedResult):
        ...

ContextCommandPipeline() uses the default CQ instance. If you manage your own CQ (see Custom DI adapter), pass its DI adapter explicitly: ContextCommandPipeline(cq.di).

Steps

A step is a method decorated with @pipeline.step. It receives the value returned by the previous command's handler and returns the next command to dispatch. Step methods can be either sync or async.

A step that returns None stops the pipeline immediately. This is the natural way to write a terminal step that only consumes the previous result without producing another command, and it also lets earlier steps abort the pipeline conditionally:

@pipeline.step
def _(self, result: ValidationResult):
    if result.is_valid:
        return PersistCommand(...)

    return None  # stop here, skip the rest of the pipeline

Dispatching queries

@pipeline.query_step works the same way as @pipeline.step but dispatches its produced message through the QueryBus instead of the CommandBus:

@pipeline.query_step
def _(self, result: TransactionCreatedResult) -> GetMerchantByIdQuery:
    return GetMerchantByIdQuery(merchant_id=result.merchant_id)

Static steps

If a step does not depend on the previous result, declare it with add_static_step (for commands) or add_static_query_step (for queries). The given message is dispatched as-is each time the pipeline runs:

class WarmCachePipeline:
    pipeline: ContextCommandPipeline[StartCommand] = ContextCommandPipeline()

    pipeline.add_static_step(LoadConfigCommand())
    pipeline.add_static_query_step(GetActiveUsersQuery())

    @pipeline.step
    def _(self, users: list[User]) -> WarmCacheCommand:
        return WarmCacheCommand(user_ids=[u.id for u in users])

Static steps and decorated steps are interleaved in the order they are declared in the class body.

Running a pipeline

Instantiating the pipeline is implicit: calling Context.pipeline(initial_command) builds a fresh Context instance, runs every step against it, and returns the populated context.

async def process_payment(cart_id: int) -> int:
    command = ValidateCartCommand(cart_id=cart_id)
    context = await PaymentContext.pipeline(command)
    return context.transaction_id

Each pipeline run gets its own context instance, so attributes you set in one run do not leak into the next.

Seeding the context with base values

If a step needs values that are known before the first command runs, instantiate the context yourself and call pipeline on the instance. The same object is returned at the end, mutated by every step that ran.

Because the input context and the returned one are the same object, the recommended pattern is to centralize both calls behind a run classmethod:

from cq import ContextCommandPipeline
from typing import ClassVar, Self
from uuid import UUID

class LinkOAuthAccountContext:
    pipeline: ClassVar[ContextCommandPipeline[VerifyIDTokenCommand]] = ContextCommandPipeline()

    def __init__(self, user_id: UUID, provider: OAuthProvider) -> None:
        self.user_id = user_id
        self.provider = provider

    @pipeline.step
    def _(self, user: OAuthUser) -> LinkOAuthAccountCommand:
        return LinkOAuthAccountCommand(
            user_id=self.user_id,
            provider=self.provider,
            provider_user_id=user.id,
            email=user.email,
        )

    @classmethod
    async def run(cls, user_id: UUID, command: VerifyIDTokenCommand) -> Self:
        return await cls(user_id, command.provider).pipeline(command)

The initial VerifyIDTokenCommand only depends on the token it carries. Once the OAuth identity is verified, the seeded user_id (taken from the auth context) is consumed by the step that builds LinkOAuthAccountCommand. provider is extracted from the input command, so callers only pass user_id and the command to run.

Middlewares

A pipeline is itself a dispatcher. You can wrap it with middlewares the same way you would wrap a bus, using add_middlewares:

class PaymentContext:
    pipeline = ContextCommandPipeline().add_middlewares(timing_middleware, retry_middleware)
    # ...

Middlewares wrap the whole pipeline execution, not each individual step. Individual command middlewares configured on the underlying CommandBus still apply to every command dispatched by the pipeline.