or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/pypi-airbyte-cdk

A framework for building Airbyte Source and Destination connectors with Python, supporting both programmatic and low-code declarative approaches.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/airbyte-cdk@3.1.x

To install, run

npx @tessl/cli install tessl/pypi-airbyte-cdk@3.1.0

0

# Airbyte CDK

1

2

The Airbyte CDK (Connector Development Kit) is a Python framework for building Airbyte Source and Destination connectors. It provides both programmatic and low-code declarative approaches to creating data integration connectors, enabling developers to build reliable data extraction and loading solutions for various APIs, databases, and data sources.

3

4

The CDK offers three main approaches: traditional programmatic connector development using Python classes, low-code declarative manifests using YAML configuration, and destination connector development for data loading scenarios.

5

6

## Package Information

7

8

- **Package Name**: airbyte-cdk

9

- **Version**: 3.1.0

10

- **Language**: Python

11

- **Installation**: `pip install airbyte-cdk`

12

- **Python Version**: 3.9+

13

- **Framework Type**: Connector Development Kit

14

15

## Core Imports

16

17

Main connector classes:

18

19

```python

20

from airbyte_cdk import Source, Destination

21

from airbyte_cdk.entrypoint import launch

22

```

23

24

Low-code declarative sources:

25

26

```python

27

from airbyte_cdk import YamlDeclarativeSource, ManifestDeclarativeSource

28

```

29

30

HTTP streams and utilities:

31

32

```python

33

from airbyte_cdk import HttpStream, HttpSubStream

34

from airbyte_cdk import TokenAuthenticator, Oauth2Authenticator

35

```

36

37

Protocol models:

38

39

```python

40

from airbyte_cdk import AirbyteMessage, ConfiguredAirbyteCatalog, AirbyteStream

41

```

42

43

## Basic Usage

44

45

### Creating a Source Connector

46

47

```python

48

from typing import Any, Iterable, List, Mapping, Optional, Tuple

49

from airbyte_cdk import Source

50

from airbyte_cdk.models import AirbyteCatalog, AirbyteMessage, ConfiguredAirbyteCatalog

51

from airbyte_cdk.sources.streams import Stream

52

import logging

53

54

class MySource(Source):

55

def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:

56

"""

57

Test connection to the source.

58

Returns (True, None) if successful, (False, error_message) otherwise.

59

"""

60

try:

61

# Test connection logic here

62

return True, None

63

except Exception as e:

64

return False, str(e)

65

66

def streams(self, config: Mapping[str, Any]) -> List[Stream]:

67

"""

68

Return list of streams for this source.

69

"""

70

return [MyStream(config=config)]

71

72

def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:

73

"""

74

Discover available streams and their schemas.

75

"""

76

return AirbyteCatalog(streams=[stream.as_airbyte_stream() for stream in self.streams(config)])

77

78

# Launch the connector

79

if __name__ == "__main__":

80

from airbyte_cdk.entrypoint import launch

81

launch(MySource(), sys.argv[1:])

82

```

83

84

### Using Low-Code Declarative Sources

85

86

```python

87

from airbyte_cdk import YamlDeclarativeSource

88

89

class MyDeclarativeSource(YamlDeclarativeSource):

90

def __init__(self):

91

super().__init__(path_to_yaml="manifest.yaml")

92

93

# The manifest.yaml file defines streams, authentication, and data transformation

94

```

95

96

### Creating a Destination Connector

97

98

```python

99

from typing import Any, Iterable, Mapping

100

from airbyte_cdk import Destination

101

from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog

102

103

class MyDestination(Destination):

104

def write(

105

self,

106

config: Mapping[str, Any],

107

configured_catalog: ConfiguredAirbyteCatalog,

108

input_messages: Iterable[AirbyteMessage]

109

) -> Iterable[AirbyteMessage]:

110

"""

111

Write data records to the destination.

112

"""

113

for message in input_messages:

114

if message.type == Type.RECORD:

115

# Process and write the record

116

self._write_record(message.record)

117

yield message

118

```

119

120

## Architecture

121

122

The Airbyte CDK follows a layered architecture designed for extensibility and reusability:

123

124

### Core Components

125

126

1. **Base Connectors**: Abstract `Source` and `Destination` classes providing the foundational interface

127

2. **Stream Framework**: `HttpStream` and `Stream` classes for data extraction with built-in state management

128

3. **Authentication Layer**: Various authenticators for OAuth2, API tokens, and custom authentication schemes

129

4. **Declarative Framework**: YAML-based low-code approach for building connectors without Python code

130

5. **Protocol Models**: Pydantic models implementing the Airbyte Protocol for message exchange

131

6. **Utilities**: Helper functions for configuration, schema handling, and data transformation

132

133

### Stream Types

134

135

- **HTTP Streams**: For REST API data sources with pagination, authentication, and error handling

136

- **Incremental Streams**: Support for state-based incremental synchronization

137

- **Sub-streams**: Nested data extraction from parent-child relationships

138

- **Concurrent Streams**: High-throughput data extraction with parallel processing

139

140

## Capabilities

141

142

### [Source Connectors](./source-connectors.md)

143

144

Framework for building data extraction connectors with support for HTTP APIs, databases, and files. Includes stream management, incremental sync, authentication, error handling, and state management.

145

146

### [Destination Connectors](./destination-connectors.md)

147

148

Framework for building data loading connectors that write records to databases, data warehouses, files, and APIs. Provides batch processing, type mapping, and error handling capabilities.

149

150

### [Declarative Low-Code CDK](./declarative-cdk.md)

151

152

YAML-based framework for building connectors without writing Python code. Supports most common connector patterns through declarative configuration with authentication, pagination, transformations, and incremental sync.

153

154

## Quick Start Example

155

156

Complete example of building and running a connector:

157

158

```python

159

import sys

160

from airbyte_cdk import Source, HttpStream, launch

161

from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator

162

163

class UsersStream(HttpStream):

164

url_base = "https://api.example.com/"

165

primary_key = "id"

166

167

def __init__(self, config: dict):

168

super().__init__()

169

self._config = config

170

171

def path(self, **kwargs) -> str:

172

return "users"

173

174

def request_headers(self, **kwargs) -> Mapping[str, Any]:

175

return {"Authorization": f"Bearer {self._config['api_token']}"}

176

177

def parse_response(self, response, **kwargs) -> Iterable[Mapping]:

178

data = response.json()

179

for record in data.get("users", []):

180

yield record

181

182

class ExampleSource(Source):

183

def check_connection(self, logger, config):

184

return True, None

185

186

def streams(self, config):

187

return [UsersStream(config)]

188

189

if __name__ == "__main__":

190

launch(ExampleSource(), sys.argv[1:])

191

```

192

193

This example demonstrates a basic HTTP source connector that extracts user data from an API with token authentication.