Publishers¶
A Publisher sends messages to a Google Cloud Pub/Sub topic in a set project. FastPubSub provides two patterns for publishing:
-
A central broker method for flexibility.
-
A dedicated publisher object for cleaner code.
Core Responsibilities¶
The publisher handles:
-
Connection Management: Efficiently manages gRPC connections, opening and closing them appropriately to reduce resource consumption.
-
Asynchronous Operations: All publishing calls are async, allowing the event loop to handle other tasks while waiting.
-
Automatic Serialization: Converts Python data into byte strings that Pub/Sub requires.
Serialization Strategy¶
The publisher automatically converts your data:
| Type | Serialization |
|---|---|
Pydantic BaseModel |
JSON bytes ({"key": "value"} → b'{"key":"value"}') |
dict |
JSON bytes ({"key": "value"} → b'{"key":"value"}') |
str |
UTF-8 bytes ("hello" → b'hello') |
bytes |
Sent as-is (for custom formats like Protobuf or Avro) |
Publishing with the Broker¶
The most direct and flexible way to publish a message. You call the broker.publish() method on your central broker object, specifying the destination topic for each call.
Example¶
from fastpubsub import FastPubSub, Message, PubSubBroker
from fastpubsub.logger import logger
broker = PubSubBroker(project_id="fastpubsub-pubsub-local")
app = FastPubSub(broker)
@broker.subscriber(
"test-alias",
topic_name="test-topic",
subscription_name="test-publish",
)
async def handle(message: Message) -> None:
logger.info(f"Processed message: {message}")
@app.after_startup
async def test_publish() -> None:
await broker.publish("test-topic", {"hello": "world"})
When to Use¶
- Publishing to many different topics from the same function.
- Simple or infrequent publishing needs.
- Debugging or quick scripts.
- Topic name determined at runtime.
Trade-offs¶
- Flexible and simple, but can become repetitive if you frequently publish to the same topic.
- The topic is specified every time, which can lead to typos caught only at runtime.
- Dependency injection becomes harder.
Step-by-Step¶
- Create a broker and app.
- Decide on a topic and message schema.
- Publish using
await broker.publish(...). - Confirm delivery by checking subscriber logs.
Using Dedicated Publisher Objects¶
The approach involves Publisher object that is pre-configured for a specific topic. This is the ideal pattern when a part of your application is dedicated to publishing messages to a single topic, as it leads to cleaner, more maintainable, and testable code.
publisher: Publisher = broker.publisher("test-topic")
@app.after_startup
async def test_publish() -> None:
await publisher.publish({"hello": "world"})
Example with Dependency Injection¶
This pattern works well with clean architecture and dependency injection:
from dataclasses import dataclass
from typing import Any
from pydantic import BaseModel
from fastpubsub import FastPubSub, Message, Publisher, PubSubBroker
from fastpubsub.logger import logger
@dataclass
class MyAwesomeUseCase:
publisher: Publisher
async def execute(self, data: dict) -> Any:
# Business logic here...
# Then publish the event
return await self.publisher.publish(data=data)
class User(BaseModel):
name: str
age: int
broker = PubSubBroker(project_id="fastpubsub-pubsub-local")
app = FastPubSub(broker)
# Create a dedicated publisher for user events
user_publisher = broker.publisher("new-users-topic")
@broker.subscriber(
"user-events-handler",
topic_name="new-users-topic",
subscription_name="new-users-subscription",
)
async def handle_user_event(message: Message) -> None:
logger.info(f"Received user event: {message.data.decode()}")
@app.post("/new-user")
async def receive_new_user(user: User) -> dict[str, str]:
logger.info(f"Received a new user: {user.name}")
# Inject the dedicated publisher into the use case
# Easy to mock in tests
use_case = MyAwesomeUseCase(publisher=user_publisher)
await use_case.execute(user.model_dump())
return {"message": "Use case executed successfully"}
When to Use¶
- A part of your application is dedicated to a single topic
- You want readable, reusable code (
user_publisher.publish(...)) - Using dependency injection
- Unit testing (easily mock the publisher)
Trade-offs¶
- It requires a minor, one-time setup for each dedicated topic.
- This might feel like boilerplate if you have dozens of topics being published from a single module.
Other Common Usages¶
Google Pub/Sub has features that let you control how data is delivered to the consumer. The next sections describe common configurations you will use when working with FastPubSub.
Publishing with Attributes¶
Sometimes you need to add metadata to give context to your message events without modifying your schema. This is useful for server-side filtering or routing. In FastPubSub, you can add information to messages using their attributes. These map directly to Pub/Sub message attributes rather than the payload.
Publishing with Ordering¶
For ordered message delivery, enable the enable_message_ordering on the receiving subscriber and provide an ordering key while publishing the message. FastPubSub's internal engine will handle all the Publisher configuration required to enable message ordering on Google's SDK. With that the messages with the same ordering key will be delivered in the order they were published.
@app.after_startup
async def publish_with_broker() -> None:
await broker.publish(
topic_name="user-events",
data={"action": "login", "user_id": "user-123"},
ordering_key="user-123", # Same key ensures order
)
await broker.publish(
topic_name="user-events",
data={"action": "update_profile", "user_id": "user-123"},
ordering_key="user-123", # Same key ensures order
)
ordered_publisher: Publisher = broker.publisher("user-events")
@app.after_startup
async def publish_with_publisher() -> None:
# Publish with ordering key
await ordered_publisher.publish(
data={"action": "login", "user_id": "user-123"},
ordering_key="user-123", # Same key ensures order
)
await ordered_publisher.publish(
data={"action": "update_profile", "user_id": "user-123"},
ordering_key="user-123", # Same key ensures order
)
Cross-Project Publishing¶
In some scenarios, you may need to publish messages into projects that are not directly linked to the subscribers you created. FastPubSub allows you to publish to a topic in a different GCP project by overriding the project_id attribute.
Recap¶
- Two publishing patterns: Direct
broker.publish()for flexibility, dedicatedPublisherobjects for cleaner code. - Automatic serialization: Pydantic models and dicts become JSON then bytes, strings become UTF-8, bytes sent as-is.
- Attributes: Add metadata for filtering and routing.
- Ordering: Enable ordering and use ordering keys for sequential delivery.
- Cross-project: Publish to topics in different GCP projects.
- Always async: All publishing must be awaited.