Subscribers¶
A subscriber is a message handler attached to a Pub/Sub subscription. A subscription represents a stream of messages from a specific topic. FastPubSub manages your subscribers using its asyncio-native broker.
Defining a Subscriber¶
Use the @broker.subscriber decorator to register an async function as a message handler:
broker = PubSubBroker(project_id="fastpubsub-pubsub-local")
app = FastPubSub(broker)
@broker.subscriber(
alias="my_handler",
topic_name="in_topic",
subscription_name="sub_name",
)
async def handle_message(message: Message):
logger.info(f"The message {message.id} is processed.")
await broker.publish(topic_name="out_topic", data="Hi!")
The decorator automatically manages connection, message fetching, and acknowledgment.
You only need to provide alias, topic_name, and subscription_name while the rest of the configuration has sensible defaults.
Step-by-Step¶
- Create a
PubSubBroker. - Add a
@broker.subscriber(...)handler. - Set
topic_nameandsubscription_nameusing your naming convention. - Run the app with
fastpubsub run. - Publish a test message and confirm logs.
Configuration Options¶
The subscriber register function has a variety of configuration you can set to fully control the message consumption behavior in case you still need to manipulate them.
@broker.subscriber(
alias: str,
*,
topic_name: str,
subscription_name: str,
project_id: str = "",
autocreate: bool = True,
autoupdate: bool = False,
filter_expression: str = "",
dead_letter_topic: str = "",
max_delivery_attempts: int = 5,
ack_deadline_seconds: int = 60,
enable_message_ordering: bool = False,
enable_exactly_once_delivery: bool = False,
min_backoff_delay_secs: int = 10,
max_backoff_delay_secs: int = 600,
max_messages: int = 1000,
middlewares: Sequence[Middleware] = (),
)
Most of the configuration is directly mapped from the Google's Pub/Sub Python SDK. The few exceptions are configuration used for lifecycle control and consumer behavior as we describe them below.
Lifecycle Control¶
These settings manage Pub/Sub resource creation on startup.
| Parameter | Description |
|---|---|
autocreate |
If True, creates topic and subscription on startup if they don't exist |
autoupdate |
If True, updates existing subscription configuration to match decorator parameters |
autoupdate Limitations
Remember: autoupdate won't create a missing topic.
Subscription Behavior (Server-Side)¶
These configure the subscription on Google Cloud.
| Parameter | Description | Updatable |
|---|---|---|
filter_expression |
Server-side filter for matching messages | Yes |
dead_letter_topic |
Topic for failed messages | Yes |
max_delivery_attempts |
Attempts before sending to dead-letter topic | Yes |
ack_deadline_seconds |
Time to acknowledge before redelivery | Yes |
enable_message_ordering |
Process messages with same key in order | No |
enable_exactly_once_delivery |
Guarantee no duplicate processing | No |
min_backoff_delay_secs |
Initial backoff delay for retries | No |
max_backoff_delay_secs |
Maximum backoff delay for retries | No |
Consumer Behavior (Client-Side)¶
These control how FastPubSub processes messages.
| Parameter | Description |
|---|---|
max_messages |
Maximum concurrent messages being processed |
middlewares |
List of middleware classes to wrap the handler |
The Importance of Async¶
FastPubSub is an asyncio-native framework, which means your message handlers must be async def functions. This design is critical for performance. An asyncio application runs on a single thread managed by an event loop. This single thread juggles multiple tasks, such as handling incoming Pub/Sub messages and serving HTTP requests (if using the FastAPI integration).
A task can only be paused when it encounters an await keyword, allowing the event loop to switch to another task. If you use a blocking operation like time.sleep() instead of await asyncio.sleep(), you freeze the entire thread. No other tasks can run, causing your application to become unresponsive.
Blocking (Don't Do This)¶
As mentioned, blocking calls freeze the event loop. Any other tasks, like incoming HTTP requests, are stalled until the blocking call finishes. The diagram below shows an example of such occurrence:
sequenceDiagram
participant Client as HTTP Client
participant App as FastPubSub App
par Message arrives
App->>App: process_message(message)
App->>App: time.sleep(5) # BLOCKS!
Note over App: Event loop frozen!
and API request arrives
Client->>App: GET /alive
Note right of App: Request stuck
App--xClient: Request Times Out
end
What is happening here:
- A message is pulled from Pub/Sub, and the event loop gives control to the Pub/Sub Handler (your function).
- The handler executes
time.sleep(5), a blocking call that freezes the entire thread. - While the handler is blocked, a Client sends a
GET /aliverequest to the API. - The application is completely unresponsive and cannot even begin to process the API request due to event loop being blocked.
- After a long wait, the client's request times out with the application's performance severely degraded.
Non-Blocking (Correct)¶
On the other hand, non-blocking calls with a explicit await yields control back to the event loop, allowing it to work on other tasks while waiting for the operation to complete. The diagram below shows how it would play out:
sequenceDiagram
participant Client as HTTP Client
participant App as FastPubSub App
par Message arrives
App->>App: process_message(message)
App->>App: await asyncio.sleep(5)
Note right of App: Event loop free!
and API request arrives
Client->>App: GET /alive
App->>Client: 200 OK (Instant)
Note over Client, App: After 5s, handler resumes
end
What is happening here:
- A message is pulled, and the event loop gives control to the Pub/Sub Handler.
- The handler executes
await asyncio.sleep(5). Theawaitkeyword pauses the handler and yields control back to the event loop. - While the handler is paused, a Client sends a
GET /aliverequest to the API. - The event loop is free and immediately processes the request, calls the API endpoint, and returns a
200 OKresponse to the client. The API remains fast and responsive. - After the 5-second non-blocking sleep is over, the event loop will resume the handler where it left off.
Handling Push Subscriptions¶
While FastPubSub is designed for high-performance pull-based consumption, you can easily handle messages from an existing push subscription using its FastAPI integration.
A push subscription sends messages via an HTTP POST request to a webhook endpoint. You can create a FastAPI endpoint to receive and process these messages. FastPubSub even provides a Pydantic model, PushMessage, to automatically parse the incoming request body.
from fastpubsub import FastPubSub, PubSubBroker, PushMessage
from fastpubsub.logger import logger
broker = PubSubBroker(project_id="fastpubsub-pubsub-local")
app = FastPubSub(broker)
@app.post("/push-handler/")
async def handle_push_message(data: PushMessage):
logger.info(f"Received push message: {data.message}")
# Returning 2xx acknowledges the message
return {"status": "ok"}
Push Subscription Checklist¶
- Create the push subscription in GCP with your HTTPS endpoint.
- Ensure the endpoint is publicly reachable or exposed via a gateway.
- Verify authentication and signature requirements for the push endpoint.
- Decode the base64 payload and validate it before processing.
Recap¶
In this section, you've learned:
-
How to define a Pub/Sub subscriber declaratively using the
@broker.subscriberdecorator. -
How to configure a subscription's server-side behavior and client-side handling.
-
Why using non-blocking,
asynccode is essential for building responsive applications with FastPubSub. -
How to receive messages from a push subscription using the FastAPI integration.