Skip to content

PubSubClient class

fastpubsub.clients.pubsub.PubSubClient

PubSubClient(project_id)

A client for interacting with Google Cloud Pub/Sub.

Initializes the PubSubClient.

PARAMETER DESCRIPTION
project_id

The Google Cloud project ID.

TYPE: str

Source code in fastpubsub/clients/pubsub.py
def __init__(self, project_id: str) -> None:
    """Initializes the PubSubClient.

    Args:
        project_id: The Google Cloud project ID.
    """
    self.project_id = project_id
    self.is_emulator = True if os.getenv("PUBSUB_EMULATOR_HOST") else False

project_id instance-attribute

project_id = project_id

is_emulator instance-attribute

is_emulator = (
    True if getenv("PUBSUB_EMULATOR_HOST") else False
)

create_subscription async

create_subscription(
    topic_name,
    subscription_name,
    retry_policy,
    delivery_policy,
    dead_letter_policy=None,
)

Creates a subscription.

PARAMETER DESCRIPTION
topic_name

The name of the topic.

TYPE: str

subscription_name

The name of the subscription.

TYPE: str

retry_policy

The retry policy for the subscription.

TYPE: MessageRetryPolicy

delivery_policy

The delivery policy for the subscription.

TYPE: MessageDeliveryPolicy

dead_letter_policy

The dead-letter policy for the subscription.

TYPE: DeadLetterPolicy | None DEFAULT: None

Source code in fastpubsub/clients/pubsub.py
async def create_subscription(
    self,
    topic_name: str,
    subscription_name: str,
    retry_policy: MessageRetryPolicy,
    delivery_policy: MessageDeliveryPolicy,
    dead_letter_policy: DeadLetterPolicy | None = None,
) -> None:
    """Creates a subscription.

    Args:
        topic_name: The name of the topic.
        subscription_name: The name of the subscription.
        retry_policy: The retry policy for the subscription.
        delivery_policy: The delivery policy for the subscription.
        dead_letter_policy: The dead-letter policy for the subscription.
    """
    subscription_request = await self._create_subscription_request(
        topic_name=topic_name,
        subscription_name=subscription_name,
        retry_policy=retry_policy,
        delivery_policy=delivery_policy,
        dead_letter_policy=dead_letter_policy,
    )

    subscriber_client = await PubSubClientFactory.get_subscriber(
        self.project_id
    )
    with suppress(AlreadyExists):
        logger.debug(
            "Attempting to create subscription: "
            f"{subscription_request.name}"
        )
        await apply_async(
            subscriber_client.create_subscription,
            request=subscription_request,
            timeout=DEFAULT_PUBSUB_TIMEOUT,
        )

        logger.debug(
            "Successfully created subscription: "
            f"{subscription_request.name}"
        )

update_subscription async

update_subscription(
    topic_name,
    subscription_name,
    retry_policy,
    delivery_policy,
    dead_letter_policy=None,
)

Updates a subscription.

PARAMETER DESCRIPTION
topic_name

The name of the topic.

TYPE: str

subscription_name

The name of the subscription.

TYPE: str

retry_policy

The retry policy for the subscription.

TYPE: MessageRetryPolicy

delivery_policy

The delivery policy for the subscription.

TYPE: MessageDeliveryPolicy

dead_letter_policy

The dead-letter policy for the subscription.

TYPE: DeadLetterPolicy | None DEFAULT: None

Source code in fastpubsub/clients/pubsub.py
async def update_subscription(
    self,
    topic_name: str,
    subscription_name: str,
    retry_policy: MessageRetryPolicy,
    delivery_policy: MessageDeliveryPolicy,
    dead_letter_policy: DeadLetterPolicy | None = None,
) -> None:
    """Updates a subscription.

    Args:
        topic_name: The name of the topic.
        subscription_name: The name of the subscription.
        retry_policy: The retry policy for the subscription.
        delivery_policy: The delivery policy for the subscription.
        dead_letter_policy: The dead-letter policy for the subscription.
    """
    subscription_request = await self._create_subscription_request(
        topic_name=topic_name,
        subscription_name=subscription_name,
        retry_policy=retry_policy,
        delivery_policy=delivery_policy,
        dead_letter_policy=dead_letter_policy,
    )

    update_fields = [
        "ack_deadline_seconds",
        "dead_letter_policy",
        "retry_policy",
        "enable_exactly_once_delivery",
    ]

    if not self.is_emulator:
        update_fields.append("filter")

    update_mask = FieldMask(paths=update_fields)

    try:
        subscriber_client = await PubSubClientFactory.get_subscriber(
            self.project_id
        )
        logger.debug(
            "Attempting to update the subscription: "
            f"{subscription_request.name}"
        )
        response = await apply_async(
            subscriber_client.update_subscription,
            subscription=subscription_request,
            update_mask=update_mask,
            timeout=DEFAULT_PUBSUB_TIMEOUT,
        )

        logger.debug(
            "Successfully updated "
            f"the subscription: {subscription_request.name}"
        )
        logger.debug(
            "The subscription is now "
            f"following the configuration: {response}"
        )
    except NotFound as e:
        raise FastPubSubException(
            "We could not update the subscription configuration. "
            f"The topic {subscription_request.topic} or "
            f"subscription {subscription_request.name} were not found. "
            "They may be deleted or not autocreated. "
            "Please, setup your @subscriber with the 'autocreate=True' "
            "option to automatically create them."
        ) from e

create_topic async

create_topic(topic_name, create_default_subscription=True)

Creates a topic.

PARAMETER DESCRIPTION
topic_name

The name of the topic.

TYPE: str

create_default_subscription

Whether to create a default subscription for the topic.

TYPE: bool DEFAULT: True

Source code in fastpubsub/clients/pubsub.py
async def create_topic(
    self, topic_name: str, create_default_subscription: bool = True
) -> None:
    """Creates a topic.

    Args:
        topic_name: The name of the topic.
        create_default_subscription: Whether to create a default
            subscription for the topic.
    """
    subscriber_client = await PubSubClientFactory.get_subscriber(
        self.project_id
    )
    publisher_client = await PubSubClientFactory.get_publisher(
        self.project_id
    )

    with suppress(AlreadyExists):
        logger.debug(f"Creating topic '{topic_name}'.")
        topic_path = publisher_client.topic_path(
            self.project_id, topic_name
        )

        topic = await apply_async(
            publisher_client.create_topic, name=topic_path
        )
        logger.debug(f"Created topic '{topic.name}' successfully.")

        if not create_default_subscription:
            return

        logger.debug(f"Creating default subscription for '{topic_path}'.")
        default_subscription_path = subscriber_client.subscription_path(
            self.project_id, topic_name
        )
        subscription = await apply_async(
            subscriber_client.create_subscription,
            name=default_subscription_path,
            topic=topic_path,
            timeout=DEFAULT_PULL_TIMEOUT,
        )

        logger.debug(
            "Creating default subscription created successfully for "
            f"'{topic_path}' as {subscription.name}."
        )

publish async

publish(topic_name, *, data, ordering_key, attributes)

Publishes a message.

PARAMETER DESCRIPTION
topic_name

The name of the topic.

TYPE: str

data

The message data.

TYPE: bytes

ordering_key

The ordering key for the message.

TYPE: str

attributes

A dictionary of message attributes.

TYPE: dict[str, str] | None

Source code in fastpubsub/clients/pubsub.py
async def publish(
    self,
    topic_name: str,
    *,
    data: bytes,
    ordering_key: str,
    attributes: dict[str, str] | None,
) -> None:
    """Publishes a message.

    Args:
        topic_name: The name of the topic.
        data: The message data.
        ordering_key: The ordering key for the message.
        attributes: A dictionary of message attributes.
    """
    publisher_client = await PubSubClientFactory.get_publisher(
        self.project_id, bool(ordering_key)
    )
    topic_path = publisher_client.topic_path(self.project_id, topic_name)
    new_attributes = {} if attributes is None else attributes

    try:
        response: Future[str] = publisher_client.publish(
            topic=topic_path,
            data=data,
            ordering_key=ordering_key,
            timeout=DEFAULT_PUSH_TIMEOUT,
            **new_attributes,
        )

        message_id = await apply_async(response.result)
        logger.info(
            "Message published for "
            f"topic {topic_path} with "
            f"id {message_id}"
        )
        logger.debug(f"We sent {data!r} with metadata {attributes}")
    except Exception:
        logger.exception("Publisher failure", stacklevel=5)
        raise

subscribe async

subscribe(
    callback, subscription_name, max_messages, scheduler
)

Starts the subscription listening on background.

PARAMETER DESCRIPTION
callback

The function called when a message is received.

TYPE: Callable[[Message], Any]

subscription_name

The name of the subscription.

TYPE: str

max_messages

The maximum number of messages to pull.

TYPE: int

scheduler

The object that allocates messages to callbacks.

TYPE: AsyncScheduler

RETURNS DESCRIPTION
StreamingPullFuture

A future that can be used to check the progress and get the result.

Source code in fastpubsub/clients/pubsub.py
async def subscribe(
    self,
    callback: Callable[[PubSubMessage], Any],
    subscription_name: str,
    max_messages: int,
    scheduler: AsyncScheduler,
) -> StreamingPullFuture:
    """Starts the subscription listening on background.

    Args:
        callback: The function called when a message is received.
        subscription_name: The name of the subscription.
        max_messages: The maximum number of messages to pull.
        scheduler: The object that allocates messages to callbacks.

    Returns:
        A future that can be used to check the progress and get the result.
    """
    subscriber_client = await PubSubClientFactory.get_subscriber(
        self.project_id
    )
    subscription_path = subscriber_client.subscription_path(
        self.project_id, subscription_name
    )

    future: StreamingPullFuture = subscriber_client.subscribe(
        callback=callback,
        subscription=subscription_path,
        scheduler=scheduler,
        flow_control=FlowControl(max_messages=max_messages),
        await_callbacks_on_shutdown=True,
    )

    return future