Lifespan and Hooks¶
FastPubSub manages the application lifecycle using a built-in lifespan event handler. This implementation is based on the standard FastAPI lifespan context manager and runs the broker and subscribers within the same event loop as the web server.
When your FastPubSub application starts, the internal lifespan function handles starting and stopping the broker. It provides four hook decorators for adding custom logic at specific moments.
The Four Event Hooks¶
@app.on_startup¶
Runs after the application process starts but before broker.start() is called.
Use it for: Setting up essential resources that subscribers need before they start pulling messages.
@app.on_startup
async def setup_database():
logger.info("Connecting to the database...")
# Example: app.state.db_pool = await asyncpg.create_pool(...)
logger.info("Database connection pool created.")
@app.after_startup¶
Runs immediately after broker.start() completes successfully. Subscribers are now running and polling for messages.
Use it for: Logic that needs to interact with the active broker or subscribers.
@app.after_startup
async def announce_startup():
logger.info("Subscribers are running. Publishing startup message.")
await broker.publish("system-logs", data={"status": "online"})
@app.on_shutdown¶
Runs when the application receives a shutdown signal (e.g., SIGTERM), but before broker.shutdown() is called. Subscribers are still running.
Use it for: Initiating graceful shutdown. Set a flag to tell long-running handlers to finish up.
@app.on_shutdown
async def prepare_for_shutdown():
logger.info("Shutdown signal received. Preparing to stop...")
app.state.is_shutting_down = True
@app.after_shutdown¶
Runs after broker.shutdown() completes and all subscriber tasks have stopped. This is the last FastPubSub code to execute.
Use it for: Final cleanup and releasing resources created in on_startup.
@app.after_shutdown
async def cleanup_database():
logger.info("Closing database connection pool...")
# Example: await app.state.db_pool.close()
logger.info("Database pool closed.")
Execution Order¶
The lifecycle follows a strict order. The on_* hooks run before the broker action, and after_* hooks run after.
sequenceDiagram
participant CLI
participant App
participant Broker
CLI->>App: fastpubsub run ...
Note over App: Enters lifespan (before yield)
App->>App: Executes @app.on_startup hooks
App->>Broker: broker.start()
Broker-->>App: Subscribers are running
App->>App: Executes @app.after_startup hooks
Note over App, Broker: Application is running...
CLI->>App: (Shutdown signal received)
Note over App: Resumes lifespan (after yield)
App->>App: Executes @app.on_shutdown hooks
App->>Broker: broker.shutdown()
Broker-->>App: Subscribers are stopped
App->>App: Executes @app.after_shutdown hooks
App-->>CLI: Process exits
Custom Lifespan¶
For advanced use cases, pass a custom lifespan context manager to FastPubSub. The built-in lifecycle (including all four hooks and broker start/stop) executes within the yield block of your custom function.
from contextlib import asynccontextmanager
import httpx
from fastpubsub import FastPubSub, Message, PubSubBroker
from fastpubsub.logger import logger
@asynccontextmanager
async def global_lifespan(app: FastPubSub):
logger.info("GLOBAL LIFESPAN: Starting up...")
# Create a shared HTTP client
async with httpx.AsyncClient() as client:
app.state.http_client = client
logger.info("GLOBAL LIFESPAN: HTTP client created.")
yield
logger.info("GLOBAL LIFESPAN: HTTP client closed.")
logger.info("GLOBAL LIFESPAN: Shutting down...")
broker = PubSubBroker(project_id="fastpubsub-pubsub-local")
app = FastPubSub(broker, lifespan=global_lifespan)
@app.on_startup
async def on_startup_hook():
# Runs after http_client is created
logger.info(" FastPubSub Hook: @app.on_startup")
@app.after_startup
async def after_startup_hook():
logger.info(" FastPubSub Hook: @app.after_startup (Broker is running)")
@app.on_shutdown
async def on_shutdown_hook():
logger.info(" FastPubSub Hook: @app.on_shutdown")
@app.after_shutdown
async def after_shutdown_hook():
logger.info(" FastPubSub Hook: @app.after_shutdown (Broker is stopped)")
@broker.subscriber(
"test-handler",
topic_name="test-topic",
subscription_name="test-subscription",
)
async def handle_message(message: Message) -> None:
logger.info(f"Received: {message.data.decode()}")
This separates global resources (HTTP client) from broker-specific logic.
Step-by-Step¶
- Define a custom lifespan context manager.
- Allocate global resources before
yield. - Let FastPubSub run its built-in lifecycle inside
yield. - Clean up resources after
yield.
Recap¶
- Built on FastAPI's Lifespan: FastPubSub uses a built-in function that follows the standard lifespan context manager pattern
- Four hooks for precise control:
on_startup: Before broker startsafter_startup: After broker startson_shutdown: Before broker stopsafter_shutdown: After broker stops
- Custom lifespan support: Pass your own lifespan function for advanced resource management