0
# Execution Control
1
2
Concurrent execution prevention system that ensures only one instance of a data job runs at a time. This prevents data consistency issues, resource conflicts, and duplicate processing that can occur when multiple instances of the same job execute simultaneously.
3
4
## Types
5
6
```python { .api }
7
from vdk.internal.builtin_plugins.run.job_context import JobContext
8
from vdk.internal.core.config import Configuration, ConfigurationBuilder
9
```
10
11
## Capabilities
12
13
### Concurrent Execution Checker
14
15
Class that checks for running executions of a data job to prevent concurrent execution conflicts.
16
17
```python { .api }
18
class ConcurrentExecutionChecker:
19
def __init__(self, rest_api_url: str) -> None:
20
"""
21
Initialize execution checker with Control Service API.
22
23
Parameters:
24
- rest_api_url: str - Base URL for Control Service REST API
25
"""
26
27
def is_job_execution_running(self, job_name, job_team, job_execution_attempt_id) -> bool:
28
"""
29
Check if another execution of the data job is currently running.
30
31
Parameters:
32
- job_name: str - Name of the data job to check
33
- job_team: str - Team owning the data job
34
- job_execution_attempt_id: str - Current job execution attempt ID
35
36
Returns:
37
bool: True if another execution with different ID is running, False otherwise
38
39
The method queries the Control Service for submitted and running executions.
40
Returns True only if there's a running execution with a different ID,
41
indicating that the current execution should be skipped.
42
"""
43
```
44
45
### Execution Skip Functions
46
47
Utility functions that handle job execution skipping logic.
48
49
```python { .api }
50
def _skip_job_run(job_name) -> None:
51
"""
52
Skip job execution and exit the process.
53
54
Parameters:
55
- job_name: str - Name of the job being skipped
56
57
Logs skip message and calls os._exit(0) to terminate execution.
58
"""
59
60
def _skip_job_if_necessary(
61
log_config: str,
62
job_name: str,
63
execution_id: str,
64
job_team: str,
65
configuration: Configuration,
66
):
67
"""
68
Conditionally skip job execution based on concurrent execution check.
69
70
Parameters:
71
- log_config: str - Log configuration type ("CLOUD" enables checking)
72
- job_name: str - Name of the data job
73
- execution_id: str - Current execution ID
74
- job_team: str - Team owning the job
75
- configuration: Configuration - VDK configuration instance
76
77
Returns:
78
None: Continues execution normally
79
int: Returns 1 if execution was skipped (though os._exit(0) is called)
80
81
Only performs checking for cloud executions (log_config == "CLOUD").
82
Local executions skip the concurrent execution check.
83
"""
84
```
85
86
### Hook Implementations
87
88
VDK hook implementations that integrate execution control into the job lifecycle.
89
90
```python { .api }
91
@hookimpl(tryfirst=True)
92
def vdk_configure(config_builder: ConfigurationBuilder):
93
"""
94
Add execution skip configuration option.
95
96
Parameters:
97
- config_builder: ConfigurationBuilder - Builder for configuration options
98
99
Adds EXECUTION_SKIP_CHECKER_ENABLED configuration with default value True.
100
"""
101
102
@hookimpl(tryfirst=True)
103
def run_job(context: JobContext) -> None:
104
"""
105
Pre-execution hook that checks for concurrent executions.
106
107
Parameters:
108
- context: JobContext - Job execution context
109
110
Returns:
111
None: Normal execution continues
112
113
Performs concurrent execution check before job runs.
114
If another execution is detected, terminates with os._exit(0).
115
"""
116
```
117
118
### Configuration
119
120
```python { .api }
121
EXECUTION_SKIP_CHECKER_ENABLED = "EXECUTION_SKIP_CHECKER_ENABLED"
122
```
123
124
## Execution Logic
125
126
### Detection Algorithm
127
128
The concurrent execution detection follows this logic:
129
130
1. **Cloud Check**: Only performs checking for cloud executions (`log_config == "CLOUD"`)
131
2. **API Query**: Queries Control Service for executions with status "submitted" or "running"
132
3. **ID Comparison**: Compares execution IDs, allowing for ID variations (VDK IDs may have `-xxxxx` suffix)
133
4. **Decision**: Skips execution if another execution with different base ID is found
134
135
### Execution Flow
136
137
```python
138
def check_execution_flow():
139
# 1. Check if skip checker is enabled
140
if not config.get_value(EXECUTION_SKIP_CHECKER_ENABLED):
141
return # Continue normal execution
142
143
# 2. Check execution environment
144
if log_config != "CLOUD":
145
return # Skip check for local executions
146
147
# 3. Query for running executions
148
checker = ConcurrentExecutionChecker(api_url)
149
is_running = checker.is_job_execution_running(job_name, team, execution_id)
150
151
# 4. Skip if concurrent execution found
152
if is_running:
153
write_termination_message(execution_skipped=True)
154
os._exit(0) # Terminate immediately
155
```
156
157
## Usage Examples
158
159
### Configuration Usage
160
161
```bash
162
# Enable execution skip checking (default)
163
export VDK_EXECUTION_SKIP_CHECKER_ENABLED=true
164
165
# Disable execution skip checking
166
export VDK_EXECUTION_SKIP_CHECKER_ENABLED=false
167
```
168
169
### Programmatic Usage
170
171
```python
172
from vdk.plugin.control_cli_plugin.execution_skip import ConcurrentExecutionChecker
173
174
# Initialize checker
175
checker = ConcurrentExecutionChecker("https://api.example.com")
176
177
# Check for concurrent execution
178
is_running = checker.is_job_execution_running(
179
job_name="my-data-job",
180
job_team="analytics-team",
181
job_execution_attempt_id="exec-12345-67890"
182
)
183
184
if is_running:
185
print("Another execution is running, skipping current execution")
186
else:
187
print("No concurrent execution detected, proceeding")
188
```
189
190
### Integration with Job Execution
191
192
The execution control integrates automatically through VDK hooks:
193
194
```python
195
# Automatic integration - no user code required
196
def run(job_input: IJobInput):
197
# This function only runs if no concurrent execution is detected
198
# The execution skip check happens automatically before this point
199
200
print("Job execution starting - no concurrent execution detected")
201
202
# Normal job logic here
203
process_data()
204
generate_reports()
205
206
print("Job execution completed successfully")
207
```
208
209
## Error Handling
210
211
### Exception Management
212
213
The execution control system handles various error scenarios:
214
215
```python
216
try:
217
# Perform concurrent execution check
218
job_running = checker.is_job_execution_running(job_name, job_team, execution_id)
219
220
if job_running:
221
# Write termination message for monitoring
222
writer_plugin.write_termination_message(
223
configuration=configuration,
224
execution_skipped=True
225
)
226
_skip_job_run(job_name) # Exits with os._exit(0)
227
228
except Exception as exc:
229
# Log error but continue execution
230
log.warning(f"Error while checking for concurrent execution: {str(exc)}")
231
log.warning("Proceeding with execution despite check failure")
232
# Execution continues normally
233
```
234
235
### Termination Message
236
237
When execution is skipped, a termination message is written for monitoring systems:
238
239
- **execution_skipped**: Set to `True` to indicate skip reason
240
- **Monitoring integration**: Allows downstream systems to detect skipped executions
241
- **Clean termination**: Uses `os._exit(0)` for immediate, clean process termination
242
243
## Configuration Requirements
244
245
The execution control system requires:
246
247
```python
248
# Required for API access
249
CONTROL_SERVICE_REST_API_URL = "https://api.example.com"
250
251
# Authentication for API calls
252
API_TOKEN = "your-api-token"
253
254
# Optional: Enable/disable checking
255
EXECUTION_SKIP_CHECKER_ENABLED = True # Default: True
256
```
257
258
## Use Cases
259
260
### Data Consistency
261
262
Prevents duplicate data processing in scenarios like:
263
264
- **Incremental ETL**: Jobs that process data since last run would duplicate data if run concurrently
265
- **State Management**: Jobs that maintain state files or checkpoints
266
- **Resource Locking**: Jobs that require exclusive access to shared resources
267
268
### Resource Management
269
270
Prevents resource conflicts for:
271
272
- **Database Connections**: Avoiding connection pool exhaustion
273
- **File System Access**: Preventing concurrent file modifications
274
- **External API Limits**: Respecting rate limits and quotas
275
276
### Operational Safety
277
278
Ensures operational stability by:
279
280
- **Memory Usage**: Preventing memory exhaustion from multiple instances
281
- **CPU Usage**: Avoiding CPU contention between concurrent executions
282
- **Network Bandwidth**: Managing network resource consumption