or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

builds.mdexecutions.mdindex.mdjobs.mdrevisions.mdservices.mdtasks.mdworker-pools.md
tile.json

jobs.mddocs/

Job Management

Management of Cloud Run jobs for batch workloads, scheduled tasks, and one-time executions. Jobs run containers to completion and can be triggered manually, on a schedule, or via events.

Capabilities

Job Client

Create and configure job clients for managing batch workloads.

class JobsClient:
    """Synchronous client for managing Cloud Run jobs."""
    
    def __init__(self, *, credentials=None, transport=None, client_options=None, client_info=None):
        """
        Initialize the Jobs client.
        
        Args:
            credentials: Optional authentication credentials
            transport: Transport to use for requests (grpc, grpc_asyncio, rest)
            client_options: Client configuration options
            client_info: Client information for user agent
        """

class JobsAsyncClient:
    """Asynchronous client for managing Cloud Run jobs."""

Job Creation

Create batch jobs with container specifications, parallelism, and retry policies.

def create_job(
    self, 
    request: CreateJobRequest = None, 
    *, 
    parent: str = None, 
    job: Job = None, 
    job_id: str = None, 
    **kwargs
) -> Operation:
    """
    Create a new Cloud Run job.
    
    Args:
        request: The request object containing all parameters
        parent: Required. The location to create the job. Format: projects/{project}/locations/{location}
        job: Required. The job configuration
        job_id: Required. The unique identifier for the job
        
    Returns:
        Operation: Long-running operation that resolves to the created Job
    """

Usage Example:

from google.cloud import run_v2

client = run_v2.JobsClient()

# Define job configuration
job = run_v2.Job()
job.template.template.containers = [
    run_v2.Container(
        image="gcr.io/my-project/batch-processor:latest",
        resources=run_v2.ResourceRequirements(
            limits={"cpu": "2", "memory": "4Gi"}
        ),
        env=[
            run_v2.EnvVar(name="BATCH_SIZE", value="1000"),
            run_v2.EnvVar(name="OUTPUT_BUCKET", value="gs://my-output-bucket")
        ]
    )
]

# Configure parallelism and retries
job.template.parallelism = 5
job.template.task_count = 10
job.template.template.max_retries = 3
job.template.template.timeout = "3600s"  # 1 hour

# Create the job
request = run_v2.CreateJobRequest(
    parent="projects/my-project/locations/us-central1",
    job=job,
    job_id="batch-processing-job"
)

operation = client.create_job(request=request)
job_response = operation.result()
print(f"Job created: {job_response.name}")

Job Execution

Execute jobs manually and monitor execution status.

def run_job(
    self, 
    request: RunJobRequest = None, 
    *, 
    name: str = None, 
    **kwargs
) -> Operation:
    """
    Execute a Cloud Run job.
    
    Args:
        request: The request object
        name: Required. The name of the job to execute. Format: projects/{project}/locations/{location}/jobs/{job}
        
    Returns:
        Operation: Long-running operation that resolves to an Execution
    """

Usage Example:

# Execute a job
run_request = run_v2.RunJobRequest(
    name="projects/my-project/locations/us-central1/jobs/batch-processing-job"
)

operation = client.run_job(request=run_request)
execution = operation.result()
print(f"Job execution started: {execution.name}")

# Monitor execution progress
executions_client = run_v2.ExecutionsClient()
while True:
    current_execution = executions_client.get_execution(name=execution.name)
    print(f"Execution status: {current_execution.status.completion_status}")
    
    if current_execution.status.completion_status in [
        run_v2.Execution.CompletionStatus.EXECUTION_SUCCEEDED,
        run_v2.Execution.CompletionStatus.EXECUTION_FAILED,
        run_v2.Execution.CompletionStatus.EXECUTION_CANCELLED
    ]:
        break
    
    time.sleep(10)

Job Retrieval

Get job configuration and status information.

def get_job(
    self, 
    request: GetJobRequest = None, 
    *, 
    name: str = None, 
    **kwargs
) -> Job:
    """
    Get a Cloud Run job.
    
    Args:
        request: The request object
        name: Required. The name of the job. Format: projects/{project}/locations/{location}/jobs/{job}
        
    Returns:
        Job: The job configuration and status
    """

Job Listing

List jobs with filtering and pagination.

def list_jobs(
    self, 
    request: ListJobsRequest = None, 
    *, 
    parent: str = None, 
    **kwargs
) -> ListJobsResponse:
    """
    List Cloud Run jobs in a location.
    
    Args:
        request: The request object
        parent: Required. The location to list jobs. Format: projects/{project}/locations/{location}
        
    Returns:
        ListJobsResponse: Paginated list of jobs
    """

Job Updates

Update job configuration, container images, and execution parameters.

def update_job(
    self, 
    request: UpdateJobRequest = None, 
    *, 
    job: Job = None, 
    **kwargs
) -> Operation:
    """
    Update a Cloud Run job.
    
    Args:
        request: The request object
        job: Required. The job configuration to update
        
    Returns:
        Operation: Long-running operation that resolves to the updated Job
    """

Job Deletion

Delete jobs and clean up associated resources.

def delete_job(
    self, 
    request: DeleteJobRequest = None, 
    *, 
    name: str = None, 
    **kwargs
) -> Operation:
    """
    Delete a Cloud Run job.
    
    Args:
        request: The request object
        name: Required. The name of the job to delete
        
    Returns:
        Operation: Long-running operation for the deletion
    """

IAM Management

Manage Identity and Access Management policies for jobs.

def get_iam_policy(self, request, *, resource: str = None, **kwargs):
    """Get the IAM access control policy for a job."""

def set_iam_policy(self, request, *, resource: str = None, policy = None, **kwargs):
    """Set the IAM access control policy for a job."""

def test_iam_permissions(self, request, *, resource: str = None, permissions: list[str] = None, **kwargs):
    """Test IAM permissions for a job."""

Request and Response Types

Job Resource

class Job:
    """
    A Cloud Run job configuration.
    
    Attributes:
        name (str): The unique name of the job
        uid (str): Unique identifier assigned by the system
        generation (int): A sequence number representing a specific generation
        labels (dict): User-defined labels
        annotations (dict): User-defined annotations
        create_time (Timestamp): The creation time
        update_time (Timestamp): The last modification time
        delete_time (Timestamp): The deletion time
        expire_time (Timestamp): When the job expires
        creator (str): Email address of the user who created the job
        last_modifier (str): Email address of the last user to modify the job
        client (str): Arbitrary identifier for the client
        client_version (str): Version identifier for the client
        launch_stage (LaunchStage): The launch stage of the job
        binary_authorization (BinaryAuthorization): Binary authorization policy
        template (ExecutionTemplate): The template describing job executions
        observed_generation (int): The generation observed by the controller
        terminal_condition (Condition): The terminal condition of the job
        conditions (list[Condition]): Detailed status conditions
        execution_count (int): Number of executions created for this job
        latest_created_execution (ExecutionReference): Reference to the latest execution
        reconciling (bool): Whether the job is currently being reconciled  
        satisfies_pzs (bool): Whether the job satisfies PZS requirements
        start_execution_token (str): A unique string used as a suffix creating a new execution
        run_execution_token (str): A unique string used as a suffix for triggering a new execution
        etag (str): A fingerprint used for optimistic concurrency control
    """

Execution Template

class ExecutionTemplate:
    """
    Template for job executions.
    
    Attributes:
        labels (dict): User-defined labels for executions
        annotations (dict): User-defined annotations for executions
        parallelism (int): Specifies the maximum desired number of tasks
        task_count (int): Specifies the desired number of tasks
        template (TaskTemplate): Required. The template for individual tasks
    """

Request Types

class CreateJobRequest:
    """
    Request message for creating a job.
    
    Attributes:
        parent (str): Required. The location to create the job
        job (Job): Required. The job configuration
        job_id (str): Required. The unique identifier for the job
        validate_only (bool): Indicates whether to validate only
    """

class RunJobRequest:
    """
    Request message for running a job.
    
    Attributes:
        name (str): Required. The name of the job to run
        validate_only (bool): Indicates whether to validate only
        etag (str): A fingerprint for optimistic concurrency control
        overrides (RunJobRequest.Overrides): Optional overrides for the execution
    """

class GetJobRequest:
    """
    Request message for getting a job.
    
    Attributes:
        name (str): Required. The name of the job to retrieve
    """

class ListJobsRequest:
    """
    Request message for listing jobs.
    
    Attributes:
        parent (str): Required. The location to list jobs in
        page_size (int): Maximum number of jobs to return
        page_token (str): Token for retrieving the next page
        show_deleted (bool): Whether to include deleted jobs
    """

class UpdateJobRequest:
    """
    Request message for updating a job.
    
    Attributes:
        job (Job): Required. The job configuration to update
        validate_only (bool): Indicates whether to validate only
        allow_missing (bool): Whether to allow creation if job doesn't exist
    """

class DeleteJobRequest:
    """
    Request message for deleting a job.
    
    Attributes:
        name (str): Required. The name of the job to delete
        validate_only (bool): Indicates whether to validate only
        etag (str): A fingerprint for optimistic concurrency control
    """

Response Types

class ListJobsResponse:
    """
    Response message for listing jobs.
    
    Attributes:
        jobs (list[Job]): The list of jobs
        next_page_token (str): Token for retrieving the next page
    """

class ExecutionReference:
    """
    Reference to a job execution.
    
    Attributes:
        name (str): Name of the execution
        create_time (Timestamp): Creation time of the execution
        completion_time (Timestamp): Completion time of the execution
        delete_time (Timestamp): Deletion time of the execution
    """

Common Usage Patterns

Batch Processing

# Create a job for processing files in batches
job = run_v2.Job()
job.template.template.containers = [
    run_v2.Container(
        image="gcr.io/my-project/file-processor:latest",
        env=[
            run_v2.EnvVar(name="INPUT_BUCKET", value="gs://input-files"),
            run_v2.EnvVar(name="OUTPUT_BUCKET", value="gs://processed-files")
        ]
    )
]

# Configure for parallel processing
job.template.parallelism = 10  # Run up to 10 tasks in parallel
job.template.task_count = 100  # Process 100 files total

Data Pipeline Jobs

# Configure a data pipeline job with retries and timeout
job.template.template.max_retries = 3
job.template.template.timeout = "1800s"  # 30 minutes

# Add environment variables for data processing
container.env.extend([
    run_v2.EnvVar(name="DATABASE_URL", 
                  value_source=run_v2.EnvVarSource(
                      secret_key_ref=run_v2.SecretKeySelector(
                          secret="db-credentials",
                          version="latest",
                          key="url"
                      )
                  )),
    run_v2.EnvVar(name="BATCH_SIZE", value="1000")
])

Scheduled Jobs

from google.cloud import scheduler_v1

# Create the job first
job_operation = jobs_client.create_job(request=create_job_request)
job = job_operation.result()

# Then create a Cloud Scheduler job to trigger it
scheduler_client = scheduler_v1.CloudSchedulerClient()
scheduled_job = scheduler_v1.Job(
    name="projects/my-project/locations/us-central1/jobs/daily-processor",
    description="Daily batch processing job",
    schedule="0 2 * * *",  # Run daily at 2 AM
    time_zone="America/New_York",
    http_target=scheduler_v1.HttpTarget(
        uri=f"https://run.googleapis.com/v2/{job.name}:run",
        http_method=scheduler_v1.HttpMethod.POST,
        oauth_token=scheduler_v1.OAuthToken(
            service_account_email="job-runner@my-project.iam.gserviceaccount.com"
        )
    )
)