Skip to content

PubSubTestClient class

fastpubsub.testing.PubSubTestClient

PubSubTestClient(broker, **kwargs)

A test wrapper for PubSubBroker that enables in-memory message routing.

This allows testing subscriber handlers without needing a real PubSub emulator,making tests fast and isolated. Supports filter expression evaluation to match Google Pub/Sub behavior.

Example
broker = PubSubBroker(project_id="test")


@broker.subscriber(
    alias="orders",
    topic_name="events",
    subscription_name="orders-sub",
    filter_expression='attributes.type = "order"',
)
async def handler(msg: Message) -> None:
    logger.info(f"Processed: {msg.data}")


async with PubSubTestClient(broker) as test_client:
    # This message will be routed to handler
    await test_client.publish(
        "Hello",
        topic="events",
        attributes={"type": "order"},
    )

    # This message will NOT be routed (wrong type)
    await test_client.publish(
        "Ignored",
        topic="events",
        attributes={"type": "user"},
    )

Initialize test broker wrapper.

PARAMETER DESCRIPTION
broker

The real PubSubBroker to wrap.

TYPE: PubSubBroker

**kwargs

Additional configuration (for future extensibility).

TYPE: Any DEFAULT: {}

Source code in fastpubsub/testing.py
def __init__(self, broker: PubSubBroker, **kwargs: Any) -> None:
    """Initialize test broker wrapper.

    Args:
        broker: The real PubSubBroker to wrap.
        **kwargs: Additional configuration (for future extensibility).
    """
    self.broker = broker
    self._patchers: list[Any] = []
    self._published_messages: list[PublishedMessage] = []
    self._processing_results: list[ProcessingResult] = []
    self._mock_client: MagicMock | None = None

broker instance-attribute

broker = broker

publish async

publish(
    data,
    topic,
    ordering_key=None,
    attributes=None,
    project_id="",
)

Publish a message for testing.

PARAMETER DESCRIPTION
data

Message data (will be encoded).

TYPE: Any

topic

Topic name.

TYPE: str

ordering_key

Ordering key.

TYPE: str | None DEFAULT: None

attributes

Message attributes.

TYPE: dict[str, str] | None DEFAULT: None

project_id

Target project ID. Defaults to broker's project_id.

TYPE: str DEFAULT: ''

Source code in fastpubsub/testing.py
async def publish(
    self,
    data: Any,
    topic: str,
    ordering_key: str | None = None,
    attributes: dict[str, str] | None = None,
    project_id: str = "",
) -> None:
    """Publish a message for testing.

    Args:
        data: Message data (will be encoded).
        topic: Topic name.
        ordering_key: Ordering key.
        attributes: Message attributes.
        project_id: Target project ID. Defaults to broker's project_id.
    """
    resolved_project_id = project_id or self.broker.project_id
    encoded_data = await Publisher._serialize_message(data)
    await self._fake_publish(
        topic,
        encoded_data,
        ordering_key,
        attributes,
        project_id=resolved_project_id,
    )

get_published_messages

get_published_messages()

Get all published messages for inspection.

RETURNS DESCRIPTION
list[PublishedMessage]

A copy of the list of published messages.

Example
from fastpubsub import PubSubBroker, Message
from fastpubsub.testing import PubSubTestClient

broker = PubSubBroker(project_id="my-project")


@broker.subscriber(
    alias="orders",
    topic_name="order-events",
    subscription_name="orders-sub",
)
async def process_order(msg: Message) -> None: ...


async with PubSubTestClient(broker) as client:
    await client.publish(
        {"id": 1},
        topic="order-events",
        attributes={"region": "us"},
    )
    await client.publish(
        {"id": 2}, topic="order-events", project_id="other-project"
    )

    messages = client.get_published_messages()
    assert len(messages) == 2
    assert messages[0].topic_name == "order-events"
    assert messages[0].data == b'{"id": 1}'
    assert messages[0].attributes == {"region": "us"}
    assert messages[0].project_id == "my-project"

    assert messages[1].topic_name == "order-events"
    assert messages[1].data == b'{"id": 2}'
    assert messages[1].attributes == {}
    assert messages[1].project_id == "other-project"
Source code in fastpubsub/testing.py
def get_published_messages(self) -> list[PublishedMessage]:
    """Get all published messages for inspection.

    Returns:
        A copy of the list of published messages.

    Example:
        ```python
        from fastpubsub import PubSubBroker, Message
        from fastpubsub.testing import PubSubTestClient

        broker = PubSubBroker(project_id="my-project")


        @broker.subscriber(
            alias="orders",
            topic_name="order-events",
            subscription_name="orders-sub",
        )
        async def process_order(msg: Message) -> None: ...


        async with PubSubTestClient(broker) as client:
            await client.publish(
                {"id": 1},
                topic="order-events",
                attributes={"region": "us"},
            )
            await client.publish(
                {"id": 2}, topic="order-events", project_id="other-project"
            )

            messages = client.get_published_messages()
            assert len(messages) == 2
            assert messages[0].topic_name == "order-events"
            assert messages[0].data == b'{"id": 1}'
            assert messages[0].attributes == {"region": "us"}
            assert messages[0].project_id == "my-project"

            assert messages[1].topic_name == "order-events"
            assert messages[1].data == b'{"id": 2}'
            assert messages[1].attributes == {}
            assert messages[1].project_id == "other-project"
        ```
    """
    return self._published_messages.copy()

get_results

get_results()

Get all processing results recorded during this session.

Each result holds the message delivered to the subscriber, the handler's return_value, and any error that was raised. Metadata like subscriber name, topic, and project are available through result.message.

RETURNS DESCRIPTION
list[ProcessingResult]

A copy of the list of processing results.

Example
from fastpubsub import PubSubBroker, Message
from fastpubsub.testing import PubSubTestClient

broker = PubSubBroker(project_id="my-project")


@broker.subscriber(
    alias="payments",
    topic_name="payment-events",
    subscription_name="payments-sub",
)
async def process_payment(msg: Message) -> str:
    return "accepted"


@broker.subscriber(
    alias="analytics",
    topic_name="payment-events",
    subscription_name="analytics-sub",
    project_id="analytics-project",
)
async def track_payment(msg: Message) -> None:
    raise ValueError("tracking failed")


async with PubSubTestClient(broker) as client:
    # Publish to the default project: Only process_payment runs
    await client.publish({"amount": 100}, topic="payment-events")

    # Publish to the analytics project: Only track_payment runs
    await client.publish(
        {"amount": 100},
        topic="payment-events",
        project_id="analytics-project",
    )

    results = client.get_results()
    assert len(results) == 2

    assert results[0].message.subscriber_name == "process_payment"
    assert results[0].return_value == "accepted"
    assert results[0].error is None

    assert results[1].message.subscriber_name == "track_payment"
    assert results[1].return_value is None
    assert isinstance(errors[1].error, ValueError)
Source code in fastpubsub/testing.py
def get_results(self) -> list[ProcessingResult]:
    """Get all processing results recorded during this session.

    Each result holds the ``message`` delivered to the subscriber,
    the handler's ``return_value``, and any ``error`` that was raised.
    Metadata like subscriber name, topic, and project are available
    through ``result.message``.

    Returns:
        A copy of the list of processing results.

    Example:
        ```python
        from fastpubsub import PubSubBroker, Message
        from fastpubsub.testing import PubSubTestClient

        broker = PubSubBroker(project_id="my-project")


        @broker.subscriber(
            alias="payments",
            topic_name="payment-events",
            subscription_name="payments-sub",
        )
        async def process_payment(msg: Message) -> str:
            return "accepted"


        @broker.subscriber(
            alias="analytics",
            topic_name="payment-events",
            subscription_name="analytics-sub",
            project_id="analytics-project",
        )
        async def track_payment(msg: Message) -> None:
            raise ValueError("tracking failed")


        async with PubSubTestClient(broker) as client:
            # Publish to the default project: Only process_payment runs
            await client.publish({"amount": 100}, topic="payment-events")

            # Publish to the analytics project: Only track_payment runs
            await client.publish(
                {"amount": 100},
                topic="payment-events",
                project_id="analytics-project",
            )

            results = client.get_results()
            assert len(results) == 2

            assert results[0].message.subscriber_name == "process_payment"
            assert results[0].return_value == "accepted"
            assert results[0].error is None

            assert results[1].message.subscriber_name == "track_payment"
            assert results[1].return_value is None
            assert isinstance(errors[1].error, ValueError)
        ```
    """
    return self._processing_results.copy()

clear_published_messages

clear_published_messages()

Clear all published messages.

Source code in fastpubsub/testing.py
def clear_published_messages(self) -> None:
    """Clear all published messages."""
    self._published_messages.clear()

clear_results

clear_results()

Clear all results.

Source code in fastpubsub/testing.py
def clear_results(self) -> None:
    """Clear all results."""
    self._processing_results.clear()