CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-google-cloud-tasks

Google Cloud Tasks API client library for managing distributed task queues.

Pending
Overview
Eval results
Files

task-management.mddocs/

Task Management

Task lifecycle operations for creating, monitoring, and executing distributed work units. Tasks can target HTTP endpoints or App Engine applications with configurable scheduling, authentication, and retry behavior.

Capabilities

Task Listing

Retrieve tasks within a queue with optional filtering, view control, and pagination support.

def list_tasks(
    self,
    request: Union[ListTasksRequest, dict] = None,
    *,
    parent: str = None,
    retry: OptionalRetry = DEFAULT,
    timeout: float = DEFAULT,
    metadata: Sequence[Tuple[str, str]] = ()
) -> pagers.ListTasksPager:
    """List the tasks in a queue.
    
    Args:
        request: The request object or dictionary.
        parent: Required. The queue name (projects/PROJECT_ID/locations/LOCATION_ID/queues/QUEUE_ID).
        retry: Designation of what errors should be retried.
        timeout: The timeout for this request.
        metadata: Strings which should be sent along with the request as metadata.
        
    Returns:
        An iterable of Task resources.
    """

Task Retrieval

Get detailed information about a specific task including execution status and attempt history.

def get_task(
    self,
    request: Union[GetTaskRequest, dict] = None,
    *,
    name: str = None,
    retry: OptionalRetry = DEFAULT,
    timeout: float = DEFAULT,
    metadata: Sequence[Tuple[str, str]] = ()
) -> Task:
    """Get a task.
    
    Args:
        request: The request object or dictionary.
        name: Required. The task name.
        retry: Designation of what errors should be retried.
        timeout: The timeout for this request.
        metadata: Strings which should be sent along with the request as metadata.
        
    Returns:
        The task object.
    """

Task Creation

Create new tasks with HTTP or App Engine targets, optional scheduling, and authentication configuration.

def create_task(
    self,
    request: Union[CreateTaskRequest, dict] = None,
    *,
    parent: str = None,
    task: Task = None,
    retry: OptionalRetry = DEFAULT,
    timeout: float = DEFAULT,
    metadata: Sequence[Tuple[str, str]] = ()
) -> Task:
    """Create a task and add it to a queue.
    
    Args:
        request: The request object or dictionary.
        parent: Required. The queue name where the task will be created.
        task: Required. The task to add.
        retry: Designation of what errors should be retried.
        timeout: The timeout for this request.
        metadata: Strings which should be sent along with the request as metadata.
        
    Returns:
        The created task.
    """

Task Deletion

Remove tasks from queues, canceling their execution.

def delete_task(
    self,
    request: Union[DeleteTaskRequest, dict] = None,
    *,
    name: str = None,
    retry: OptionalRetry = DEFAULT,
    timeout: float = DEFAULT,
    metadata: Sequence[Tuple[str, str]] = ()
) -> None:
    """Delete a task.
    
    Args:
        request: The request object or dictionary.
        name: Required. The task name to delete.
        retry: Designation of what errors should be retried.
        timeout: The timeout for this request.
        metadata: Strings which should be sent along with the request as metadata.
    """

Task Execution

Force immediate execution of queued tasks for testing or manual processing.

def run_task(
    self,
    request: Union[RunTaskRequest, dict] = None,
    *,
    name: str = None,
    retry: OptionalRetry = DEFAULT,
    timeout: float = DEFAULT,
    metadata: Sequence[Tuple[str, str]] = ()
) -> Task:
    """Force a task to run immediately.
    
    Args:
        request: The request object or dictionary.
        name: Required. The task name to run.
        retry: Designation of what errors should be retried.
        timeout: The timeout for this request.
        metadata: Strings which should be sent along with the request as metadata.
        
    Returns:
        The task after execution attempt.
    """

Request Types

ListTasksRequest

class ListTasksRequest:
    parent: str  # Required. The queue name
    response_view: Task.View  # Specifies which subset of Task to return
    page_size: int  # Maximum page size (max 1000)
    page_token: str  # Token for pagination

GetTaskRequest

class GetTaskRequest:
    name: str  # Required. The task name
    response_view: Task.View  # Specifies which subset of Task to return

CreateTaskRequest

class CreateTaskRequest:
    parent: str  # Required. The queue name where task will be created
    task: Task  # Required. The task to add
    response_view: Task.View  # Specifies which subset of Task to return

DeleteTaskRequest

class DeleteTaskRequest:
    name: str  # Required. The task name to delete

RunTaskRequest

class RunTaskRequest:
    name: str  # Required. The task name to run
    response_view: Task.View  # Specifies which subset of Task to return

ListTasksResponse

class ListTasksResponse:
    tasks: MutableSequence[Task]  # List of tasks
    next_page_token: str  # Token for next page

Task Structure

Task

class Task:
    name: str  # Optionally caller-specified task name
    app_engine_http_request: AppEngineHttpRequest  # App Engine HTTP request (oneof)
    http_request: HttpRequest  # HTTP request to worker (oneof)
    schedule_time: timestamp_pb2.Timestamp  # When task is scheduled to be attempted
    create_time: timestamp_pb2.Timestamp  # Output only creation time
    dispatch_deadline: duration_pb2.Duration  # Deadline for requests sent to worker
    dispatch_count: int  # Output only number of attempts dispatched
    response_count: int  # Output only number of attempts that received response
    first_attempt: Attempt  # Output only status of first attempt
    last_attempt: Attempt  # Output only status of last attempt
    view: Task.View  # Output only view specifying returned subset
    
    class View:
        VIEW_UNSPECIFIED = 0  # Unspecified, defaults to BASIC
        BASIC = 1  # Basic view omits large/sensitive fields
        FULL = 2  # All information returned (requires IAM permission)

Attempt

class Attempt:
    schedule_time: timestamp_pb2.Timestamp  # Output only time attempt was scheduled
    dispatch_time: timestamp_pb2.Timestamp  # Output only time attempt was dispatched
    response_time: timestamp_pb2.Timestamp  # Output only time response was received
    response_status: status_pb2.Status  # Output only response from worker

Usage Examples

Creating HTTP Tasks

from google.cloud import tasks
import json

client = tasks.CloudTasksClient()

# Define paths
project = 'my-project-id'
location = 'us-central1'
queue_name = 'my-queue'

queue_path = client.queue_path(project, location, queue_name)

# Create an HTTP task
task_data = {'user_id': 123, 'action': 'process_order'}
task = tasks.Task(
    http_request=tasks.HttpRequest(
        url='https://myapp.com/process',
        http_method=tasks.HttpMethod.POST,
        headers={'Content-Type': 'application/json'},
        body=json.dumps(task_data).encode('utf-8')
    )
)

# Add the task to the queue
created_task = client.create_task(parent=queue_path, task=task)
print(f'Created task: {created_task.name}')

Creating Scheduled Tasks

from google.cloud import tasks
from google.protobuf import timestamp_pb2
import datetime

client = tasks.CloudTasksClient()
queue_path = client.queue_path('my-project-id', 'us-central1', 'my-queue')

# Schedule task for 10 minutes from now
schedule_time = datetime.datetime.utcnow() + datetime.timedelta(minutes=10)
timestamp = timestamp_pb2.Timestamp()
timestamp.FromDatetime(schedule_time)

task = tasks.Task(
    http_request=tasks.HttpRequest(
        url='https://myapp.com/scheduled-job',
        http_method=tasks.HttpMethod.POST
    ),
    schedule_time=timestamp
)

created_task = client.create_task(parent=queue_path, task=task)
print(f'Scheduled task: {created_task.name} for {schedule_time}')

Creating App Engine Tasks

from google.cloud import tasks

client = tasks.CloudTasksClient()
queue_path = client.queue_path('my-project-id', 'us-central1', 'my-queue')

# Create an App Engine task
task = tasks.Task(
    app_engine_http_request=tasks.AppEngineHttpRequest(
        http_method=tasks.HttpMethod.POST,
        relative_uri='/task-handler',
        app_engine_routing=tasks.AppEngineRouting(
            service='worker',
            version='v1'
        ),
        body=b'{"data": "example"}'
    )
)

created_task = client.create_task(parent=queue_path, task=task)
print(f'Created App Engine task: {created_task.name}')

Managing Task Lifecycle

from google.cloud import tasks

client = tasks.CloudTasksClient()
queue_path = client.queue_path('my-project-id', 'us-central1', 'my-queue')

# List tasks in queue
print("Tasks in queue:")
for task in client.list_tasks(parent=queue_path):
    print(f'- {task.name}: {task.dispatch_count} dispatches')

# Get task details with full view
task_name = client.task_path('my-project-id', 'us-central1', 'my-queue', 'my-task')
task = client.get_task(
    name=task_name, 
    response_view=tasks.Task.View.FULL
)
print(f'Task details: {task.create_time}, {task.schedule_time}')

# Force task execution
run_result = client.run_task(name=task_name)
print(f'Task run result: {run_result.dispatch_count}')

# Delete task
client.delete_task(name=task_name)
print(f'Deleted task: {task_name}')

Authenticated HTTP Tasks

from google.cloud import tasks

client = tasks.CloudTasksClient()
queue_path = client.queue_path('my-project-id', 'us-central1', 'my-queue')

# Create task with OAuth authentication
task = tasks.Task(
    http_request=tasks.HttpRequest(
        url='https://myapp.com/authenticated-endpoint',
        http_method=tasks.HttpMethod.POST,
        oauth_token=tasks.OAuthToken(
            service_account_email='worker@my-project.iam.gserviceaccount.com',
            scope='https://www.googleapis.com/auth/cloud-platform'
        ),
        body=b'{"secure": "data"}'
    )
)

created_task = client.create_task(parent=queue_path, task=task)
print(f'Created authenticated task: {created_task.name}')

# Create task with OIDC authentication
oidc_task = tasks.Task(
    http_request=tasks.HttpRequest(
        url='https://myapp.com/oidc-endpoint',
        http_method=tasks.HttpMethod.POST,
        oidc_token=tasks.OidcToken(
            service_account_email='worker@my-project.iam.gserviceaccount.com',
            audience='https://myapp.com'
        )
    )
)

created_oidc_task = client.create_task(parent=queue_path, task=oidc_task)
print(f'Created OIDC task: {created_oidc_task.name}')

Install with Tessl CLI

npx tessl i tessl/pypi-google-cloud-tasks

docs

client-operations.md

iam-security.md

index.md

queue-configuration.md

queue-management.md

task-management.md

task-targeting.md

tile.json