PubSubTestClient class¶
fastpubsub.testing.PubSubTestClient
¶
A test wrapper for PubSubBroker that enables in-memory message routing.
This allows testing subscriber handlers without needing a real PubSub emulator,making tests fast and isolated. Supports filter expression evaluation to match Google Pub/Sub behavior.
Example
broker = PubSubBroker(project_id="test")
@broker.subscriber(
alias="orders",
topic_name="events",
subscription_name="orders-sub",
filter_expression='attributes.type = "order"',
)
async def handler(msg: Message) -> None:
logger.info(f"Processed: {msg.data}")
async with PubSubTestClient(broker) as test_client:
# This message will be routed to handler
await test_client.publish(
"Hello",
topic="events",
attributes={"type": "order"},
)
# This message will NOT be routed (wrong type)
await test_client.publish(
"Ignored",
topic="events",
attributes={"type": "user"},
)
Initialize test broker wrapper.
| PARAMETER | DESCRIPTION |
|---|---|
broker
|
The real PubSubBroker to wrap.
TYPE:
|
**kwargs
|
Additional configuration (for future extensibility).
TYPE:
|
Source code in fastpubsub/testing.py
publish
async
¶
Publish a message for testing.
| PARAMETER | DESCRIPTION |
|---|---|
data
|
Message data (will be encoded).
TYPE:
|
topic
|
Topic name.
TYPE:
|
ordering_key
|
Ordering key.
TYPE:
|
attributes
|
Message attributes. |
project_id
|
Target project ID. Defaults to broker's project_id.
TYPE:
|
Source code in fastpubsub/testing.py
get_published_messages
¶
Get all published messages for inspection.
| RETURNS | DESCRIPTION |
|---|---|
list[PublishedMessage]
|
A copy of the list of published messages. |
Example
from fastpubsub import PubSubBroker, Message
from fastpubsub.testing import PubSubTestClient
broker = PubSubBroker(project_id="my-project")
@broker.subscriber(
alias="orders",
topic_name="order-events",
subscription_name="orders-sub",
)
async def process_order(msg: Message) -> None: ...
async with PubSubTestClient(broker) as client:
await client.publish(
{"id": 1},
topic="order-events",
attributes={"region": "us"},
)
await client.publish(
{"id": 2}, topic="order-events", project_id="other-project"
)
messages = client.get_published_messages()
assert len(messages) == 2
assert messages[0].topic_name == "order-events"
assert messages[0].data == b'{"id": 1}'
assert messages[0].attributes == {"region": "us"}
assert messages[0].project_id == "my-project"
assert messages[1].topic_name == "order-events"
assert messages[1].data == b'{"id": 2}'
assert messages[1].attributes == {}
assert messages[1].project_id == "other-project"
Source code in fastpubsub/testing.py
get_results
¶
Get all processing results recorded during this session.
Each result holds the message delivered to the subscriber,
the handler's return_value, and any error that was raised.
Metadata like subscriber name, topic, and project are available
through result.message.
| RETURNS | DESCRIPTION |
|---|---|
list[ProcessingResult]
|
A copy of the list of processing results. |
Example
from fastpubsub import PubSubBroker, Message
from fastpubsub.testing import PubSubTestClient
broker = PubSubBroker(project_id="my-project")
@broker.subscriber(
alias="payments",
topic_name="payment-events",
subscription_name="payments-sub",
)
async def process_payment(msg: Message) -> str:
return "accepted"
@broker.subscriber(
alias="analytics",
topic_name="payment-events",
subscription_name="analytics-sub",
project_id="analytics-project",
)
async def track_payment(msg: Message) -> None:
raise ValueError("tracking failed")
async with PubSubTestClient(broker) as client:
# Publish to the default project: Only process_payment runs
await client.publish({"amount": 100}, topic="payment-events")
# Publish to the analytics project: Only track_payment runs
await client.publish(
{"amount": 100},
topic="payment-events",
project_id="analytics-project",
)
results = client.get_results()
assert len(results) == 2
assert results[0].message.subscriber_name == "process_payment"
assert results[0].return_value == "accepted"
assert results[0].error is None
assert results[1].message.subscriber_name == "track_payment"
assert results[1].return_value is None
assert isinstance(errors[1].error, ValueError)