Skip to content

Subscriber class

fastpubsub.Subscriber

Subscriber(
    func,
    topic_name,
    subscription_name,
    retry_policy,
    lifecycle_policy,
    delivery_policy,
    control_flow_policy,
    dead_letter_policy=None,
    middlewares=(),
    project_id="",
)

A class representing a Pub/Sub subscriber.

Initializes the Subscriber.

PARAMETER DESCRIPTION
func

The function to call when a message is received.

TYPE: AsyncCallable

topic_name

The name of the topic to subscribe to.

TYPE: str

subscription_name

The name of the subscription.

TYPE: str

retry_policy

The retry policy for the subscription.

TYPE: MessageRetryPolicy

lifecycle_policy

The lifecycle policy for the subscription.

TYPE: LifecyclePolicy

delivery_policy

The delivery policy for the subscription.

TYPE: MessageDeliveryPolicy

control_flow_policy

The control flow policy for the subscription.

TYPE: MessageControlFlowPolicy

dead_letter_policy

The dead-letter policy for the subscription.

TYPE: DeadLetterPolicy | None DEFAULT: None

middlewares

A sequence of middlewares to apply.

TYPE: Sequence[Middleware] DEFAULT: ()

project_id

An alternative project id to create a subscription

TYPE: str DEFAULT: ''

Source code in fastpubsub/pubsub/subscriber.py
def __init__(
    self,
    func: AsyncCallable,
    topic_name: str,
    subscription_name: str,
    retry_policy: MessageRetryPolicy,
    lifecycle_policy: LifecyclePolicy,
    delivery_policy: MessageDeliveryPolicy,
    control_flow_policy: MessageControlFlowPolicy,
    dead_letter_policy: DeadLetterPolicy | None = None,
    middlewares: Sequence[Middleware] = (),
    project_id: str = "",
) -> None:
    """Initializes the Subscriber.

    Args:
        func: The function to call when a message is received.
        topic_name: The name of the topic to subscribe to.
        subscription_name: The name of the subscription.
        retry_policy: The retry policy for the subscription.
        lifecycle_policy: The lifecycle policy for the subscription.
        delivery_policy: The delivery policy for the subscription.
        control_flow_policy: The control flow policy for the subscription.
        dead_letter_policy: The dead-letter policy for the subscription.
        middlewares: A sequence of middlewares to apply.
        project_id: An alternative project id to create a subscription
        and consume messages from.
        If set the broker's project id will be ignored.
    """
    self.func = func
    self.project_id = project_id
    self.topic_name = topic_name
    self.subscription_name = subscription_name
    self.retry_policy = retry_policy
    self.lifecycle_policy = lifecycle_policy
    self.delivery_policy = delivery_policy
    self.dead_letter_policy = dead_letter_policy
    self.control_flow_policy = control_flow_policy
    self.middlewares: MutableSequence[Middleware] = []

    if middlewares and isinstance(middlewares, Sequence):
        for middleware, args, kwargs in middlewares:
            self.include_middleware(middleware, *args, **kwargs)

func instance-attribute

func = func

project_id instance-attribute

project_id = project_id

topic_name instance-attribute

topic_name = topic_name

subscription_name instance-attribute

subscription_name = subscription_name

retry_policy instance-attribute

retry_policy = retry_policy

lifecycle_policy instance-attribute

lifecycle_policy = lifecycle_policy

delivery_policy instance-attribute

delivery_policy = delivery_policy

dead_letter_policy instance-attribute

dead_letter_policy = dead_letter_policy

control_flow_policy instance-attribute

control_flow_policy = control_flow_policy

middlewares instance-attribute

middlewares = []

name property

name

The name of the subscriber.

include_middleware

include_middleware(middleware, *args, **kwargs)

Includes a middleware in the subscriber.

PARAMETER DESCRIPTION
middleware

The middleware to include.

TYPE: type[BaseMiddleware]

args

The positional arguments used on the middleware instantiation.

TYPE: Any DEFAULT: ()

kwargs

The keyword arguments used on the middleware instantiation.

TYPE: Any DEFAULT: {}

Source code in fastpubsub/pubsub/subscriber.py
@validate_call(config=ConfigDict(strict=True))
def include_middleware(
    self, middleware: type[BaseMiddleware], *args: Any, **kwargs: Any
) -> None:
    """Includes a middleware in the subscriber.

    Args:
        middleware: The middleware to include.
        args: The positional arguments used on the middleware instantiation.
        kwargs: The keyword  arguments used on the middleware instantiation.
    """
    ensure_async_middleware(middleware)

    wrapper_middleware = Middleware(middleware, *args, **kwargs)
    if wrapper_middleware in self.middlewares:
        return

    self.middlewares.append(wrapper_middleware)