Versatile Data Kit SDK plugin exposing CLI commands for managing the lifecycle of a Data Jobs.
Concurrent execution prevention system that ensures only one instance of a data job runs at a time. This prevents data consistency issues, resource conflicts, and duplicate processing that can occur when multiple instances of the same job execute simultaneously.
from vdk.internal.builtin_plugins.run.job_context import JobContext
from vdk.internal.core.config import Configuration, ConfigurationBuilderClass that checks for running executions of a data job to prevent concurrent execution conflicts.
class ConcurrentExecutionChecker:
def __init__(self, rest_api_url: str) -> None:
"""
Initialize execution checker with Control Service API.
Parameters:
- rest_api_url: str - Base URL for Control Service REST API
"""
def is_job_execution_running(self, job_name, job_team, job_execution_attempt_id) -> bool:
"""
Check if another execution of the data job is currently running.
Parameters:
- job_name: str - Name of the data job to check
- job_team: str - Team owning the data job
- job_execution_attempt_id: str - Current job execution attempt ID
Returns:
bool: True if another execution with different ID is running, False otherwise
The method queries the Control Service for submitted and running executions.
Returns True only if there's a running execution with a different ID,
indicating that the current execution should be skipped.
"""Utility functions that handle job execution skipping logic.
def _skip_job_run(job_name) -> None:
"""
Skip job execution and exit the process.
Parameters:
- job_name: str - Name of the job being skipped
Logs skip message and calls os._exit(0) to terminate execution.
"""
def _skip_job_if_necessary(
log_config: str,
job_name: str,
execution_id: str,
job_team: str,
configuration: Configuration,
):
"""
Conditionally skip job execution based on concurrent execution check.
Parameters:
- log_config: str - Log configuration type ("CLOUD" enables checking)
- job_name: str - Name of the data job
- execution_id: str - Current execution ID
- job_team: str - Team owning the job
- configuration: Configuration - VDK configuration instance
Returns:
None: Continues execution normally
int: Returns 1 if execution was skipped (though os._exit(0) is called)
Only performs checking for cloud executions (log_config == "CLOUD").
Local executions skip the concurrent execution check.
"""VDK hook implementations that integrate execution control into the job lifecycle.
@hookimpl(tryfirst=True)
def vdk_configure(config_builder: ConfigurationBuilder):
"""
Add execution skip configuration option.
Parameters:
- config_builder: ConfigurationBuilder - Builder for configuration options
Adds EXECUTION_SKIP_CHECKER_ENABLED configuration with default value True.
"""
@hookimpl(tryfirst=True)
def run_job(context: JobContext) -> None:
"""
Pre-execution hook that checks for concurrent executions.
Parameters:
- context: JobContext - Job execution context
Returns:
None: Normal execution continues
Performs concurrent execution check before job runs.
If another execution is detected, terminates with os._exit(0).
"""EXECUTION_SKIP_CHECKER_ENABLED = "EXECUTION_SKIP_CHECKER_ENABLED"The concurrent execution detection follows this logic:
log_config == "CLOUD")-xxxxx suffix)def check_execution_flow():
# 1. Check if skip checker is enabled
if not config.get_value(EXECUTION_SKIP_CHECKER_ENABLED):
return # Continue normal execution
# 2. Check execution environment
if log_config != "CLOUD":
return # Skip check for local executions
# 3. Query for running executions
checker = ConcurrentExecutionChecker(api_url)
is_running = checker.is_job_execution_running(job_name, team, execution_id)
# 4. Skip if concurrent execution found
if is_running:
write_termination_message(execution_skipped=True)
os._exit(0) # Terminate immediately# Enable execution skip checking (default)
export VDK_EXECUTION_SKIP_CHECKER_ENABLED=true
# Disable execution skip checking
export VDK_EXECUTION_SKIP_CHECKER_ENABLED=falsefrom vdk.plugin.control_cli_plugin.execution_skip import ConcurrentExecutionChecker
# Initialize checker
checker = ConcurrentExecutionChecker("https://api.example.com")
# Check for concurrent execution
is_running = checker.is_job_execution_running(
job_name="my-data-job",
job_team="analytics-team",
job_execution_attempt_id="exec-12345-67890"
)
if is_running:
print("Another execution is running, skipping current execution")
else:
print("No concurrent execution detected, proceeding")The execution control integrates automatically through VDK hooks:
# Automatic integration - no user code required
def run(job_input: IJobInput):
# This function only runs if no concurrent execution is detected
# The execution skip check happens automatically before this point
print("Job execution starting - no concurrent execution detected")
# Normal job logic here
process_data()
generate_reports()
print("Job execution completed successfully")The execution control system handles various error scenarios:
try:
# Perform concurrent execution check
job_running = checker.is_job_execution_running(job_name, job_team, execution_id)
if job_running:
# Write termination message for monitoring
writer_plugin.write_termination_message(
configuration=configuration,
execution_skipped=True
)
_skip_job_run(job_name) # Exits with os._exit(0)
except Exception as exc:
# Log error but continue execution
log.warning(f"Error while checking for concurrent execution: {str(exc)}")
log.warning("Proceeding with execution despite check failure")
# Execution continues normallyWhen execution is skipped, a termination message is written for monitoring systems:
True to indicate skip reasonos._exit(0) for immediate, clean process terminationThe execution control system requires:
# Required for API access
CONTROL_SERVICE_REST_API_URL = "https://api.example.com"
# Authentication for API calls
API_TOKEN = "your-api-token"
# Optional: Enable/disable checking
EXECUTION_SKIP_CHECKER_ENABLED = True # Default: TruePrevents duplicate data processing in scenarios like:
Prevents resource conflicts for:
Ensures operational stability by:
Install with Tessl CLI
npx tessl i tessl/pypi-vdk-plugin-control-cli