or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-triggers.mdhook-api.mdindex.mdjob-monitoring.mdsync-operations.md

sync-operations.mddocs/

0

# Sync Operations

1

2

The AirbyteTriggerSyncOperator provides Airflow task functionality for triggering Airbyte data synchronization jobs. It supports both synchronous and asynchronous execution modes, with optional deferrable execution for long-running jobs.

3

4

```python

5

from airflow.configuration import conf

6

```

7

8

## Capabilities

9

10

### Operator Initialization

11

12

Creates a sync operator task with comprehensive configuration options.

13

14

```python { .api }

15

class AirbyteTriggerSyncOperator(BaseOperator):

16

def __init__(

17

self,

18

connection_id: str,

19

airbyte_conn_id: str = "airbyte_default",

20

asynchronous: bool = False,

21

deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),

22

api_version: str = "v1",

23

wait_seconds: float = 3,

24

timeout: float = 3600,

25

**kwargs

26

) -> None:

27

"""

28

Initialize Airbyte sync operator.

29

30

Args:

31

connection_id: Required. Airbyte connection UUID to sync

32

airbyte_conn_id: Airflow connection ID for Airbyte server

33

asynchronous: Return job_id immediately without waiting for completion

34

deferrable: Use deferrable execution mode for long-running jobs

35

api_version: Airbyte API version to use

36

wait_seconds: Polling interval for synchronous mode

37

timeout: Maximum wait time for job completion

38

**kwargs: Additional BaseOperator arguments

39

"""

40

```

41

42

### Class Attributes

43

44

Template fields and UI configuration.

45

46

```python { .api }

47

template_fields: Sequence[str] = ("connection_id",)

48

ui_color: str = "#6C51FD"

49

```

50

51

### Execution Methods

52

53

Core execution functionality for different modes.

54

55

```python { .api }

56

def execute(self, context: Context) -> int:

57

"""

58

Execute the sync operation.

59

60

Args:

61

context: Airflow task execution context

62

63

Returns:

64

Job ID (int) for the submitted Airbyte job

65

66

Raises:

67

AirflowException: If job submission fails or job completes with error

68

"""

69

70

def execute_complete(self, context: Context, event: Any = None) -> None:

71

"""

72

Callback method for deferrable mode completion.

73

74

Args:

75

context: Airflow task execution context

76

event: Trigger event data

77

78

Raises:

79

AirflowException: If job completed with error status

80

"""

81

82

def on_kill(self) -> None:

83

"""

84

Cancel the Airbyte job when task is killed.

85

86

Called automatically by Airflow when task is cancelled or times out.

87

"""

88

```

89

90

## Usage Examples

91

92

### Synchronous Execution

93

94

```python

95

from datetime import datetime

96

from airflow import DAG

97

from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator

98

99

dag = DAG(

100

'sync_example',

101

start_date=datetime(2024, 1, 1),

102

schedule_interval='@daily'

103

)

104

105

# Synchronous sync - waits for completion

106

sync_task = AirbyteTriggerSyncOperator(

107

task_id='sync_customer_data',

108

connection_id='{{ var.value.customer_connection_id }}',

109

airbyte_conn_id='airbyte_default',

110

timeout=1800, # 30 minutes

111

wait_seconds=10, # Check every 10 seconds

112

dag=dag

113

)

114

```

115

116

### Asynchronous Execution

117

118

```python

119

from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator

120

from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor

121

122

# Async sync - returns job_id immediately

123

trigger_sync = AirbyteTriggerSyncOperator(

124

task_id='trigger_sync',

125

connection_id='connection-uuid-123',

126

asynchronous=True, # Return job_id without waiting

127

dag=dag

128

)

129

130

# Monitor completion with sensor

131

monitor_sync = AirbyteJobSensor(

132

task_id='monitor_sync',

133

airbyte_job_id="{{ task_instance.xcom_pull(task_ids='trigger_sync') }}",

134

timeout=3600,

135

dag=dag

136

)

137

138

trigger_sync >> monitor_sync

139

```

140

141

### Deferrable Execution

142

143

```python

144

# Deferrable mode - uses async triggers

145

deferrable_sync = AirbyteTriggerSyncOperator(

146

task_id='deferrable_sync',

147

connection_id='connection-uuid-123',

148

deferrable=True, # Use async trigger mechanism

149

timeout=7200, # 2 hours

150

dag=dag

151

)

152

```

153

154

### Dynamic Connection IDs

155

156

```python

157

# Using templated connection_id

158

dynamic_sync = AirbyteTriggerSyncOperator(

159

task_id='dynamic_sync',

160

connection_id='{{ dag_run.conf["connection_id"] }}', # From DAG run config

161

airbyte_conn_id='{{ var.value.airbyte_conn }}', # From Airflow variable

162

dag=dag

163

)

164

```

165

166

### Error Handling and Retries

167

168

```python

169

from datetime import timedelta

170

from airflow.operators.bash import BashOperator

171

from airflow.utils.trigger_rule import TriggerRule

172

173

# Sync with custom retry configuration

174

robust_sync = AirbyteTriggerSyncOperator(

175

task_id='robust_sync',

176

connection_id='connection-uuid-123',

177

timeout=3600,

178

retries=3,

179

retry_delay=timedelta(minutes=5),

180

dag=dag

181

)

182

183

# Cleanup task that runs even if sync fails

184

cleanup_task = BashOperator(

185

task_id='cleanup',

186

bash_command='echo "Cleaning up after sync"',

187

trigger_rule=TriggerRule.ALL_DONE, # Run regardless of upstream status

188

dag=dag

189

)

190

191

robust_sync >> cleanup_task

192

```

193

194

## Execution Modes

195

196

### Synchronous Mode (Default)

197

- **asynchronous=False, deferrable=False**

198

- Task blocks until job completion

199

- Uses polling with configurable intervals

200

- Suitable for short to medium duration jobs

201

- Consumes worker slot during entire execution

202

203

### Asynchronous Mode

204

- **asynchronous=True**

205

- Returns job_id immediately

206

- Requires separate monitoring (sensor/operator)

207

- Frees worker slot immediately

208

- Best for fire-and-forget scenarios

209

210

### Deferrable Mode

211

- **deferrable=True**

212

- Uses async triggers for monitoring

213

- Frees worker slot during job execution

214

- Automatically resumes when job completes

215

- Optimal for long-running jobs in resource-constrained environments

216

217

## Configuration

218

219

### Connection Requirements

220

221

The operator requires an Airflow connection of type "airbyte" with:

222

223

```python

224

{

225

"conn_type": "airbyte",

226

"host": "https://api.airbyte.com", # Airbyte server URL

227

"login": "client_id", # OAuth client ID

228

"password": "client_secret", # OAuth client secret

229

"schema": "v1/applications/token", # Token endpoint

230

"extra": {

231

"proxies": { # Optional proxy configuration

232

"http": "http://proxy:8080",

233

"https": "https://proxy:8080"

234

}

235

}

236

}

237

```

238

239

### Template Fields

240

241

The `connection_id` field supports Jinja templating for dynamic values:

242

243

```python

244

# From DAG run configuration

245

connection_id='{{ dag_run.conf["connection_id"] }}'

246

247

# From Airflow variables

248

connection_id='{{ var.value.my_connection_id }}'

249

250

# From task context

251

connection_id='{{ ds }}_connection' # Date-based connection

252

```

253

254

## Error Handling

255

256

The operator handles various error scenarios:

257

258

- **Job submission failures**: Invalid connection_id, authentication errors

259

- **Job execution failures**: Data sync errors, connection timeouts

260

- **Task cancellation**: Automatically cancels Airbyte job via on_kill()

261

- **Timeout scenarios**: Cancels job and raises AirflowException

262

- **Unexpected job states**: Handles CANCELLED and unknown states appropriately

263

264

All errors are logged with appropriate detail levels for debugging and monitoring.