Troubleshooting and FAQ¶
This guide helps you resolve common issues when working with FastPubSub.
Common Issues¶
Authentication and Credentials¶
"Could not load the default credentials"¶
Error:
Solutions:
-
For local development with emulator:
-
For production/cloud:
-
Verify credentials:
"Permission denied" errors¶
Error:
Required permissions:
| Action | Required Permissions |
|---|---|
| Subscribers | pubsub.subscriptions.consume, pubsub.subscriptions.get |
| Publishers | pubsub.topics.publish |
| Autocreate | pubsub.topics.create, pubsub.subscriptions.create |
Solution: Check if you Service Account has enough permissions.
Message Delivery Issues¶
Messages not being consumed¶
Debugging steps:
-
Check if subscriber is running:
-
Verify topic and subscription exist:
-
Check subscription is attached to correct topic:
-
Test with manual publish:
-
Check filter expressions: If using
filter_expression, ensure published messages have matching attributes.
Messages being nacked repeatedly¶
Common causes:
-
Unhandled exceptions:
@broker.subscriber( alias="validated-handler", topic_name="validated-events", subscription_name="validated-events-subscription", ) async def validated_handler(message: Message): try: data = SomeModel.model_validate_json(message.data) except ValidationError as e: # Any other error will nack the message logger.error(f"Invalid message: {e}") raise Drop("Invalid message format") -
Processing timeout (ack deadline exceeded):
-
Blocking operations:
See Message Lifecycle for Drop/Retry behavior and Performance Tuning for ack deadline guidance.
Duplicate message processing¶
Solutions:
-
Make handlers idempotent:
@broker.subscriber( alias="idempotent-handler", topic_name="idempotent-events", subscription_name="idempotent-events-subscription", ) async def idempotent_handler(message: Message): event_id = message.attributes.get("event_id") if await redis.exists(f"processed:{event_id}"): return # Already handled await do_work(message.data) await redis.set(f"processed:{event_id}", "1", ex=86400) -
Enable exactly-once delivery:
Performance Issues¶
High latency / slow processing¶
Solutions:
-
Increase
max_messages: -
Profile with middleware:
See Performance Tuning for guidance on max_messages and ack_deadline_seconds.
High memory usage¶
Solutions:
-
Limit concurrent messages:
-
Avoid global mutable state:
Graceful Shutdown Issues¶
Messages lost during shutdown¶
Solutions:
-
Increase shutdown timeout:
-
In Kubernetes, set adequate termination period:
Development Issues¶
Emulator not connecting¶
Error:
Solutions:
-
Start the emulator:
-
Verify it's running:
-
Set environment variable:
-
Check port conflicts:
Frequently Asked Questions¶
What's the difference between a topic and a subscription?¶
- Topic: A named channel where messages are published
- Subscription: A named consumer of messages from a topic
- One topic can have multiple subscriptions (fan-out pattern)
- Each subscription receives a copy of every message
What happens if my handler raises an exception?¶
| Exception | Action | Message Destiny |
|---|---|---|
| None (success) | ack() |
Removed |
Drop |
ack() |
Removed |
Retry |
nack() |
Redelivered |
| Any other | nack() |
Redelivered |
How do I process messages in order?¶
Enable message ordering and use ordering keys:
@broker.subscriber(
alias="ordered-handler",
topic_name="ordered-events",
subscription_name="ordered-events-subscription",
enable_message_ordering=True,
)
async def ordered_handler(message: Message):
user_id = message.ordering_key
await process_in_order(user_id, message.data)
Can I use FastPubSub without FastAPI?¶
Currently, FastPubSub has tight coupling with FastAPI. The core PubSubBroker functionality doesn't strictly require FastAPI, but the framework is designed to work with it. Standalone usage is planned for future releases.
How do I test without the emulator?¶
Use PubSubTestClient:
from fastpubsub.testing import PubSubTestClient
async def test_handler():
async with PubSubTestClient(broker) as client:
await client.publish("topic", data=b"test")
What's the maximum message size?¶
Google Pub/Sub has a 10MB limit. For larger data:
- Store data in Cloud Storage and publish the URL
- Split into multiple messages
- Use compression (GZipMiddleware)
from fastpubsub import Middleware, GZipMiddleware
broker = PubSubBroker(
project_id="your-project-id",
middlewares=[Middleware(GZipMiddleware, compresslevel=6)]
)
How do I handle deployments without losing messages?¶
Use graceful shutdown:
Getting Help¶
If you encounter an issue not covered here:
- Check logs with
--log-level debug - Search existing issues: GitHub Issues
- Create a minimal reproduction and file an issue with:
- FastPubSub version (
pip show fastpubsub) - Python version
- Error messages and stack traces
- Relevant code snippets
Recap¶
- Authentication: Set
PUBSUB_EMULATOR_HOSTfor local,GOOGLE_APPLICATION_CREDENTIALSfor cloud - Message delivery: Check autocreate, filters, and ack deadlines
- Performance: Tune
max_messagesbased on workload - Graceful shutdown: Set adequate
shutdown_timeout - Testing: Use
PubSubTestClientfor fast unit tests - Production: Use idempotent handlers, monitoring, and proper IAM permissions