Skip to content

Custom Middlewares

FastPubSub middlewares provide explicit interception points for cross-cutting concerns such as validation, rate control, telemetry, and lifecycle error mapping.

This page focuses on designing production-grade custom middlewares and composing them safely. For baseline usage, see Middlewares.

Execution Topology

Middleware execution order follows registration hierarchy.

sequenceDiagram
    participant P as Pub/Sub Message
    participant B as Broker Middleware
    participant R as Router Middleware
    participant S as Subscriber Middleware
    participant H as Handler

    P->>B: on_message
    B->>R: on_message
    R->>S: on_message
    S->>H: on_message
    H-->>S: return
    S-->>R: return
    R-->>B: return

For publish flow, direction is inverted (publisher -> router -> broker).

Why Use Custom Middlewares

Custom middlewares are best when logic must be:

  • Applied consistently across many subscribers or publishers.
  • Independent from domain-specific business handlers.
  • Reusable and testable as an isolated unit.

Typical examples include:

  • Structured validation gates.
  • Rate limiting and admission control.
  • Metadata enrichment and contract tagging.
  • Latency and status telemetry.

Configured Middleware (Stateless-First)

A configured middleware instance is often preferable to hard-coded constants.

class RateLimiterService:
    """External rate limiter contract (Redis, API gateway, etc.)."""

    async def acquire(self, key: str) -> None:
        pass


class RateLimitMiddleware(BaseMiddleware):
    """Rate limiting middleware that delegates state to an external service."""

    def __init__(self, next_call: BaseMiddleware, limiter: RateLimiterService):
        super().__init__(next_call)
        self.limiter = limiter

    async def on_message(self, message: Message) -> Any:
        await self.limiter.acquire(key=message.subscriber_name)
        return await super().on_message(message)

Register configured middleware using the Middleware(...) wrapper:

rate_limiter = RateLimiterService()

broker = PubSubBroker(
    project_id="fastpubsub-pubsub-local",
    middlewares=[
        Middleware(RateLimitMiddleware, limiter=rate_limiter),
    ],
)

The wrapper keeps constructor arguments explicit while preserving declarative broker setup.

Prefer Stateless Middlewares

Middlewares should be stateless whenever possible. If state is required (for example, rate limiting counters, distributed locks, dedup keys), store it in dedicated external systems such as Redis or a persistence service. Do not keep mutable operational state inside middleware instances.

Directional Middlewares

Some middlewares should affect only one direction.

Subscriber-Side Validation

class ValidationMiddleware(BaseMiddleware):
    """Rejects non-JSON payloads before they reach the handler."""

    async def on_message(self, message: Message) -> Any:
        if not self._is_valid_json(message.data):
            raise Drop("Invalid payload: expected JSON bytes")

        return await super().on_message(message)

    @staticmethod
    def _is_valid_json(data: bytes) -> bool:
        try:
            json.loads(data)
            return True
        except json.JSONDecodeError:
            return False

Publisher-Side Metadata Enrichment

class PublisherMetadataMiddleware(BaseMiddleware):
    """Adds delivery metadata to outgoing messages."""

    async def on_publish(
        self,
        data: bytes,
        ordering_key: str,
        attributes: dict[str, str] | None,
    ) -> Any:
        metadata = {} if attributes is None else dict(attributes)
        metadata["schema-version"] = "v1"
        metadata["source-service"] = "orders-service"
        return await super().on_publish(data, ordering_key, metadata)

Use Built-In Compression Middleware

For payload compression, prefer the built-in GZipMiddleware. Custom middlewares should demonstrate behavior not already covered by the framework core.

Error Classification Strategy

Middleware is an appropriate location to convert broad exceptions into explicit message lifecycle intent (Drop vs Retry).

class ErrorHandlingMiddleware(BaseMiddleware):
    """Maps domain errors to explicit lifecycle outcomes."""

    async def on_message(self, message: Message) -> Any:
        try:
            return await super().on_message(message)
        except ValueError as error:
            logger.warning(
                "Dropping message due to validation error",
                extra={"error": str(error)},
            )
            raise Drop(str(error)) from error
        except TimeoutError as error:
            logger.info(
                "Retrying message due to transient timeout",
                extra={"error": str(error)},
            )
            raise Retry(str(error)) from error

This keeps domain handlers focused on business behavior while centralizing policy for transient and permanent errors.

Observability Middleware Pattern

class MetricsMiddleware(BaseMiddleware):
    """Records processing latency and status per subscriber."""

    def __init__(self, next_call: BaseMiddleware, subscriber_name: str):
        super().__init__(next_call)
        self.subscriber_name = subscriber_name

    async def on_message(self, message: Message) -> Any:
        start = time.monotonic()
        status = "success"

        try:
            return await super().on_message(message)
        except Exception:
            status = "error"
            raise
        finally:
            elapsed = time.monotonic() - start
            logger.info(
                "subscriber.metrics",
                extra={
                    "subscriber": self.subscriber_name,
                    "status": status,
                    "latency_seconds": f"{elapsed:.6f}",
                },
            )

Even simple latency/status logging at middleware level can significantly reduce mean-time-to-diagnosis during incident response.

Composition Strategy

Prefer multiple single-responsibility middlewares over one monolithic middleware.

broker_with_composition = PubSubBroker(
    project_id="fastpubsub-pubsub-local",
    middlewares=[
        Middleware(ErrorHandlingMiddleware),
        Middleware(MetricsMiddleware, subscriber_name="orders-handler"),
        Middleware(ValidationMiddleware),
    ],
)

Suggested order for inbound message handling:

  1. Admission/validation first.
  2. Policy mapping (error handling) next.
  3. Metrics/logging around the call boundary.

Anti-Pattern: Cross-Middleware Dependencies

Avoid designing middleware A to depend on side effects from middleware B. Middleware chains should remain composable and order-tolerant.

Instead of hidden dependencies between middlewares:

  • Place shared state in a dedicated service (cache, database, message store).
  • Inject that service into each middleware through Middleware(...) arguments.
  • Keep each middleware independently testable and replaceable.

Validation with PubSubTestClient

Use PubSubTestClient to assert middleware outcomes directly in tests.

@pytest.mark.asyncio
async def test_validation_middleware_drops_invalid_payload() -> None:
    test_broker = PubSubBroker(
        project_id="test-project",
        middlewares=[Middleware(ValidationMiddleware)],
    )

    @test_broker.subscriber(
        alias="validator",
        topic_name="events",
        subscription_name="events-subscription",
    )
    async def handler(message: Message) -> str:
        return message.data.decode("utf-8")

    async with PubSubTestClient(test_broker) as client:
        await client.publish(topic="events", data=b'{"valid":true}')
        await client.publish(topic="events", data=b"{invalid-json")

        results = client.get_results()

    assert len(results) == 2
    assert results[0].error is None
    assert isinstance(results[1].error, Drop)

This approach validates middleware chain behavior without emulator dependency.

Design Rules for Reliable Middleware

Always Continue the Chain

Call await super().on_message(...) or await super().on_publish(...) unless the middleware intentionally terminates flow.

Keep Middleware Fast

Middlewares execute on every message. Avoid blocking operations and unbounded in-memory state.

Preserve Determinism

Any non-deterministic behavior (random waits, external side effects without safeguards) increases debugging complexity.

Emit Actionable Context

Logs and metrics should include stable identifiers (message_id, subscriber alias, error class) to support cross-system correlation.

Common Failure Modes

  • Omitting super() and breaking the chain.
  • Mixing unrelated concerns into one middleware class.
  • Keeping mutable runtime state inside middleware instances.
  • Creating hidden dependencies between middleware classes.
  • Raising generic exceptions instead of Drop/Retry when policy is known.

Recap

  • Custom middlewares are the primary extension point for cross-cutting runtime policy.
  • Use Middleware(...) for configured, reusable classes.
  • Keep middlewares stateless and externalize mutable state when needed.
  • Separate subscriber and publisher concerns when behavior differs by direction.
  • Avoid middleware-to-middleware dependencies; share state through explicit services.
  • Compose small middlewares in explicit order.
  • Validate middleware behavior with PubSubTestClient before production rollout.