Observability and Logging¶
FastPubSub includes a powerful logging system designed for production environments. You get context-aware logging with minimal setup.
Built-in Logger¶
FastPubSub provides a pre-configured logger that you can import and use immediately:
@broker.subscriber(
alias="task-handler",
topic_name="tasks",
subscription_name="tasks-subscription",
)
async def handle_task(message: Message):
logger.info("This is a log message!")
Context-Aware Logging¶
The logger uses Python's ContextVars to safely add metadata in async environments. FastPubSub automatically adds context to every message:
- Message ID
- Topic name
- Subscriber handler name
However, you can also add you own context using logger.contextualize():
@broker.subscriber(
alias="user-task-handler",
topic_name="user-tasks",
subscription_name="user-tasks-subscription",
)
async def handle_user_task(message: Message):
user_id = message.attributes.get("user_id")
with logger.contextualize(user_id=user_id):
logger.info("Processing task for user.")
# ... some work ...
logger.warning("User processing had a minor issue.")
logger.info("This log will NOT have the user_id tag.")
Output:
2025-10-25 13:30:00 | INFO | Processing task for user. | message_id=12345 topic_name=tasks user_id=123
2025-10-25 13:30:00 | WARNING | User processing had a minor issue. | message_id=12345 topic_name=tasks user_id=123
2025-10-25 13:30:01 | INFO | This log will NOT have the user_id tag. | message_id=12345 topic_name=tasks
Structured JSON Logging¶
For production, switch to structured JSON output for log aggregation platforms:
# Via environment variable
export FASTPUBSUB_ENABLE_LOG_SERIALIZE=1
# Or via CLI flag
fastpubsub run app:app --log-serialize
Step-by-Step¶
- Enable JSON logging with
FASTPUBSUB_ENABLE_LOG_SERIALIZE=1or--log-serialize. - Run the app and confirm logs are JSON objects.
- Filter by fields like
message_id,topic_name, or your custom keys.
JSON Output:
{
"timestamp": "2025-10-25 13:30:00,123",
"level": "INFO",
"name": "fastpubsub",
"message": "Processing task for user.",
"module": "my_app",
"function": "handle_task",
"line": 15,
"message_id": "12345",
"topic_name": "tasks",
"user_id": "u_abc"
}
Log Levels¶
You can define the appropriate log levels using the built-in fastpubsub CLI:
@broker.subscriber(
alias="levels-handler",
topic_name="levels",
subscription_name="levels-subscription",
)
async def handler(message: Message):
order_id = "12345"
sku = "SKU-001"
qty = 5
reason = "insufficient funds"
attempts = 3
logger.debug("Detailed debug info", extra={"raw_data": message.data})
logger.info("Processing order", extra={"order_id": order_id})
logger.warning("Inventory low", extra={"sku": sku, "quantity": qty})
logger.error("Payment failed", extra={"reason": reason})
logger.critical("Database unreachable", extra={"attempts": attempts})
Production log level:
Health Check Endpoints¶
FastPubSub provides built-in health checks for orchestration:
Liveness Probe¶
Checks if the application is running:
Kubernetes configuration:
Readiness Probe¶
Checks if subscribers are actively polling:
curl http://localhost:8000/consumers/ready
# Response: 200 OK if subscribers are running
# Response: 503 Service Unavailable if not ready
Kubernetes configuration:
Cloud Logging Integration¶
For Google Cloud Logging:
import google.cloud.logging
from fastpubsub import FastPubSub, Message, PubSubBroker
from fastpubsub.logger import logger
# Set up Cloud Logging
client = google.cloud.logging.Client()
client.setup_logging()
broker = PubSubBroker(project_id="your-project-id")
app = FastPubSub(broker)
@broker.subscriber(
alias="cloud-handler",
topic_name="events",
subscription_name="events-subscription",
)
async def handler(message: Message):
logger.info("Message processed") # Sent to Cloud Logging
Recap¶
- Built-in logger: Import from
fastpubsub.loggerand use immediately - Context-aware: Automatic
message_id,topic_name, and handler context - Custom context: Use
logger.contextualize()for additional tags - JSON logs: Enable with
--log-serializefor production - Health checks:
/consumers/aliveand/consumers/readyfor orchestration - Log levels: Use appropriate levels (debug, info, warning, error, critical)