0
# Destination Connectors
1
2
Framework for building data loading connectors that write records to databases, data warehouses, files, and APIs. The Destination connector framework provides structured approaches to implementing data loading with batch processing, type mapping, error handling, and integration with Airbyte's standardized message protocol.
3
4
## Capabilities
5
6
### Base Destination Class
7
8
Core class for implementing destination connectors that load data into external systems.
9
10
```python { .api }
11
from airbyte_cdk import Destination
12
from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog
13
from typing import Any, Iterable, Mapping
14
import logging
15
16
class Destination:
17
"""
18
Base class for Airbyte destination connectors.
19
"""
20
21
def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
22
"""
23
Test connection validity with given configuration.
24
25
Args:
26
logger: Logger instance for outputting messages
27
config: Configuration dictionary containing connection parameters
28
29
Returns:
30
AirbyteConnectionStatus indicating success or failure with details
31
"""
32
33
def write(
34
self,
35
config: Mapping[str, Any],
36
configured_catalog: ConfiguredAirbyteCatalog,
37
input_messages: Iterable[AirbyteMessage]
38
) -> Iterable[AirbyteMessage]:
39
"""
40
Write data records to the destination.
41
42
Args:
43
config: Configuration dictionary
44
configured_catalog: Catalog specifying destination streams and sync modes
45
input_messages: Stream of AirbyteMessage instances containing records to write
46
47
Yields:
48
AirbyteMessage instances for status updates, state, or errors
49
"""
50
51
def spec(self) -> ConnectorSpecification:
52
"""
53
Return the specification for this destination's configuration.
54
55
Returns:
56
ConnectorSpecification defining required and optional configuration fields
57
"""
58
```
59
60
### Message Processing
61
62
Classes and utilities for processing Airbyte protocol messages.
63
64
```python { .api }
65
from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, AirbyteStateMessage, Type
66
from typing import Any, Dict, List
67
68
class AirbyteMessage:
69
"""
70
Airbyte protocol message containing data records, state, or metadata.
71
"""
72
73
type: Type # MESSAGE, RECORD, STATE, LOG, CATALOG, etc.
74
record: Optional[AirbyteRecordMessage]
75
state: Optional[AirbyteStateMessage]
76
log: Optional[AirbyteLogMessage]
77
78
class AirbyteRecordMessage:
79
"""
80
Data record message containing actual data to be written.
81
"""
82
83
stream: str # Name of the stream
84
data: Dict[str, Any] # Record data as key-value pairs
85
emitted_at: int # Timestamp when record was emitted (milliseconds)
86
namespace: Optional[str] # Optional namespace for the stream
87
88
class ConfiguredAirbyteCatalog:
89
"""
90
Catalog describing which streams to write and how.
91
"""
92
93
streams: List[ConfiguredAirbyteStream]
94
95
class ConfiguredAirbyteStream:
96
"""
97
Configuration for a single stream in the destination.
98
"""
99
100
stream: AirbyteStream # Stream schema and metadata
101
sync_mode: SyncMode # FULL_REFRESH or INCREMENTAL
102
destination_sync_mode: DestinationSyncMode # APPEND, OVERWRITE, APPEND_DEDUP
103
cursor_field: Optional[List[str]] # Fields used for incremental sync
104
primary_key: Optional[List[List[str]]] # Fields used for deduplication
105
```
106
107
### Configuration and Schema Handling
108
109
Utilities for handling destination configuration and data schemas.
110
111
```python { .api }
112
from airbyte_cdk.models import ConnectorSpecification, AirbyteStream
113
from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader
114
from typing import Any, Dict, List, Optional
115
116
class ConnectorSpecification:
117
"""
118
Specification defining the configuration schema for a destination.
119
"""
120
121
connectionSpecification: Dict[str, Any] # JSONSchema for configuration
122
supportsIncremental: Optional[bool] # Whether destination supports incremental sync
123
supportsNormalization: Optional[bool] # Whether destination supports normalization
124
supportsDBT: Optional[bool] # Whether destination supports DBT transformations
125
supported_destination_sync_modes: List[DestinationSyncMode] # Supported sync modes
126
127
def check_config_against_spec_or_exit(config: Dict[str, Any], spec: ConnectorSpecification) -> None:
128
"""
129
Validate configuration against the connector specification.
130
131
Args:
132
config: Configuration dictionary to validate
133
spec: Connector specification with schema
134
135
Raises:
136
SystemExit: If configuration is invalid
137
"""
138
139
class ResourceSchemaLoader:
140
"""
141
Load JSON schemas from package resources.
142
"""
143
144
def __init__(self, package_name: str):
145
"""
146
Initialize schema loader.
147
148
Args:
149
package_name: Python package containing schema files
150
"""
151
152
def get_schema(self, name: str) -> Dict[str, Any]:
153
"""
154
Load schema by name.
155
156
Args:
157
name: Schema file name (without .json extension)
158
159
Returns:
160
Schema dictionary
161
"""
162
```
163
164
### Type Mapping and Transformation
165
166
Classes for handling data type conversion and transformation.
167
168
```python { .api }
169
from airbyte_cdk.sources.utils.transform import TypeTransformer, TransformConfig
170
from typing import Any, Dict, Mapping
171
172
class TypeTransformer:
173
"""
174
Transform data types between Airbyte and destination formats.
175
"""
176
177
def __init__(self, config: TransformConfig):
178
"""
179
Initialize type transformer.
180
181
Args:
182
config: Configuration for type transformations
183
"""
184
185
def transform(self, data: Dict[str, Any], schema: Dict[str, Any]) -> Dict[str, Any]:
186
"""
187
Transform record data according to schema and configuration.
188
189
Args:
190
data: Record data dictionary
191
schema: JSONSchema for the record
192
193
Returns:
194
Transformed data dictionary
195
"""
196
197
class TransformConfig:
198
"""
199
Configuration for data transformations.
200
"""
201
202
def __init__(
203
self,
204
date_format: str = None,
205
datetime_format: str = None,
206
time_format: str = None,
207
normalization: Mapping[str, str] = None
208
):
209
"""
210
Initialize transformation configuration.
211
212
Args:
213
date_format: Format string for date fields
214
datetime_format: Format string for datetime fields
215
time_format: Format string for time fields
216
normalization: Field name normalization mappings
217
"""
218
```
219
220
## Usage Examples
221
222
### Basic Database Destination
223
224
```python
225
from airbyte_cdk import Destination
226
from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, Status, Type
227
import logging
228
import sqlite3
229
from typing import Any, Iterable, Mapping
230
231
class SqliteDestination(Destination):
232
def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
233
try:
234
# Test database connection
235
conn = sqlite3.connect(config["database_path"])
236
conn.execute("SELECT 1")
237
conn.close()
238
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
239
except Exception as e:
240
return AirbyteConnectionStatus(status=Status.FAILED, message=str(e))
241
242
def write(
243
self,
244
config: Mapping[str, Any],
245
configured_catalog: ConfiguredAirbyteCatalog,
246
input_messages: Iterable[AirbyteMessage]
247
) -> Iterable[AirbyteMessage]:
248
249
conn = sqlite3.connect(config["database_path"])
250
251
try:
252
for message in input_messages:
253
if message.type == Type.RECORD:
254
self._write_record(conn, message.record)
255
elif message.type == Type.STATE:
256
# Pass through state messages
257
yield message
258
259
# Yield message to maintain protocol
260
yield message
261
finally:
262
conn.close()
263
264
def _write_record(self, conn: sqlite3.Connection, record):
265
table_name = record.stream
266
data = record.data
267
268
# Create table if not exists
269
self._ensure_table_exists(conn, table_name, data)
270
271
# Insert record
272
columns = list(data.keys())
273
values = list(data.values())
274
placeholders = ",".join(["?" for _ in values])
275
276
query = f"INSERT INTO {table_name} ({','.join(columns)}) VALUES ({placeholders})"
277
conn.execute(query, values)
278
conn.commit()
279
280
def _ensure_table_exists(self, conn: sqlite3.Connection, table_name: str, sample_data: dict):
281
# Simple table creation based on sample data
282
columns = []
283
for key, value in sample_data.items():
284
if isinstance(value, int):
285
columns.append(f"{key} INTEGER")
286
elif isinstance(value, float):
287
columns.append(f"{key} REAL")
288
else:
289
columns.append(f"{key} TEXT")
290
291
create_sql = f"CREATE TABLE IF NOT EXISTS {table_name} ({', '.join(columns)})"
292
conn.execute(create_sql)
293
conn.commit()
294
295
# Usage
296
destination = SqliteDestination()
297
config = {"database_path": "/path/to/database.db"}
298
status = destination.check(logging.getLogger(), config)
299
```
300
301
### Batch Processing Destination
302
303
```python
304
from airbyte_cdk import Destination
305
from airbyte_cdk.models import AirbyteMessage, Type
306
import json
307
from typing import Any, Dict, Iterable, List, Mapping
308
309
class BatchFileDestination(Destination):
310
def __init__(self):
311
self._buffer = {} # stream_name -> list of records
312
self._batch_size = 1000
313
314
def write(
315
self,
316
config: Mapping[str, Any],
317
configured_catalog,
318
input_messages: Iterable[AirbyteMessage]
319
) -> Iterable[AirbyteMessage]:
320
321
for message in input_messages:
322
if message.type == Type.RECORD:
323
self._buffer_record(message.record)
324
325
# Check if batch is ready
326
if len(self._buffer[message.record.stream]) >= self._batch_size:
327
self._flush_batch(config, message.record.stream)
328
329
yield message
330
331
# Flush remaining records
332
for stream_name in self._buffer:
333
if self._buffer[stream_name]:
334
self._flush_batch(config, stream_name)
335
336
def _buffer_record(self, record):
337
stream_name = record.stream
338
if stream_name not in self._buffer:
339
self._buffer[stream_name] = []
340
341
self._buffer[stream_name].append({
342
"data": record.data,
343
"emitted_at": record.emitted_at
344
})
345
346
def _flush_batch(self, config: Mapping[str, Any], stream_name: str):
347
if not self._buffer[stream_name]:
348
return
349
350
output_path = f"{config['output_dir']}/{stream_name}.jsonl"
351
352
with open(output_path, "a") as f:
353
for record in self._buffer[stream_name]:
354
f.write(json.dumps(record) + "\n")
355
356
self._buffer[stream_name] = []
357
print(f"Flushed batch for stream {stream_name}")
358
```
359
360
### Type-Safe Destination with Schema Validation
361
362
```python
363
from airbyte_cdk import Destination
364
from airbyte_cdk.sources.utils.transform import TypeTransformer, TransformConfig
365
from airbyte_cdk.models import AirbyteMessage, Type
366
import jsonschema
367
from typing import Any, Dict, Iterable, Mapping
368
369
class ValidatingDestination(Destination):
370
def __init__(self):
371
self._transformer = TypeTransformer(TransformConfig(
372
date_format="%Y-%m-%d",
373
datetime_format="%Y-%m-%dT%H:%M:%S",
374
))
375
self._schemas = {} # stream_name -> schema
376
377
def write(
378
self,
379
config: Mapping[str, Any],
380
configured_catalog,
381
input_messages: Iterable[AirbyteMessage]
382
) -> Iterable[AirbyteMessage]:
383
384
# Load schemas from catalog
385
for stream_config in configured_catalog.streams:
386
stream_name = stream_config.stream.name
387
self._schemas[stream_name] = stream_config.stream.json_schema
388
389
for message in input_messages:
390
if message.type == Type.RECORD:
391
try:
392
# Validate and transform record
393
validated_record = self._validate_and_transform(message.record)
394
self._write_validated_record(config, validated_record)
395
except Exception as e:
396
# Log validation error but don't stop processing
397
print(f"Validation error for record in {message.record.stream}: {e}")
398
399
yield message
400
401
def _validate_and_transform(self, record) -> Dict[str, Any]:
402
stream_name = record.stream
403
schema = self._schemas.get(stream_name)
404
405
if schema:
406
# Validate against schema
407
jsonschema.validate(record.data, schema)
408
409
# Transform data types
410
transformed_data = self._transformer.transform(record.data, schema)
411
return {
412
"stream": stream_name,
413
"data": transformed_data,
414
"emitted_at": record.emitted_at
415
}
416
417
return {
418
"stream": stream_name,
419
"data": record.data,
420
"emitted_at": record.emitted_at
421
}
422
423
def _write_validated_record(self, config: Mapping[str, Any], record: Dict[str, Any]):
424
# Write the validated and transformed record
425
pass
426
```
427
428
### API Destination with Error Handling
429
430
```python
431
from airbyte_cdk import Destination
432
from airbyte_cdk.models import AirbyteMessage, Type
433
import requests
434
import time
435
from typing import Any, Iterable, Mapping
436
437
class ApiDestination(Destination):
438
def write(
439
self,
440
config: Mapping[str, Any],
441
configured_catalog,
442
input_messages: Iterable[AirbyteMessage]
443
) -> Iterable[AirbyteMessage]:
444
445
session = requests.Session()
446
session.headers.update({
447
"Authorization": f"Bearer {config['api_token']}",
448
"Content-Type": "application/json"
449
})
450
451
for message in input_messages:
452
if message.type == Type.RECORD:
453
success = self._write_record_with_retry(
454
session,
455
config,
456
message.record
457
)
458
459
if not success:
460
print(f"Failed to write record to API: {message.record.stream}")
461
462
yield message
463
464
def _write_record_with_retry(self, session: requests.Session, config: Mapping[str, Any], record, max_retries: int = 3) -> bool:
465
url = f"{config['api_base_url']}/data/{record.stream}"
466
payload = {
467
"data": record.data,
468
"timestamp": record.emitted_at
469
}
470
471
for attempt in range(max_retries + 1):
472
try:
473
response = session.post(url, json=payload, timeout=30)
474
475
if response.status_code == 200:
476
return True
477
elif response.status_code == 429: # Rate limited
478
wait_time = int(response.headers.get("Retry-After", 60))
479
time.sleep(wait_time)
480
continue
481
elif response.status_code >= 500: # Server error, retry
482
time.sleep(2 ** attempt) # Exponential backoff
483
continue
484
else:
485
print(f"API error {response.status_code}: {response.text}")
486
return False
487
488
except requests.exceptions.RequestException as e:
489
print(f"Request failed (attempt {attempt + 1}): {e}")
490
if attempt < max_retries:
491
time.sleep(2 ** attempt)
492
493
return False
494
```