Google Cloud Tasks API client library for managing distributed task queues.
—
Task lifecycle operations for creating, monitoring, and executing distributed work units. Tasks can target HTTP endpoints or App Engine applications with configurable scheduling, authentication, and retry behavior.
Retrieve tasks within a queue with optional filtering, view control, and pagination support.
def list_tasks(
self,
request: Union[ListTasksRequest, dict] = None,
*,
parent: str = None,
retry: OptionalRetry = DEFAULT,
timeout: float = DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
) -> pagers.ListTasksPager:
"""List the tasks in a queue.
Args:
request: The request object or dictionary.
parent: Required. The queue name (projects/PROJECT_ID/locations/LOCATION_ID/queues/QUEUE_ID).
retry: Designation of what errors should be retried.
timeout: The timeout for this request.
metadata: Strings which should be sent along with the request as metadata.
Returns:
An iterable of Task resources.
"""Get detailed information about a specific task including execution status and attempt history.
def get_task(
self,
request: Union[GetTaskRequest, dict] = None,
*,
name: str = None,
retry: OptionalRetry = DEFAULT,
timeout: float = DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
) -> Task:
"""Get a task.
Args:
request: The request object or dictionary.
name: Required. The task name.
retry: Designation of what errors should be retried.
timeout: The timeout for this request.
metadata: Strings which should be sent along with the request as metadata.
Returns:
The task object.
"""Create new tasks with HTTP or App Engine targets, optional scheduling, and authentication configuration.
def create_task(
self,
request: Union[CreateTaskRequest, dict] = None,
*,
parent: str = None,
task: Task = None,
retry: OptionalRetry = DEFAULT,
timeout: float = DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
) -> Task:
"""Create a task and add it to a queue.
Args:
request: The request object or dictionary.
parent: Required. The queue name where the task will be created.
task: Required. The task to add.
retry: Designation of what errors should be retried.
timeout: The timeout for this request.
metadata: Strings which should be sent along with the request as metadata.
Returns:
The created task.
"""Remove tasks from queues, canceling their execution.
def delete_task(
self,
request: Union[DeleteTaskRequest, dict] = None,
*,
name: str = None,
retry: OptionalRetry = DEFAULT,
timeout: float = DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
) -> None:
"""Delete a task.
Args:
request: The request object or dictionary.
name: Required. The task name to delete.
retry: Designation of what errors should be retried.
timeout: The timeout for this request.
metadata: Strings which should be sent along with the request as metadata.
"""Force immediate execution of queued tasks for testing or manual processing.
def run_task(
self,
request: Union[RunTaskRequest, dict] = None,
*,
name: str = None,
retry: OptionalRetry = DEFAULT,
timeout: float = DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
) -> Task:
"""Force a task to run immediately.
Args:
request: The request object or dictionary.
name: Required. The task name to run.
retry: Designation of what errors should be retried.
timeout: The timeout for this request.
metadata: Strings which should be sent along with the request as metadata.
Returns:
The task after execution attempt.
"""class ListTasksRequest:
parent: str # Required. The queue name
response_view: Task.View # Specifies which subset of Task to return
page_size: int # Maximum page size (max 1000)
page_token: str # Token for paginationclass GetTaskRequest:
name: str # Required. The task name
response_view: Task.View # Specifies which subset of Task to returnclass CreateTaskRequest:
parent: str # Required. The queue name where task will be created
task: Task # Required. The task to add
response_view: Task.View # Specifies which subset of Task to returnclass DeleteTaskRequest:
name: str # Required. The task name to deleteclass RunTaskRequest:
name: str # Required. The task name to run
response_view: Task.View # Specifies which subset of Task to returnclass ListTasksResponse:
tasks: MutableSequence[Task] # List of tasks
next_page_token: str # Token for next pageclass Task:
name: str # Optionally caller-specified task name
app_engine_http_request: AppEngineHttpRequest # App Engine HTTP request (oneof)
http_request: HttpRequest # HTTP request to worker (oneof)
schedule_time: timestamp_pb2.Timestamp # When task is scheduled to be attempted
create_time: timestamp_pb2.Timestamp # Output only creation time
dispatch_deadline: duration_pb2.Duration # Deadline for requests sent to worker
dispatch_count: int # Output only number of attempts dispatched
response_count: int # Output only number of attempts that received response
first_attempt: Attempt # Output only status of first attempt
last_attempt: Attempt # Output only status of last attempt
view: Task.View # Output only view specifying returned subset
class View:
VIEW_UNSPECIFIED = 0 # Unspecified, defaults to BASIC
BASIC = 1 # Basic view omits large/sensitive fields
FULL = 2 # All information returned (requires IAM permission)class Attempt:
schedule_time: timestamp_pb2.Timestamp # Output only time attempt was scheduled
dispatch_time: timestamp_pb2.Timestamp # Output only time attempt was dispatched
response_time: timestamp_pb2.Timestamp # Output only time response was received
response_status: status_pb2.Status # Output only response from workerfrom google.cloud import tasks
import json
client = tasks.CloudTasksClient()
# Define paths
project = 'my-project-id'
location = 'us-central1'
queue_name = 'my-queue'
queue_path = client.queue_path(project, location, queue_name)
# Create an HTTP task
task_data = {'user_id': 123, 'action': 'process_order'}
task = tasks.Task(
http_request=tasks.HttpRequest(
url='https://myapp.com/process',
http_method=tasks.HttpMethod.POST,
headers={'Content-Type': 'application/json'},
body=json.dumps(task_data).encode('utf-8')
)
)
# Add the task to the queue
created_task = client.create_task(parent=queue_path, task=task)
print(f'Created task: {created_task.name}')from google.cloud import tasks
from google.protobuf import timestamp_pb2
import datetime
client = tasks.CloudTasksClient()
queue_path = client.queue_path('my-project-id', 'us-central1', 'my-queue')
# Schedule task for 10 minutes from now
schedule_time = datetime.datetime.utcnow() + datetime.timedelta(minutes=10)
timestamp = timestamp_pb2.Timestamp()
timestamp.FromDatetime(schedule_time)
task = tasks.Task(
http_request=tasks.HttpRequest(
url='https://myapp.com/scheduled-job',
http_method=tasks.HttpMethod.POST
),
schedule_time=timestamp
)
created_task = client.create_task(parent=queue_path, task=task)
print(f'Scheduled task: {created_task.name} for {schedule_time}')from google.cloud import tasks
client = tasks.CloudTasksClient()
queue_path = client.queue_path('my-project-id', 'us-central1', 'my-queue')
# Create an App Engine task
task = tasks.Task(
app_engine_http_request=tasks.AppEngineHttpRequest(
http_method=tasks.HttpMethod.POST,
relative_uri='/task-handler',
app_engine_routing=tasks.AppEngineRouting(
service='worker',
version='v1'
),
body=b'{"data": "example"}'
)
)
created_task = client.create_task(parent=queue_path, task=task)
print(f'Created App Engine task: {created_task.name}')from google.cloud import tasks
client = tasks.CloudTasksClient()
queue_path = client.queue_path('my-project-id', 'us-central1', 'my-queue')
# List tasks in queue
print("Tasks in queue:")
for task in client.list_tasks(parent=queue_path):
print(f'- {task.name}: {task.dispatch_count} dispatches')
# Get task details with full view
task_name = client.task_path('my-project-id', 'us-central1', 'my-queue', 'my-task')
task = client.get_task(
name=task_name,
response_view=tasks.Task.View.FULL
)
print(f'Task details: {task.create_time}, {task.schedule_time}')
# Force task execution
run_result = client.run_task(name=task_name)
print(f'Task run result: {run_result.dispatch_count}')
# Delete task
client.delete_task(name=task_name)
print(f'Deleted task: {task_name}')from google.cloud import tasks
client = tasks.CloudTasksClient()
queue_path = client.queue_path('my-project-id', 'us-central1', 'my-queue')
# Create task with OAuth authentication
task = tasks.Task(
http_request=tasks.HttpRequest(
url='https://myapp.com/authenticated-endpoint',
http_method=tasks.HttpMethod.POST,
oauth_token=tasks.OAuthToken(
service_account_email='worker@my-project.iam.gserviceaccount.com',
scope='https://www.googleapis.com/auth/cloud-platform'
),
body=b'{"secure": "data"}'
)
)
created_task = client.create_task(parent=queue_path, task=task)
print(f'Created authenticated task: {created_task.name}')
# Create task with OIDC authentication
oidc_task = tasks.Task(
http_request=tasks.HttpRequest(
url='https://myapp.com/oidc-endpoint',
http_method=tasks.HttpMethod.POST,
oidc_token=tasks.OidcToken(
service_account_email='worker@my-project.iam.gserviceaccount.com',
audience='https://myapp.com'
)
)
)
created_oidc_task = client.create_task(parent=queue_path, task=oidc_task)
print(f'Created OIDC task: {created_oidc_task.name}')Install with Tessl CLI
npx tessl i tessl/pypi-google-cloud-tasks