Skip to content

PubSubBroker class

fastpubsub.PubSubBroker

PubSubBroker(
    project_id,
    shutdown_timeout=30.0,
    routers=(),
    middlewares=(),
)

Manages the connection with PubSub Broker.

Initializes the PubSubBroker.

PARAMETER DESCRIPTION
project_id

The Google Cloud project ID.

TYPE: str

shutdown_timeout

Maximum time to wait for in-flight messages per subscription during shutdown (seconds). Defaults to 30 seconds.

TYPE: float DEFAULT: 30.0

routers

A sequence of routers to include.

TYPE: Sequence[PubSubRouter] DEFAULT: ()

middlewares

A sequence of middlewares to apply to all messages incoming to subscribers and publishers.

TYPE: Sequence[Middleware] DEFAULT: ()

Source code in fastpubsub/broker.py
def __init__(
    self,
    project_id: str,
    shutdown_timeout: float = 30.0,
    routers: Sequence[PubSubRouter] = (),
    middlewares: Sequence[Middleware] = (),
):
    """Initializes the PubSubBroker.

    Args:
        project_id: The Google Cloud project ID.
        shutdown_timeout: Maximum time to wait for in-flight messages per
            subscription during shutdown (seconds). Defaults to 30 seconds.
        routers: A sequence of routers to include.
        middlewares: A sequence of middlewares to apply to all messages
            incoming to subscribers and publishers.
    """
    if not (project_id and isinstance(project_id, str) and len(project_id.strip()) > 0):
        raise FastPubSubException(f"The project id value ({project_id}) is invalid.")

    self.project_id = project_id
    self.shutdown_timeout = shutdown_timeout
    self.router = PubSubRouter(routers=routers, project_id=project_id, middlewares=middlewares)
    self.task_manager = AsyncTaskManager()

project_id instance-attribute

project_id = project_id

shutdown_timeout instance-attribute

shutdown_timeout = shutdown_timeout

router instance-attribute

router = PubSubRouter(
    routers=routers,
    project_id=project_id,
    middlewares=middlewares,
)

task_manager instance-attribute

task_manager = AsyncTaskManager()

subscriber

subscriber(
    alias,
    *,
    topic_name,
    subscription_name,
    project_id="",
    autocreate=True,
    autoupdate=False,
    filter_expression="",
    dead_letter_topic="",
    max_delivery_attempts=5,
    ack_deadline_seconds=60,
    enable_message_ordering=False,
    enable_exactly_once_delivery=False,
    min_backoff_delay_secs=10,
    max_backoff_delay_secs=600,
    max_messages=1000,
    middlewares=(),
)

Decorator to register a function as a subscriber.

PARAMETER DESCRIPTION
alias

A unique name for the subscriber. You can use this alias to select which subscription to use on the CLI.

TYPE: str

topic_name

The name of the topic to subscribe to.

TYPE: str

subscription_name

The name of the subscription attached to the topic.

TYPE: str

project_id

An alternative project id to create a subscription and consume messages from. If set the broker project id will be ignored.

TYPE: str DEFAULT: ''

autocreate

Whether to automatically create the topic and subscription if they do not exists.

TYPE: bool DEFAULT: True

autoupdate

Whether to automatically update the subscription.

TYPE: bool DEFAULT: False

filter_expression

A filter expression to apply to the subscription to filter messages.

TYPE: str DEFAULT: ''

dead_letter_topic

The name of the dead-letter topic.

TYPE: str DEFAULT: ''

max_delivery_attempts

The maximum number of delivery attempts before sending the message to the dead-letter.

TYPE: int DEFAULT: 5

ack_deadline_seconds

The acknowledgment deadline in seconds.

TYPE: int DEFAULT: 60

enable_message_ordering

Whether the message must be delivered in order.

TYPE: bool DEFAULT: False

enable_exactly_once_delivery

Whether to enable exactly-once delivery.

TYPE: bool DEFAULT: False

min_backoff_delay_secs

The minimum backoff delay in seconds.

TYPE: int DEFAULT: 10

max_backoff_delay_secs

The maximum backoff delay in seconds.

TYPE: int DEFAULT: 600

max_messages

The maximum number of messages to fetch from the broker.

TYPE: int DEFAULT: 1000

middlewares

A sequence of middlewares to apply only to the subscriber.

TYPE: Sequence[Middleware] DEFAULT: ()

RETURNS DESCRIPTION
SubscribedCallable

A decorator that registers the function as a subscriber.

Source code in fastpubsub/broker.py
@validate_call(config=ConfigDict(strict=True, arbitrary_types_allowed=True))
def subscriber(
    self,
    alias: str,
    *,
    topic_name: str,
    subscription_name: str,
    project_id: str = "",
    autocreate: bool = True,
    autoupdate: bool = False,
    filter_expression: str = "",
    dead_letter_topic: str = "",
    max_delivery_attempts: int = 5,
    ack_deadline_seconds: int = 60,
    enable_message_ordering: bool = False,
    enable_exactly_once_delivery: bool = False,
    min_backoff_delay_secs: int = 10,
    max_backoff_delay_secs: int = 600,
    max_messages: int = 1000,
    middlewares: Sequence[Middleware] = (),
) -> SubscribedCallable:
    """Decorator to register a function as a subscriber.

    Args:
        alias: A unique name for the subscriber. You can use this alias to
            select which subscription to use on the CLI.
        topic_name: The name of the topic to subscribe to.
        subscription_name: The name of the subscription attached to the topic.
        project_id: An alternative project id to create a subscription
            and consume messages from. If set the broker project id
            will be ignored.
        autocreate: Whether to automatically create the topic and
            subscription if they do not exists.
        autoupdate: Whether to automatically update the subscription.
        filter_expression: A filter expression to apply to the
            subscription to filter messages.
        dead_letter_topic: The name of the dead-letter topic.
        max_delivery_attempts: The maximum number of delivery attempts
            before sending the message to the dead-letter.
        ack_deadline_seconds: The acknowledgment deadline in seconds.
        enable_message_ordering: Whether the message must be delivered in order.
        enable_exactly_once_delivery: Whether to enable exactly-once delivery.
        min_backoff_delay_secs: The minimum backoff delay in seconds.
        max_backoff_delay_secs: The maximum backoff delay in seconds.
        max_messages: The maximum number of messages to fetch from the broker.
        middlewares: A sequence of middlewares to apply **only to the subscriber**.

    Returns:
        A decorator that registers the function as a subscriber.
    """
    return self.router.subscriber(
        alias=alias,
        topic_name=topic_name,
        subscription_name=subscription_name,
        project_id=project_id,
        autocreate=autocreate,
        autoupdate=autoupdate,
        filter_expression=filter_expression,
        dead_letter_topic=dead_letter_topic,
        max_delivery_attempts=max_delivery_attempts,
        ack_deadline_seconds=ack_deadline_seconds,
        enable_message_ordering=enable_message_ordering,
        enable_exactly_once_delivery=enable_exactly_once_delivery,
        min_backoff_delay_secs=min_backoff_delay_secs,
        max_backoff_delay_secs=max_backoff_delay_secs,
        max_messages=max_messages,
        middlewares=middlewares,
    )

publisher

publisher(topic_name, project_id='')

Returns a publisher for the given topic.

PARAMETER DESCRIPTION
topic_name

The name of the topic.

TYPE: str

project_id

An alternative project id to publish messages. If set the broker's project id will be ignored.

TYPE: str DEFAULT: ''

RETURNS DESCRIPTION
Publisher

A publisher for the given topic.

Source code in fastpubsub/broker.py
@validate_call(config=ConfigDict(strict=True))
def publisher(self, topic_name: str, project_id: str = "") -> Publisher:
    """Returns a publisher for the given topic.

    Args:
        topic_name: The name of the topic.
        project_id: An alternative project id to publish messages.
            If set the broker's project id will be ignored.

    Returns:
        A publisher for the given topic.
    """
    return self.router.publisher(topic_name=topic_name, project_id=project_id)

publish async

publish(
    topic_name,
    data,
    project_id="",
    ordering_key="",
    attributes=None,
    autocreate=True,
)

Publishes a message to the given topic.

PARAMETER DESCRIPTION
topic_name

The name of the topic.

TYPE: str

data

The message data.

TYPE: dict[str, Any] | str | bytes | BaseModel

project_id

An alternative project id to publish messages. If set the broker's project id will be ignored.

TYPE: str DEFAULT: ''

ordering_key

The ordering key for the message.

TYPE: str DEFAULT: ''

attributes

A dictionary of message attributes.

TYPE: dict[str, str] | None DEFAULT: None

autocreate

Whether to automatically create the topic if it does not exists.

TYPE: bool DEFAULT: True

Source code in fastpubsub/broker.py
@validate_call(config=ConfigDict(strict=True))
async def publish(
    self,
    topic_name: str,
    data: dict[str, Any] | str | bytes | BaseModel,
    project_id: str = "",
    ordering_key: str = "",
    attributes: dict[str, str] | None = None,
    autocreate: bool = True,
) -> None:
    """Publishes a message to the given topic.

    Args:
        topic_name: The name of the topic.
        data: The message data.
        project_id: An alternative project id to publish messages.
                    If set the broker's project id will be ignored.
        ordering_key: The ordering key for the message.
        attributes: A dictionary of message attributes.
        autocreate: Whether to automatically create the topic if it does not exists.
    """
    return await self.router.publish(
        topic_name=topic_name,
        data=data,
        project_id=project_id,
        ordering_key=ordering_key,
        attributes=attributes,
        autocreate=autocreate,
    )

include_router

include_router(router)

Includes a router in the broker.

PARAMETER DESCRIPTION
router

The router to include.

TYPE: PubSubRouter

Source code in fastpubsub/broker.py
def include_router(self, router: PubSubRouter) -> None:
    """Includes a router in the broker.

    Args:
        router: The router to include.
    """
    return self.router.include_router(router)

include_middleware

include_middleware(middleware, *args, **kwargs)

Includes a middleware in the broker.

PARAMETER DESCRIPTION
middleware

The middleware to include.

TYPE: type[BaseMiddleware]

args

The positional arguments used on the middleware instantiation.

TYPE: Any DEFAULT: ()

kwargs

The keyword arguments used on the middleware instantiation.

TYPE: Any DEFAULT: {}

Source code in fastpubsub/broker.py
@validate_call(config=ConfigDict(strict=True))
def include_middleware(
    self, middleware: type[BaseMiddleware], *args: Any, **kwargs: Any
) -> None:
    """Includes a middleware in the broker.

    Args:
        middleware: The middleware to include.
        args: The positional arguments used on the middleware instantiation.
        kwargs: The keyword  arguments used on the middleware instantiation.
    """
    return self.router.include_middleware(middleware, *args, **kwargs)

start async

start()

Starts the broker.

Source code in fastpubsub/broker.py
async def start(self) -> None:
    """Starts the broker."""
    subscribers = self._filter_subscribers()
    if not subscribers:
        logger.error("No subscriber found for running.")
        raise FastPubSubException(
            "You must select the subscribers using --subscribers flag or run them all."
        )

    for subscriber in subscribers:
        subscription_builder = PubSubSubscriptionBuilder(project_id=subscriber.project_id)
        await subscription_builder.build(subscriber)
        self.task_manager.create_task(subscriber)

    await self.task_manager.start()

alive

alive()

Checks if the message consumer tasks are alive.

RETURNS DESCRIPTION
bool

True if they are alive, False otherwise.

Source code in fastpubsub/broker.py
def alive(self) -> bool:
    """Checks if the message consumer tasks are alive.

    Returns:
        True if they are alive, False otherwise.
    """
    subscribers = self.task_manager.alive()
    if not subscribers:
        logger.info("The subscribers are not active. May be they are deactivated?")
        return False

    for name, liveness in subscribers.items():
        if not liveness:
            logger.error(f"The {name} subscriber handler is not alive")
            return False

    return True

ready

ready()

Checks if the message consumer tasks are ready.

RETURNS DESCRIPTION
bool

True if they are ready, False otherwise.

Source code in fastpubsub/broker.py
def ready(self) -> bool:
    """Checks if the message consumer tasks are ready.

    Returns:
        True if they are ready, False otherwise.
    """
    subscribers = self.task_manager.ready()
    if not subscribers:
        logger.info("The subscribers are not active. May be they are deactivated?")
        return False

    for name, readiness in subscribers.items():
        if not readiness:
            logger.error(f"The {name} subscriber handler is not ready")
            return False

    return True

shutdown async

shutdown()

Gracefully shuts down the broker using the configured timeout.

Source code in fastpubsub/broker.py
async def shutdown(self) -> None:
    """Gracefully shuts down the broker using the configured timeout."""
    await self.task_manager.shutdown(timeout=self.shutdown_timeout)