Skip to content

PubSubRouter class

fastpubsub.PubSubRouter

PubSubRouter(
    prefix="", *, project_id="", routers=(), middlewares=()
)

A router for organizing publishers and subscribers.

Initializes the PubSubRouter.

PARAMETER DESCRIPTION
prefix

A prefix to apply to all subscribers and publishers in the router. If set, the subscriber alias will be: .. Also, it affects the subscription name. A subscription will be ..

TYPE: str DEFAULT: ''

project_id

An alternative project id to the broker's project id. All the publishers and subscriber created with this router will use this attribute instead of the project id set at broker-level.

TYPE: str DEFAULT: ''

routers

A sequence of children routers to include.

TYPE: Sequence[PubSubRouter] DEFAULT: ()

middlewares

A sequence of middlewares to apply to all subscribers in this router and its children.

TYPE: Sequence[Middleware] DEFAULT: ()

Source code in fastpubsub/router.py
def __init__(
    self,
    prefix: str = "",
    *,
    project_id: str = "",
    routers: Sequence["PubSubRouter"] = (),
    middlewares: Sequence[Middleware] = (),
):
    """Initializes the PubSubRouter.

    Args:
        prefix: A prefix to apply to all subscribers and publishers in the
            router. If set, the subscriber alias will be: <prefix>.<alias>.
            Also, it affects the subscription name. A subscription will be
            <prefix>.<subscription_name>.
        project_id: An alternative project id to the broker's project id.
            All the publishers and subscriber created with this router
            will use this attribute instead of the project id set at broker-level.
        routers: A sequence of children routers to include.
        middlewares: A sequence of middlewares to apply to all subscribers
            in this router and its children.
    """
    if prefix and not _PREFIX_REGEX.match(prefix):
        raise FastPubSubException(
            "Prefix must be a string that starts and ends with a letter or number, "
            "and can only contain periods, slashes, or underscores in the middle."
        )

    self.prefix = prefix
    self.project_id = project_id
    self.routers: list[PubSubRouter] = []
    self.subscribers: dict[str, Subscriber] = {}
    self.publishers: WeakSet[Publisher] = WeakSet()
    self.middlewares: MutableSequence[Middleware] = []

    if routers:
        if not isinstance(routers, Sequence):
            raise FastPubSubException("Your routers should be passed as a sequence")

        for router in routers:
            self.include_router(router)

    if middlewares:
        if not isinstance(middlewares, Sequence):
            raise FastPubSubException("Your routers should be passed as a sequence")

        for middleware, args, kwargs in middlewares:
            self.include_middleware(middleware, *args, **kwargs)

prefix instance-attribute

prefix = prefix

project_id instance-attribute

project_id = project_id

routers instance-attribute

routers = []

subscribers instance-attribute

subscribers = {}

publishers instance-attribute

publishers = WeakSet()

middlewares instance-attribute

middlewares = []

include_router

include_router(router)

Includes a child router in the current router.

PARAMETER DESCRIPTION
router

The router to include.

TYPE: PubSubRouter

Source code in fastpubsub/router.py
def include_router(self, router: "PubSubRouter") -> None:
    """Includes a child router in the current router.

    Args:
        router: The router to include.
    """
    if not (router and isinstance(router, PubSubRouter)):
        raise FastPubSubException(f"Your routers must be of type {self.__class__.__name__}")

    if self == router:
        # V2: Create a algorithm to detect cycles on these routers.
        # For now, let us assume that the router is well configured
        # and this is the only error case.
        raise FastPubSubException(f"There is a cyclical reference on router {self.prefix}.")

    router._add_prefix(self.prefix)
    router._set_project_id(self.project_id)

    for middleware, args, kwargs in self.middlewares:
        router.include_middleware(middleware, *args, **kwargs)

    self.routers.append(router)

subscriber

subscriber(
    alias,
    *,
    topic_name,
    subscription_name,
    project_id="",
    autocreate=True,
    autoupdate=False,
    filter_expression="",
    dead_letter_topic="",
    max_delivery_attempts=5,
    ack_deadline_seconds=60,
    enable_message_ordering=False,
    enable_exactly_once_delivery=False,
    min_backoff_delay_secs=10,
    max_backoff_delay_secs=600,
    max_messages=1000,
    middlewares=(),
)

Decorator to register a function as a subscriber.

PARAMETER DESCRIPTION
alias

A unique name for the subscriber. You can use this alias to select which subscription to use on the CLI.

TYPE: str

topic_name

The name of the topic to subscribe to.

TYPE: str

subscription_name

The name of the subscription attached to the topic.

TYPE: str

project_id

An alternative project id to create a subscription and consume messages from. If set the router project id will be ignored.

TYPE: str DEFAULT: ''

autocreate

Whether to automatically create the topic and subscription if they do not exists.

TYPE: bool DEFAULT: True

autoupdate

Whether to automatically update the subscription.

TYPE: bool DEFAULT: False

filter_expression

A filter expression to apply to the subscription to filter messages.

TYPE: str DEFAULT: ''

dead_letter_topic

The name of the dead-letter topic.

TYPE: str DEFAULT: ''

max_delivery_attempts

The maximum number of delivery attempts before sending the message to the dead-letter.

TYPE: int DEFAULT: 5

ack_deadline_seconds

The acknowledgment deadline in seconds.

TYPE: int DEFAULT: 60

enable_message_ordering

Whether the message must be delivered in order.

TYPE: bool DEFAULT: False

enable_exactly_once_delivery

Whether to enable exactly-once delivery.

TYPE: bool DEFAULT: False

min_backoff_delay_secs

The minimum backoff delay in seconds.

TYPE: int DEFAULT: 10

max_backoff_delay_secs

The maximum backoff delay in seconds.

TYPE: int DEFAULT: 600

max_messages

The maximum number of messages to fetch from the broker.

TYPE: int DEFAULT: 1000

middlewares

A sequence of middlewares to apply only to the subscriber.

TYPE: Sequence[Middleware] DEFAULT: ()

RETURNS DESCRIPTION
SubscribedCallable

A decorator that registers the function as a subscriber.

Source code in fastpubsub/router.py
@validate_call(config=ConfigDict(strict=True, arbitrary_types_allowed=True))
def subscriber(
    self,
    alias: str,
    *,
    topic_name: str,
    subscription_name: str,
    project_id: str = "",
    autocreate: bool = True,
    autoupdate: bool = False,
    filter_expression: str = "",
    dead_letter_topic: str = "",
    max_delivery_attempts: int = 5,
    ack_deadline_seconds: int = 60,
    enable_message_ordering: bool = False,
    enable_exactly_once_delivery: bool = False,
    min_backoff_delay_secs: int = 10,
    max_backoff_delay_secs: int = 600,
    max_messages: int = 1000,
    middlewares: Sequence[Middleware] = (),
) -> SubscribedCallable:
    """Decorator to register a function as a subscriber.

    Args:
        alias: A unique name for the subscriber. You can use this alias to
            select which subscription to use on the CLI.
        topic_name: The name of the topic to subscribe to.
        subscription_name: The name of the subscription attached to the topic.
        project_id: An alternative project id to create a subscription
            and consume messages from. If set the router project id
            will be ignored.
        autocreate: Whether to automatically create the topic and
            subscription if they do not exists.
        autoupdate: Whether to automatically update the subscription.
        filter_expression: A filter expression to apply to the
            subscription to filter messages.
        dead_letter_topic: The name of the dead-letter topic.
        max_delivery_attempts: The maximum number of delivery attempts
            before sending the message to the dead-letter.
        ack_deadline_seconds: The acknowledgment deadline in seconds.
        enable_message_ordering: Whether the message must be delivered in order.
        enable_exactly_once_delivery: Whether to enable exactly-once delivery.
        min_backoff_delay_secs: The minimum backoff delay in seconds.
        max_backoff_delay_secs: The maximum backoff delay in seconds.
        max_messages: The maximum number of messages to fetch from the broker.
        middlewares: A sequence of middlewares to apply **only to the subscriber**.

    Returns:
        A decorator that registers the function as a subscriber.
    """

    def decorator(func: AsyncDecoratedCallable) -> AsyncDecoratedCallable:
        ensure_async_callable_function(func)

        prefixed_alias = alias
        prefixed_subscription_name = subscription_name

        if self.prefix and isinstance(self.prefix, str):
            prefixed_alias = f"{self.prefix}.{prefixed_alias}"
            prefixed_subscription_name = f"{self.prefix}.{prefixed_subscription_name}"

        if prefixed_alias in self.subscribers:
            raise FastPubSubException(
                f"The alias '{prefixed_alias}' already exists."
                " The alias must be unique among all subscribers"
            )

        dead_letter_policy = None
        if dead_letter_topic:
            dead_letter_policy = DeadLetterPolicy(
                topic_name=dead_letter_topic, max_delivery_attempts=max_delivery_attempts
            )

        retry_policy = MessageRetryPolicy(
            min_backoff_delay_secs=min_backoff_delay_secs,
            max_backoff_delay_secs=max_backoff_delay_secs,
        )

        delivery_policy = MessageDeliveryPolicy(
            filter_expression=filter_expression,
            ack_deadline_seconds=ack_deadline_seconds,
            enable_message_ordering=enable_message_ordering,
            enable_exactly_once_delivery=enable_exactly_once_delivery,
        )

        lifecycle_policy = LifecyclePolicy(autocreate=autocreate, autoupdate=autoupdate)

        control_flow_policy = MessageControlFlowPolicy(
            max_messages=max_messages,
        )

        subscriber_middlewares = list(middlewares) if middlewares else []
        for middleware in self.middlewares:
            subscriber_middlewares.append(middleware)

        chosen_project_id = project_id or self.project_id
        subscriber = Subscriber(
            func=func,
            topic_name=topic_name,
            subscription_name=prefixed_subscription_name,
            retry_policy=retry_policy,
            delivery_policy=delivery_policy,
            lifecycle_policy=lifecycle_policy,
            control_flow_policy=control_flow_policy,
            dead_letter_policy=dead_letter_policy,
            middlewares=subscriber_middlewares,
            project_id=chosen_project_id,
        )
        self.subscribers[prefixed_alias.lower()] = subscriber
        return func

    return decorator

publisher

publisher(topic_name, project_id='')

Returns a publisher for the given topic.

PARAMETER DESCRIPTION
topic_name

The name of the topic.

TYPE: str

project_id

An alternative project id to publish messages. If set the router project id will be ignored.

TYPE: str DEFAULT: ''

RETURNS DESCRIPTION
Publisher

A publisher for the given topic.

Source code in fastpubsub/router.py
@validate_call(config=ConfigDict(strict=True))
def publisher(self, topic_name: str, project_id: str = "") -> Publisher:
    """Returns a publisher for the given topic.

    Args:
        topic_name: The name of the topic.
        project_id: An alternative project id to publish messages.
                    If set the router project id will be ignored.

    Returns:
        A publisher for the given topic.
    """
    chosen_project_id = project_id or self.project_id
    publisher = Publisher(
        topic_name=topic_name, project_id=chosen_project_id, middlewares=self.middlewares
    )
    self.publishers.add(publisher)
    return publisher

publish async

publish(
    topic_name,
    data,
    project_id="",
    ordering_key="",
    attributes=None,
    autocreate=True,
)

Publishes a message to the given topic.

PARAMETER DESCRIPTION
topic_name

The name of the topic.

TYPE: str

data

The message data.

TYPE: dict[str, Any] | str | bytes | BaseModel

project_id

An alternative project id to publish messages. If set the router project id will be ignored.

TYPE: str DEFAULT: ''

ordering_key

The ordering key for the message.

TYPE: str DEFAULT: ''

attributes

A dictionary of message attributes.

TYPE: dict[str, str] | None DEFAULT: None

autocreate

Whether to automatically create the topic if it does not exists.

TYPE: bool DEFAULT: True

Source code in fastpubsub/router.py
@validate_call(config=ConfigDict(strict=True))
async def publish(
    self,
    topic_name: str,
    data: dict[str, Any] | str | bytes | BaseModel,
    project_id: str = "",
    ordering_key: str = "",
    attributes: dict[str, str] | None = None,
    autocreate: bool = True,
) -> None:
    """Publishes a message to the given topic.

    Args:
        topic_name: The name of the topic.
        data: The message data.
        project_id: An alternative project id to publish messages.
                    If set the router project id will be ignored.
        ordering_key: The ordering key for the message.
        attributes: A dictionary of message attributes.
        autocreate: Whether to automatically create the topic if it does not exists.
    """
    publisher = self.publisher(topic_name=topic_name, project_id=project_id)
    await publisher.publish(
        data=data, ordering_key=ordering_key, attributes=attributes, autocreate=autocreate
    )

include_middleware

include_middleware(middleware, *args, **kwargs)

Includes a middleware in the router.

PARAMETER DESCRIPTION
middleware

The middleware to include.

TYPE: type[BaseMiddleware]

args

The positional arguments used on the middleware instantiation.

TYPE: Any DEFAULT: ()

kwargs

The keyword arguments used on the middleware instantiation.

TYPE: Any DEFAULT: {}

Source code in fastpubsub/router.py
@validate_call(config=ConfigDict(strict=True))
def include_middleware(
    self, middleware: type[BaseMiddleware], *args: Any, **kwargs: Any
) -> None:
    """Includes a middleware in the router.

    Args:
        middleware: The middleware to include.
        args: The positional arguments used on the middleware instantiation.
        kwargs: The keyword  arguments used on the middleware instantiation.
    """
    for publisher in self.publishers:
        publisher.include_middleware(middleware, *args, **kwargs)

    for subscriber in self.subscribers.values():
        subscriber.include_middleware(middleware, *args, **kwargs)

    for router in self.routers:
        router.include_middleware(middleware, *args, **kwargs)

    wrapper_middleware = Middleware(middleware, *args, **kwargs)
    if wrapper_middleware not in self.middlewares:
        self.middlewares.append(wrapper_middleware)