CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-vdk-plugin-control-cli

Versatile Data Kit SDK plugin exposing CLI commands for managing the lifecycle of a Data Jobs.

Overview
Eval results
Files

execution-control.mddocs/

Execution Control

Concurrent execution prevention system that ensures only one instance of a data job runs at a time. This prevents data consistency issues, resource conflicts, and duplicate processing that can occur when multiple instances of the same job execute simultaneously.

Types

from vdk.internal.builtin_plugins.run.job_context import JobContext
from vdk.internal.core.config import Configuration, ConfigurationBuilder

Capabilities

Concurrent Execution Checker

Class that checks for running executions of a data job to prevent concurrent execution conflicts.

class ConcurrentExecutionChecker:
    def __init__(self, rest_api_url: str) -> None:
        """
        Initialize execution checker with Control Service API.
        
        Parameters:
        - rest_api_url: str - Base URL for Control Service REST API
        """
        
    def is_job_execution_running(self, job_name, job_team, job_execution_attempt_id) -> bool:
        """
        Check if another execution of the data job is currently running.
        
        Parameters:
        - job_name: str - Name of the data job to check
        - job_team: str - Team owning the data job
        - job_execution_attempt_id: str - Current job execution attempt ID
        
        Returns:
        bool: True if another execution with different ID is running, False otherwise
        
        The method queries the Control Service for submitted and running executions.
        Returns True only if there's a running execution with a different ID,
        indicating that the current execution should be skipped.
        """

Execution Skip Functions

Utility functions that handle job execution skipping logic.

def _skip_job_run(job_name) -> None:
    """
    Skip job execution and exit the process.
    
    Parameters:
    - job_name: str - Name of the job being skipped
    
    Logs skip message and calls os._exit(0) to terminate execution.
    """
    
def _skip_job_if_necessary(
    log_config: str,
    job_name: str,
    execution_id: str,
    job_team: str,
    configuration: Configuration,
):
    """
    Conditionally skip job execution based on concurrent execution check.
    
    Parameters:
    - log_config: str - Log configuration type ("CLOUD" enables checking)
    - job_name: str - Name of the data job
    - execution_id: str - Current execution ID
    - job_team: str - Team owning the job
    - configuration: Configuration - VDK configuration instance
    
    Returns:
    None: Continues execution normally
    int: Returns 1 if execution was skipped (though os._exit(0) is called)
    
    Only performs checking for cloud executions (log_config == "CLOUD").
    Local executions skip the concurrent execution check.
    """

Hook Implementations

VDK hook implementations that integrate execution control into the job lifecycle.

@hookimpl(tryfirst=True)
def vdk_configure(config_builder: ConfigurationBuilder):
    """
    Add execution skip configuration option.
    
    Parameters:
    - config_builder: ConfigurationBuilder - Builder for configuration options
    
    Adds EXECUTION_SKIP_CHECKER_ENABLED configuration with default value True.
    """
    
@hookimpl(tryfirst=True)
def run_job(context: JobContext) -> None:
    """
    Pre-execution hook that checks for concurrent executions.
    
    Parameters:
    - context: JobContext - Job execution context
    
    Returns:
    None: Normal execution continues
    
    Performs concurrent execution check before job runs.
    If another execution is detected, terminates with os._exit(0).
    """

Configuration

EXECUTION_SKIP_CHECKER_ENABLED = "EXECUTION_SKIP_CHECKER_ENABLED"

Execution Logic

Detection Algorithm

The concurrent execution detection follows this logic:

  1. Cloud Check: Only performs checking for cloud executions (log_config == "CLOUD")
  2. API Query: Queries Control Service for executions with status "submitted" or "running"
  3. ID Comparison: Compares execution IDs, allowing for ID variations (VDK IDs may have -xxxxx suffix)
  4. Decision: Skips execution if another execution with different base ID is found

Execution Flow

def check_execution_flow():
    # 1. Check if skip checker is enabled
    if not config.get_value(EXECUTION_SKIP_CHECKER_ENABLED):
        return  # Continue normal execution
    
    # 2. Check execution environment
    if log_config != "CLOUD":
        return  # Skip check for local executions
    
    # 3. Query for running executions
    checker = ConcurrentExecutionChecker(api_url)
    is_running = checker.is_job_execution_running(job_name, team, execution_id)
    
    # 4. Skip if concurrent execution found
    if is_running:
        write_termination_message(execution_skipped=True)
        os._exit(0)  # Terminate immediately

Usage Examples

Configuration Usage

# Enable execution skip checking (default)
export VDK_EXECUTION_SKIP_CHECKER_ENABLED=true

# Disable execution skip checking
export VDK_EXECUTION_SKIP_CHECKER_ENABLED=false

Programmatic Usage

from vdk.plugin.control_cli_plugin.execution_skip import ConcurrentExecutionChecker

# Initialize checker
checker = ConcurrentExecutionChecker("https://api.example.com")

# Check for concurrent execution
is_running = checker.is_job_execution_running(
    job_name="my-data-job",
    job_team="analytics-team", 
    job_execution_attempt_id="exec-12345-67890"
)

if is_running:
    print("Another execution is running, skipping current execution")
else:
    print("No concurrent execution detected, proceeding")

Integration with Job Execution

The execution control integrates automatically through VDK hooks:

# Automatic integration - no user code required
def run(job_input: IJobInput):
    # This function only runs if no concurrent execution is detected
    # The execution skip check happens automatically before this point
    
    print("Job execution starting - no concurrent execution detected")
    
    # Normal job logic here
    process_data()
    generate_reports()
    
    print("Job execution completed successfully")

Error Handling

Exception Management

The execution control system handles various error scenarios:

try:
    # Perform concurrent execution check
    job_running = checker.is_job_execution_running(job_name, job_team, execution_id)
    
    if job_running:
        # Write termination message for monitoring
        writer_plugin.write_termination_message(
            configuration=configuration, 
            execution_skipped=True
        )
        _skip_job_run(job_name)  # Exits with os._exit(0)
        
except Exception as exc:
    # Log error but continue execution
    log.warning(f"Error while checking for concurrent execution: {str(exc)}")
    log.warning("Proceeding with execution despite check failure")
    # Execution continues normally

Termination Message

When execution is skipped, a termination message is written for monitoring systems:

  • execution_skipped: Set to True to indicate skip reason
  • Monitoring integration: Allows downstream systems to detect skipped executions
  • Clean termination: Uses os._exit(0) for immediate, clean process termination

Configuration Requirements

The execution control system requires:

# Required for API access
CONTROL_SERVICE_REST_API_URL = "https://api.example.com"

# Authentication for API calls
API_TOKEN = "your-api-token"

# Optional: Enable/disable checking
EXECUTION_SKIP_CHECKER_ENABLED = True  # Default: True

Use Cases

Data Consistency

Prevents duplicate data processing in scenarios like:

  • Incremental ETL: Jobs that process data since last run would duplicate data if run concurrently
  • State Management: Jobs that maintain state files or checkpoints
  • Resource Locking: Jobs that require exclusive access to shared resources

Resource Management

Prevents resource conflicts for:

  • Database Connections: Avoiding connection pool exhaustion
  • File System Access: Preventing concurrent file modifications
  • External API Limits: Respecting rate limits and quotas

Operational Safety

Ensures operational stability by:

  • Memory Usage: Preventing memory exhaustion from multiple instances
  • CPU Usage: Avoiding CPU contention between concurrent executions
  • Network Bandwidth: Managing network resource consumption

Install with Tessl CLI

npx tessl i tessl/pypi-vdk-plugin-control-cli

docs

cli-commands.md

configuration.md

error-handling.md

execution-control.md

index.md

properties-backend.md

secrets-backend.md

tile.json