or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-triggers.mdhook-api.mdindex.mdjob-monitoring.mdsync-operations.md

job-monitoring.mddocs/

0

# Job Monitoring

1

2

The AirbyteJobSensor provides Airflow sensor functionality for monitoring the status of Airbyte jobs. It supports both traditional polling and deferrable execution modes, making it suitable for monitoring long-running sync operations.

3

4

```python

5

from airflow.configuration import conf

6

```

7

8

## Capabilities

9

10

### Sensor Initialization

11

12

Creates a sensor to monitor specific Airbyte job completion.

13

14

```python { .api }

15

class AirbyteJobSensor(BaseSensorOperator):

16

def __init__(

17

self,

18

*,

19

airbyte_job_id: int,

20

deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),

21

airbyte_conn_id: str = "airbyte_default",

22

api_version: str = "v1",

23

**kwargs

24

) -> None:

25

"""

26

Initialize Airbyte job sensor.

27

28

Args:

29

airbyte_job_id: Required. Airbyte job ID to monitor

30

deferrable: Use deferrable execution mode

31

airbyte_conn_id: Airflow connection ID for Airbyte server

32

api_version: Airbyte API version to use

33

**kwargs: Additional BaseSensorOperator arguments (poke_interval, timeout, etc.)

34

"""

35

```

36

37

### Class Attributes

38

39

Template fields and UI configuration.

40

41

```python { .api }

42

template_fields: Sequence[str] = ("airbyte_job_id",)

43

ui_color: str = "#6C51FD"

44

```

45

46

### Monitoring Methods

47

48

Core sensor functionality for job status checking.

49

50

```python { .api }

51

def poke(self, context: Context) -> bool:

52

"""

53

Check job status and determine if sensor condition is satisfied.

54

55

Args:

56

context: Airflow task execution context

57

58

Returns:

59

True if job completed successfully, False if still running

60

61

Raises:

62

AirflowException: If job failed or was cancelled

63

"""

64

65

def execute(self, context: Context) -> Any:

66

"""

67

Execute sensor logic with support for both polling and deferrable modes.

68

69

Args:

70

context: Airflow task execution context

71

72

Returns:

73

None when job completes successfully

74

75

Raises:

76

AirflowException: If job fails, is cancelled, or times out

77

"""

78

79

def execute_complete(self, context: Context, event: Any = None) -> None:

80

"""

81

Callback method for deferrable mode completion.

82

83

Args:

84

context: Airflow task execution context

85

event: Trigger event data

86

87

Raises:

88

AirflowException: If job completed with error status

89

"""

90

```

91

92

## Usage Examples

93

94

### Basic Job Monitoring

95

96

```python

97

from datetime import datetime, timedelta

98

from airflow import DAG

99

from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor

100

101

dag = DAG(

102

'monitor_example',

103

start_date=datetime(2024, 1, 1),

104

schedule_interval=None

105

)

106

107

# Monitor specific job ID

108

monitor_job = AirbyteJobSensor(

109

task_id='wait_for_sync',

110

airbyte_job_id=12345,

111

airbyte_conn_id='airbyte_default',

112

poke_interval=30, # Check every 30 seconds

113

timeout=3600, # 1 hour timeout

114

dag=dag

115

)

116

```

117

118

### Monitoring Async Jobs

119

120

```python

121

from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator

122

from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor

123

124

# Trigger async job

125

trigger_sync = AirbyteTriggerSyncOperator(

126

task_id='trigger_sync',

127

connection_id='connection-uuid-123',

128

asynchronous=True, # Returns job_id

129

dag=dag

130

)

131

132

# Monitor the triggered job

133

monitor_sync = AirbyteJobSensor(

134

task_id='monitor_sync',

135

airbyte_job_id="{{ task_instance.xcom_pull(task_ids='trigger_sync') }}",

136

poke_interval=60, # Check every minute

137

timeout=7200, # 2 hour timeout

138

dag=dag

139

)

140

141

trigger_sync >> monitor_sync

142

```

143

144

### Deferrable Monitoring

145

146

```python

147

# Deferrable sensor - releases worker slot while waiting

148

deferrable_monitor = AirbyteJobSensor(

149

task_id='deferrable_monitor',

150

airbyte_job_id=67890,

151

deferrable=True, # Use async trigger

152

timeout=24*3600, # 24 hour timeout

153

dag=dag

154

)

155

```

156

157

### Dynamic Job ID Monitoring

158

159

```python

160

# Monitor job ID from DAG configuration

161

dynamic_monitor = AirbyteJobSensor(

162

task_id='dynamic_monitor',

163

airbyte_job_id="{{ dag_run.conf['job_id'] }}",

164

poke_interval=45,

165

dag=dag

166

)

167

168

# Monitor job ID from Airflow variable

169

variable_monitor = AirbyteJobSensor(

170

task_id='variable_monitor',

171

airbyte_job_id="{{ var.value.current_job_id }}",

172

timeout=1800,

173

dag=dag

174

)

175

```

176

177

### Multiple Job Monitoring

178

179

```python

180

from airflow.utils.task_group import TaskGroup

181

182

# Monitor multiple jobs in parallel

183

with TaskGroup('monitor_jobs', dag=dag) as job_group:

184

for i, job_id in enumerate([111, 222, 333]):

185

AirbyteJobSensor(

186

task_id=f'monitor_job_{i}',

187

airbyte_job_id=job_id,

188

poke_interval=30,

189

timeout=3600,

190

)

191

```

192

193

### Conditional Monitoring with Branching

194

195

```python

196

from airflow.operators.python import BranchPythonOperator

197

from airflow.operators.dummy import DummyOperator

198

199

def check_job_status(**context):

200

"""Decide whether to monitor or skip based on conditions."""

201

# Custom logic to determine if monitoring is needed

202

if context['dag_run'].conf.get('monitor_job', True):

203

return 'monitor_job'

204

else:

205

return 'skip_monitoring'

206

207

branch_task = BranchPythonOperator(

208

task_id='check_monitoring_needed',

209

python_callable=check_job_status,

210

dag=dag

211

)

212

213

monitor_job = AirbyteJobSensor(

214

task_id='monitor_job',

215

airbyte_job_id="{{ dag_run.conf['job_id'] }}",

216

poke_interval=60,

217

dag=dag

218

)

219

220

skip_task = DummyOperator(

221

task_id='skip_monitoring',

222

dag=dag

223

)

224

225

branch_task >> [monitor_job, skip_task]

226

```

227

228

## Configuration Options

229

230

### Sensor-Specific Configuration

231

232

```python

233

AirbyteJobSensor(

234

# Required parameters

235

airbyte_job_id=12345,

236

237

# Connection configuration

238

airbyte_conn_id='my_airbyte_conn',

239

api_version='v1',

240

241

# Execution mode

242

deferrable=True, # Use async triggers

243

244

# Timing configuration (inherited from BaseSensorOperator)

245

poke_interval=30, # Seconds between status checks

246

timeout=3600, # Maximum wait time

247

exponential_backoff=True, # Increase intervals on failures

248

max_retry_delay=60, # Maximum backoff interval

249

250

# Retry configuration

251

retries=3,

252

retry_delay=timedelta(minutes=5),

253

retry_exponential_backoff=True,

254

)

255

```

256

257

### Deferrable Mode Configuration

258

259

When `deferrable=True`, the sensor automatically configures:

260

261

```python

262

# Default deferrable settings (applied automatically)

263

poke_interval = 5 # Quick initial check

264

timeout = 60*60*24*7 # 7 days default timeout

265

```

266

267

### Template Fields

268

269

The `airbyte_job_id` field supports Jinja templating:

270

271

```python

272

# From XCom (previous task output)

273

airbyte_job_id="{{ task_instance.xcom_pull(task_ids='trigger_task') }}"

274

275

# From DAG run configuration

276

airbyte_job_id="{{ dag_run.conf['job_id'] }}"

277

278

# From Airflow variables

279

airbyte_job_id="{{ var.value.job_to_monitor }}"

280

281

# From task instance context

282

airbyte_job_id="{{ ti.xcom_pull(key='job_id') }}"

283

```

284

285

## Execution Modes

286

287

### Polling Mode (Default)

288

- **deferrable=False**

289

- Continuously occupies worker slot

290

- Suitable for short to medium duration jobs

291

- Uses configurable poke_interval for status checks

292

- Traditional Airflow sensor behavior

293

294

### Deferrable Mode

295

- **deferrable=True**

296

- Releases worker slot while waiting

297

- Uses async triggers for monitoring

298

- Optimal for long-running jobs

299

- Automatically resumes when job completes

300

- Better resource utilization in large deployments

301

302

## Job Status Handling

303

304

The sensor handles all Airbyte job statuses:

305

306

### Success States

307

- **SUCCEEDED**: Job completed successfully, sensor returns True

308

309

### Waiting States

310

- **RUNNING**: Job actively executing, sensor continues waiting

311

- **PENDING**: Job queued for execution, sensor continues waiting

312

- **INCOMPLETE**: Job partially completed, sensor continues waiting

313

314

### Error States

315

- **FAILED**: Job execution failed, raises AirflowException

316

- **CANCELLED**: Job was cancelled, raises AirflowException

317

318

### Unknown States

319

Any unexpected job status raises AirflowException with detailed information.

320

321

## Error Handling

322

323

The sensor provides comprehensive error handling:

324

325

- **Connection errors**: Network issues, authentication failures

326

- **Job not found**: Invalid job_id or job expired

327

- **Timeout errors**: Job exceeds specified timeout duration

328

- **API errors**: Airbyte server errors, rate limiting

329

- **State transition errors**: Unexpected job state changes

330

331

All errors include detailed logging for troubleshooting and monitoring purposes.