Management of individual tasks within executions. Tasks are the smallest unit of work in Cloud Run jobs and represent individual container instances processing part of the workload.
Create and configure task clients for monitoring individual work units.
class TasksClient:
"""Synchronous client for managing Cloud Run tasks."""
def __init__(self, *, credentials=None, transport=None, client_options=None, client_info=None):
"""
Initialize the Tasks client.
Args:
credentials: Optional authentication credentials
transport: Transport to use for requests (grpc, grpc_asyncio, rest)
client_options: Client configuration options
client_info: Client information for user agent
"""
class TasksAsyncClient:
"""Asynchronous client for managing Cloud Run tasks."""Get task details, status, and execution information.
def get_task(
self,
request: GetTaskRequest = None,
*,
name: str = None,
**kwargs
) -> Task:
"""
Get a Cloud Run task.
Args:
request: The request object
name: Required. The name of the task. Format: projects/{project}/locations/{location}/jobs/{job}/executions/{execution}/tasks/{task}
Returns:
Task: The task details and status
"""Usage Example:
from google.cloud import run_v2
client = run_v2.TasksClient()
# Get task details
task = client.get_task(
name="projects/my-project/locations/us-central1/jobs/my-job/executions/exec-123/tasks/task-456"
)
print(f"Task: {task.name}")
print(f"Status: {task.status.completion_status}")
print(f"Exit code: {task.status.exit_code}")
print(f"Started: {task.status.start_time}")
if task.status.completion_time:
print(f"Completed: {task.status.completion_time}")
print(f"Log URI: {task.log_uri}")List tasks for an execution with filtering and pagination.
def list_tasks(
self,
request: ListTasksRequest = None,
*,
parent: str = None,
**kwargs
) -> ListTasksResponse:
"""
List Cloud Run tasks for an execution.
Args:
request: The request object
parent: Required. The execution to list tasks for. Format: projects/{project}/locations/{location}/jobs/{job}/executions/{execution}
Returns:
ListTasksResponse: Paginated list of tasks
"""Usage Example:
# List all tasks for an execution
request = run_v2.ListTasksRequest(
parent="projects/my-project/locations/us-central1/jobs/my-job/executions/exec-123"
)
page_result = client.list_tasks(request=request)
failed_tasks = []
succeeded_tasks = []
for task in page_result:
print(f"Task: {task.name}")
print(f" Status: {task.status.completion_status}")
print(f" Exit code: {task.status.exit_code}")
print(f" Retries: {task.status.retry_count}")
if task.status.completion_status == run_v2.Task.CompletionStatus.TASK_FAILED:
failed_tasks.append(task)
elif task.status.completion_status == run_v2.Task.CompletionStatus.TASK_SUCCEEDED:
succeeded_tasks.append(task)
print(f"Summary: {len(succeeded_tasks)} succeeded, {len(failed_tasks)} failed")class Task:
"""
A Cloud Run task instance.
Attributes:
name (str): The unique name of the task
uid (str): Unique identifier assigned by the system
generation (int): A sequence number representing a specific generation
labels (dict): User-defined labels
annotations (dict): User-defined annotations
create_time (Timestamp): The creation time
scheduled_time (Timestamp): When the task was scheduled to run
start_time (Timestamp): When the task started running
completion_time (Timestamp): When the task completed
update_time (Timestamp): The last modification time
delete_time (Timestamp): The deletion time
expire_time (Timestamp): When the task expires
job (str): The name of the job that owns this task
execution (str): The name of the execution that owns this task
containers (list[Container]): The containers that define the task
volumes (list[Volume]): Volumes to make available to containers
max_retries (int): Maximum number of retries for this task
timeout (str): Maximum duration the task may be active before it's killed
service_account (str): Email address of the IAM service account
execution_environment (ExecutionEnvironment): The execution environment
reconciling (bool): Whether the task is currently being reconciled
conditions (list[Condition]): Detailed status conditions
observed_generation (int): The generation observed by the controller
index (int): Index of the task, unique per execution, and beginning at 0
retried (int): Number of times this task has been retried
last_attempt_result (TaskAttemptResult): Result of the last attempt
encryption_key (str): A reference to a customer managed encryption key
vpc_access (VpcAccess): VPC access configuration
log_uri (str): URI where logs for this task can be found
satisfies_pzs (bool): Whether the task satisfies PZS requirements
etag (str): A fingerprint used for optimistic concurrency control
"""class TaskAttemptResult:
"""
Result of a task attempt.
Attributes:
status (TaskAttemptStatus): Overall status of the task attempt
exit_code (int): The exit code of the task attempt
"""
class TaskAttemptStatus:
"""
Status of a task attempt.
Values:
TASK_ATTEMPT_STATUS_UNSPECIFIED: The default value. This value is used if the status is omitted.
TASK_ATTEMPT_SUCCEEDED: Task attempt succeeded.
TASK_ATTEMPT_FAILED: Task attempt failed.
"""
TASK_ATTEMPT_STATUS_UNSPECIFIED = 0
TASK_ATTEMPT_SUCCEEDED = 1
TASK_ATTEMPT_FAILED = 2
class CompletionStatus:
"""
Completion status of a task.
Values:
COMPLETION_STATUS_UNSPECIFIED: The default value. This value is used if the status is omitted.
TASK_SUCCEEDED: Task has succeeded.
TASK_FAILED: Task has failed.
TASK_RUNNING: Task is running normally.
TASK_PENDING: Task is pending, waiting for resources.
TASK_CANCELLED: Task has been cancelled.
"""
COMPLETION_STATUS_UNSPECIFIED = 0
TASK_SUCCEEDED = 1
TASK_FAILED = 2
TASK_RUNNING = 3
TASK_PENDING = 4
TASK_CANCELLED = 5class GetTaskRequest:
"""
Request message for getting a task.
Attributes:
name (str): Required. The name of the task to retrieve
"""
class ListTasksRequest:
"""
Request message for listing tasks.
Attributes:
parent (str): Required. The execution to list tasks for
page_size (int): Maximum number of tasks to return
page_token (str): Token for retrieving the next page
show_deleted (bool): Whether to include deleted tasks
"""class ListTasksResponse:
"""
Response message for listing tasks.
Attributes:
tasks (list[Task]): The list of tasks
next_page_token (str): Token for retrieving the next page
"""class TaskTemplate:
"""
Template for creating tasks within an execution.
Attributes:
containers (list[Container]): The containers that will run in the task
volumes (list[Volume]): Volumes to make available to containers
max_retries (int): Maximum number of retries for failed tasks
timeout (str): Maximum duration a task may be active before being killed
service_account (str): Email address of the IAM service account
execution_environment (ExecutionEnvironment): The execution environment
encryption_key (str): Reference to a customer managed encryption key
vpc_access (VpcAccess): VPC access configuration
"""def analyze_task_failures(execution_name):
"""Analyze failed tasks in an execution."""
client = run_v2.TasksClient()
request = run_v2.ListTasksRequest(parent=execution_name)
failed_tasks = []
exit_codes = {}
for task in client.list_tasks(request=request):
if task.status.completion_status == run_v2.Task.CompletionStatus.TASK_FAILED:
failed_tasks.append(task)
exit_code = task.status.exit_code
exit_codes[exit_code] = exit_codes.get(exit_code, 0) + 1
print(f"Failed tasks: {len(failed_tasks)}")
print("Exit code distribution:")
for code, count in sorted(exit_codes.items()):
print(f" Exit code {code}: {count} tasks")
return failed_tasks
# Usage
failed_tasks = analyze_task_failures(
"projects/my-project/locations/us-central1/jobs/my-job/executions/exec-123"
)def get_task_logs(task_name):
"""Get logs for a specific task."""
import subprocess
client = run_v2.TasksClient()
task = client.get_task(name=task_name)
if task.log_uri:
print(f"Task logs available at: {task.log_uri}")
# Use gcloud to fetch logs (requires gcloud CLI)
try:
# Extract log filter from URI if it's a Cloud Logging URI
if "logging.cloud.google.com" in task.log_uri:
print("Fetching logs using Cloud Logging...")
# Implementation would depend on specific log URI format
else:
print(f"Log URI: {task.log_uri}")
except Exception as e:
print(f"Error fetching logs: {e}")
else:
print("No log URI available for task")def analyze_task_retries(execution_name):
"""Analyze retry patterns in tasks."""
client = run_v2.TasksClient()
request = run_v2.ListTasksRequest(parent=execution_name)
retry_stats = {
0: 0, # No retries
1: 0, # 1 retry
2: 0, # 2 retries
"3+": 0 # 3 or more retries
}
for task in client.list_tasks(request=request):
retries = task.status.retry_count
if retries <= 2:
retry_stats[retries] += 1
else:
retry_stats["3+"] += 1
print("Task retry distribution:")
for retries, count in retry_stats.items():
print(f" {retries} retries: {count} tasks")
return retry_statsdef monitor_task_performance(execution_name):
"""Monitor task execution times and resource usage."""
client = run_v2.TasksClient()
request = run_v2.ListTasksRequest(parent=execution_name)
execution_times = []
for task in client.list_tasks(request=request):
if task.status.start_time and task.status.completion_time:
start = task.status.start_time.timestamp()
end = task.status.completion_time.timestamp()
duration = end - start
execution_times.append(duration)
print(f"Task {task.index}: {duration:.2f} seconds")
if execution_times:
avg_time = sum(execution_times) / len(execution_times)
min_time = min(execution_times)
max_time = max(execution_times)
print(f"\nPerformance Summary:")
print(f" Average execution time: {avg_time:.2f} seconds")
print(f" Fastest task: {min_time:.2f} seconds")
print(f" Slowest task: {max_time:.2f} seconds")
print(f" Total tasks: {len(execution_times)}")
return execution_times