S3 connector for Airbyte that syncs data from Amazon S3 and S3-compatible services
npx @tessl/cli install tessl/pypi-airbyte-source-s3@4.14.00
# Airbyte Source S3
1
2
A production-ready Airbyte connector that syncs data from Amazon S3 and S3-compatible storage services. Built on the Airbyte file-based connector framework, it supports multiple file formats, authentication methods, and incremental synchronization with comprehensive error handling and backward compatibility.
3
4
## Package Information
5
6
- **Package Name**: airbyte-source-s3
7
- **Package Type**: pypi
8
- **Language**: Python
9
- **Installation**: `pip install airbyte-source-s3`
10
- **Version**: 4.14.2
11
12
## Core Imports
13
14
```python
15
from source_s3.run import run
16
from source_s3.v4 import SourceS3, Config, SourceS3StreamReader, Cursor
17
```
18
19
Entry point usage:
20
21
```python
22
from source_s3.run import run
23
24
# Launch the connector
25
run()
26
```
27
28
## Basic Usage
29
30
```python
31
from source_s3.v4 import SourceS3, Config
32
33
# Create configuration
34
config = Config(
35
bucket="my-bucket",
36
aws_access_key_id="AKIAIOSFODNN7EXAMPLE",
37
aws_secret_access_key="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
38
region_name="us-east-1"
39
)
40
41
# Create and launch source
42
source = SourceS3.create()
43
source.launch()
44
```
45
46
## Architecture
47
48
The airbyte-source-s3 connector follows a modular architecture built on Airbyte's file-based connector framework with dual-version support:
49
50
### Version Support
51
- **V4 (Current)**: File-based CDK implementation with `Config`, `SourceS3StreamReader`, and `Cursor` classes
52
- **V3 (Legacy/Deprecated)**: Legacy implementation with `SourceS3Spec` and `IncrementalFileStreamS3`, still supported for backward compatibility
53
54
### Core Components
55
- **SourceS3**: Main source class inheriting from FileBasedSource with V3-to-V4 config transformation
56
- **SourceS3StreamReader**: Handles S3-specific file discovery, streaming, and ZIP file extraction
57
- **Config**: V4 configuration specification with comprehensive S3 authentication support
58
- **Cursor**: Manages incremental sync state with legacy state migration capabilities
59
- **LegacyConfigTransformer**: Provides seamless V3-to-V4 configuration transformation
60
- **ZIP Support**: Full ZIP file extraction with `ZipFileHandler`, `DecompressedStream`, and `ZipContentReader`
61
62
### File Format Support
63
The connector supports multiple file formats with configurable options:
64
- **CSV**: Delimiter, encoding, quote characters, escape characters
65
- **JSON Lines**: Newline handling, unexpected field behavior, block size configuration
66
- **Parquet**: Column selection, batch size, buffer size optimization
67
- **Avro**: Native Avro format support
68
69
### Authentication Methods
70
- **AWS Access Keys**: Traditional access key ID and secret access key authentication
71
- **IAM Role Assumption**: Role-based authentication with external ID support
72
- **Anonymous Access**: Public bucket access without credentials
73
- **S3-Compatible Services**: Custom endpoint support for MinIO, DigitalOcean Spaces, etc.
74
75
The connector integrates seamlessly with AWS S3 and S3-compatible services, providing comprehensive file processing capabilities including compressed archives, incremental synchronization, and robust error handling.
76
77
## Capabilities
78
79
### Core Source Operations
80
81
Main source functionality including connector launching, configuration reading, and specification generation. These functions provide the primary entry points for running the S3 connector.
82
83
```python { .api }
84
def run() -> None: ...
85
86
class SourceS3(FileBasedSource):
87
@classmethod
88
def read_config(cls, config_path: str) -> Mapping[str, Any]: ...
89
@classmethod
90
def launch(cls, args: list[str] | None = None) -> None: ...
91
@classmethod
92
def create(cls, *, configured_catalog_path: Path | str | None = None) -> SourceS3: ...
93
```
94
95
[Core Source Operations](./core-source.md)
96
97
### Configuration Management
98
99
Configuration classes and specifications for both V4 and legacy V3 formats. Handles S3 authentication, bucket configuration, and file format specifications with full validation and schema generation.
100
101
```python { .api }
102
class Config(AbstractFileBasedSpec):
103
bucket: str
104
aws_access_key_id: Optional[str]
105
aws_secret_access_key: Optional[str]
106
role_arn: Optional[str]
107
endpoint: Optional[str]
108
region_name: Optional[str]
109
110
@classmethod
111
def schema(cls, *args, **kwargs) -> Dict[str, Any]: ...
112
```
113
114
[Configuration Management](./configuration.md)
115
116
### Stream Operations
117
118
Stream reading, cursor management, and incremental synchronization functionality. Provides S3-specific file discovery, reading operations, and state management for efficient data synchronization.
119
120
```python { .api }
121
class SourceS3StreamReader(AbstractFileBasedStreamReader):
122
@property
123
def s3_client: BaseClient: ...
124
125
def get_matching_files(self, globs: List[str], prefix: Optional[str], logger) -> Iterable[RemoteFile]: ...
126
def open_file(self, file: RemoteFile, mode: FileReadMode, encoding: Optional[str], logger) -> IOBase: ...
127
128
class Cursor(DefaultFileBasedCursor):
129
def set_initial_state(self, value: StreamState) -> None: ...
130
def get_state(self) -> StreamState: ...
131
```
132
133
[Stream Operations](./stream-operations.md)
134
135
### File Format Specifications
136
137
Configuration classes for supported file formats including CSV, JSON Lines, Parquet, and Avro formats. Each format provides specific configuration options for parsing and processing.
138
139
```python { .api }
140
class CsvFormat:
141
filetype: str = "csv"
142
delimiter: str
143
quote_char: str
144
encoding: Optional[str]
145
infer_datatypes: Optional[bool]
146
147
class JsonlFormat:
148
filetype: str = "jsonl"
149
newlines_in_values: bool
150
unexpected_field_behavior: UnexpectedFieldBehaviorEnum
151
block_size: int
152
153
class ParquetFormat:
154
filetype: str = "parquet"
155
columns: Optional[List[str]]
156
batch_size: int
157
buffer_size: int
158
159
class AvroFormat:
160
filetype: str = "avro"
161
```
162
163
[File Format Specifications](./file-formats.md)
164
165
### ZIP File Support
166
167
Comprehensive ZIP file extraction and streaming support for processing compressed S3 files. Handles large ZIP files efficiently with streaming decompression and individual file access.
168
169
```python { .api }
170
class ZipFileHandler:
171
def __init__(self, s3_client: BaseClient, config: Config): ...
172
def get_zip_files(self, filename: str) -> Tuple[List[zipfile.ZipInfo], int]: ...
173
174
class DecompressedStream(io.IOBase):
175
def __init__(self, file_obj: IO[bytes], file_info: RemoteFileInsideArchive, buffer_size: int = BUFFER_SIZE_DEFAULT): ...
176
def read(self, size: int = -1) -> bytes: ...
177
178
class ZipContentReader:
179
def __init__(self, decompressed_stream: DecompressedStream, encoding: Optional[str] = None, buffer_size: int = BUFFER_SIZE_DEFAULT): ...
180
def read(self, size: int = -1) -> Union[str, bytes]: ...
181
182
class RemoteFileInsideArchive(RemoteFile):
183
start_offset: int
184
compressed_size: int
185
uncompressed_size: int
186
compression_method: int
187
```
188
189
[ZIP File Support](./zip-support.md)
190
191
### Utilities and Error Handling
192
193
Utility functions for process management, data serialization, and custom exception handling. Includes multiprocessing support, Airbyte message handling, and S3-specific error management.
194
195
```python { .api }
196
def run_in_external_process(fn: Callable, timeout: int, max_timeout: int, logger, args: List[Any]) -> Mapping[str, Any]: ...
197
198
def _get_s3_compatible_client_args(config: Config) -> dict: ...
199
200
class S3Exception(AirbyteTracedException):
201
def __init__(self, file_info: Union[List[FileInfo], FileInfo], internal_message: Optional[str] = None, ...): ...
202
```
203
204
[Utilities and Error Handling](./utilities.md)
205
206
## Types
207
208
```python { .api }
209
# Required imports
210
from typing import Optional, Union, List, Dict, Any, Callable, Tuple, Iterable, IO
211
from datetime import datetime, timedelta
212
from pathlib import Path
213
from io import IOBase
214
import zipfile
215
import multiprocessing as mp
216
from enum import Enum
217
218
# Configuration types
219
class DeliverRecords:
220
"""Configuration for delivering records as structured data"""
221
delivery_type: str = "use_records_transfer"
222
223
class DeliverRawFiles:
224
"""Configuration for delivering raw files"""
225
delivery_type: str = "use_raw_files"
226
227
# Stream types
228
class RemoteFile:
229
"""Represents a remote file with metadata"""
230
uri: str
231
last_modified: Optional[datetime]
232
size: Optional[int]
233
234
class RemoteFileInsideArchive(RemoteFile):
235
"""Represents a file inside a ZIP archive"""
236
start_offset: int
237
compressed_size: int
238
uncompressed_size: int
239
compression_method: int
240
241
class FileInfo:
242
"""File information for error context"""
243
key: str
244
size: Optional[int]
245
last_modified: Optional[datetime]
246
247
class StreamState:
248
"""Stream synchronization state"""
249
pass
250
251
class FileBasedStreamConfig:
252
"""Configuration for file-based streams"""
253
name: str
254
globs: List[str]
255
format: Union['CsvFormat', 'ParquetFormat', 'AvroFormat', 'JsonlFormat']
256
input_schema: Optional[str]
257
258
# File format types
259
class UnexpectedFieldBehaviorEnum(str, Enum):
260
ignore = "ignore"
261
infer = "infer"
262
error = "error"
263
264
class CsvFormat:
265
filetype: str = "csv"
266
delimiter: str
267
quote_char: str
268
encoding: Optional[str]
269
infer_datatypes: Optional[bool]
270
escape_char: Optional[str]
271
double_quote: bool
272
newlines_in_values: bool
273
additional_reader_options: Optional[str]
274
advanced_options: Optional[str]
275
block_size: int
276
277
class JsonlFormat:
278
filetype: str = "jsonl"
279
newlines_in_values: bool
280
unexpected_field_behavior: UnexpectedFieldBehaviorEnum
281
block_size: int
282
283
class ParquetFormat:
284
filetype: str = "parquet"
285
columns: Optional[List[str]]
286
batch_size: int
287
buffer_size: int
288
289
class AvroFormat:
290
filetype: str = "avro"
291
292
# Authentication types
293
class BaseClient:
294
"""S3 client interface from botocore"""
295
def get_object(self, **kwargs) -> Dict[str, Any]: ...
296
def list_objects_v2(self, **kwargs) -> Dict[str, Any]: ...
297
def head_object(self, **kwargs) -> Dict[str, Any]: ...
298
299
# Airbyte types
300
class AirbyteMessage:
301
"""Airbyte message object for data synchronization"""
302
type: str
303
record: Optional['AirbyteRecordMessage']
304
state: Optional['AirbyteStateMessage']
305
log: Optional['AirbyteLogMessage']
306
307
class AirbyteRecordMessage:
308
"""Airbyte record message"""
309
stream: str
310
data: Dict[str, Any]
311
emitted_at: int
312
313
class AirbyteRecordMessageFileReference:
314
"""Reference to a file in Airbyte record message"""
315
pass
316
317
class FileRecordData:
318
"""File record data container"""
319
pass
320
321
class ConnectorSpecification:
322
"""Airbyte connector specification"""
323
documentationUrl: str
324
connectionSpecification: Dict[str, Any]
325
326
# Error handling types
327
class FailureType:
328
"""Enumeration of failure types for error classification"""
329
system_error: str = "system_error"
330
config_error: str = "config_error"
331
transient_error: str = "transient_error"
332
333
class AirbyteTracedException(Exception):
334
"""Base exception class for Airbyte connectors"""
335
def __init__(self, internal_message: Optional[str] = None, message: Optional[str] = None,
336
failure_type: FailureType = FailureType.system_error, exception: BaseException = None): ...
337
338
# File reading types
339
class FileReadMode:
340
"""File reading mode enumeration"""
341
pass
342
```
343
344
## Constants
345
346
```python { .api }
347
from datetime import timedelta
348
from os import getenv
349
350
# File size limits
351
FILE_SIZE_LIMIT = 1_500_000_000 # 1.5GB maximum file size
352
353
# Default concurrency
354
DEFAULT_CONCURRENCY = 10
355
356
# Date formats
357
_DATE_FORMAT = "%Y-%m-%d"
358
_LEGACY_DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
359
360
# Migration settings
361
_V4_MIGRATION_BUFFER = timedelta(hours=1)
362
_V3_MIN_SYNC_DATE_FIELD = "v3_min_sync_date"
363
364
# ZIP file handling constants
365
BUFFER_SIZE_DEFAULT = 1024 * 1024 # 1MB default buffer size
366
MAX_BUFFER_SIZE_DEFAULT = 16 * BUFFER_SIZE_DEFAULT # 16MB max buffer
367
EOCD_SIGNATURE = b"\x50\x4b\x05\x06" # ZIP End of Central Directory signature
368
ZIP64_LOCATOR_SIGNATURE = b"\x50\x4b\x06\x07" # ZIP64 locator signature
369
370
# Legacy config transformation
371
SECONDS_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
372
MICROS_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
373
374
# AWS Environment
375
AWS_EXTERNAL_ID = getenv("AWS_ASSUME_ROLE_EXTERNAL_ID")
376
```