Command-Line Interface (CLI)¶
The fastpubsub CLI is a production-ready tool for running and managing your applications. Built with Typer and powered by Uvicorn, it provides a seamless experience for both local development and production deployments.
How It Works¶
When you execute the run command, the CLI:
- Checks authentication: Verifies Google Cloud credentials are set up (
GOOGLE_APPLICATION_CREDENTIALSorPUBSUB_EMULATOR_HOST) - Loads configuration: Parses command-line arguments and sets environment variables
- Imports the application: Loads your FastPubSub app using the specified path (e.g.,
my_app.main:app) - Starts Uvicorn: Hands over to Uvicorn, which runs your app. Subscribers start as background tasks within the event loop
Prerequisites¶
Before running the CLI, authenticate with Google Cloud or start the emulator:
# For cloud environments
gcloud auth application-default login
# For local development with emulator
export PUBSUB_EMULATOR_HOST=localhost:8085
Basic Usage¶
The primary command is run, which takes one required argument: the path to your FastPubSub application in the format path.to.module:variable_name.
Example application (my_project/main.py):
from fastpubsub import FastPubSub, Message, PubSubBroker
broker = PubSubBroker(project_id="your-project-id")
app = FastPubSub(broker)
@broker.subscriber(
alias="process-orders",
topic_name="orders",
subscription_name="orders-sub",
)
async def handle_orders(message: Message):
"""Process order messages."""
pass
@broker.subscriber(
alias="send-notifications",
topic_name="notifications",
subscription_name="notifications-sub",
)
async def handle_notifications(message: Message):
"""Process notification messages."""
pass
Run with default settings:
Step-by-Step¶
- Set credentials or emulator host.
- Point to your app:
module.path:app. - Run
fastpubsub run. - Check logs for subscriber startup.
Development Mode¶
Hot-Reloading¶
For local development, use --reload to automatically restart when you save a file:
Note
When using --reload, the --workers option is ignored. The application runs in a single process.
Running Specific Subscribers¶
In larger applications, you might want to run only a subset of subscribers. Use the -s or --subscribers flag:
# Run only one subscriber
fastpubsub run my_project.main:app -s process-orders
# Run multiple specific subscribers
fastpubsub run my_project.main:app -s process-orders -s send-notifications
This is useful for:
- Running different subscribers on different machines
- Testing specific handlers in isolation
- Scaling individual subscribers independently
Wildcard Patterns¶
You can use glob patterns to select subscribers by alias prefix, suffix, or hierarchy:
# All subscribers under the "orders" prefix
fastpubsub run my_project.main:app -s 'orders.*'
# All "process" subscribers across any prefix
fastpubsub run my_project.main:app -s '*.process'
# All subscribers at any depth under "orders"
fastpubsub run my_project.main:app -s 'orders.**'
# "process" subscribers at any depth
fastpubsub run my_project.main:app -s '**.process'
# Compose patterns: ** and * together
fastpubsub run my_project.main:app -s '**.orders.*.process'
# Mix exact names and patterns
fastpubsub run my_project.main:app -s 'orders.*' -s payments.refund
Given the following application:
@orders_router.subscriber(
alias="process",
topic_name="new-orders",
subscription_name="process-sub",
)
async def process_order(message: Message):
"""Process incoming orders."""
pass
@orders_router.subscriber(
alias="validate",
topic_name="new-orders",
subscription_name="validate-sub",
)
async def validate_order(message: Message):
"""Validate incoming orders."""
pass
@orders_router.subscriber(
alias="notify",
topic_name="order-events",
subscription_name="notify-sub",
)
async def notify_order(message: Message):
"""Send order notifications."""
pass
@payments_router.subscriber(
alias="process",
topic_name="payment-events",
subscription_name="process-sub",
)
async def process_payment(message: Message):
"""Process payments."""
pass
broker.include_router(orders_router)
broker.include_router(payments_router)
The registered aliases are: orders.process, orders.validate, orders.notify, payments.process.
| Pattern | Matches |
|---|---|
orders.* |
orders.process, orders.validate, orders.notify |
*.process |
orders.process, payments.process |
orders.** |
orders.process, orders.validate, orders.notify |
**.process |
orders.process, payments.process |
order?.* |
orders.process, orders.validate, orders.notify |
Supported wildcards:
| Wildcard | Meaning |
|---|---|
* |
Matches any characters within a single segment (between dots) |
? |
Matches exactly one character |
** |
Matches zero or more dot-separated segments |
Note
If a pattern does not match any subscriber, a warning is logged and the application continues. The application only fails to start if no subscribers at all are selected.
Production Mode¶
Multiple Workers¶
For production, run multiple worker processes to utilize multiple CPU cores:
Each worker is a separate Python process with its own event loop, allowing true parallel execution.
Worker Count
A common recommendation is (2 * CPU_CORES) + 1. For a 4-core machine: (2 * 4) + 1 = 9 workers.
Host and Port¶
Configure the network binding:
Logging Options¶
Control log verbosity and format:
# Debug-level logging
fastpubsub run my_project.main:app --log-level debug
# Structured JSON logging for production
fastpubsub run my_project.main:app --log-serialize
# Combine options
fastpubsub run my_project.main:app --log-level info --log-serialize
Available log levels: debug, info, warning, error, critical
Future Development¶
The framework is actively developed with planned features:
- Full Support for Uvicorn: At the moment we only support a few set of basic configurations for uvicorn. We intend to support them all in the future.
- Publish Messages: Publish messages locally to the configured emulator or directly sending to your handler.
- Inspecting Components: See Inspecting Your Application for listing subscribers, topics, and more.
Recap¶
- Core command:
fastpubsub run module:appis the main entry point - Development: Use
--reloadfor efficient local development - Production: Use
--workers Nto scale across CPU cores - Granular control: Use
--subscribersto run specific handlers - Logging: Configure with
--log-leveland--log-serialize - Flexibility: Options can be set via CLI flags or environment variables