Performance Tuning¶
Performance tuning in FastPubSub is the process of matching subscription and runtime parameters to workload shape. The objective is not maximum raw throughput in isolation, but stable throughput with predictable latency and bounded resource usage.
Control Surface Overview¶
| Parameter | Scope | Primary Effect |
|---|---|---|
max_messages |
Subscriber client-side | Concurrency and memory pressure |
ack_deadline_seconds |
Subscription server-side | Processing window before redelivery |
min_backoff_delay_secs / max_backoff_delay_secs |
Subscription server-side | Retry pacing under failure |
max_delivery_attempts + dead_letter_topic |
Subscription server-side | Bounded failure handling |
shutdown_timeout |
Broker runtime | Graceful drain on shutdown |
Tuning Workflow¶
Apply tuning in a controlled loop:
- Measure baseline metrics (latency, throughput, failure rates, memory).
- Change one major parameter group at a time.
- Re-run under representative load.
- Keep changes that improve SLOs and rollback regressions.
Concurrency with max_messages¶
max_messages sets the upper bound of in-flight messages per subscriber task.
@broker.subscriber(
alias="high-throughput",
topic_name="high-events",
subscription_name="high-events-subscription",
max_messages=500,
)
async def high_throughput_handler(message: Message):
await fast_async_operation(message.data)
A compact load-probe setup for this parameter:
@broker.subscriber(
"test-alias",
topic_name="test-topic",
subscription_name="test-basic-subscription",
max_messages=MAX_MESSAGES,
)
async def process_message(message: Message) -> None:
logger.info(f"Processed message: {message}")
value = random.randint(1, 5)
await asyncio.sleep(value)
@app.after_startup
async def test_publish() -> None:
async with TaskGroup() as tg:
for _ in range(MAX_MESSAGES * 5):
tg.create_task(broker.publish("test-topic", "hi!"))
Workload-Specific Profiles¶
I/O-Bound Workloads¶
# High concurrency for I/O-bound tasks (API calls, database queries)
@broker.subscriber(
alias="api-caller",
topic_name="api-requests",
subscription_name="api-requests-subscription",
max_messages=500, # High - most time is spent waiting
)
async def call_external_api(message: Message):
await http_client.post("/api/endpoint", json={"data": message.data})
CPU-Bound Workloads¶
# Low concurrency for CPU-bound tasks
@broker.subscriber(
alias="data-processor",
topic_name="processing-jobs",
subscription_name="processing-subscription",
max_messages=10, # Low - use multiple workers instead
)
async def process_data(message: Message):
result = compute_heavy_operation(message.data)
await save_result(result)
Rate-Limited Dependencies¶
# Match the API rate limit
@broker.subscriber(
alias="rate-limited-api",
topic_name="rate-limited-requests",
subscription_name="rate-limited-subscription",
max_messages=50, # Match API's rate limit
)
async def call_rate_limited_api(message: Message):
await rate_limited_client.call(message.data)
Practical Heuristics¶
| Workload | Typical max_messages Range |
Rationale |
|---|---|---|
| Async I/O dominant | 100-1000 | Waiting time dominates, concurrency pays off |
| DB-bound | 50-200 | Align with pool limits and transaction pressure |
| CPU-heavy | 10-50 | Prefer process scaling over coroutine fan-out |
| External API quotas | Depends on quota | Prevent downstream throttling cascades |
Acknowledgment Deadline¶
ack_deadline_seconds must exceed expected processing time with safety margin.
@broker.subscriber(
alias="slow-processor",
topic_name="heavy-tasks",
subscription_name="heavy-tasks-subscription",
ack_deadline_seconds=600,
max_messages=10,
)
async def slow_handler(message: Message):
await complex_ml_inference(message.data)
If configured too low, valid in-flight work can be redelivered before completion, creating duplicate processing pressure.
| Expected Processing Time | Suggested ack_deadline_seconds |
|---|---|
| < 10 seconds | 30 |
| 10-60 seconds | 60 |
| 1-5 minutes | 300 |
| 5-10 minutes | 600 |
Upper Bound
Pub/Sub caps acknowledgment deadline at 600 seconds. Workloads beyond this window should be decomposed into smaller steps.
Retry Backoff and Failure Pacing¶
Tune retry policy to the failure mode.
@broker.subscriber(
alias="api-with-backoff",
topic_name="api-calls",
subscription_name="api-calls-subscription",
min_backoff_delay_secs=10,
max_backoff_delay_secs=600,
max_delivery_attempts=10,
dead_letter_topic="api-calls-dlq",
)
async def call_api_with_backoff(message: Message):
await external_service.process(message.data)
Short Backoff for Transient Instability¶
# Short backoff for transient issues (network blips)
@broker.subscriber(
alias="network-sensitive",
topic_name="transient-events",
subscription_name="transient-events-subscription",
min_backoff_delay_secs=5,
max_backoff_delay_secs=60,
max_delivery_attempts=5,
)
async def handle_transient_event(message: Message):
await send_notification(message.data)
Long Backoff for External Outages¶
# Longer backoff for external service outages
@broker.subscriber(
alias="external-api",
topic_name="external-api-calls",
subscription_name="external-api-subscription",
min_backoff_delay_secs=30,
max_backoff_delay_secs=600,
max_delivery_attempts=10,
)
async def call_external_service(message: Message):
await external_service.process(message.data)
Longer backoff reduces retry storms and protects downstream dependencies during partial outages.
Multi-Process Scaling¶
For CPU-constrained workloads, scale workers before inflating max_messages.
Effective theoretical concurrency is:
workers x max_messages
Monitor total memory because each worker process has independent state.
Integrated Configuration Example¶
tuned_broker = PubSubBroker(
project_id="fastpubsub-pubsub-local",
shutdown_timeout=30.0,
middlewares=[Middleware(GZipMiddleware, compresslevel=6)],
)
tuned_app = FastPubSub(tuned_broker)
@tuned_broker.subscriber(
alias="optimized-processor",
topic_name="optimized-events",
subscription_name="optimized-events-subscription",
# Concurrency
max_messages=200,
# Timeouts
ack_deadline_seconds=120,
# Retry policy
min_backoff_delay_secs=10,
max_backoff_delay_secs=300,
max_delivery_attempts=5,
# Error handling
dead_letter_topic="events-dlq",
autocreate=True,
)
async def process_optimized_event(message: Message):
await handle_event(message.data)
This profile combines graceful shutdown, GZipMiddleware, bounded retries, and dead-letter isolation.
Metrics to Observe During Tuning¶
Track these continuously while iterating:
- Throughput (
messages/s). - Latency distribution (
p50,p95,p99). - Retry rate and retry burst shape.
- Dead-letter ingress count.
- Worker CPU and memory saturation.
Validation Approach¶
For behavior-level checks, PubSubTestClient can validate handler correctness.
For performance conclusions, rely on controlled load tests because in-memory tests do not represent broker/network pressure.
Common Failure Modes¶
- Increasing
max_messageswithout observing memory growth. - Setting acknowledgment deadline below realistic processing time.
- Applying CPU-bound profile to I/O-bound workload (or inverse).
- Using aggressive retry with no dead-letter strategy.
Recap¶
- Tune for workload characteristics, not generic defaults.
- Start with
max_messages, then align deadlines and retry policy. - Use dead-letter and backoff to bound and shape failures.
- Scale by workers for CPU pressure, not coroutine count alone.
- Validate behavior with tests and validate performance with load measurement.