Skip to content

Lifespan and Hooks

FastPubSub manages the application lifecycle using a built-in lifespan event handler. This implementation is based on the standard FastAPI lifespan context manager and runs the broker and subscribers within the same event loop as the web server.

When your FastPubSub application starts, the internal lifespan function handles starting and stopping the broker. It provides four hook decorators for adding custom logic at specific moments.

The Four Event Hooks

@app.on_startup

Runs after the application process starts but before broker.start() is called.

Use it for: Setting up essential resources that subscribers need before they start pulling messages.

@app.on_startup
async def setup_database():
    logger.info("Connecting to the database...")
    # Example: app.state.db_pool = await asyncpg.create_pool(...)
    logger.info("Database connection pool created.")

@app.after_startup

Runs immediately after broker.start() completes successfully. Subscribers are now running and polling for messages.

Use it for: Logic that needs to interact with the active broker or subscribers.

@app.after_startup
async def announce_startup():
    logger.info("Subscribers are running. Publishing startup message.")
    await broker.publish("system-logs", data={"status": "online"})

@app.on_shutdown

Runs when the application receives a shutdown signal (e.g., SIGTERM), but before broker.shutdown() is called. Subscribers are still running.

Use it for: Initiating graceful shutdown. Set a flag to tell long-running handlers to finish up.

@app.on_shutdown
async def prepare_for_shutdown():
    logger.info("Shutdown signal received. Preparing to stop...")
    app.state.is_shutting_down = True

@app.after_shutdown

Runs after broker.shutdown() completes and all subscriber tasks have stopped. This is the last FastPubSub code to execute.

Use it for: Final cleanup and releasing resources created in on_startup.

@app.after_shutdown
async def cleanup_database():
    logger.info("Closing database connection pool...")
    # Example: await app.state.db_pool.close()
    logger.info("Database pool closed.")

Execution Order

The lifecycle follows a strict order. The on_* hooks run before the broker action, and after_* hooks run after.

sequenceDiagram
    participant CLI
    participant App
    participant Broker

    CLI->>App: fastpubsub run ...

    Note over App: Enters lifespan (before yield)
    App->>App: Executes @app.on_startup hooks
    App->>Broker: broker.start()
    Broker-->>App: Subscribers are running
    App->>App: Executes @app.after_startup hooks
    Note over App, Broker: Application is running...

    CLI->>App: (Shutdown signal received)

    Note over App: Resumes lifespan (after yield)
    App->>App: Executes @app.on_shutdown hooks
    App->>Broker: broker.shutdown()
    Broker-->>App: Subscribers are stopped
    App->>App: Executes @app.after_shutdown hooks
    App-->>CLI: Process exits

Custom Lifespan

For advanced use cases, pass a custom lifespan context manager to FastPubSub. The built-in lifecycle (including all four hooks and broker start/stop) executes within the yield block of your custom function.

from contextlib import asynccontextmanager

import httpx

from fastpubsub import FastPubSub, Message, PubSubBroker
from fastpubsub.logger import logger


@asynccontextmanager
async def global_lifespan(app: FastPubSub):
    logger.info("GLOBAL LIFESPAN: Starting up...")
    # Create a shared HTTP client
    async with httpx.AsyncClient() as client:
        app.state.http_client = client
        logger.info("GLOBAL LIFESPAN: HTTP client created.")
        yield
    logger.info("GLOBAL LIFESPAN: HTTP client closed.")
    logger.info("GLOBAL LIFESPAN: Shutting down...")


broker = PubSubBroker(project_id="fastpubsub-pubsub-local")
app = FastPubSub(broker, lifespan=global_lifespan)


@app.on_startup
async def on_startup_hook():
    # Runs after http_client is created
    logger.info("  FastPubSub Hook: @app.on_startup")


@app.after_startup
async def after_startup_hook():
    logger.info("  FastPubSub Hook: @app.after_startup (Broker is running)")


@app.on_shutdown
async def on_shutdown_hook():
    logger.info("  FastPubSub Hook: @app.on_shutdown")


@app.after_shutdown
async def after_shutdown_hook():
    logger.info("  FastPubSub Hook: @app.after_shutdown (Broker is stopped)")


@broker.subscriber(
    "test-handler",
    topic_name="test-topic",
    subscription_name="test-subscription",
)
async def handle_message(message: Message) -> None:
    logger.info(f"Received: {message.data.decode()}")

This separates global resources (HTTP client) from broker-specific logic.


Step-by-Step

  1. Define a custom lifespan context manager.
  2. Allocate global resources before yield.
  3. Let FastPubSub run its built-in lifecycle inside yield.
  4. Clean up resources after yield.

Recap

  • Built on FastAPI's Lifespan: FastPubSub uses a built-in function that follows the standard lifespan context manager pattern
  • Four hooks for precise control:
    • on_startup: Before broker starts
    • after_startup: After broker starts
    • on_shutdown: Before broker stops
    • after_shutdown: After broker stops
  • Custom lifespan support: Pass your own lifespan function for advanced resource management