or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli-commands.mdconfiguration.mderror-handling.mdexecution-control.mdindex.mdproperties-backend.mdsecrets-backend.md

execution-control.mddocs/

0

# Execution Control

1

2

Concurrent execution prevention system that ensures only one instance of a data job runs at a time. This prevents data consistency issues, resource conflicts, and duplicate processing that can occur when multiple instances of the same job execute simultaneously.

3

4

## Types

5

6

```python { .api }

7

from vdk.internal.builtin_plugins.run.job_context import JobContext

8

from vdk.internal.core.config import Configuration, ConfigurationBuilder

9

```

10

11

## Capabilities

12

13

### Concurrent Execution Checker

14

15

Class that checks for running executions of a data job to prevent concurrent execution conflicts.

16

17

```python { .api }

18

class ConcurrentExecutionChecker:

19

def __init__(self, rest_api_url: str) -> None:

20

"""

21

Initialize execution checker with Control Service API.

22

23

Parameters:

24

- rest_api_url: str - Base URL for Control Service REST API

25

"""

26

27

def is_job_execution_running(self, job_name, job_team, job_execution_attempt_id) -> bool:

28

"""

29

Check if another execution of the data job is currently running.

30

31

Parameters:

32

- job_name: str - Name of the data job to check

33

- job_team: str - Team owning the data job

34

- job_execution_attempt_id: str - Current job execution attempt ID

35

36

Returns:

37

bool: True if another execution with different ID is running, False otherwise

38

39

The method queries the Control Service for submitted and running executions.

40

Returns True only if there's a running execution with a different ID,

41

indicating that the current execution should be skipped.

42

"""

43

```

44

45

### Execution Skip Functions

46

47

Utility functions that handle job execution skipping logic.

48

49

```python { .api }

50

def _skip_job_run(job_name) -> None:

51

"""

52

Skip job execution and exit the process.

53

54

Parameters:

55

- job_name: str - Name of the job being skipped

56

57

Logs skip message and calls os._exit(0) to terminate execution.

58

"""

59

60

def _skip_job_if_necessary(

61

log_config: str,

62

job_name: str,

63

execution_id: str,

64

job_team: str,

65

configuration: Configuration,

66

):

67

"""

68

Conditionally skip job execution based on concurrent execution check.

69

70

Parameters:

71

- log_config: str - Log configuration type ("CLOUD" enables checking)

72

- job_name: str - Name of the data job

73

- execution_id: str - Current execution ID

74

- job_team: str - Team owning the job

75

- configuration: Configuration - VDK configuration instance

76

77

Returns:

78

None: Continues execution normally

79

int: Returns 1 if execution was skipped (though os._exit(0) is called)

80

81

Only performs checking for cloud executions (log_config == "CLOUD").

82

Local executions skip the concurrent execution check.

83

"""

84

```

85

86

### Hook Implementations

87

88

VDK hook implementations that integrate execution control into the job lifecycle.

89

90

```python { .api }

91

@hookimpl(tryfirst=True)

92

def vdk_configure(config_builder: ConfigurationBuilder):

93

"""

94

Add execution skip configuration option.

95

96

Parameters:

97

- config_builder: ConfigurationBuilder - Builder for configuration options

98

99

Adds EXECUTION_SKIP_CHECKER_ENABLED configuration with default value True.

100

"""

101

102

@hookimpl(tryfirst=True)

103

def run_job(context: JobContext) -> None:

104

"""

105

Pre-execution hook that checks for concurrent executions.

106

107

Parameters:

108

- context: JobContext - Job execution context

109

110

Returns:

111

None: Normal execution continues

112

113

Performs concurrent execution check before job runs.

114

If another execution is detected, terminates with os._exit(0).

115

"""

116

```

117

118

### Configuration

119

120

```python { .api }

121

EXECUTION_SKIP_CHECKER_ENABLED = "EXECUTION_SKIP_CHECKER_ENABLED"

122

```

123

124

## Execution Logic

125

126

### Detection Algorithm

127

128

The concurrent execution detection follows this logic:

129

130

1. **Cloud Check**: Only performs checking for cloud executions (`log_config == "CLOUD"`)

131

2. **API Query**: Queries Control Service for executions with status "submitted" or "running"

132

3. **ID Comparison**: Compares execution IDs, allowing for ID variations (VDK IDs may have `-xxxxx` suffix)

133

4. **Decision**: Skips execution if another execution with different base ID is found

134

135

### Execution Flow

136

137

```python

138

def check_execution_flow():

139

# 1. Check if skip checker is enabled

140

if not config.get_value(EXECUTION_SKIP_CHECKER_ENABLED):

141

return # Continue normal execution

142

143

# 2. Check execution environment

144

if log_config != "CLOUD":

145

return # Skip check for local executions

146

147

# 3. Query for running executions

148

checker = ConcurrentExecutionChecker(api_url)

149

is_running = checker.is_job_execution_running(job_name, team, execution_id)

150

151

# 4. Skip if concurrent execution found

152

if is_running:

153

write_termination_message(execution_skipped=True)

154

os._exit(0) # Terminate immediately

155

```

156

157

## Usage Examples

158

159

### Configuration Usage

160

161

```bash

162

# Enable execution skip checking (default)

163

export VDK_EXECUTION_SKIP_CHECKER_ENABLED=true

164

165

# Disable execution skip checking

166

export VDK_EXECUTION_SKIP_CHECKER_ENABLED=false

167

```

168

169

### Programmatic Usage

170

171

```python

172

from vdk.plugin.control_cli_plugin.execution_skip import ConcurrentExecutionChecker

173

174

# Initialize checker

175

checker = ConcurrentExecutionChecker("https://api.example.com")

176

177

# Check for concurrent execution

178

is_running = checker.is_job_execution_running(

179

job_name="my-data-job",

180

job_team="analytics-team",

181

job_execution_attempt_id="exec-12345-67890"

182

)

183

184

if is_running:

185

print("Another execution is running, skipping current execution")

186

else:

187

print("No concurrent execution detected, proceeding")

188

```

189

190

### Integration with Job Execution

191

192

The execution control integrates automatically through VDK hooks:

193

194

```python

195

# Automatic integration - no user code required

196

def run(job_input: IJobInput):

197

# This function only runs if no concurrent execution is detected

198

# The execution skip check happens automatically before this point

199

200

print("Job execution starting - no concurrent execution detected")

201

202

# Normal job logic here

203

process_data()

204

generate_reports()

205

206

print("Job execution completed successfully")

207

```

208

209

## Error Handling

210

211

### Exception Management

212

213

The execution control system handles various error scenarios:

214

215

```python

216

try:

217

# Perform concurrent execution check

218

job_running = checker.is_job_execution_running(job_name, job_team, execution_id)

219

220

if job_running:

221

# Write termination message for monitoring

222

writer_plugin.write_termination_message(

223

configuration=configuration,

224

execution_skipped=True

225

)

226

_skip_job_run(job_name) # Exits with os._exit(0)

227

228

except Exception as exc:

229

# Log error but continue execution

230

log.warning(f"Error while checking for concurrent execution: {str(exc)}")

231

log.warning("Proceeding with execution despite check failure")

232

# Execution continues normally

233

```

234

235

### Termination Message

236

237

When execution is skipped, a termination message is written for monitoring systems:

238

239

- **execution_skipped**: Set to `True` to indicate skip reason

240

- **Monitoring integration**: Allows downstream systems to detect skipped executions

241

- **Clean termination**: Uses `os._exit(0)` for immediate, clean process termination

242

243

## Configuration Requirements

244

245

The execution control system requires:

246

247

```python

248

# Required for API access

249

CONTROL_SERVICE_REST_API_URL = "https://api.example.com"

250

251

# Authentication for API calls

252

API_TOKEN = "your-api-token"

253

254

# Optional: Enable/disable checking

255

EXECUTION_SKIP_CHECKER_ENABLED = True # Default: True

256

```

257

258

## Use Cases

259

260

### Data Consistency

261

262

Prevents duplicate data processing in scenarios like:

263

264

- **Incremental ETL**: Jobs that process data since last run would duplicate data if run concurrently

265

- **State Management**: Jobs that maintain state files or checkpoints

266

- **Resource Locking**: Jobs that require exclusive access to shared resources

267

268

### Resource Management

269

270

Prevents resource conflicts for:

271

272

- **Database Connections**: Avoiding connection pool exhaustion

273

- **File System Access**: Preventing concurrent file modifications

274

- **External API Limits**: Respecting rate limits and quotas

275

276

### Operational Safety

277

278

Ensures operational stability by:

279

280

- **Memory Usage**: Preventing memory exhaustion from multiple instances

281

- **CPU Usage**: Avoiding CPU contention between concurrent executions

282

- **Network Bandwidth**: Managing network resource consumption