Middlewares¶
A "middleware" is a function that runs on every message received before it is processed by any specific subscriber handler and with every end of processing. The FastPubSub's middleware system intercepts and processes incoming messages before they reach your handler, and outgoing messages before they're sent. Middlewares are ideal for implementing cross-cutting concerns without cluttering business logic.
How Middlewares Work¶
Think of middlewares as layers of an onion:
- Incoming messages: Start at the outermost layer and travel inward through each middleware until reaching your handler
- Outgoing messages: Start at the core (your code) and travel outward through the stack before being sent to Pub/Sub
Common Use Cases¶
- Adding contextual logging for every message
- Measuring processing time with metrics and tracing
- Validating authentication tokens
- Automatically adding trace IDs to outgoing messages
- Implementing global error handling
Creating Middlewares¶
Inherit from BaseMiddleware and implement one or both methods:
async def on_message(...): Intercepts incoming messagesasync def on_publish(...): Intercepts outgoing messages
Always Call Super
You must call await super().on_message(...) or await super().on_publish(...) to pass control to the next middleware. Without this, the chain breaks and the application fails.
Example: Logging Middleware¶
class FullLoggingMiddleware(BaseMiddleware):
async def on_message(self, message: Message) -> Any:
start_time = time.monotonic()
try:
# Call the next middleware or handler
response = await super().on_message(message)
processing_time = (time.monotonic() - start_time) * 1000
logger.info(f"Message processed in {processing_time:.2f}ms")
return response
except Exception as e:
logger.error(
f"Message {message.id} failed with error: {e}",
extra={"message_id": message.id},
)
# Re-raise to trigger nack
raise
async def on_publish(
self, data: bytes, ordering_key: str, attributes: dict[str, str] | None
) -> Any:
logger.info(f"Publishing message with {len(data)} bytes")
if attributes is None:
attributes = {}
# Add a trace ID to all outgoing messages
attributes["x-trace-id"] = "some-trace-id"
# Call the next middleware or publisher
return await super().on_publish(data, ordering_key, attributes)
Step-by-Step¶
- Inherit from
BaseMiddleware. - Implement
on_messageand/oron_publish. - Call
await super().on_message(...)orawait super().on_publish(...). - Register the middleware at the desired level.
Applying Middlewares¶
Middlewares can be applied at four levels, from broadest to most specific.
Broker Level (Global)¶
Applied to all subscribers and publishers in the application.
Router Level¶
Applied to all subscribers and publishers in a specific router (and its nested routers):
Subscriber Level¶
Applied to a single subscriber:
@broker.subscriber(
alias="debug-handler",
topic_name="events",
subscription_name="events-subscription",
middlewares=[Middleware(DebugMiddleware)],
)
async def handle_message(message: Message):
print(message)
Publisher Level¶
Applied to a dedicated publisher instance:
my_publisher = broker.publisher("events")
my_publisher.include_middleware(AddTraceMiddleware)
@app.after_startup
async def publish_with_trace():
await my_publisher.publish(data={"hello": "world"})
The Middleware Wrapper¶
Use the Middleware(...) wrapper when you need to pass constructor arguments to a middleware class. It lets you configure a middleware instance at registration time while keeping the registration API consistent.
Only one way to add
Subscriber and Publisher middlewares can only be added on the constructor functions subscriber(...) or publisher(...), respectively. You cannot call broker.publish(...) with a middleware.
Common Pitfalls¶
- Forgetting to call
super()breaks the chain. - Adding middleware at the wrong level (broker vs router vs subscriber).
- Doing slow I/O inside middleware without
await.
Middleware Hierarchy and Flow¶
The execution order depends on the message direction:
graph TD
subgraph "Incoming Flow"
direction TB
A[Message from Pub/Sub] --> B(Broker Middleware)
B --> C(Router Middleware)
C --> D(Subscriber Middleware)
D --> E[Your Handler]
end
subgraph "Outgoing Flow"
direction TB
F[Your code calls publish] --> G(Publisher Middleware)
G --> H(Router Middleware)
H --> I(Broker Middleware)
I --> J[Send to Pub/Sub]
end
Incoming messages: Broker → Router → Subscriber → Handler
Outgoing messages: Publisher → Router → Broker → Pub/Sub
Recap¶
- Purpose: Implement cross-cutting concerns (logging, auth, metrics) without cluttering handlers
- Dual function: Intercept incoming messages via
on_messageand outgoing messages viaon_publish - Creating: Inherit from
BaseMiddleware, implementon_messageand/oron_publish - The
super()call: Always callawait super().on_message(...)orawait super().on_publish(...)to continue the chain - Multiple levels: Apply at Broker, Router, Subscriber, or Publisher level
- Execution order:
- Incoming: Broker → Router → Subscriber → Handler
- Outgoing: Publisher → Router → Broker → Pub/Sub