CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-google-cloud-tasks

Google Cloud Tasks API client library for managing distributed task queues.

Pending
Overview
Eval results
Files

queue-configuration.mddocs/

Queue Configuration

Advanced queue configuration options including rate limiting, retry policies, App Engine routing, and logging settings for fine-tuned task processing behavior.

Capabilities

Queue Structure

Core queue configuration container with routing, rate limits, retry policies, and logging settings.

class Queue:
    name: str  # Caller-specified queue name (required in create_queue)
    app_engine_routing_override: AppEngineRouting  # Overrides for task-level App Engine routing
    rate_limits: RateLimits  # Rate limits for task dispatches
    retry_config: RetryConfig  # Settings that determine retry behavior
    state: Queue.State  # Output only queue state
    purge_time: timestamp_pb2.Timestamp  # Output only last purge time
    stackdriver_logging_config: StackdriverLoggingConfig  # Logging configuration
    
    class State:
        STATE_UNSPECIFIED = 0  # Unspecified state
        RUNNING = 1  # Queue is running, tasks can be dispatched
        PAUSED = 2  # Tasks are paused by user
        DISABLED = 3  # Queue is disabled

Rate Limiting

Control task dispatch frequency and concurrency to manage system load and external service rate limits.

class RateLimits:
    max_dispatches_per_second: float  # Maximum rate at which tasks are dispatched (max 500)
    max_burst_size: int  # Output only max burst size for token bucket algorithm
    max_concurrent_dispatches: int  # Maximum number of concurrent tasks (max 5000)

Retry Configuration

Fine-tune automatic retry behavior for failed tasks with exponential backoff and limits.

class RetryConfig:
    max_attempts: int  # Number of attempts per task (-1 for unlimited)
    max_retry_duration: duration_pb2.Duration  # Time limit for retrying failed tasks
    min_backoff: duration_pb2.Duration  # Minimum backoff between retries
    max_backoff: duration_pb2.Duration  # Maximum backoff between retries
    max_doublings: int  # Number of times retry interval doubles

Logging Configuration

Control what task execution information is logged to Cloud Logging for monitoring and debugging.

class StackdriverLoggingConfig:
    sampling_ratio: float  # Fraction of operations to log (0.0-1.0, default 0.0)

App Engine Routing Override

Default routing settings applied to all App Engine tasks in the queue unless overridden at the task level.

class AppEngineRouting:
    service: str  # App service (default service if not specified)
    version: str  # App version (default version if not specified)
    instance: str  # App instance (available instance if not specified)
    host: str  # Output only host that task is sent to

Usage Examples

Basic Queue Configuration

from google.cloud import tasks
from google.protobuf import duration_pb2

client = tasks.CloudTasksClient()

# Create a queue with rate limits
queue_path = client.queue_path('my-project-id', 'us-central1', 'configured-queue')
parent = client.common_location_path('my-project-id', 'us-central1')

queue = tasks.Queue(
    name=queue_path,
    rate_limits=tasks.RateLimits(
        max_dispatches_per_second=5.0,  # 5 tasks per second max
        max_concurrent_dispatches=10    # Max 10 concurrent tasks
    )
)

created_queue = client.create_queue(parent=parent, queue=queue)
print(f'Created queue with rate limits: {created_queue.name}')

Advanced Retry Configuration

from google.cloud import tasks
from google.protobuf import duration_pb2

client = tasks.CloudTasksClient()
queue_path = client.queue_path('my-project-id', 'us-central1', 'retry-queue')
parent = client.common_location_path('my-project-id', 'us-central1')

# Configure detailed retry behavior
retry_config = tasks.RetryConfig(
    max_attempts=5,  # Try up to 5 times
    max_retry_duration=duration_pb2.Duration(seconds=3600),  # 1 hour max
    min_backoff=duration_pb2.Duration(seconds=5),   # Start with 5 second backoff
    max_backoff=duration_pb2.Duration(seconds=300), # Cap at 5 minutes
    max_doublings=3  # Double backoff 3 times: 5s, 10s, 20s, then cap at 300s
)

queue = tasks.Queue(
    name=queue_path,
    retry_config=retry_config
)

created_queue = client.create_queue(parent=parent, queue=queue)
print(f'Created queue with retry config: {created_queue.retry_config.max_attempts}')

App Engine Routing Configuration

from google.cloud import tasks

client = tasks.CloudTasksClient()
queue_path = client.queue_path('my-project-id', 'us-central1', 'appengine-queue')
parent = client.common_location_path('my-project-id', 'us-central1')

# Configure default App Engine routing for all tasks in queue
app_engine_routing = tasks.AppEngineRouting(
    service='worker-service',
    version='v2'
)

queue = tasks.Queue(
    name=queue_path,
    app_engine_routing_override=app_engine_routing
)

created_queue = client.create_queue(parent=parent, queue=queue)
print(f'Created App Engine queue: {created_queue.app_engine_routing_override.service}')

Logging Configuration

from google.cloud import tasks

client = tasks.CloudTasksClient()
queue_path = client.queue_path('my-project-id', 'us-central1', 'logged-queue')
parent = client.common_location_path('my-project-id', 'us-central1')

# Configure logging for debugging
logging_config = tasks.StackdriverLoggingConfig(
    sampling_ratio=1.0  # Log all operations (for debugging)
)

queue = tasks.Queue(
    name=queue_path,
    stackdriver_logging_config=logging_config
)

created_queue = client.create_queue(parent=parent, queue=queue)
print(f'Created queue with full logging: {created_queue.stackdriver_logging_config.sampling_ratio}')

Complete Queue Configuration

from google.cloud import tasks
from google.protobuf import duration_pb2

client = tasks.CloudTasksClient()
queue_path = client.queue_path('my-project-id', 'us-central1', 'production-queue')
parent = client.common_location_path('my-project-id', 'us-central1')

# Production-ready queue with all configurations
queue = tasks.Queue(
    name=queue_path,
    
    # Rate limiting for external service protection
    rate_limits=tasks.RateLimits(
        max_dispatches_per_second=10.0,
        max_concurrent_dispatches=20
    ),
    
    # Robust retry configuration
    retry_config=tasks.RetryConfig(
        max_attempts=3,
        max_retry_duration=duration_pb2.Duration(seconds=1800),  # 30 minutes
        min_backoff=duration_pb2.Duration(seconds=10),
        max_backoff=duration_pb2.Duration(seconds=300),
        max_doublings=2
    ),
    
    # Default App Engine routing
    app_engine_routing_override=tasks.AppEngineRouting(
        service='task-processor',
        version='stable'
    ),
    
    # Moderate logging for monitoring
    stackdriver_logging_config=tasks.StackdriverLoggingConfig(
        sampling_ratio=0.1  # Log 10% of operations
    )
)

created_queue = client.create_queue(parent=parent, queue=queue)
print(f'Created production queue: {created_queue.name}')
print(f'- Rate limit: {created_queue.rate_limits.max_dispatches_per_second}/sec')
print(f'- Max attempts: {created_queue.retry_config.max_attempts}')
print(f'- Default service: {created_queue.app_engine_routing_override.service}')

Dynamic Queue Reconfiguration

from google.cloud import tasks
from google.protobuf import field_mask_pb2, duration_pb2

client = tasks.CloudTasksClient()
queue_path = client.queue_path('my-project-id', 'us-central1', 'dynamic-queue')

# Get current queue configuration
current_queue = client.get_queue(name=queue_path)

# Modify rate limits for high traffic period
current_queue.rate_limits.max_dispatches_per_second = 50.0
current_queue.rate_limits.max_concurrent_dispatches = 100

# Update only the rate limits
update_mask = field_mask_pb2.FieldMask(
    paths=[
        'rate_limits.max_dispatches_per_second',
        'rate_limits.max_concurrent_dispatches'
    ]
)

updated_queue = client.update_queue(
    queue=current_queue,
    update_mask=update_mask
)

print(f'Updated rate limits to {updated_queue.rate_limits.max_dispatches_per_second}/sec')

# Later, modify retry configuration for better reliability
current_queue.retry_config.max_attempts = 5
current_queue.retry_config.min_backoff = duration_pb2.Duration(seconds=30)

retry_update_mask = field_mask_pb2.FieldMask(
    paths=[
        'retry_config.max_attempts',
        'retry_config.min_backoff'
    ]
)

updated_queue = client.update_queue(
    queue=current_queue,
    update_mask=retry_update_mask
)

print(f'Updated retry config: {updated_queue.retry_config.max_attempts} attempts')

Queue State Management

from google.cloud import tasks

client = tasks.CloudTasksClient()
queue_path = client.queue_path('my-project-id', 'us-central1', 'state-queue')

# Check queue state
queue = client.get_queue(name=queue_path)
print(f'Queue state: {queue.state}')

if queue.state == tasks.Queue.State.RUNNING:
    print('Queue is actively processing tasks')
elif queue.state == tasks.Queue.State.PAUSED:
    print('Queue is paused - tasks are not being dispatched')
elif queue.state == tasks.Queue.State.DISABLED:
    print('Queue is disabled')

# Pause queue for maintenance
paused_queue = client.pause_queue(name=queue_path)
print(f'Queue paused: {paused_queue.state == tasks.Queue.State.PAUSED}')

# Resume normal operations
resumed_queue = client.resume_queue(name=queue_path)
print(f'Queue resumed: {resumed_queue.state == tasks.Queue.State.RUNNING}')

Install with Tessl CLI

npx tessl i tessl/pypi-google-cloud-tasks

docs

client-operations.md

iam-security.md

index.md

queue-configuration.md

queue-management.md

task-management.md

task-targeting.md

tile.json