Skip to content

Publisher class

fastpubsub.Publisher

Publisher(topic_name, project_id='', middlewares=())

A class for publishing messages to a Pub/Sub topic.

Initializes the Publisher.

PARAMETER DESCRIPTION
topic_name

The name of the topic.

TYPE: str

project_id

An alternative project id to publish messages. If set the broker's project id will be ignored.

TYPE: str DEFAULT: ''

middlewares

A list of middlewares to apply.

TYPE: Sequence[Middleware] DEFAULT: ()

Source code in fastpubsub/pubsub/publisher.py
def __init__(
    self,
    topic_name: str,
    project_id: str = "",
    middlewares: Sequence[Middleware] = (),
):
    """Initializes the Publisher.

    Args:
        topic_name: The name of the topic.
        project_id: An alternative project id to publish messages.
                    If set the broker's project id will be ignored.
        middlewares: A list of middlewares to apply.
    """
    self.topic_name = topic_name
    self.project_id = project_id
    self.middlewares: MutableSequence[Middleware] = []

    if middlewares and isinstance(middlewares, Sequence):
        for middleware, args, kwargs in middlewares:
            self.include_middleware(middleware, *args, **kwargs)

topic_name instance-attribute

topic_name = topic_name

project_id instance-attribute

project_id = project_id

middlewares instance-attribute

middlewares = []

publish async

publish(
    data, ordering_key="", attributes=None, autocreate=True
)

Publishes a message to the topic.

PARAMETER DESCRIPTION
data

The message data.

TYPE: dict[str, Any] | str | bytes | BaseModel

ordering_key

The ordering key for the message.

TYPE: str DEFAULT: ''

attributes

A dictionary of message attributes.

TYPE: dict[str, str] | None DEFAULT: None

autocreate

Whether to automatically create the topic.

TYPE: bool DEFAULT: True

Source code in fastpubsub/pubsub/publisher.py
@validate_call(config=ConfigDict(strict=True))
async def publish(
    self,
    data: dict[str, Any] | str | bytes | BaseModel,
    ordering_key: str = "",
    attributes: dict[str, str] | None = None,
    autocreate: bool = True,
) -> None:
    """Publishes a message to the topic.

    Args:
        data: The message data.
        ordering_key: The ordering key for the message.
        attributes: A dictionary of message attributes.
        autocreate: Whether to automatically create the topic.
    """
    callstack = self._build_callstack(autocreate=autocreate)
    serialized_message = await self._serialize_message(data)

    await callstack.on_publish(
        data=serialized_message, ordering_key=ordering_key, attributes=attributes
    )

include_middleware

include_middleware(middleware, *args, **kwargs)

Includes a middleware in the publisher.

PARAMETER DESCRIPTION
middleware

The middleware to include.

TYPE: type[BaseMiddleware]

args

The positional arguments used on the middleware instantiation.

TYPE: Any DEFAULT: ()

kwargs

The keyword arguments used on the middleware instantiation.

TYPE: Any DEFAULT: {}

Source code in fastpubsub/pubsub/publisher.py
@validate_call(config=ConfigDict(strict=True))
def include_middleware(
    self, middleware: type[BaseMiddleware], *args: Any, **kwargs: Any
) -> None:
    """Includes a middleware in the publisher.

    Args:
        middleware: The middleware to include.
        args: The positional arguments used on the middleware instantiation.
        kwargs: The keyword  arguments used on the middleware instantiation.
    """
    ensure_async_middleware(middleware)

    wrapper_middleware = Middleware(middleware, *args, **kwargs)
    if wrapper_middleware in self.middlewares:
        return

    self.middlewares.append(wrapper_middleware)