Custom Middlewares¶
FastPubSub middlewares provide explicit interception points for cross-cutting concerns such as validation, rate control, telemetry, and lifecycle error mapping.
This page focuses on designing production-grade custom middlewares and composing them safely. For baseline usage, see Middlewares.
Execution Topology¶
Middleware execution order follows registration hierarchy.
sequenceDiagram
participant P as Pub/Sub Message
participant B as Broker Middleware
participant R as Router Middleware
participant S as Subscriber Middleware
participant H as Handler
P->>B: on_message
B->>R: on_message
R->>S: on_message
S->>H: on_message
H-->>S: return
S-->>R: return
R-->>B: return
For publish flow, direction is inverted (publisher -> router -> broker).
Why Use Custom Middlewares¶
Custom middlewares are best when logic must be:
- Applied consistently across many subscribers or publishers.
- Independent from domain-specific business handlers.
- Reusable and testable as an isolated unit.
Typical examples include:
- Structured validation gates.
- Rate limiting and admission control.
- Metadata enrichment and contract tagging.
- Latency and status telemetry.
Configured Middleware (Stateless-First)¶
A configured middleware instance is often preferable to hard-coded constants.
class RateLimiterService:
"""External rate limiter contract (Redis, API gateway, etc.)."""
async def acquire(self, key: str) -> None:
pass
class RateLimitMiddleware(BaseMiddleware):
"""Rate limiting middleware that delegates state to an external service."""
def __init__(self, next_call: BaseMiddleware, limiter: RateLimiterService):
super().__init__(next_call)
self.limiter = limiter
async def on_message(self, message: Message) -> Any:
await self.limiter.acquire(key=message.subscriber_name)
return await super().on_message(message)
Register configured middleware using the Middleware(...) wrapper:
rate_limiter = RateLimiterService()
broker = PubSubBroker(
project_id="fastpubsub-pubsub-local",
middlewares=[
Middleware(RateLimitMiddleware, limiter=rate_limiter),
],
)
The wrapper keeps constructor arguments explicit while preserving declarative broker setup.
Prefer Stateless Middlewares
Middlewares should be stateless whenever possible. If state is required (for example, rate limiting counters, distributed locks, dedup keys), store it in dedicated external systems such as Redis or a persistence service. Do not keep mutable operational state inside middleware instances.
Directional Middlewares¶
Some middlewares should affect only one direction.
Subscriber-Side Validation¶
class ValidationMiddleware(BaseMiddleware):
"""Rejects non-JSON payloads before they reach the handler."""
async def on_message(self, message: Message) -> Any:
if not self._is_valid_json(message.data):
raise Drop("Invalid payload: expected JSON bytes")
return await super().on_message(message)
@staticmethod
def _is_valid_json(data: bytes) -> bool:
try:
json.loads(data)
return True
except json.JSONDecodeError:
return False
Publisher-Side Metadata Enrichment¶
class PublisherMetadataMiddleware(BaseMiddleware):
"""Adds delivery metadata to outgoing messages."""
async def on_publish(
self,
data: bytes,
ordering_key: str,
attributes: dict[str, str] | None,
) -> Any:
metadata = {} if attributes is None else dict(attributes)
metadata["schema-version"] = "v1"
metadata["source-service"] = "orders-service"
return await super().on_publish(data, ordering_key, metadata)
Use Built-In Compression Middleware
For payload compression, prefer the built-in GZipMiddleware.
Custom middlewares should demonstrate behavior not already covered by the framework core.
Error Classification Strategy¶
Middleware is an appropriate location to convert broad exceptions into explicit message lifecycle intent (Drop vs Retry).
class ErrorHandlingMiddleware(BaseMiddleware):
"""Maps domain errors to explicit lifecycle outcomes."""
async def on_message(self, message: Message) -> Any:
try:
return await super().on_message(message)
except ValueError as error:
logger.warning(
"Dropping message due to validation error",
extra={"error": str(error)},
)
raise Drop(str(error)) from error
except TimeoutError as error:
logger.info(
"Retrying message due to transient timeout",
extra={"error": str(error)},
)
raise Retry(str(error)) from error
This keeps domain handlers focused on business behavior while centralizing policy for transient and permanent errors.
Observability Middleware Pattern¶
class MetricsMiddleware(BaseMiddleware):
"""Records processing latency and status per subscriber."""
def __init__(self, next_call: BaseMiddleware, subscriber_name: str):
super().__init__(next_call)
self.subscriber_name = subscriber_name
async def on_message(self, message: Message) -> Any:
start = time.monotonic()
status = "success"
try:
return await super().on_message(message)
except Exception:
status = "error"
raise
finally:
elapsed = time.monotonic() - start
logger.info(
"subscriber.metrics",
extra={
"subscriber": self.subscriber_name,
"status": status,
"latency_seconds": f"{elapsed:.6f}",
},
)
Even simple latency/status logging at middleware level can significantly reduce mean-time-to-diagnosis during incident response.
Composition Strategy¶
Prefer multiple single-responsibility middlewares over one monolithic middleware.
broker_with_composition = PubSubBroker(
project_id="fastpubsub-pubsub-local",
middlewares=[
Middleware(ErrorHandlingMiddleware),
Middleware(MetricsMiddleware, subscriber_name="orders-handler"),
Middleware(ValidationMiddleware),
],
)
Suggested order for inbound message handling:
- Admission/validation first.
- Policy mapping (error handling) next.
- Metrics/logging around the call boundary.
Anti-Pattern: Cross-Middleware Dependencies¶
Avoid designing middleware A to depend on side effects from middleware B. Middleware chains should remain composable and order-tolerant.
Instead of hidden dependencies between middlewares:
- Place shared state in a dedicated service (cache, database, message store).
- Inject that service into each middleware through
Middleware(...)arguments. - Keep each middleware independently testable and replaceable.
Validation with PubSubTestClient¶
Use PubSubTestClient to assert middleware outcomes directly in tests.
@pytest.mark.asyncio
async def test_validation_middleware_drops_invalid_payload() -> None:
test_broker = PubSubBroker(
project_id="test-project",
middlewares=[Middleware(ValidationMiddleware)],
)
@test_broker.subscriber(
alias="validator",
topic_name="events",
subscription_name="events-subscription",
)
async def handler(message: Message) -> str:
return message.data.decode("utf-8")
async with PubSubTestClient(test_broker) as client:
await client.publish(topic="events", data=b'{"valid":true}')
await client.publish(topic="events", data=b"{invalid-json")
results = client.get_results()
assert len(results) == 2
assert results[0].error is None
assert isinstance(results[1].error, Drop)
This approach validates middleware chain behavior without emulator dependency.
Design Rules for Reliable Middleware¶
Always Continue the Chain¶
Call await super().on_message(...) or await super().on_publish(...) unless the middleware intentionally terminates flow.
Keep Middleware Fast¶
Middlewares execute on every message. Avoid blocking operations and unbounded in-memory state.
Preserve Determinism¶
Any non-deterministic behavior (random waits, external side effects without safeguards) increases debugging complexity.
Emit Actionable Context¶
Logs and metrics should include stable identifiers (message_id, subscriber alias, error class) to support cross-system correlation.
Common Failure Modes¶
- Omitting
super()and breaking the chain. - Mixing unrelated concerns into one middleware class.
- Keeping mutable runtime state inside middleware instances.
- Creating hidden dependencies between middleware classes.
- Raising generic exceptions instead of
Drop/Retrywhen policy is known.
Recap¶
- Custom middlewares are the primary extension point for cross-cutting runtime policy.
- Use
Middleware(...)for configured, reusable classes. - Keep middlewares stateless and externalize mutable state when needed.
- Separate subscriber and publisher concerns when behavior differs by direction.
- Avoid middleware-to-middleware dependencies; share state through explicit services.
- Compose small middlewares in explicit order.
- Validate middleware behavior with
PubSubTestClientbefore production rollout.