Pydantic Integration¶
FastPubSub integrates with Pydantic for data validation and serialization. This integration enables type-safe message handling and automatic JSON conversion.
How FastPubSub Uses Pydantic¶
FastPubSub leverages Pydantic in three ways:
- Message Serialization - Pydantic models are automatically converted to JSON when publishing.
- Push Message Handling - Built-in models for HTTP push endpoints.
- Parameter Validation - Public APIs use
@validate_callfor type checking.
Step-by-Step¶
- Define a Pydantic model for your message schema.
- Publish model instances with
broker.publish(...). - Parse incoming bytes with
model_validate_json()in handlers. - Handle validation errors by raising
Drop.
Publishing Pydantic Models¶
When you publish a Pydantic model, FastPubSub automatically serializes it to JSON:
class OrderEvent(BaseModel):
order_id: str
customer_id: str
total: float
items: list[str]
@app.post("/create-order")
async def create_order(order: OrderEvent):
# Pydantic model is automatically serialized to JSON
await broker.publish("orders", order)
return {"status": "created"}
- FastPubSub calls
order.model_dump_json()internally
Pydantic Version
The examples use Pydantic v2 (model_dump_json, model_validate_json). We will not provide Pydantic v1 as it is already deprecated.
Supported Data Types¶
FastPubSub accepts multiple data types for publishing:
| Type | Serialization |
|---|---|
bytes |
Sent as-is |
str |
UTF-8 encoded |
dict |
JSON serialized |
BaseModel |
JSON serialized via model_dump_json() |
# All of these work:
await broker.publish("topic", b"raw bytes")
await broker.publish("topic", "string message")
await broker.publish("topic", {"key": "value"})
await broker.publish("topic", OrderEvent(order_id="123", ...))
Validating Incoming Messages¶
Parse and validate incoming message data using Pydantic models in your handlers:
class UserEvent(BaseModel):
user_id: str
email: str
action: str
@broker.subscriber(
alias="user-handler",
topic_name="user-events",
subscription_name="user-events-subscription",
)
async def handle_user_event(message: Message):
try:
# Parse and validate the message data
event = UserEvent.model_validate_json(message.data)
await process_user_event(event)
except ValidationError as e:
# Invalid data - drop the message
raise Drop(f"Invalid user event: {e}")
- Use
model_validate_json()to parse bytes directly
Validation Patterns¶
class PaymentEvent(BaseModel):
payment_id: str
amount: float # Required field
@broker.subscriber(
alias="payment-handler",
topic_name="payments",
subscription_name="payments-subscription",
)
async def handle_payment(message: Message):
# Raises ValidationError if amount is missing
event = PaymentEvent.model_validate_json(message.data)
class NotificationEvent(BaseModel):
user_id: str
title: str
body: str | None = None # Optional field
@broker.subscriber(
alias="notification-handler",
topic_name="notifications",
subscription_name="notifications-subscription",
)
async def handle_notification(message: Message):
event = NotificationEvent.model_validate_json(message.data)
# body will be None if not provided
class ConstrainedOrderEvent(BaseModel):
order_id: str = Field(min_length=1)
quantity: int = Field(gt=0, le=1000)
email: str = Field(pattern=r"^[\w.-]+@[\w.-]+\.\w+$")
@broker.subscriber(
alias="constrained-order-handler",
topic_name="constrained-orders",
subscription_name="constrained-orders-subscription",
)
async def handle_constrained_order(message: Message):
# Validates constraints automatically
event = ConstrainedOrderEvent.model_validate_json(message.data)
Push Message Models¶
FastPubSub provides built-in Pydantic models for handling HTTP push subscriptions:
@app.post("/push-endpoint")
async def receive_push(push_message: PushMessage):
# Access the nested message content
message_id = push_message.message.id
subscription = push_message.subscription
# Decode base64 data
raw_data = base64.b64decode(push_message.message.data)
# Parse as your domain model
event = OrderEvent.model_validate_json(raw_data)
await process_event(event)
return {"status": "ok"}
- FastAPI automatically validates the incoming JSON against
PushMessage
PushMessage Structure¶
class PushMessageContent(BaseModel):
id: str # Message ID (alias: messageId)
data: str # Base64-encoded message data
publish_time: str # Publish timestamp (alias: publishTime)
attributes: dict[str, str] = {}
class PushMessage(BaseModel):
subscription: str # Full subscription path
message: PushMessageContent
Base64 Encoding
Push messages from Pub/Sub have their data base64-encoded. Use base64.b64decode() to get the raw bytes before parsing.
Schema Evolution¶
Handle message schema changes gracefully:
Adding New Fields¶
class OrderEventV2(BaseModel):
order_id: str
customer_id: str
total: float
# New field with default - backward compatible
priority: str = "normal"
@broker.subscriber(
alias="order-v2-handler",
topic_name="orders-v2",
subscription_name="orders-v2-subscription",
)
async def handle_order_v2(message: Message):
# Works with both old (no priority) and new messages
event = OrderEventV2.model_validate_json(message.data)
Handling Unknown Fields¶
class FlexibleEvent(BaseModel):
model_config = ConfigDict(extra="ignore")
order_id: str
# Unknown fields are silently ignored
class StrictEvent(BaseModel):
model_config = ConfigDict(extra="forbid")
order_id: str
# Unknown fields raise ValidationError
- Ignore extra fields from newer message versions
- Fail if message has unexpected fields
Best Practices¶
- Use Explicit Models: Define Pydantic models for all message types. This documents your message schema and catches errors early.
- Handle Validation Errors: Always wrap
model_validate_json()in try/except. Invalid messages should be dropped, not retried forever. - Version Your Schemas: Use optional fields with defaults when evolving schemas. This maintains backward compatibility during deployments.
- Validate at Boundaries: Validate incoming messages at the handler entry point. Trust your own Pydantic models when publishing.
Recap¶
- FastPubSub automatically serializes Pydantic models to JSON when publishing
- Use
model_validate_json()to parse and validate incoming messages - Handle
ValidationErrorby raising Drop for invalid messages - Built-in
PushMessagemodel handles HTTP push endpoints - Use optional fields with defaults for backward-compatible schema evolution
- Next: Learn about Uvicorn Integration for production deployments