Google Cloud Tasks API client library for managing distributed task queues.
—
Advanced queue configuration options including rate limiting, retry policies, App Engine routing, and logging settings for fine-tuned task processing behavior.
Core queue configuration container with routing, rate limits, retry policies, and logging settings.
class Queue:
name: str # Caller-specified queue name (required in create_queue)
app_engine_routing_override: AppEngineRouting # Overrides for task-level App Engine routing
rate_limits: RateLimits # Rate limits for task dispatches
retry_config: RetryConfig # Settings that determine retry behavior
state: Queue.State # Output only queue state
purge_time: timestamp_pb2.Timestamp # Output only last purge time
stackdriver_logging_config: StackdriverLoggingConfig # Logging configuration
class State:
STATE_UNSPECIFIED = 0 # Unspecified state
RUNNING = 1 # Queue is running, tasks can be dispatched
PAUSED = 2 # Tasks are paused by user
DISABLED = 3 # Queue is disabledControl task dispatch frequency and concurrency to manage system load and external service rate limits.
class RateLimits:
max_dispatches_per_second: float # Maximum rate at which tasks are dispatched (max 500)
max_burst_size: int # Output only max burst size for token bucket algorithm
max_concurrent_dispatches: int # Maximum number of concurrent tasks (max 5000)Fine-tune automatic retry behavior for failed tasks with exponential backoff and limits.
class RetryConfig:
max_attempts: int # Number of attempts per task (-1 for unlimited)
max_retry_duration: duration_pb2.Duration # Time limit for retrying failed tasks
min_backoff: duration_pb2.Duration # Minimum backoff between retries
max_backoff: duration_pb2.Duration # Maximum backoff between retries
max_doublings: int # Number of times retry interval doublesControl what task execution information is logged to Cloud Logging for monitoring and debugging.
class StackdriverLoggingConfig:
sampling_ratio: float # Fraction of operations to log (0.0-1.0, default 0.0)Default routing settings applied to all App Engine tasks in the queue unless overridden at the task level.
class AppEngineRouting:
service: str # App service (default service if not specified)
version: str # App version (default version if not specified)
instance: str # App instance (available instance if not specified)
host: str # Output only host that task is sent tofrom google.cloud import tasks
from google.protobuf import duration_pb2
client = tasks.CloudTasksClient()
# Create a queue with rate limits
queue_path = client.queue_path('my-project-id', 'us-central1', 'configured-queue')
parent = client.common_location_path('my-project-id', 'us-central1')
queue = tasks.Queue(
name=queue_path,
rate_limits=tasks.RateLimits(
max_dispatches_per_second=5.0, # 5 tasks per second max
max_concurrent_dispatches=10 # Max 10 concurrent tasks
)
)
created_queue = client.create_queue(parent=parent, queue=queue)
print(f'Created queue with rate limits: {created_queue.name}')from google.cloud import tasks
from google.protobuf import duration_pb2
client = tasks.CloudTasksClient()
queue_path = client.queue_path('my-project-id', 'us-central1', 'retry-queue')
parent = client.common_location_path('my-project-id', 'us-central1')
# Configure detailed retry behavior
retry_config = tasks.RetryConfig(
max_attempts=5, # Try up to 5 times
max_retry_duration=duration_pb2.Duration(seconds=3600), # 1 hour max
min_backoff=duration_pb2.Duration(seconds=5), # Start with 5 second backoff
max_backoff=duration_pb2.Duration(seconds=300), # Cap at 5 minutes
max_doublings=3 # Double backoff 3 times: 5s, 10s, 20s, then cap at 300s
)
queue = tasks.Queue(
name=queue_path,
retry_config=retry_config
)
created_queue = client.create_queue(parent=parent, queue=queue)
print(f'Created queue with retry config: {created_queue.retry_config.max_attempts}')from google.cloud import tasks
client = tasks.CloudTasksClient()
queue_path = client.queue_path('my-project-id', 'us-central1', 'appengine-queue')
parent = client.common_location_path('my-project-id', 'us-central1')
# Configure default App Engine routing for all tasks in queue
app_engine_routing = tasks.AppEngineRouting(
service='worker-service',
version='v2'
)
queue = tasks.Queue(
name=queue_path,
app_engine_routing_override=app_engine_routing
)
created_queue = client.create_queue(parent=parent, queue=queue)
print(f'Created App Engine queue: {created_queue.app_engine_routing_override.service}')from google.cloud import tasks
client = tasks.CloudTasksClient()
queue_path = client.queue_path('my-project-id', 'us-central1', 'logged-queue')
parent = client.common_location_path('my-project-id', 'us-central1')
# Configure logging for debugging
logging_config = tasks.StackdriverLoggingConfig(
sampling_ratio=1.0 # Log all operations (for debugging)
)
queue = tasks.Queue(
name=queue_path,
stackdriver_logging_config=logging_config
)
created_queue = client.create_queue(parent=parent, queue=queue)
print(f'Created queue with full logging: {created_queue.stackdriver_logging_config.sampling_ratio}')from google.cloud import tasks
from google.protobuf import duration_pb2
client = tasks.CloudTasksClient()
queue_path = client.queue_path('my-project-id', 'us-central1', 'production-queue')
parent = client.common_location_path('my-project-id', 'us-central1')
# Production-ready queue with all configurations
queue = tasks.Queue(
name=queue_path,
# Rate limiting for external service protection
rate_limits=tasks.RateLimits(
max_dispatches_per_second=10.0,
max_concurrent_dispatches=20
),
# Robust retry configuration
retry_config=tasks.RetryConfig(
max_attempts=3,
max_retry_duration=duration_pb2.Duration(seconds=1800), # 30 minutes
min_backoff=duration_pb2.Duration(seconds=10),
max_backoff=duration_pb2.Duration(seconds=300),
max_doublings=2
),
# Default App Engine routing
app_engine_routing_override=tasks.AppEngineRouting(
service='task-processor',
version='stable'
),
# Moderate logging for monitoring
stackdriver_logging_config=tasks.StackdriverLoggingConfig(
sampling_ratio=0.1 # Log 10% of operations
)
)
created_queue = client.create_queue(parent=parent, queue=queue)
print(f'Created production queue: {created_queue.name}')
print(f'- Rate limit: {created_queue.rate_limits.max_dispatches_per_second}/sec')
print(f'- Max attempts: {created_queue.retry_config.max_attempts}')
print(f'- Default service: {created_queue.app_engine_routing_override.service}')from google.cloud import tasks
from google.protobuf import field_mask_pb2, duration_pb2
client = tasks.CloudTasksClient()
queue_path = client.queue_path('my-project-id', 'us-central1', 'dynamic-queue')
# Get current queue configuration
current_queue = client.get_queue(name=queue_path)
# Modify rate limits for high traffic period
current_queue.rate_limits.max_dispatches_per_second = 50.0
current_queue.rate_limits.max_concurrent_dispatches = 100
# Update only the rate limits
update_mask = field_mask_pb2.FieldMask(
paths=[
'rate_limits.max_dispatches_per_second',
'rate_limits.max_concurrent_dispatches'
]
)
updated_queue = client.update_queue(
queue=current_queue,
update_mask=update_mask
)
print(f'Updated rate limits to {updated_queue.rate_limits.max_dispatches_per_second}/sec')
# Later, modify retry configuration for better reliability
current_queue.retry_config.max_attempts = 5
current_queue.retry_config.min_backoff = duration_pb2.Duration(seconds=30)
retry_update_mask = field_mask_pb2.FieldMask(
paths=[
'retry_config.max_attempts',
'retry_config.min_backoff'
]
)
updated_queue = client.update_queue(
queue=current_queue,
update_mask=retry_update_mask
)
print(f'Updated retry config: {updated_queue.retry_config.max_attempts} attempts')from google.cloud import tasks
client = tasks.CloudTasksClient()
queue_path = client.queue_path('my-project-id', 'us-central1', 'state-queue')
# Check queue state
queue = client.get_queue(name=queue_path)
print(f'Queue state: {queue.state}')
if queue.state == tasks.Queue.State.RUNNING:
print('Queue is actively processing tasks')
elif queue.state == tasks.Queue.State.PAUSED:
print('Queue is paused - tasks are not being dispatched')
elif queue.state == tasks.Queue.State.DISABLED:
print('Queue is disabled')
# Pause queue for maintenance
paused_queue = client.pause_queue(name=queue_path)
print(f'Queue paused: {paused_queue.state == tasks.Queue.State.PAUSED}')
# Resume normal operations
resumed_queue = client.resume_queue(name=queue_path)
print(f'Queue resumed: {resumed_queue.state == tasks.Queue.State.RUNNING}')Install with Tessl CLI
npx tessl i tessl/pypi-google-cloud-tasks