Google Cloud Tasks API client library for managing distributed task queues.
npx @tessl/cli install tessl/pypi-google-cloud-tasks@2.19.0Google Cloud Tasks is a fully managed service that allows you to manage the execution, dispatch and delivery of a large number of distributed tasks. It enables asynchronous task execution with HTTP endpoints or App Engine applications, featuring automatic retry logic, rate limiting, and comprehensive queue management.
pip install google-cloud-tasksfrom google.cloud import tasksFor specific components:
from google.cloud.tasks import CloudTasksClient, CloudTasksAsyncClient
from google.cloud.tasks import Queue, Task, HttpRequest, AppEngineHttpRequest
from typing import Dictfrom google.cloud import tasks
# Create a client
client = tasks.CloudTasksClient()
# Define project, location, and queue
project = 'my-project-id'
location = 'us-central1'
queue_name = 'my-queue'
# Create a queue
parent = client.common_location_path(project, location)
queue_path = client.queue_path(project, location, queue_name)
queue = tasks.Queue(name=queue_path)
created_queue = client.create_queue(parent=parent, queue=queue)
# Create an HTTP task
task = tasks.Task(
http_request=tasks.HttpRequest(
url='https://example.com/task-handler',
http_method=tasks.HttpMethod.POST,
body=b'{"data": "example"}'
)
)
# Add the task to the queue
response = client.create_task(parent=queue_path, task=task)
print(f'Created task: {response.name}')
# Using context manager for automatic resource cleanup
with tasks.CloudTasksClient() as client:
queue = client.get_queue(name=queue_path)
print(f'Queue state: {queue.state}')Google Cloud Tasks provides a comprehensive task queue system with these key components:
The service integrates with Google Cloud IAM for access control and provides automatic scaling, monitoring, and error handling for distributed task processing.
Core client functionality for managing Cloud Tasks service connections, authentication, and transport configuration. Includes both synchronous and asynchronous clients with identical APIs.
class CloudTasksClient:
def __init__(
self,
*,
credentials=None,
transport=None,
client_options=None,
client_info=None
): ...
@classmethod
def from_service_account_file(cls, filename: str, *args, **kwargs): ...
@classmethod
def from_service_account_info(cls, info: dict, *args, **kwargs): ...
class CloudTasksAsyncClient:
def __init__(
self,
*,
credentials=None,
transport=None,
client_options=None,
client_info=None
): ...Comprehensive queue lifecycle management including creation, configuration, monitoring, and control operations. Queues serve as containers for tasks with configurable rate limits, retry policies, and routing rules.
def list_queues(self, request=None, *, parent=None, retry=DEFAULT, timeout=DEFAULT, metadata=()) -> ListQueuesPager: ...
def get_queue(self, request=None, *, name=None, retry=DEFAULT, timeout=DEFAULT, metadata=()) -> Queue: ...
def create_queue(self, request=None, *, parent=None, queue=None, retry=DEFAULT, timeout=DEFAULT, metadata=()) -> Queue: ...
def update_queue(self, request=None, *, queue=None, update_mask=None, retry=DEFAULT, timeout=DEFAULT, metadata=()) -> Queue: ...
def delete_queue(self, request=None, *, name=None, retry=DEFAULT, timeout=DEFAULT, metadata=()) -> None: ...
def purge_queue(self, request=None, *, name=None, retry=DEFAULT, timeout=DEFAULT, metadata=()) -> Queue: ...
def pause_queue(self, request=None, *, name=None, retry=DEFAULT, timeout=DEFAULT, metadata=()) -> Queue: ...
def resume_queue(self, request=None, *, name=None, retry=DEFAULT, timeout=DEFAULT, metadata=()) -> Queue: ...Task lifecycle operations for creating, monitoring, and executing distributed work units. Tasks can target HTTP endpoints or App Engine applications with configurable scheduling, authentication, and retry behavior.
def list_tasks(self, request=None, *, parent=None, retry=DEFAULT, timeout=DEFAULT, metadata=()) -> ListTasksPager: ...
def get_task(self, request=None, *, name=None, retry=DEFAULT, timeout=DEFAULT, metadata=()) -> Task: ...
def create_task(self, request=None, *, parent=None, task=None, retry=DEFAULT, timeout=DEFAULT, metadata=()) -> Task: ...
def delete_task(self, request=None, *, name=None, retry=DEFAULT, timeout=DEFAULT, metadata=()) -> None: ...
def run_task(self, request=None, *, name=None, retry=DEFAULT, timeout=DEFAULT, metadata=()) -> Task: ...Advanced queue configuration options including rate limiting, retry policies, App Engine routing, and logging settings for fine-tuned task processing behavior.
class Queue:
name: str
app_engine_routing_override: AppEngineRouting
rate_limits: RateLimits
retry_config: RetryConfig
state: Queue.State
purge_time: Timestamp
stackdriver_logging_config: StackdriverLoggingConfig
class RateLimits:
max_dispatches_per_second: float
max_burst_size: int
max_concurrent_dispatches: int
class RetryConfig:
max_attempts: int
max_retry_duration: Duration
min_backoff: Duration
max_backoff: Duration
max_doublings: intTask target configuration for HTTP endpoints and App Engine applications, including authentication, routing, and request formatting options.
class HttpRequest:
url: str
http_method: HttpMethod
headers: MutableMapping[str, str]
body: bytes
oauth_token: OAuthToken # mutually exclusive with oidc_token
oidc_token: OidcToken # mutually exclusive with oauth_token
class AppEngineHttpRequest:
http_method: HttpMethod
app_engine_routing: AppEngineRouting
relative_uri: str
headers: MutableMapping[str, str]
body: bytes
class AppEngineRouting:
service: str
version: str
instance: str
host: strIdentity and Access Management integration for queue-level access control, including policy management and permission testing for secure task queue operations.
def get_iam_policy(self, request=None, *, resource=None, retry=DEFAULT, timeout=DEFAULT, metadata=()) -> Policy: ...
def set_iam_policy(self, request=None, *, resource=None, retry=DEFAULT, timeout=DEFAULT, metadata=()) -> Policy: ...
def test_iam_permissions(self, request=None, *, resource=None, permissions=None, retry=DEFAULT, timeout=DEFAULT, metadata=()): ...Service location discovery and information retrieval for managing Google Cloud Tasks across different geographic regions.
def get_location(self, request=None, *, name=None, retry=DEFAULT, timeout=DEFAULT, metadata=()) -> Location: ...
def list_locations(self, request=None, *, name=None, retry=DEFAULT, timeout=DEFAULT, metadata=()) -> ListLocationsResponse: ...Resource path construction and parsing utilities for building proper resource names and extracting components from resource paths.
@staticmethod
def queue_path(project: str, location: str, queue: str) -> str: ...
@staticmethod
def task_path(project: str, location: str, queue: str, task: str) -> str: ...
@staticmethod
def common_location_path(project: str, location: str) -> str: ...
@staticmethod
def common_project_path(project: str) -> str: ...
@staticmethod
def parse_queue_path(path: str) -> Dict[str, str]: ...
@staticmethod
def parse_task_path(path: str) -> Dict[str, str]: ...
@staticmethod
def parse_common_location_path(path: str) -> Dict[str, str]: ...Usage example:
# Build resource paths
queue_path = client.queue_path('my-project', 'us-central1', 'my-queue')
task_path = client.task_path('my-project', 'us-central1', 'my-queue', 'task-123')
# Parse resource paths
components = client.parse_queue_path(queue_path)
# Returns: {'project': 'my-project', 'location': 'us-central1', 'queue': 'my-queue'}Advanced client configuration and introspection capabilities for transport management and endpoint configuration.
@property
def transport(self) -> CloudTasksTransport: ...
@property
def api_endpoint(self) -> str: ...
@property
def universe_domain(self) -> str: ...class HttpMethod:
HTTP_METHOD_UNSPECIFIED = 0
POST = 1
GET = 2
HEAD = 3
PUT = 4
DELETE = 5
PATCH = 6
OPTIONS = 7class OAuthToken:
service_account_email: str
scope: str
class OidcToken:
service_account_email: str
audience: strclass Task:
name: str
app_engine_http_request: AppEngineHttpRequest # mutually exclusive with http_request
http_request: HttpRequest # mutually exclusive with app_engine_http_request
schedule_time: Timestamp
create_time: Timestamp
dispatch_deadline: Duration
dispatch_count: int
response_count: int
first_attempt: Attempt
last_attempt: Attempt
view: Task.View
class Attempt:
schedule_time: Timestamp
dispatch_time: Timestamp
response_time: Timestamp
response_status: Statusclass StackdriverLoggingConfig:
sampling_ratio: floatclass Queue.State:
STATE_UNSPECIFIED = 0
RUNNING = 1
PAUSED = 2
DISABLED = 3
class Task.View:
VIEW_UNSPECIFIED = 0
BASIC = 1
FULL = 2