or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

declarative-cdk.mddestination-connectors.mdindex.mdsource-connectors.md

source-connectors.mddocs/

0

# Source Connectors

1

2

Framework for building data extraction connectors with support for HTTP APIs, databases, and files. The Source connector framework provides a structured approach to implementing data ingestion with built-in stream management, incremental synchronization, authentication, error handling, and state management.

3

4

## Capabilities

5

6

### Base Source Classes

7

8

Core classes for implementing source connectors that extract data from external systems.

9

10

```python { .api }

11

from airbyte_cdk import Source

12

from airbyte_cdk.models import AirbyteCatalog, AirbyteMessage, ConfiguredAirbyteCatalog

13

from typing import Any, Iterable, List, Mapping, Optional, Tuple

14

import logging

15

16

class Source:

17

"""

18

Base class for Airbyte source connectors.

19

"""

20

def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:

21

"""

22

Test connection validity with given configuration.

23

24

Args:

25

logger: Logger instance for outputting messages

26

config: Configuration dictionary containing connection parameters

27

28

Returns:

29

Tuple of (success_boolean, error_message_or_none)

30

"""

31

32

def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:

33

"""

34

Discover available streams and their schemas.

35

36

Args:

37

logger: Logger instance

38

config: Configuration dictionary

39

40

Returns:

41

AirbyteCatalog containing available streams and their schemas

42

"""

43

44

def read(

45

self,

46

logger: logging.Logger,

47

config: Mapping[str, Any],

48

catalog: ConfiguredAirbyteCatalog,

49

state: Optional[Mapping[str, Any]] = None

50

) -> Iterable[AirbyteMessage]:

51

"""

52

Read data from the source.

53

54

Args:

55

logger: Logger instance

56

config: Configuration dictionary

57

catalog: Configured catalog specifying which streams to read

58

state: Optional state for incremental reads

59

60

Yields:

61

AirbyteMessage instances containing records, state, or logs

62

"""

63

64

def streams(self, config: Mapping[str, Any]) -> List[Stream]:

65

"""

66

Return list of streams for this source.

67

68

Args:

69

config: Configuration dictionary

70

71

Returns:

72

List of Stream instances available in this source

73

"""

74

```

75

76

### HTTP Stream Classes

77

78

Classes for building HTTP-based source connectors with built-in pagination, authentication, and error handling.

79

80

```python { .api }

81

from airbyte_cdk import HttpStream, HttpSubStream

82

from airbyte_cdk.sources.streams.core import Stream

83

from requests.auth import AuthBase

84

from typing import Any, Iterable, Mapping, Optional

85

86

class HttpStream(Stream):

87

"""

88

Base class for HTTP API data extraction streams.

89

"""

90

91

def __init__(self, authenticator: Optional[AuthBase] = None):

92

"""

93

Initialize HTTP stream.

94

95

Args:

96

authenticator: Authentication handler for HTTP requests

97

"""

98

99

@property

100

def url_base(self) -> str:

101

"""

102

Base URL for the API endpoint.

103

Example: "https://api.example.com/v1/"

104

"""

105

106

def path(self, **kwargs) -> str:

107

"""

108

Return the API endpoint path for this stream.

109

110

Returns:

111

Path component of the URL (e.g., "users", "posts")

112

"""

113

114

def request_params(self, **kwargs) -> Mapping[str, Any]:

115

"""

116

Return query parameters for the request.

117

118

Returns:

119

Dictionary of query parameters

120

"""

121

122

def request_headers(self, **kwargs) -> Mapping[str, Any]:

123

"""

124

Return headers for the request.

125

126

Returns:

127

Dictionary of HTTP headers

128

"""

129

130

def parse_response(self, response, **kwargs) -> Iterable[Mapping]:

131

"""

132

Parse HTTP response into records.

133

134

Args:

135

response: HTTP response object

136

137

Yields:

138

Dictionary records extracted from the response

139

"""

140

141

def next_page_token(self, response) -> Optional[Mapping[str, Any]]:

142

"""

143

Extract next page token for pagination.

144

145

Args:

146

response: HTTP response object

147

148

Returns:

149

Token for next page or None if no more pages

150

"""

151

152

class HttpSubStream(HttpStream):

153

"""

154

HTTP stream that depends on data from a parent stream.

155

"""

156

157

def __init__(self, parent: HttpStream, **kwargs):

158

"""

159

Initialize sub-stream with parent dependency.

160

161

Args:

162

parent: Parent stream that provides data for this sub-stream

163

"""

164

```

165

166

### Authentication

167

168

Authentication handlers for various HTTP authentication schemes.

169

170

```python { .api }

171

from airbyte_cdk import TokenAuthenticator, Oauth2Authenticator, BasicHttpAuthenticator

172

from requests.auth import AuthBase

173

174

class TokenAuthenticator(AuthBase):

175

"""

176

Authentication using API tokens in headers.

177

"""

178

179

def __init__(self, token: str, auth_method: str = "Bearer", auth_header: str = "Authorization"):

180

"""

181

Initialize token authenticator.

182

183

Args:

184

token: API token value

185

auth_method: Authentication method (e.g., "Bearer", "Token")

186

auth_header: Header name for authentication

187

"""

188

189

class Oauth2Authenticator(AuthBase):

190

"""

191

OAuth 2.0 authentication with automatic token refresh.

192

"""

193

194

def __init__(

195

self,

196

token_refresh_endpoint: str,

197

client_id: str,

198

client_secret: str,

199

refresh_token: str,

200

scopes: Optional[List[str]] = None,

201

token_expiry_date: Optional[str] = None,

202

access_token: Optional[str] = None

203

):

204

"""

205

Initialize OAuth2 authenticator.

206

207

Args:

208

token_refresh_endpoint: URL for token refresh

209

client_id: OAuth client ID

210

client_secret: OAuth client secret

211

refresh_token: Refresh token for obtaining access tokens

212

scopes: Optional list of OAuth scopes

213

token_expiry_date: When current access token expires

214

access_token: Current access token

215

"""

216

217

class BasicHttpAuthenticator(AuthBase):

218

"""

219

HTTP Basic authentication.

220

"""

221

222

def __init__(self, username: str, password: str):

223

"""

224

Initialize basic authentication.

225

226

Args:

227

username: Username for basic auth

228

password: Password for basic auth

229

"""

230

```

231

232

### Stream State Management

233

234

Classes for managing incremental synchronization state.

235

236

```python { .api }

237

from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager

238

from airbyte_cdk.sources.streams.core import IncrementalMixin

239

from typing import Any, Mapping, Optional

240

241

class IncrementalMixin:

242

"""

243

Mixin for streams that support incremental synchronization.

244

"""

245

246

@property

247

def cursor_field(self) -> str:

248

"""

249

Field name used as cursor for incremental sync.

250

251

Returns:

252

Name of the cursor field (e.g., "updated_at", "id")

253

"""

254

255

def get_updated_state(

256

self,

257

current_stream_state: Mapping[str, Any],

258

latest_record: Mapping[str, Any]

259

) -> Mapping[str, Any]:

260

"""

261

Update stream state based on the latest record.

262

263

Args:

264

current_stream_state: Current state for this stream

265

latest_record: Latest record processed

266

267

Returns:

268

Updated state dictionary

269

"""

270

271

class ConnectorStateManager:

272

"""

273

Manages state across all streams in a connector.

274

"""

275

276

def get_stream_state(self, stream_name: str) -> Mapping[str, Any]:

277

"""

278

Get state for a specific stream.

279

280

Args:

281

stream_name: Name of the stream

282

283

Returns:

284

State dictionary for the stream

285

"""

286

287

def update_state_for_stream(

288

self,

289

stream_name: str,

290

state: Mapping[str, Any]

291

) -> None:

292

"""

293

Update state for a specific stream.

294

295

Args:

296

stream_name: Name of the stream

297

state: New state dictionary

298

"""

299

```

300

301

## Usage Examples

302

303

### Basic HTTP Source

304

305

```python

306

from airbyte_cdk import Source, HttpStream

307

from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator

308

import logging

309

from typing import Any, Mapping

310

311

class UsersStream(HttpStream):

312

url_base = "https://api.example.com/v1/"

313

primary_key = "id"

314

315

def __init__(self, config: Mapping[str, Any]):

316

authenticator = TokenAuthenticator(token=config["api_token"])

317

super().__init__(authenticator=authenticator)

318

self._config = config

319

320

def path(self, **kwargs) -> str:

321

return "users"

322

323

def parse_response(self, response, **kwargs):

324

data = response.json()

325

for user in data.get("users", []):

326

yield user

327

328

class ExampleSource(Source):

329

def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]):

330

try:

331

# Test API connection

332

stream = UsersStream(config)

333

# Perform test request

334

return True, None

335

except Exception as e:

336

return False, str(e)

337

338

def streams(self, config: Mapping[str, Any]):

339

return [UsersStream(config)]

340

341

# Usage

342

source = ExampleSource()

343

config = {"api_token": "your_token_here"}

344

success, error = source.check_connection(logging.getLogger(), config)

345

```

346

347

### Incremental Stream with Pagination

348

349

```python

350

from airbyte_cdk import HttpStream

351

from airbyte_cdk.sources.streams.core import IncrementalMixin

352

from datetime import datetime

353

from typing import Any, Mapping, Optional

354

355

class OrdersStream(HttpStream, IncrementalMixin):

356

url_base = "https://api.example.com/v1/"

357

primary_key = "id"

358

cursor_field = "updated_at"

359

360

def path(self, **kwargs) -> str:

361

return "orders"

362

363

def request_params(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Mapping[str, Any]:

364

params = {"limit": 100}

365

366

# Add cursor for incremental sync

367

if stream_state and self.cursor_field in stream_state:

368

params["updated_since"] = stream_state[self.cursor_field]

369

370

# Add pagination token

371

if kwargs.get("next_page_token"):

372

params["page_token"] = kwargs["next_page_token"]

373

374

return params

375

376

def next_page_token(self, response) -> Optional[str]:

377

data = response.json()

378

return data.get("next_page_token")

379

380

def parse_response(self, response, **kwargs):

381

data = response.json()

382

for order in data.get("orders", []):

383

yield order

384

385

def get_updated_state(self, current_stream_state: Mapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:

386

current_cursor = current_stream_state.get(self.cursor_field, "")

387

latest_cursor = latest_record.get(self.cursor_field, "")

388

389

return {self.cursor_field: max(current_cursor, latest_cursor)}

390

```

391

392

### OAuth2 Authentication

393

394

```python

395

from airbyte_cdk import HttpStream

396

from airbyte_cdk.sources.streams.http.requests_native_auth import Oauth2Authenticator

397

398

class AuthenticatedStream(HttpStream):

399

url_base = "https://api.example.com/v1/"

400

401

def __init__(self, config: Mapping[str, Any]):

402

authenticator = Oauth2Authenticator(

403

token_refresh_endpoint="https://api.example.com/oauth/token",

404

client_id=config["client_id"],

405

client_secret=config["client_secret"],

406

refresh_token=config["refresh_token"]

407

)

408

super().__init__(authenticator=authenticator)

409

```

410

411

### Sub-stream Implementation

412

413

```python

414

from airbyte_cdk import HttpSubStream

415

416

class UserPostsStream(HttpSubStream):

417

def __init__(self, parent: UsersStream, **kwargs):

418

super().__init__(parent=parent, **kwargs)

419

420

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:

421

user_id = stream_slice["parent"]["id"]

422

return f"users/{user_id}/posts"

423

424

def stream_slices(self, **kwargs):

425

# Use parent stream records as slices

426

for user in self.parent.read_records(**kwargs):

427

yield {"parent": user}

428

429

def parse_response(self, response, **kwargs):

430

data = response.json()

431

for post in data.get("posts", []):

432

yield post

433

```