or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

declarative-cdk.mddestination-connectors.mdindex.mdsource-connectors.md

destination-connectors.mddocs/

0

# Destination Connectors

1

2

Framework for building data loading connectors that write records to databases, data warehouses, files, and APIs. The Destination connector framework provides structured approaches to implementing data loading with batch processing, type mapping, error handling, and integration with Airbyte's standardized message protocol.

3

4

## Capabilities

5

6

### Base Destination Class

7

8

Core class for implementing destination connectors that load data into external systems.

9

10

```python { .api }

11

from airbyte_cdk import Destination

12

from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog

13

from typing import Any, Iterable, Mapping

14

import logging

15

16

class Destination:

17

"""

18

Base class for Airbyte destination connectors.

19

"""

20

21

def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:

22

"""

23

Test connection validity with given configuration.

24

25

Args:

26

logger: Logger instance for outputting messages

27

config: Configuration dictionary containing connection parameters

28

29

Returns:

30

AirbyteConnectionStatus indicating success or failure with details

31

"""

32

33

def write(

34

self,

35

config: Mapping[str, Any],

36

configured_catalog: ConfiguredAirbyteCatalog,

37

input_messages: Iterable[AirbyteMessage]

38

) -> Iterable[AirbyteMessage]:

39

"""

40

Write data records to the destination.

41

42

Args:

43

config: Configuration dictionary

44

configured_catalog: Catalog specifying destination streams and sync modes

45

input_messages: Stream of AirbyteMessage instances containing records to write

46

47

Yields:

48

AirbyteMessage instances for status updates, state, or errors

49

"""

50

51

def spec(self) -> ConnectorSpecification:

52

"""

53

Return the specification for this destination's configuration.

54

55

Returns:

56

ConnectorSpecification defining required and optional configuration fields

57

"""

58

```

59

60

### Message Processing

61

62

Classes and utilities for processing Airbyte protocol messages.

63

64

```python { .api }

65

from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, AirbyteStateMessage, Type

66

from typing import Any, Dict, List

67

68

class AirbyteMessage:

69

"""

70

Airbyte protocol message containing data records, state, or metadata.

71

"""

72

73

type: Type # MESSAGE, RECORD, STATE, LOG, CATALOG, etc.

74

record: Optional[AirbyteRecordMessage]

75

state: Optional[AirbyteStateMessage]

76

log: Optional[AirbyteLogMessage]

77

78

class AirbyteRecordMessage:

79

"""

80

Data record message containing actual data to be written.

81

"""

82

83

stream: str # Name of the stream

84

data: Dict[str, Any] # Record data as key-value pairs

85

emitted_at: int # Timestamp when record was emitted (milliseconds)

86

namespace: Optional[str] # Optional namespace for the stream

87

88

class ConfiguredAirbyteCatalog:

89

"""

90

Catalog describing which streams to write and how.

91

"""

92

93

streams: List[ConfiguredAirbyteStream]

94

95

class ConfiguredAirbyteStream:

96

"""

97

Configuration for a single stream in the destination.

98

"""

99

100

stream: AirbyteStream # Stream schema and metadata

101

sync_mode: SyncMode # FULL_REFRESH or INCREMENTAL

102

destination_sync_mode: DestinationSyncMode # APPEND, OVERWRITE, APPEND_DEDUP

103

cursor_field: Optional[List[str]] # Fields used for incremental sync

104

primary_key: Optional[List[List[str]]] # Fields used for deduplication

105

```

106

107

### Configuration and Schema Handling

108

109

Utilities for handling destination configuration and data schemas.

110

111

```python { .api }

112

from airbyte_cdk.models import ConnectorSpecification, AirbyteStream

113

from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader

114

from typing import Any, Dict, List, Optional

115

116

class ConnectorSpecification:

117

"""

118

Specification defining the configuration schema for a destination.

119

"""

120

121

connectionSpecification: Dict[str, Any] # JSONSchema for configuration

122

supportsIncremental: Optional[bool] # Whether destination supports incremental sync

123

supportsNormalization: Optional[bool] # Whether destination supports normalization

124

supportsDBT: Optional[bool] # Whether destination supports DBT transformations

125

supported_destination_sync_modes: List[DestinationSyncMode] # Supported sync modes

126

127

def check_config_against_spec_or_exit(config: Dict[str, Any], spec: ConnectorSpecification) -> None:

128

"""

129

Validate configuration against the connector specification.

130

131

Args:

132

config: Configuration dictionary to validate

133

spec: Connector specification with schema

134

135

Raises:

136

SystemExit: If configuration is invalid

137

"""

138

139

class ResourceSchemaLoader:

140

"""

141

Load JSON schemas from package resources.

142

"""

143

144

def __init__(self, package_name: str):

145

"""

146

Initialize schema loader.

147

148

Args:

149

package_name: Python package containing schema files

150

"""

151

152

def get_schema(self, name: str) -> Dict[str, Any]:

153

"""

154

Load schema by name.

155

156

Args:

157

name: Schema file name (without .json extension)

158

159

Returns:

160

Schema dictionary

161

"""

162

```

163

164

### Type Mapping and Transformation

165

166

Classes for handling data type conversion and transformation.

167

168

```python { .api }

169

from airbyte_cdk.sources.utils.transform import TypeTransformer, TransformConfig

170

from typing import Any, Dict, Mapping

171

172

class TypeTransformer:

173

"""

174

Transform data types between Airbyte and destination formats.

175

"""

176

177

def __init__(self, config: TransformConfig):

178

"""

179

Initialize type transformer.

180

181

Args:

182

config: Configuration for type transformations

183

"""

184

185

def transform(self, data: Dict[str, Any], schema: Dict[str, Any]) -> Dict[str, Any]:

186

"""

187

Transform record data according to schema and configuration.

188

189

Args:

190

data: Record data dictionary

191

schema: JSONSchema for the record

192

193

Returns:

194

Transformed data dictionary

195

"""

196

197

class TransformConfig:

198

"""

199

Configuration for data transformations.

200

"""

201

202

def __init__(

203

self,

204

date_format: str = None,

205

datetime_format: str = None,

206

time_format: str = None,

207

normalization: Mapping[str, str] = None

208

):

209

"""

210

Initialize transformation configuration.

211

212

Args:

213

date_format: Format string for date fields

214

datetime_format: Format string for datetime fields

215

time_format: Format string for time fields

216

normalization: Field name normalization mappings

217

"""

218

```

219

220

## Usage Examples

221

222

### Basic Database Destination

223

224

```python

225

from airbyte_cdk import Destination

226

from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, Status, Type

227

import logging

228

import sqlite3

229

from typing import Any, Iterable, Mapping

230

231

class SqliteDestination(Destination):

232

def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:

233

try:

234

# Test database connection

235

conn = sqlite3.connect(config["database_path"])

236

conn.execute("SELECT 1")

237

conn.close()

238

return AirbyteConnectionStatus(status=Status.SUCCEEDED)

239

except Exception as e:

240

return AirbyteConnectionStatus(status=Status.FAILED, message=str(e))

241

242

def write(

243

self,

244

config: Mapping[str, Any],

245

configured_catalog: ConfiguredAirbyteCatalog,

246

input_messages: Iterable[AirbyteMessage]

247

) -> Iterable[AirbyteMessage]:

248

249

conn = sqlite3.connect(config["database_path"])

250

251

try:

252

for message in input_messages:

253

if message.type == Type.RECORD:

254

self._write_record(conn, message.record)

255

elif message.type == Type.STATE:

256

# Pass through state messages

257

yield message

258

259

# Yield message to maintain protocol

260

yield message

261

finally:

262

conn.close()

263

264

def _write_record(self, conn: sqlite3.Connection, record):

265

table_name = record.stream

266

data = record.data

267

268

# Create table if not exists

269

self._ensure_table_exists(conn, table_name, data)

270

271

# Insert record

272

columns = list(data.keys())

273

values = list(data.values())

274

placeholders = ",".join(["?" for _ in values])

275

276

query = f"INSERT INTO {table_name} ({','.join(columns)}) VALUES ({placeholders})"

277

conn.execute(query, values)

278

conn.commit()

279

280

def _ensure_table_exists(self, conn: sqlite3.Connection, table_name: str, sample_data: dict):

281

# Simple table creation based on sample data

282

columns = []

283

for key, value in sample_data.items():

284

if isinstance(value, int):

285

columns.append(f"{key} INTEGER")

286

elif isinstance(value, float):

287

columns.append(f"{key} REAL")

288

else:

289

columns.append(f"{key} TEXT")

290

291

create_sql = f"CREATE TABLE IF NOT EXISTS {table_name} ({', '.join(columns)})"

292

conn.execute(create_sql)

293

conn.commit()

294

295

# Usage

296

destination = SqliteDestination()

297

config = {"database_path": "/path/to/database.db"}

298

status = destination.check(logging.getLogger(), config)

299

```

300

301

### Batch Processing Destination

302

303

```python

304

from airbyte_cdk import Destination

305

from airbyte_cdk.models import AirbyteMessage, Type

306

import json

307

from typing import Any, Dict, Iterable, List, Mapping

308

309

class BatchFileDestination(Destination):

310

def __init__(self):

311

self._buffer = {} # stream_name -> list of records

312

self._batch_size = 1000

313

314

def write(

315

self,

316

config: Mapping[str, Any],

317

configured_catalog,

318

input_messages: Iterable[AirbyteMessage]

319

) -> Iterable[AirbyteMessage]:

320

321

for message in input_messages:

322

if message.type == Type.RECORD:

323

self._buffer_record(message.record)

324

325

# Check if batch is ready

326

if len(self._buffer[message.record.stream]) >= self._batch_size:

327

self._flush_batch(config, message.record.stream)

328

329

yield message

330

331

# Flush remaining records

332

for stream_name in self._buffer:

333

if self._buffer[stream_name]:

334

self._flush_batch(config, stream_name)

335

336

def _buffer_record(self, record):

337

stream_name = record.stream

338

if stream_name not in self._buffer:

339

self._buffer[stream_name] = []

340

341

self._buffer[stream_name].append({

342

"data": record.data,

343

"emitted_at": record.emitted_at

344

})

345

346

def _flush_batch(self, config: Mapping[str, Any], stream_name: str):

347

if not self._buffer[stream_name]:

348

return

349

350

output_path = f"{config['output_dir']}/{stream_name}.jsonl"

351

352

with open(output_path, "a") as f:

353

for record in self._buffer[stream_name]:

354

f.write(json.dumps(record) + "\n")

355

356

self._buffer[stream_name] = []

357

print(f"Flushed batch for stream {stream_name}")

358

```

359

360

### Type-Safe Destination with Schema Validation

361

362

```python

363

from airbyte_cdk import Destination

364

from airbyte_cdk.sources.utils.transform import TypeTransformer, TransformConfig

365

from airbyte_cdk.models import AirbyteMessage, Type

366

import jsonschema

367

from typing import Any, Dict, Iterable, Mapping

368

369

class ValidatingDestination(Destination):

370

def __init__(self):

371

self._transformer = TypeTransformer(TransformConfig(

372

date_format="%Y-%m-%d",

373

datetime_format="%Y-%m-%dT%H:%M:%S",

374

))

375

self._schemas = {} # stream_name -> schema

376

377

def write(

378

self,

379

config: Mapping[str, Any],

380

configured_catalog,

381

input_messages: Iterable[AirbyteMessage]

382

) -> Iterable[AirbyteMessage]:

383

384

# Load schemas from catalog

385

for stream_config in configured_catalog.streams:

386

stream_name = stream_config.stream.name

387

self._schemas[stream_name] = stream_config.stream.json_schema

388

389

for message in input_messages:

390

if message.type == Type.RECORD:

391

try:

392

# Validate and transform record

393

validated_record = self._validate_and_transform(message.record)

394

self._write_validated_record(config, validated_record)

395

except Exception as e:

396

# Log validation error but don't stop processing

397

print(f"Validation error for record in {message.record.stream}: {e}")

398

399

yield message

400

401

def _validate_and_transform(self, record) -> Dict[str, Any]:

402

stream_name = record.stream

403

schema = self._schemas.get(stream_name)

404

405

if schema:

406

# Validate against schema

407

jsonschema.validate(record.data, schema)

408

409

# Transform data types

410

transformed_data = self._transformer.transform(record.data, schema)

411

return {

412

"stream": stream_name,

413

"data": transformed_data,

414

"emitted_at": record.emitted_at

415

}

416

417

return {

418

"stream": stream_name,

419

"data": record.data,

420

"emitted_at": record.emitted_at

421

}

422

423

def _write_validated_record(self, config: Mapping[str, Any], record: Dict[str, Any]):

424

# Write the validated and transformed record

425

pass

426

```

427

428

### API Destination with Error Handling

429

430

```python

431

from airbyte_cdk import Destination

432

from airbyte_cdk.models import AirbyteMessage, Type

433

import requests

434

import time

435

from typing import Any, Iterable, Mapping

436

437

class ApiDestination(Destination):

438

def write(

439

self,

440

config: Mapping[str, Any],

441

configured_catalog,

442

input_messages: Iterable[AirbyteMessage]

443

) -> Iterable[AirbyteMessage]:

444

445

session = requests.Session()

446

session.headers.update({

447

"Authorization": f"Bearer {config['api_token']}",

448

"Content-Type": "application/json"

449

})

450

451

for message in input_messages:

452

if message.type == Type.RECORD:

453

success = self._write_record_with_retry(

454

session,

455

config,

456

message.record

457

)

458

459

if not success:

460

print(f"Failed to write record to API: {message.record.stream}")

461

462

yield message

463

464

def _write_record_with_retry(self, session: requests.Session, config: Mapping[str, Any], record, max_retries: int = 3) -> bool:

465

url = f"{config['api_base_url']}/data/{record.stream}"

466

payload = {

467

"data": record.data,

468

"timestamp": record.emitted_at

469

}

470

471

for attempt in range(max_retries + 1):

472

try:

473

response = session.post(url, json=payload, timeout=30)

474

475

if response.status_code == 200:

476

return True

477

elif response.status_code == 429: # Rate limited

478

wait_time = int(response.headers.get("Retry-After", 60))

479

time.sleep(wait_time)

480

continue

481

elif response.status_code >= 500: # Server error, retry

482

time.sleep(2 ** attempt) # Exponential backoff

483

continue

484

else:

485

print(f"API error {response.status_code}: {response.text}")

486

return False

487

488

except requests.exceptions.RequestException as e:

489

print(f"Request failed (attempt {attempt + 1}): {e}")

490

if attempt < max_retries:

491

time.sleep(2 ** attempt)

492

493

return False

494

```