Skip to content

Middleware Classes

fastpubsub.BaseMiddleware

BaseMiddleware(next_call)

Base class for middlewares.

Your middlewares should extend this class if you want to implement your own middleware.

Initializes the BaseMiddleware.

PARAMETER DESCRIPTION
next_call

The next middleware or command in the chain.

TYPE: Union[BaseMiddleware, None]

Source code in fastpubsub/middlewares/base.py
def __init__(self, next_call: Union["BaseMiddleware", None]):
    """Initializes the BaseMiddleware.

    Args:
        next_call: The next middleware or command in the chain.
    """
    self.next_call = next_call

next_call instance-attribute

next_call = next_call

on_message async

on_message(message)

Handles a message.

When extending this methods, you should always call await super().on_message(...) to continue the chain.

PARAMETER DESCRIPTION
message

The message to handle.

TYPE: Message

Source code in fastpubsub/middlewares/base.py
async def on_message(self, message: "Message") -> Any:
    """Handles a message.

    When extending this methods, you should always call
    `await super().on_message(...)` to continue the chain.

    Args:
        message: The message to handle.
    """
    if not self.next_call:
        return

    return await self.next_call.on_message(message)

on_publish async

on_publish(data, ordering_key, attributes)

Handles a publish event.

When extending this methods, you should always call await super().on_publish(...) to continue the chain.

PARAMETER DESCRIPTION
data

The message data.

TYPE: bytes

ordering_key

The ordering key for the message.

TYPE: str

attributes

A dictionary of message attributes.

TYPE: dict[str, str] | None

Source code in fastpubsub/middlewares/base.py
async def on_publish(
    self, data: bytes, ordering_key: str, attributes: dict[str, str] | None
) -> Any:
    """Handles a publish event.

    When extending this methods, you should always call
    `await super().on_publish(...)` to continue the chain.

    Args:
        data: The message data.
        ordering_key: The ordering key for the message.
        attributes: A dictionary of message attributes.
    """
    if not self.next_call:
        return

    return await self.next_call.on_publish(data, ordering_key, attributes)

fastpubsub.Middleware

Middleware(cls, *args, **kwargs)

Wrapper class for middlewares.

You should only use this class to create middlewares on class constructors. Its purpose is to only store the middleware. information for delayed initiatization.

Initializes the Middleware.

PARAMETER DESCRIPTION
cls

The middleware class you want to initialize later.

TYPE: type[BaseMiddleware]

args

The middleware class positional arguments.

TYPE: Any DEFAULT: ()

kwargs

The middleware class keyword arguments.

TYPE: Any DEFAULT: {}

Source code in fastpubsub/middlewares/base.py
def __init__(
    self, cls: type[BaseMiddleware], *args: Any, **kwargs: Any
) -> None:
    """Initializes the Middleware.

    Args:
        cls: The middleware class you want to initialize later.
        args: The middleware class positional arguments.
        kwargs: The middleware class keyword arguments.
    """
    self.cls = cls
    self.args = args
    self.kwargs = kwargs

cls instance-attribute

cls = cls

args instance-attribute

args = args

kwargs instance-attribute

kwargs = kwargs

fastpubsub.middlewares.GZipMiddleware

GZipMiddleware(next_call, compresslevel=9, mtime=None)

Bases: BaseMiddleware

A middleware for compressing and decompressing messages using gzip.

Initializes the GZipMiddleware.

PARAMETER DESCRIPTION
next_call

The next call in the chain to call.

TYPE: BaseMiddleware

compresslevel

The level of compression used on gzip.compress function on a ranges of 0 to 9.

TYPE: int DEFAULT: 9

mtime

The modification time. The modification time is set to the current time by default.

TYPE: int | float | None DEFAULT: None

Source code in fastpubsub/middlewares/gzip.py
def __init__(
    self,
    next_call: BaseMiddleware,
    compresslevel: int = 9,
    mtime: int | float | None = None,
):
    """Initializes the GZipMiddleware.

    Args:
        next_call: The next call in the chain to call.
        compresslevel: The level of compression used on
            gzip.compress function on a ranges of 0 to 9.
        mtime: The modification time. The modification time is
            set to the current time by default.
    """
    super().__init__(next_call)
    self.compresslevel = compresslevel
    self.mtime = mtime

next_call instance-attribute

next_call = next_call

compresslevel instance-attribute

compresslevel = compresslevel

mtime instance-attribute

mtime = mtime

on_message async

on_message(message)

Decompresses a message.

PARAMETER DESCRIPTION
message

The message to decompress.

TYPE: Message

Source code in fastpubsub/middlewares/gzip.py
async def on_message(self, message: Message) -> Any:
    """Decompresses a message.

    Args:
        message: The message to decompress.
    """
    if message.attributes.get("content-encoding", "") == "gzip":
        decompressed_data = gzip.decompress(data=message.data)
        new_message = Message(
            id=message.id,
            size=message.size,
            data=decompressed_data,
            attributes=message.attributes,
            delivery_attempt=message.delivery_attempt,
            project_id=message.project_id,
            topic_name=message.topic_name,
            subscriber_name=message.subscriber_name,
        )
        return await super().on_message(new_message)

    return await super().on_message(message)

on_publish async

on_publish(data, ordering_key, attributes)

Compresses a message.

PARAMETER DESCRIPTION
data

The message data to compress.

TYPE: bytes

ordering_key

The ordering key for the message.

TYPE: str

attributes

A dictionary of message attributes.

TYPE: dict[str, str] | None

Source code in fastpubsub/middlewares/gzip.py
async def on_publish(
    self, data: bytes, ordering_key: str, attributes: dict[str, str] | None
) -> Any:
    """Compresses a message.

    Args:
        data: The message data to compress.
        ordering_key: The ordering key for the message.
        attributes: A dictionary of message attributes.
    """
    if not attributes:
        attributes = {}

    attributes["content-encoding"] = "gzip"
    compressed_data = gzip.compress(
        data=data, compresslevel=self.compresslevel, mtime=self.mtime
    )
    return await super().on_publish(
        compressed_data, ordering_key, attributes
    )