Routers¶
As your application grows, managing all subscribers and publishers in a single file becomes cumbersome. Routers, inspired by FastAPI's APIRouter, help you organize and scale your application by dividing it into smaller, modular components.
Why Use Routers?¶
- Logical Separation: Organize your subscribers and publishers by domain or feature (e.g.,
users,orders,notifications). This makes your codebase easier to navigate and understand. - Code Modularity: Split your application into multiple Python files and directories. Each module can define its own self-contained router, which is then included in the main application.
- Avoid Naming Conflicts: Routers use a
prefixto namespace the aliases of their subscribers. This prevents two subscribers from different domains from having the same alias (e.g.,users.createandorders.create). - Scoped Configurations: Apply configurations, such as a specific set of middlewares, to an entire group of subscribers at once.
Core Concepts¶
Prefix and Aliasing¶
The key to understanding routers is the aliasing mechanism. When you create a router, you give it a string prefix that will be prepended to the alias of every subscriber defined within it. The final alias is then used by the CLI as prefix.alias to define which subscriber should be started.
Example:
- Router:
users_router = PubSubRouter(prefix="users") - Subscriber:
@users_router.subscriber(alias="new-user", subscription_name="new-accounts", ...) - Final alias:
users.new-user - Subscription name:
new-accounts
How to Use Routers¶
Using routers is a three-step process:
- Create a Router: Instantiate PubSubRouter in a separate module, providing a unique
prefix. - Define Subscribers: Use the router's
@router.subscriber()decorator, just as you would with a broker. - Include the Router: In your main application file, import the router instance and include it in your central broker using
broker.include_router()or via constructor.
Step-by-Step¶
- Create a router with a clear prefix.
- Define subscribers on the router.
- Include the router in the broker.
- Run specific subscribers with
-s prefix.alias.
Example: Multi-File Application¶
Project structure:
File: my_app/users_domain/router.py
from fastpubsub import Message, PubSubRouter
from fastpubsub.logger import logger
users_router = PubSubRouter(prefix="users")
@users_router.subscriber(
alias="created",
topic_name="users-topic",
subscription_name="users-subscription",
)
async def handle_user_message(message: Message):
logger.info(f"Processing message {message.id} in users domain.")
File: my_app/posts_domain/router.py
from fastpubsub import Message, PubSubRouter
from fastpubsub.logger import logger
posts_router = PubSubRouter(prefix="posts")
@posts_router.subscriber(
alias="published",
topic_name="posts-topic",
subscription_name="posts-subscription",
)
async def handle_post_message(message: Message):
logger.info(f"Processing message {message.id} in posts domain.")
File: my_app/main.py
broker = PubSubBroker(project_id="fastpubsub-pubsub-local")
# Include the routers as part of the broker
broker.include_router(users_router)
broker.include_router(posts_router)
app = FastPubSub(broker)
@app.after_startup
async def publish_test_messages():
await users_router.publish("users-topic", data={"username": "Yugi"})
await posts_router.publish("posts-topic", data={"title": "My New Post"})
Running Specific Subscribers¶
Use fully-qualified aliases (prefix.alias) with the -s flag:
# Set up your environment
export PUBSUB_EMULATOR_HOST=localhost:8085
# Run only the users subscriber
fastpubsub run my_app.main:app -s users.created
# Run subscribers from both routers
fastpubsub run my_app.main:app -s users.created -s posts.published
Nested Routers¶
For large applications, nest routers to create a hierarchical structures. When you nest a router, prefixes are stacked (concatenated with a dot).
Example: Financial Application¶
from fastpubsub import FastPubSub, Message, PubSubBroker, PubSubRouter
from fastpubsub.logger import logger
# Level 2: Sub-domain routers
banking_router = PubSubRouter(prefix="banking")
finance_router = PubSubRouter(prefix="finance")
# Level 1: Main domain router
core_router = PubSubRouter(prefix="core")
core_router.include_router(banking_router)
core_router.include_router(finance_router)
@core_router.subscriber(
alias="core_handler",
topic_name="core-topic",
subscription_name="core-subscription",
)
async def handle_message_core(message: Message):
logger.info(f"CORE handler received message {message.id}")
@banking_router.subscriber(
alias="banking_handler",
topic_name="banking-topic",
subscription_name="banking-subscription",
)
async def handle_message_banking(message: Message):
logger.info(f"BANKING handler received message {message.id}")
@finance_router.subscriber(
alias="finance_handler",
topic_name="finance-topic",
subscription_name="finance-subscription",
)
async def handle_message_finance(message: Message):
logger.info(f"FINANCE handler received message {message.id}")
# Application setup
broker = PubSubBroker(project_id="fastpubsub-pubsub-local")
broker.include_router(core_router)
app = FastPubSub(broker)
@app.after_startup
async def publish_test_messages():
await broker.publish("core-topic", {"event": "core"})
await broker.publish("banking-topic", {"event": "banking"})
await broker.publish("finance-topic", {"event": "finance"})
Resulting Aliases¶
| Handler | Stacked Prefixes | Subscriber Alias | Final Alias |
|---|---|---|---|
handle_message_core |
core |
core_handler |
core.core_handler |
handle_message_banking |
core + banking |
banking_handler |
core.banking.banking_handler |
handle_message_finance |
core + finance |
finance_handler |
core.finance.finance_handler |
CLI Commands¶
# Run only the finance handler
fastpubsub run main:app -s core.finance.finance_handler
# Run both core and banking handlers
fastpubsub run main:app -s core.core_handler -s core.banking.banking_handler
Interface: PubSubBroker vs PubSubRouter¶
Routers have a broker-like interface. Most things you can do with a broker, can also be done with a router.
Publishing from Routers¶
You can publish messages directly from them:
@app.after_startup
async def publish_test_messages():
await users_router.publish("users-topic", data={"username": "Yugi"})
await posts_router.publish("posts-topic", data={"title": "My New Post"})
Cross-Project Routers¶
Configure a router to use a different GCP project:
# All subscribers in this router use project-b
external_router = PubSubRouter(
prefix="external",
project_id="project-b",
)
@external_router.subscriber(
alias="handler",
topic_name="events",
subscription_name="events-subscription",
)
async def handle_external_event(message: Message):
logger.info(f"External event: {message.data.decode()}")
broker.include_router(external_router)
Router Middlewares¶
Apply middlewares to all subscribers in a router:
See the Middlewares guide for more details on middleware hierarchy.
Recap¶
- Purpose: Routers organize large applications into domain-specific modules.
- Prefix is key: Each router has a prefix that namespaces subscriber aliases.
- Modular structure: Define routers in separate files, then use
broker.include_router()to plug them in. - CLI integration: Select subscribers by fully-qualified alias:
prefix.alias. - Nested routers: Stack prefixes for deep hierarchies (e.g.,
core.banking.handler). - Broker-like interface: Routers can define subscribers and publish messages.