or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/pypi-airbyte-source-s3

S3 connector for Airbyte that syncs data from Amazon S3 and S3-compatible services

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/airbyte-source-s3@4.14.x

To install, run

npx @tessl/cli install tessl/pypi-airbyte-source-s3@4.14.0

0

# Airbyte Source S3

1

2

A production-ready Airbyte connector that syncs data from Amazon S3 and S3-compatible storage services. Built on the Airbyte file-based connector framework, it supports multiple file formats, authentication methods, and incremental synchronization with comprehensive error handling and backward compatibility.

3

4

## Package Information

5

6

- **Package Name**: airbyte-source-s3

7

- **Package Type**: pypi

8

- **Language**: Python

9

- **Installation**: `pip install airbyte-source-s3`

10

- **Version**: 4.14.2

11

12

## Core Imports

13

14

```python

15

from source_s3.run import run

16

from source_s3.v4 import SourceS3, Config, SourceS3StreamReader, Cursor

17

```

18

19

Entry point usage:

20

21

```python

22

from source_s3.run import run

23

24

# Launch the connector

25

run()

26

```

27

28

## Basic Usage

29

30

```python

31

from source_s3.v4 import SourceS3, Config

32

33

# Create configuration

34

config = Config(

35

bucket="my-bucket",

36

aws_access_key_id="AKIAIOSFODNN7EXAMPLE",

37

aws_secret_access_key="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",

38

region_name="us-east-1"

39

)

40

41

# Create and launch source

42

source = SourceS3.create()

43

source.launch()

44

```

45

46

## Architecture

47

48

The airbyte-source-s3 connector follows a modular architecture built on Airbyte's file-based connector framework with dual-version support:

49

50

### Version Support

51

- **V4 (Current)**: File-based CDK implementation with `Config`, `SourceS3StreamReader`, and `Cursor` classes

52

- **V3 (Legacy/Deprecated)**: Legacy implementation with `SourceS3Spec` and `IncrementalFileStreamS3`, still supported for backward compatibility

53

54

### Core Components

55

- **SourceS3**: Main source class inheriting from FileBasedSource with V3-to-V4 config transformation

56

- **SourceS3StreamReader**: Handles S3-specific file discovery, streaming, and ZIP file extraction

57

- **Config**: V4 configuration specification with comprehensive S3 authentication support

58

- **Cursor**: Manages incremental sync state with legacy state migration capabilities

59

- **LegacyConfigTransformer**: Provides seamless V3-to-V4 configuration transformation

60

- **ZIP Support**: Full ZIP file extraction with `ZipFileHandler`, `DecompressedStream`, and `ZipContentReader`

61

62

### File Format Support

63

The connector supports multiple file formats with configurable options:

64

- **CSV**: Delimiter, encoding, quote characters, escape characters

65

- **JSON Lines**: Newline handling, unexpected field behavior, block size configuration

66

- **Parquet**: Column selection, batch size, buffer size optimization

67

- **Avro**: Native Avro format support

68

69

### Authentication Methods

70

- **AWS Access Keys**: Traditional access key ID and secret access key authentication

71

- **IAM Role Assumption**: Role-based authentication with external ID support

72

- **Anonymous Access**: Public bucket access without credentials

73

- **S3-Compatible Services**: Custom endpoint support for MinIO, DigitalOcean Spaces, etc.

74

75

The connector integrates seamlessly with AWS S3 and S3-compatible services, providing comprehensive file processing capabilities including compressed archives, incremental synchronization, and robust error handling.

76

77

## Capabilities

78

79

### Core Source Operations

80

81

Main source functionality including connector launching, configuration reading, and specification generation. These functions provide the primary entry points for running the S3 connector.

82

83

```python { .api }

84

def run() -> None: ...

85

86

class SourceS3(FileBasedSource):

87

@classmethod

88

def read_config(cls, config_path: str) -> Mapping[str, Any]: ...

89

@classmethod

90

def launch(cls, args: list[str] | None = None) -> None: ...

91

@classmethod

92

def create(cls, *, configured_catalog_path: Path | str | None = None) -> SourceS3: ...

93

```

94

95

[Core Source Operations](./core-source.md)

96

97

### Configuration Management

98

99

Configuration classes and specifications for both V4 and legacy V3 formats. Handles S3 authentication, bucket configuration, and file format specifications with full validation and schema generation.

100

101

```python { .api }

102

class Config(AbstractFileBasedSpec):

103

bucket: str

104

aws_access_key_id: Optional[str]

105

aws_secret_access_key: Optional[str]

106

role_arn: Optional[str]

107

endpoint: Optional[str]

108

region_name: Optional[str]

109

110

@classmethod

111

def schema(cls, *args, **kwargs) -> Dict[str, Any]: ...

112

```

113

114

[Configuration Management](./configuration.md)

115

116

### Stream Operations

117

118

Stream reading, cursor management, and incremental synchronization functionality. Provides S3-specific file discovery, reading operations, and state management for efficient data synchronization.

119

120

```python { .api }

121

class SourceS3StreamReader(AbstractFileBasedStreamReader):

122

@property

123

def s3_client: BaseClient: ...

124

125

def get_matching_files(self, globs: List[str], prefix: Optional[str], logger) -> Iterable[RemoteFile]: ...

126

def open_file(self, file: RemoteFile, mode: FileReadMode, encoding: Optional[str], logger) -> IOBase: ...

127

128

class Cursor(DefaultFileBasedCursor):

129

def set_initial_state(self, value: StreamState) -> None: ...

130

def get_state(self) -> StreamState: ...

131

```

132

133

[Stream Operations](./stream-operations.md)

134

135

### File Format Specifications

136

137

Configuration classes for supported file formats including CSV, JSON Lines, Parquet, and Avro formats. Each format provides specific configuration options for parsing and processing.

138

139

```python { .api }

140

class CsvFormat:

141

filetype: str = "csv"

142

delimiter: str

143

quote_char: str

144

encoding: Optional[str]

145

infer_datatypes: Optional[bool]

146

147

class JsonlFormat:

148

filetype: str = "jsonl"

149

newlines_in_values: bool

150

unexpected_field_behavior: UnexpectedFieldBehaviorEnum

151

block_size: int

152

153

class ParquetFormat:

154

filetype: str = "parquet"

155

columns: Optional[List[str]]

156

batch_size: int

157

buffer_size: int

158

159

class AvroFormat:

160

filetype: str = "avro"

161

```

162

163

[File Format Specifications](./file-formats.md)

164

165

### ZIP File Support

166

167

Comprehensive ZIP file extraction and streaming support for processing compressed S3 files. Handles large ZIP files efficiently with streaming decompression and individual file access.

168

169

```python { .api }

170

class ZipFileHandler:

171

def __init__(self, s3_client: BaseClient, config: Config): ...

172

def get_zip_files(self, filename: str) -> Tuple[List[zipfile.ZipInfo], int]: ...

173

174

class DecompressedStream(io.IOBase):

175

def __init__(self, file_obj: IO[bytes], file_info: RemoteFileInsideArchive, buffer_size: int = BUFFER_SIZE_DEFAULT): ...

176

def read(self, size: int = -1) -> bytes: ...

177

178

class ZipContentReader:

179

def __init__(self, decompressed_stream: DecompressedStream, encoding: Optional[str] = None, buffer_size: int = BUFFER_SIZE_DEFAULT): ...

180

def read(self, size: int = -1) -> Union[str, bytes]: ...

181

182

class RemoteFileInsideArchive(RemoteFile):

183

start_offset: int

184

compressed_size: int

185

uncompressed_size: int

186

compression_method: int

187

```

188

189

[ZIP File Support](./zip-support.md)

190

191

### Utilities and Error Handling

192

193

Utility functions for process management, data serialization, and custom exception handling. Includes multiprocessing support, Airbyte message handling, and S3-specific error management.

194

195

```python { .api }

196

def run_in_external_process(fn: Callable, timeout: int, max_timeout: int, logger, args: List[Any]) -> Mapping[str, Any]: ...

197

198

def _get_s3_compatible_client_args(config: Config) -> dict: ...

199

200

class S3Exception(AirbyteTracedException):

201

def __init__(self, file_info: Union[List[FileInfo], FileInfo], internal_message: Optional[str] = None, ...): ...

202

```

203

204

[Utilities and Error Handling](./utilities.md)

205

206

## Types

207

208

```python { .api }

209

# Required imports

210

from typing import Optional, Union, List, Dict, Any, Callable, Tuple, Iterable, IO

211

from datetime import datetime, timedelta

212

from pathlib import Path

213

from io import IOBase

214

import zipfile

215

import multiprocessing as mp

216

from enum import Enum

217

218

# Configuration types

219

class DeliverRecords:

220

"""Configuration for delivering records as structured data"""

221

delivery_type: str = "use_records_transfer"

222

223

class DeliverRawFiles:

224

"""Configuration for delivering raw files"""

225

delivery_type: str = "use_raw_files"

226

227

# Stream types

228

class RemoteFile:

229

"""Represents a remote file with metadata"""

230

uri: str

231

last_modified: Optional[datetime]

232

size: Optional[int]

233

234

class RemoteFileInsideArchive(RemoteFile):

235

"""Represents a file inside a ZIP archive"""

236

start_offset: int

237

compressed_size: int

238

uncompressed_size: int

239

compression_method: int

240

241

class FileInfo:

242

"""File information for error context"""

243

key: str

244

size: Optional[int]

245

last_modified: Optional[datetime]

246

247

class StreamState:

248

"""Stream synchronization state"""

249

pass

250

251

class FileBasedStreamConfig:

252

"""Configuration for file-based streams"""

253

name: str

254

globs: List[str]

255

format: Union['CsvFormat', 'ParquetFormat', 'AvroFormat', 'JsonlFormat']

256

input_schema: Optional[str]

257

258

# File format types

259

class UnexpectedFieldBehaviorEnum(str, Enum):

260

ignore = "ignore"

261

infer = "infer"

262

error = "error"

263

264

class CsvFormat:

265

filetype: str = "csv"

266

delimiter: str

267

quote_char: str

268

encoding: Optional[str]

269

infer_datatypes: Optional[bool]

270

escape_char: Optional[str]

271

double_quote: bool

272

newlines_in_values: bool

273

additional_reader_options: Optional[str]

274

advanced_options: Optional[str]

275

block_size: int

276

277

class JsonlFormat:

278

filetype: str = "jsonl"

279

newlines_in_values: bool

280

unexpected_field_behavior: UnexpectedFieldBehaviorEnum

281

block_size: int

282

283

class ParquetFormat:

284

filetype: str = "parquet"

285

columns: Optional[List[str]]

286

batch_size: int

287

buffer_size: int

288

289

class AvroFormat:

290

filetype: str = "avro"

291

292

# Authentication types

293

class BaseClient:

294

"""S3 client interface from botocore"""

295

def get_object(self, **kwargs) -> Dict[str, Any]: ...

296

def list_objects_v2(self, **kwargs) -> Dict[str, Any]: ...

297

def head_object(self, **kwargs) -> Dict[str, Any]: ...

298

299

# Airbyte types

300

class AirbyteMessage:

301

"""Airbyte message object for data synchronization"""

302

type: str

303

record: Optional['AirbyteRecordMessage']

304

state: Optional['AirbyteStateMessage']

305

log: Optional['AirbyteLogMessage']

306

307

class AirbyteRecordMessage:

308

"""Airbyte record message"""

309

stream: str

310

data: Dict[str, Any]

311

emitted_at: int

312

313

class AirbyteRecordMessageFileReference:

314

"""Reference to a file in Airbyte record message"""

315

pass

316

317

class FileRecordData:

318

"""File record data container"""

319

pass

320

321

class ConnectorSpecification:

322

"""Airbyte connector specification"""

323

documentationUrl: str

324

connectionSpecification: Dict[str, Any]

325

326

# Error handling types

327

class FailureType:

328

"""Enumeration of failure types for error classification"""

329

system_error: str = "system_error"

330

config_error: str = "config_error"

331

transient_error: str = "transient_error"

332

333

class AirbyteTracedException(Exception):

334

"""Base exception class for Airbyte connectors"""

335

def __init__(self, internal_message: Optional[str] = None, message: Optional[str] = None,

336

failure_type: FailureType = FailureType.system_error, exception: BaseException = None): ...

337

338

# File reading types

339

class FileReadMode:

340

"""File reading mode enumeration"""

341

pass

342

```

343

344

## Constants

345

346

```python { .api }

347

from datetime import timedelta

348

from os import getenv

349

350

# File size limits

351

FILE_SIZE_LIMIT = 1_500_000_000 # 1.5GB maximum file size

352

353

# Default concurrency

354

DEFAULT_CONCURRENCY = 10

355

356

# Date formats

357

_DATE_FORMAT = "%Y-%m-%d"

358

_LEGACY_DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"

359

360

# Migration settings

361

_V4_MIGRATION_BUFFER = timedelta(hours=1)

362

_V3_MIN_SYNC_DATE_FIELD = "v3_min_sync_date"

363

364

# ZIP file handling constants

365

BUFFER_SIZE_DEFAULT = 1024 * 1024 # 1MB default buffer size

366

MAX_BUFFER_SIZE_DEFAULT = 16 * BUFFER_SIZE_DEFAULT # 16MB max buffer

367

EOCD_SIGNATURE = b"\x50\x4b\x05\x06" # ZIP End of Central Directory signature

368

ZIP64_LOCATOR_SIGNATURE = b"\x50\x4b\x06\x07" # ZIP64 locator signature

369

370

# Legacy config transformation

371

SECONDS_FORMAT = "%Y-%m-%dT%H:%M:%SZ"

372

MICROS_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"

373

374

# AWS Environment

375

AWS_EXTERNAL_ID = getenv("AWS_ASSUME_ROLE_EXTERNAL_ID")

376

```