Skip to content

PubSubStreamingPullTask class

fastpubsub.concurrency.tasks.PubSubStreamingPullTask

PubSubStreamingPullTask(subscriber)

A task for polling messages from a Pub/Sub subscription.

Initializes the PubSubPollTask.

PARAMETER DESCRIPTION
subscriber

The subscriber to poll messages for.

TYPE: Subscriber

Source code in fastpubsub/concurrency/tasks.py
def __init__(self, subscriber: Subscriber) -> None:
    """Initializes the PubSubPollTask.

    Args:
        subscriber: The subscriber to poll messages for.
    """
    self.loop = asyncio.get_running_loop()
    self.scheduler: AsyncScheduler = AsyncScheduler(self.loop)

    self.subscriber: Subscriber = subscriber
    self.mapper = MessageMapper(self.subscriber)
    self.client = PubSubClient(self.subscriber.project_id)
    self.task: StreamingPullFuture | None = None

loop instance-attribute

scheduler instance-attribute

scheduler = AsyncScheduler(loop)

subscriber instance-attribute

subscriber = subscriber

mapper instance-attribute

mapper = MessageMapper(subscriber)

client instance-attribute

client = PubSubClient(project_id)

task instance-attribute

task = None

start async

start()

Starts the message polling loop.

Source code in fastpubsub/concurrency/tasks.py
async def start(self) -> None:
    """Starts the message polling loop."""
    logger.info(
        f"The {self.subscriber.name} handler is waiting for messages."
    )
    self.task = await self.client.subscribe(
        callback=self._on_message,
        subscription_name=self.subscriber.subscription_name,
        scheduler=self.scheduler,
        max_messages=self.subscriber.control_flow_policy.max_messages,
    )

task_ready

task_ready()

Checks if the task is ready.

RETURNS DESCRIPTION
bool

True if the task is ready, False otherwise.

Source code in fastpubsub/concurrency/tasks.py
def task_ready(self) -> bool:
    """Checks if the task is ready.

    Returns:
        True if the task is ready, False otherwise.
    """
    if not self.task or not isinstance(self.task, StreamingPullFuture):
        return False

    return bool(self.task.running())

task_alive

task_alive()

Checks if the task is alive.

RETURNS DESCRIPTION
bool

True if the task is alive, False otherwise.

Source code in fastpubsub/concurrency/tasks.py
def task_alive(self) -> bool:
    """Checks if the task is alive.

    Returns:
        True if the task is alive, False otherwise.
    """
    if not self.task or not isinstance(self.task, StreamingPullFuture):
        return False

    return not bool(self.task.done())

shutdown async

shutdown(timeout=30.0)

Shuts down the task.

Source code in fastpubsub/concurrency/tasks.py
async def shutdown(self, timeout: float = 30.0) -> None:
    """Shuts down the task."""
    logger.info(f"The {self.subscriber.name} handler is turning off...")
    if self.task and self.task.running():
        await self.scheduler.wait_for_completion(timeout=timeout)
        self.task.cancel()
        self.task.result(timeout=timeout)

    logger.info(f"The {self.subscriber.name} handler is shutdown...")