Skip to content

PubSubRouter class

fastpubsub.PubSubRouter

PubSubRouter(
    prefix="", *, project_id="", routers=(), middlewares=()
)

A router for organizing publishers and subscribers.

Initializes the PubSubRouter.

PARAMETER DESCRIPTION
prefix

A prefix to apply to all subscribers and publishers in the router. If set, the subscriber alias will be: {prefix}.{alias}. Also, it affects the subscription name. A subscription will be {prefix}.{subscription_name}.

TYPE: str DEFAULT: ''

project_id

An alternative project id to the router's project id. All the publishers and subscriber created with this router will use this attribute instead of the project id set at router-level.

TYPE: str DEFAULT: ''

routers

A sequence of children routers to include.

TYPE: Sequence[PubSubRouter] DEFAULT: ()

middlewares

A sequence of middlewares to apply to all subscribers in this router and its children.

TYPE: Sequence[Middleware] DEFAULT: ()

Source code in fastpubsub/router.py
def __init__(
    self,
    prefix: str = "",
    *,
    project_id: str = "",
    routers: Sequence["PubSubRouter"] = (),
    middlewares: Sequence[Middleware] = (),
):
    """Initializes the PubSubRouter.

    Args:
        prefix: A prefix to apply to all subscribers and publishers in the
            router. If set, the subscriber alias will be: {prefix}.{alias}.
            Also, it affects the subscription name. A subscription will be
            {prefix}.{subscription_name}.
        project_id: An alternative project id to the router's project id.
            All the publishers and subscriber created with this router
            will use this attribute instead of the project id set at
            router-level.
        routers: A sequence of children routers to include.
        middlewares: A sequence of middlewares to apply to all subscribers
            in this router and its children.
    """
    if prefix and not _PREFIX_REGEX.match(prefix):
        raise FastPubSubException(
            "Prefix must be a string that starts and "
            "ends with a letter or number, "
            "and can only contain periods, "
            "slashes, or underscores in the middle."
        )

    self.prefix = prefix
    self.project_id = project_id
    self.routers: list[PubSubRouter] = []
    self.subscribers: dict[str, Subscriber] = {}
    self.publishers: WeakSet[Publisher] = WeakSet()
    self.middlewares: MutableSequence[Middleware] = []

    if routers:
        if not isinstance(routers, Sequence):
            raise FastPubSubException(
                "Your routers should be passed as a sequence"
            )

        for router in routers:
            self.include_router(router)

    if middlewares:
        if not isinstance(middlewares, Sequence):
            raise FastPubSubException(
                "Your routers should be passed as a sequence"
            )

        for middleware, args, kwargs in middlewares:
            self.include_middleware(middleware, *args, **kwargs)

prefix instance-attribute

prefix = prefix

project_id instance-attribute

project_id = project_id

routers instance-attribute

routers = []

subscribers instance-attribute

subscribers = {}

publishers instance-attribute

publishers = WeakSet()

middlewares instance-attribute

middlewares = []

include_router

include_router(router)

Includes a child router in the current router.

PARAMETER DESCRIPTION
router

The router to include.

TYPE: PubSubRouter

Source code in fastpubsub/router.py
def include_router(self, router: "PubSubRouter") -> None:
    """Includes a child router in the current router.

    Args:
        router: The router to include.
    """
    if not (router and isinstance(router, PubSubRouter)):
        raise FastPubSubException(
            f"Your routers must be of type {self.__class__.__name__}"
        )

    if self == router:
        # V2: Create a algorithm to detect cycles on these routers.
        # For now, let us assume that the router is well configured
        # and this is the only error case.
        raise FastPubSubException(
            f"There is a cyclical reference on router {self.prefix}."
        )

    router._add_prefix(self.prefix)
    router._set_project_id(self.project_id)

    for middleware, args, kwargs in self.middlewares:
        router.include_middleware(middleware, *args, **kwargs)

    self.routers.append(router)

subscriber

subscriber(
    alias,
    *,
    topic_name,
    subscription_name,
    project_id="",
    autocreate=True,
    autoupdate=False,
    filter_expression="",
    dead_letter_topic="",
    max_delivery_attempts=5,
    ack_deadline_seconds=60,
    enable_message_ordering=False,
    enable_exactly_once_delivery=False,
    min_backoff_delay_secs=10,
    max_backoff_delay_secs=600,
    max_messages=1000,
    middlewares=(),
)

Decorator to register a function as a subscriber.

PARAMETER DESCRIPTION
alias

A unique name for the subscriber. You can use this alias to select which subscription to use on the CLI.

TYPE: str

topic_name

The name of the topic to subscribe to.

TYPE: str

subscription_name

The name of the subscription for the topic.

TYPE: str

project_id

An alternative project id to create a subscription and consume messages from. If set the router project id will be ignored.

TYPE: str DEFAULT: ''

autocreate

Automatically creates the topic and subscription if they do not exists.

TYPE: bool DEFAULT: True

autoupdate

Automatically updates the subscription.

TYPE: bool DEFAULT: False

filter_expression

A filter expression to apply to the subscription to filter messages.

TYPE: str DEFAULT: ''

dead_letter_topic

The name of the dead-letter topic.

TYPE: str DEFAULT: ''

max_delivery_attempts

The maximum number of delivery attempts before sending the message to the dead-letter.

TYPE: int DEFAULT: 5

ack_deadline_seconds

The acknowledgment deadline in seconds.

TYPE: int DEFAULT: 60

enable_message_ordering

Enables message ordering. It can only be set when creating the subscription.

TYPE: bool DEFAULT: False

enable_exactly_once_delivery

Enables exactly-once delivery. It can only be set when creating the subscription.

TYPE: bool DEFAULT: False

min_backoff_delay_secs

The minimum backoff delay in seconds.

TYPE: int DEFAULT: 10

max_backoff_delay_secs

The maximum backoff delay in seconds.

TYPE: int DEFAULT: 600

max_messages

The maximum number of messages to fetch from the broker at a time.

TYPE: int DEFAULT: 1000

middlewares

A sequence of middlewares to apply to the subscriber. It is subscriber specific, not application-wide.

TYPE: Sequence[Middleware] DEFAULT: ()

RETURNS DESCRIPTION
SubscribedCallable

A decorator that registers the function as a subscriber.

Source code in fastpubsub/router.py
@validate_call(
    config=ConfigDict(strict=True, arbitrary_types_allowed=True)
)
def subscriber(
    self,
    alias: str,
    *,
    topic_name: str,
    subscription_name: str,
    project_id: str = "",
    autocreate: bool = True,
    autoupdate: bool = False,
    filter_expression: str = "",
    dead_letter_topic: str = "",
    max_delivery_attempts: int = 5,
    ack_deadline_seconds: int = 60,
    enable_message_ordering: bool = False,
    enable_exactly_once_delivery: bool = False,
    min_backoff_delay_secs: int = 10,
    max_backoff_delay_secs: int = 600,
    max_messages: int = 1000,
    middlewares: Sequence[Middleware] = (),
) -> SubscribedCallable:
    """Decorator to register a function as a subscriber.

    Args:
        alias: A unique name for the subscriber. You can use this alias to
            select which subscription to use on the CLI.
        topic_name: The name of the topic to subscribe to.
        subscription_name: The name of the subscription for the topic.
        project_id: An alternative project id to create a subscription
            and consume messages from. If set the router project id
            will be ignored.
        autocreate: Automatically creates the topic and
            subscription if they do not exists.
        autoupdate: Automatically updates the subscription.
        filter_expression: A filter expression to apply to the
            subscription to filter messages.
        dead_letter_topic: The name of the dead-letter topic.
        max_delivery_attempts: The maximum number of delivery attempts
            before sending the message to the dead-letter.
        ack_deadline_seconds: The acknowledgment deadline in seconds.
        enable_message_ordering: Enables message ordering.
            It can only be set when creating the subscription.
        enable_exactly_once_delivery: Enables exactly-once delivery.
            It can only be set when creating the subscription.
        min_backoff_delay_secs: The minimum backoff delay in seconds.
        max_backoff_delay_secs: The maximum backoff delay in seconds.
        max_messages: The maximum number of messages to fetch from
            the broker at a time.
        middlewares: A sequence of middlewares to apply to the subscriber.
            It is subscriber specific, not application-wide.

    Returns:
        A decorator that registers the function as a subscriber.
    """

    def decorator(func: AsyncDecoratedCallable) -> AsyncDecoratedCallable:
        ensure_async_callable_function(func)

        prefixed_alias = alias
        prefixed_subscription_name = subscription_name

        if self.prefix and isinstance(self.prefix, str):
            prefixed_alias = f"{self.prefix}.{prefixed_alias}"
            prefixed_subscription_name = (
                f"{self.prefix}.{prefixed_subscription_name}"
            )

        if prefixed_alias in self.subscribers:
            raise FastPubSubException(
                f"The alias '{prefixed_alias}' already exists."
                " The alias must be unique among all subscribers"
            )

        dead_letter_policy = None
        if dead_letter_topic:
            dead_letter_policy = DeadLetterPolicy(
                topic_name=dead_letter_topic,
                max_delivery_attempts=max_delivery_attempts,
            )

        retry_policy = MessageRetryPolicy(
            min_backoff_delay_secs=min_backoff_delay_secs,
            max_backoff_delay_secs=max_backoff_delay_secs,
        )

        delivery_policy = MessageDeliveryPolicy(
            filter_expression=filter_expression,
            ack_deadline_seconds=ack_deadline_seconds,
            enable_message_ordering=enable_message_ordering,
            enable_exactly_once_delivery=enable_exactly_once_delivery,
        )

        lifecycle_policy = LifecyclePolicy(
            autocreate=autocreate, autoupdate=autoupdate
        )

        control_flow_policy = MessageControlFlowPolicy(
            max_messages=max_messages,
        )

        subscriber_middlewares = list(middlewares) if middlewares else []
        for middleware in self.middlewares:
            subscriber_middlewares.append(middleware)

        chosen_project_id = project_id or self.project_id
        subscriber = Subscriber(
            func=func,
            topic_name=topic_name,
            subscription_name=prefixed_subscription_name,
            retry_policy=retry_policy,
            delivery_policy=delivery_policy,
            lifecycle_policy=lifecycle_policy,
            control_flow_policy=control_flow_policy,
            dead_letter_policy=dead_letter_policy,
            middlewares=subscriber_middlewares,
            project_id=chosen_project_id,
        )
        self.subscribers[prefixed_alias.lower()] = subscriber
        return func

    return decorator

publisher

publisher(topic_name, project_id='')

Returns a publisher for the given topic.

PARAMETER DESCRIPTION
topic_name

The name of the topic.

TYPE: str

project_id

An alternative project id to publish messages. If set the router project id will be ignored.

TYPE: str DEFAULT: ''

RETURNS DESCRIPTION
Publisher

A publisher for the given topic.

Source code in fastpubsub/router.py
@validate_call(config=ConfigDict(strict=True))
def publisher(self, topic_name: str, project_id: str = "") -> Publisher:
    """Returns a publisher for the given topic.

    Args:
        topic_name: The name of the topic.
        project_id: An alternative project id to publish messages.
                    If set the router project id will be ignored.

    Returns:
        A publisher for the given topic.
    """
    chosen_project_id = project_id or self.project_id
    publisher = Publisher(
        topic_name=topic_name,
        project_id=chosen_project_id,
        middlewares=self.middlewares,
    )
    self.publishers.add(publisher)
    return publisher

publish async

publish(
    topic_name,
    data,
    project_id="",
    ordering_key="",
    attributes=None,
    autocreate=True,
)

Publishes a message to the given topic.

PARAMETER DESCRIPTION
topic_name

The name of the topic.

TYPE: str

data

The message data.

TYPE: dict[str, Any] | str | bytes | BaseModel

project_id

An alternative project id to publish messages. If set the router project id will be ignored.

TYPE: str DEFAULT: ''

ordering_key

The ordering key for the message.

TYPE: str DEFAULT: ''

attributes

A dictionary of message attributes.

TYPE: dict[str, str] | None DEFAULT: None

autocreate

Automatically creates the topic if it does not exists.

TYPE: bool DEFAULT: True

Source code in fastpubsub/router.py
@validate_call(config=ConfigDict(strict=True))
async def publish(
    self,
    topic_name: str,
    data: dict[str, Any] | str | bytes | BaseModel,
    project_id: str = "",
    ordering_key: str = "",
    attributes: dict[str, str] | None = None,
    autocreate: bool = True,
) -> None:
    """Publishes a message to the given topic.

    Args:
        topic_name: The name of the topic.
        data: The message data.
        project_id: An alternative project id to publish messages.
                    If set the router project id will be ignored.
        ordering_key: The ordering key for the message.
        attributes: A dictionary of message attributes.
        autocreate: Automatically creates the topic if it does not exists.
    """
    publisher = self.publisher(
        topic_name=topic_name, project_id=project_id
    )
    await publisher.publish(
        data=data,
        ordering_key=ordering_key,
        attributes=attributes,
        autocreate=autocreate,
    )

include_middleware

include_middleware(middleware, *args, **kwargs)

Includes a middleware in the router.

PARAMETER DESCRIPTION
middleware

The middleware to include.

TYPE: type[BaseMiddleware]

args

The positional arguments used on middleware instantiation.

TYPE: Any DEFAULT: ()

kwargs

The keyword arguments used on middleware instantiation.

TYPE: Any DEFAULT: {}

Source code in fastpubsub/router.py
@validate_call(config=ConfigDict(strict=True))
def include_middleware(
    self, middleware: type[BaseMiddleware], *args: Any, **kwargs: Any
) -> None:
    """Includes a middleware in the router.

    Args:
        middleware: The middleware to include.
        args: The positional arguments used on middleware instantiation.
        kwargs: The keyword  arguments used on middleware instantiation.
    """
    for publisher in self.publishers:
        publisher.include_middleware(middleware, *args, **kwargs)

    for subscriber in self.subscribers.values():
        subscriber.include_middleware(middleware, *args, **kwargs)

    for router in self.routers:
        router.include_middleware(middleware, *args, **kwargs)

    wrapper_middleware = Middleware(middleware, *args, **kwargs)
    if wrapper_middleware not in self.middlewares:
        self.middlewares.append(wrapper_middleware)