Management of Cloud Run jobs for batch workloads, scheduled tasks, and one-time executions. Jobs run containers to completion and can be triggered manually, on a schedule, or via events.
Create and configure job clients for managing batch workloads.
class JobsClient:
"""Synchronous client for managing Cloud Run jobs."""
def __init__(self, *, credentials=None, transport=None, client_options=None, client_info=None):
"""
Initialize the Jobs client.
Args:
credentials: Optional authentication credentials
transport: Transport to use for requests (grpc, grpc_asyncio, rest)
client_options: Client configuration options
client_info: Client information for user agent
"""
class JobsAsyncClient:
"""Asynchronous client for managing Cloud Run jobs."""Create batch jobs with container specifications, parallelism, and retry policies.
def create_job(
self,
request: CreateJobRequest = None,
*,
parent: str = None,
job: Job = None,
job_id: str = None,
**kwargs
) -> Operation:
"""
Create a new Cloud Run job.
Args:
request: The request object containing all parameters
parent: Required. The location to create the job. Format: projects/{project}/locations/{location}
job: Required. The job configuration
job_id: Required. The unique identifier for the job
Returns:
Operation: Long-running operation that resolves to the created Job
"""Usage Example:
from google.cloud import run_v2
client = run_v2.JobsClient()
# Define job configuration
job = run_v2.Job()
job.template.template.containers = [
run_v2.Container(
image="gcr.io/my-project/batch-processor:latest",
resources=run_v2.ResourceRequirements(
limits={"cpu": "2", "memory": "4Gi"}
),
env=[
run_v2.EnvVar(name="BATCH_SIZE", value="1000"),
run_v2.EnvVar(name="OUTPUT_BUCKET", value="gs://my-output-bucket")
]
)
]
# Configure parallelism and retries
job.template.parallelism = 5
job.template.task_count = 10
job.template.template.max_retries = 3
job.template.template.timeout = "3600s" # 1 hour
# Create the job
request = run_v2.CreateJobRequest(
parent="projects/my-project/locations/us-central1",
job=job,
job_id="batch-processing-job"
)
operation = client.create_job(request=request)
job_response = operation.result()
print(f"Job created: {job_response.name}")Execute jobs manually and monitor execution status.
def run_job(
self,
request: RunJobRequest = None,
*,
name: str = None,
**kwargs
) -> Operation:
"""
Execute a Cloud Run job.
Args:
request: The request object
name: Required. The name of the job to execute. Format: projects/{project}/locations/{location}/jobs/{job}
Returns:
Operation: Long-running operation that resolves to an Execution
"""Usage Example:
# Execute a job
run_request = run_v2.RunJobRequest(
name="projects/my-project/locations/us-central1/jobs/batch-processing-job"
)
operation = client.run_job(request=run_request)
execution = operation.result()
print(f"Job execution started: {execution.name}")
# Monitor execution progress
executions_client = run_v2.ExecutionsClient()
while True:
current_execution = executions_client.get_execution(name=execution.name)
print(f"Execution status: {current_execution.status.completion_status}")
if current_execution.status.completion_status in [
run_v2.Execution.CompletionStatus.EXECUTION_SUCCEEDED,
run_v2.Execution.CompletionStatus.EXECUTION_FAILED,
run_v2.Execution.CompletionStatus.EXECUTION_CANCELLED
]:
break
time.sleep(10)Get job configuration and status information.
def get_job(
self,
request: GetJobRequest = None,
*,
name: str = None,
**kwargs
) -> Job:
"""
Get a Cloud Run job.
Args:
request: The request object
name: Required. The name of the job. Format: projects/{project}/locations/{location}/jobs/{job}
Returns:
Job: The job configuration and status
"""List jobs with filtering and pagination.
def list_jobs(
self,
request: ListJobsRequest = None,
*,
parent: str = None,
**kwargs
) -> ListJobsResponse:
"""
List Cloud Run jobs in a location.
Args:
request: The request object
parent: Required. The location to list jobs. Format: projects/{project}/locations/{location}
Returns:
ListJobsResponse: Paginated list of jobs
"""Update job configuration, container images, and execution parameters.
def update_job(
self,
request: UpdateJobRequest = None,
*,
job: Job = None,
**kwargs
) -> Operation:
"""
Update a Cloud Run job.
Args:
request: The request object
job: Required. The job configuration to update
Returns:
Operation: Long-running operation that resolves to the updated Job
"""Delete jobs and clean up associated resources.
def delete_job(
self,
request: DeleteJobRequest = None,
*,
name: str = None,
**kwargs
) -> Operation:
"""
Delete a Cloud Run job.
Args:
request: The request object
name: Required. The name of the job to delete
Returns:
Operation: Long-running operation for the deletion
"""Manage Identity and Access Management policies for jobs.
def get_iam_policy(self, request, *, resource: str = None, **kwargs):
"""Get the IAM access control policy for a job."""
def set_iam_policy(self, request, *, resource: str = None, policy = None, **kwargs):
"""Set the IAM access control policy for a job."""
def test_iam_permissions(self, request, *, resource: str = None, permissions: list[str] = None, **kwargs):
"""Test IAM permissions for a job."""class Job:
"""
A Cloud Run job configuration.
Attributes:
name (str): The unique name of the job
uid (str): Unique identifier assigned by the system
generation (int): A sequence number representing a specific generation
labels (dict): User-defined labels
annotations (dict): User-defined annotations
create_time (Timestamp): The creation time
update_time (Timestamp): The last modification time
delete_time (Timestamp): The deletion time
expire_time (Timestamp): When the job expires
creator (str): Email address of the user who created the job
last_modifier (str): Email address of the last user to modify the job
client (str): Arbitrary identifier for the client
client_version (str): Version identifier for the client
launch_stage (LaunchStage): The launch stage of the job
binary_authorization (BinaryAuthorization): Binary authorization policy
template (ExecutionTemplate): The template describing job executions
observed_generation (int): The generation observed by the controller
terminal_condition (Condition): The terminal condition of the job
conditions (list[Condition]): Detailed status conditions
execution_count (int): Number of executions created for this job
latest_created_execution (ExecutionReference): Reference to the latest execution
reconciling (bool): Whether the job is currently being reconciled
satisfies_pzs (bool): Whether the job satisfies PZS requirements
start_execution_token (str): A unique string used as a suffix creating a new execution
run_execution_token (str): A unique string used as a suffix for triggering a new execution
etag (str): A fingerprint used for optimistic concurrency control
"""class ExecutionTemplate:
"""
Template for job executions.
Attributes:
labels (dict): User-defined labels for executions
annotations (dict): User-defined annotations for executions
parallelism (int): Specifies the maximum desired number of tasks
task_count (int): Specifies the desired number of tasks
template (TaskTemplate): Required. The template for individual tasks
"""class CreateJobRequest:
"""
Request message for creating a job.
Attributes:
parent (str): Required. The location to create the job
job (Job): Required. The job configuration
job_id (str): Required. The unique identifier for the job
validate_only (bool): Indicates whether to validate only
"""
class RunJobRequest:
"""
Request message for running a job.
Attributes:
name (str): Required. The name of the job to run
validate_only (bool): Indicates whether to validate only
etag (str): A fingerprint for optimistic concurrency control
overrides (RunJobRequest.Overrides): Optional overrides for the execution
"""
class GetJobRequest:
"""
Request message for getting a job.
Attributes:
name (str): Required. The name of the job to retrieve
"""
class ListJobsRequest:
"""
Request message for listing jobs.
Attributes:
parent (str): Required. The location to list jobs in
page_size (int): Maximum number of jobs to return
page_token (str): Token for retrieving the next page
show_deleted (bool): Whether to include deleted jobs
"""
class UpdateJobRequest:
"""
Request message for updating a job.
Attributes:
job (Job): Required. The job configuration to update
validate_only (bool): Indicates whether to validate only
allow_missing (bool): Whether to allow creation if job doesn't exist
"""
class DeleteJobRequest:
"""
Request message for deleting a job.
Attributes:
name (str): Required. The name of the job to delete
validate_only (bool): Indicates whether to validate only
etag (str): A fingerprint for optimistic concurrency control
"""class ListJobsResponse:
"""
Response message for listing jobs.
Attributes:
jobs (list[Job]): The list of jobs
next_page_token (str): Token for retrieving the next page
"""
class ExecutionReference:
"""
Reference to a job execution.
Attributes:
name (str): Name of the execution
create_time (Timestamp): Creation time of the execution
completion_time (Timestamp): Completion time of the execution
delete_time (Timestamp): Deletion time of the execution
"""# Create a job for processing files in batches
job = run_v2.Job()
job.template.template.containers = [
run_v2.Container(
image="gcr.io/my-project/file-processor:latest",
env=[
run_v2.EnvVar(name="INPUT_BUCKET", value="gs://input-files"),
run_v2.EnvVar(name="OUTPUT_BUCKET", value="gs://processed-files")
]
)
]
# Configure for parallel processing
job.template.parallelism = 10 # Run up to 10 tasks in parallel
job.template.task_count = 100 # Process 100 files total# Configure a data pipeline job with retries and timeout
job.template.template.max_retries = 3
job.template.template.timeout = "1800s" # 30 minutes
# Add environment variables for data processing
container.env.extend([
run_v2.EnvVar(name="DATABASE_URL",
value_source=run_v2.EnvVarSource(
secret_key_ref=run_v2.SecretKeySelector(
secret="db-credentials",
version="latest",
key="url"
)
)),
run_v2.EnvVar(name="BATCH_SIZE", value="1000")
])from google.cloud import scheduler_v1
# Create the job first
job_operation = jobs_client.create_job(request=create_job_request)
job = job_operation.result()
# Then create a Cloud Scheduler job to trigger it
scheduler_client = scheduler_v1.CloudSchedulerClient()
scheduled_job = scheduler_v1.Job(
name="projects/my-project/locations/us-central1/jobs/daily-processor",
description="Daily batch processing job",
schedule="0 2 * * *", # Run daily at 2 AM
time_zone="America/New_York",
http_target=scheduler_v1.HttpTarget(
uri=f"https://run.googleapis.com/v2/{job.name}:run",
http_method=scheduler_v1.HttpMethod.POST,
oauth_token=scheduler_v1.OAuthToken(
service_account_email="job-runner@my-project.iam.gserviceaccount.com"
)
)
)