0
# Stream Operations
1
2
Stream reading, cursor management, and incremental synchronization functionality. Provides S3-specific file discovery, reading operations, and state management for efficient data synchronization.
3
4
## Capabilities
5
6
### Stream Reader
7
8
S3-specific stream reader that handles file discovery, reading, and upload operations with comprehensive S3 integration.
9
10
```python { .api }
11
class SourceS3StreamReader(AbstractFileBasedStreamReader):
12
"""
13
Handles S3 file reading and streaming operations.
14
Inherits from AbstractFileBasedStreamReader for file-based connector compatibility.
15
"""
16
17
FILE_SIZE_LIMIT = 1_500_000_000
18
"""Maximum file size limit (1.5GB)"""
19
20
@property
21
def config(self) -> Config:
22
"""
23
Configuration getter/setter for the stream reader.
24
25
Returns:
26
Current Config instance
27
"""
28
29
@property
30
def s3_client(self) -> BaseClient:
31
"""
32
S3 client property with lazy loading.
33
Creates and caches S3 client based on configuration.
34
35
Returns:
36
Configured S3 client instance
37
"""
38
39
def get_matching_files(self, globs: List[str], prefix: Optional[str], logger) -> Iterable[RemoteFile]:
40
"""
41
Finds S3 files matching the specified glob patterns.
42
43
Args:
44
globs: List of glob patterns to match files
45
prefix: Optional path prefix to filter files
46
logger: Logger instance for operation logging
47
48
Returns:
49
Iterable of RemoteFile objects matching the patterns
50
"""
51
52
def open_file(self, file: RemoteFile, mode: FileReadMode, encoding: Optional[str], logger) -> IOBase:
53
"""
54
Opens an S3 file for reading with specified mode and encoding.
55
56
Args:
57
file: RemoteFile object representing the S3 file
58
mode: File reading mode (text, binary, etc.)
59
encoding: Optional text encoding for file reading
60
logger: Logger instance for operation logging
61
62
Returns:
63
File-like object for reading the file content
64
"""
65
66
def upload(self, file: RemoteFile, local_directory: str, logger) -> Tuple[FileRecordData, AirbyteRecordMessageFileReference]:
67
"""
68
Downloads S3 file to local directory for processing.
69
70
Args:
71
file: RemoteFile object to download
72
local_directory: Local directory path for file storage
73
logger: Logger instance for operation logging
74
75
Returns:
76
Tuple of file record data and message reference
77
"""
78
79
def file_size(self, file: RemoteFile) -> int:
80
"""
81
Gets the size of an S3 file in bytes.
82
83
Args:
84
file: RemoteFile object
85
86
Returns:
87
File size in bytes
88
"""
89
90
def is_modified_after_start_date(self, last_modified_date: Optional[datetime]) -> bool:
91
"""
92
Checks if file was modified after the configured start date.
93
94
Args:
95
last_modified_date: File's last modification timestamp
96
97
Returns:
98
True if file should be included based on modification date
99
"""
100
101
def _get_iam_s3_client(self, client_kv_args: dict) -> BaseClient:
102
"""
103
Creates S3 client with IAM role assumption.
104
105
Args:
106
client_kv_args: Client configuration arguments
107
108
Returns:
109
Configured S3 client with IAM authentication
110
"""
111
112
def _construct_s3_uri(self, file: RemoteFile) -> str:
113
"""
114
Constructs S3 URI for the given file.
115
116
Args:
117
file: RemoteFile object
118
119
Returns:
120
S3 URI string (s3://bucket/key)
121
"""
122
123
def _page(self, s3, globs, bucket, prefix, seen, logger) -> Iterable[RemoteFile]:
124
"""
125
Paginates through S3 objects matching the criteria.
126
127
Args:
128
s3: S3 client instance
129
globs: Glob patterns for file matching
130
bucket: S3 bucket name
131
prefix: Optional key prefix
132
seen: Set of already processed files
133
logger: Logger instance
134
135
Returns:
136
Iterable of matching RemoteFile objects
137
"""
138
139
def _handle_file(self, file):
140
"""
141
Handles file processing for both regular and ZIP files.
142
143
Args:
144
file: File object to process
145
"""
146
147
def _handle_zip_file(self, file):
148
"""
149
Handles ZIP file extraction and processing.
150
151
Args:
152
file: ZIP file object to process
153
"""
154
155
def _handle_regular_file(self, file):
156
"""
157
Handles regular file processing.
158
159
Args:
160
file: Regular file object to process
161
"""
162
163
@staticmethod
164
def create_progress_handler(file_size: int, local_file_path: str, logger) -> Callable:
165
"""
166
Creates a progress handler for file download operations.
167
168
Args:
169
file_size: Total file size in bytes
170
local_file_path: Local path where file is being saved
171
logger: Logger instance for progress reporting
172
173
Returns:
174
Callable progress handler function
175
"""
176
177
@staticmethod
178
def _is_folder(file) -> bool:
179
"""
180
Checks if S3 object represents a folder.
181
182
Args:
183
file: S3 object to check
184
185
Returns:
186
True if object is a folder, False otherwise
187
"""
188
```
189
190
### Cursor Management
191
192
Manages incremental synchronization state and file tracking with support for legacy state migration.
193
194
```python { .api }
195
class Cursor(DefaultFileBasedCursor):
196
"""
197
Manages incremental sync state and file tracking.
198
Inherits from DefaultFileBasedCursor with S3-specific enhancements.
199
"""
200
201
_DATE_FORMAT = "%Y-%m-%d"
202
"""Date format string for cursor timestamps"""
203
204
_LEGACY_DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
205
"""Legacy datetime format for V3 compatibility"""
206
207
_V4_MIGRATION_BUFFER = timedelta(hours=1)
208
"""Buffer time for V3 to V4 migration"""
209
210
_V3_MIN_SYNC_DATE_FIELD = "v3_min_sync_date"
211
"""Field name for V3 minimum sync date"""
212
213
def __init__(self, stream_config: FileBasedStreamConfig, **_):
214
"""
215
Initialize cursor with stream configuration.
216
217
Args:
218
stream_config: Configuration for the file-based stream
219
"""
220
221
def set_initial_state(self, value: StreamState) -> None:
222
"""
223
Sets the initial cursor state, handling legacy format conversion.
224
225
Args:
226
value: Stream state to set as initial state
227
"""
228
229
def get_state(self) -> StreamState:
230
"""
231
Gets the current cursor state for incremental synchronization.
232
233
Returns:
234
Current stream state for persistence
235
"""
236
237
def _should_sync_file(self, file: RemoteFile, logger) -> bool:
238
"""
239
Determines if a file should be synchronized based on cursor state.
240
241
Args:
242
file: RemoteFile to evaluate
243
logger: Logger instance for decision logging
244
245
Returns:
246
True if file should be synced, False otherwise
247
"""
248
249
@staticmethod
250
def _is_legacy_state(value: StreamState) -> bool:
251
"""
252
Checks if the provided state is in legacy V3 format.
253
254
Args:
255
value: Stream state to check
256
257
Returns:
258
True if state is legacy format, False otherwise
259
"""
260
261
@staticmethod
262
def _convert_legacy_state(legacy_state: StreamState) -> MutableMapping[str, Any]:
263
"""
264
Converts legacy V3 state to V4 format.
265
266
Args:
267
legacy_state: V3 format stream state
268
269
Returns:
270
V4 format state dictionary
271
"""
272
273
@staticmethod
274
def _get_adjusted_date_timestamp(cursor_datetime: datetime, file_datetime: datetime) -> datetime:
275
"""
276
Adjusts timestamps for proper cursor comparison.
277
278
Args:
279
cursor_datetime: Current cursor timestamp
280
file_datetime: File modification timestamp
281
282
Returns:
283
Adjusted datetime for comparison
284
"""
285
```
286
287
### Legacy Stream Implementation
288
289
S3-specific stream implementation for backward compatibility with V3 configurations.
290
291
```python { .api }
292
class IncrementalFileStreamS3(IncrementalFileStream):
293
"""
294
S3-specific incremental file stream implementation.
295
Provides compatibility with legacy V3 stream operations.
296
"""
297
298
@property
299
def storagefile_class(self) -> type:
300
"""
301
Returns the S3File class for file handling.
302
303
Returns:
304
S3File class type
305
"""
306
307
def filepath_iterator(self, stream_state=None) -> Iterator[FileInfo]:
308
"""
309
Iterates over S3 file paths for stream processing.
310
311
Args:
312
stream_state: Optional stream state for incremental sync
313
314
Yields:
315
FileInfo objects for each file to process
316
"""
317
318
def _filter_by_last_modified_date(self, file=None, stream_state=None):
319
"""
320
Filters files based on last modification date and stream state.
321
322
Args:
323
file: File object to filter
324
stream_state: Current stream state for comparison
325
"""
326
327
@staticmethod
328
def is_not_folder(file) -> bool:
329
"""
330
Checks if S3 object is not a folder.
331
332
Args:
333
file: S3 object to check
334
335
Returns:
336
True if object is not a folder, False otherwise
337
"""
338
```
339
340
### Helper Functions
341
342
Utility functions for S3 client configuration and file handling.
343
344
```python { .api }
345
def _get_s3_compatible_client_args(config: Config) -> dict:
346
"""
347
Returns configuration for S3-compatible client creation.
348
349
Args:
350
config: Configuration object with S3 settings
351
352
Returns:
353
Dictionary with client configuration parameters
354
"""
355
```
356
357
## Usage Examples
358
359
### Basic Stream Reader Usage
360
361
```python
362
from source_s3.v4 import SourceS3StreamReader, Config
363
364
# Create configuration
365
config = Config(
366
bucket="my-bucket",
367
region_name="us-east-1"
368
)
369
370
# Create stream reader
371
reader = SourceS3StreamReader(config=config)
372
373
# Find matching files
374
files = reader.get_matching_files(
375
globs=["*.csv", "*.json"],
376
prefix="data/",
377
logger=logger
378
)
379
380
# Process files
381
for file in files:
382
with reader.open_file(file, mode="text", encoding="utf-8", logger=logger) as f:
383
content = f.read()
384
# Process file content
385
```
386
387
### Cursor State Management
388
389
```python
390
from source_s3.v4 import Cursor
391
from airbyte_cdk.sources.file_based.config import FileBasedStreamConfig
392
393
# Create stream configuration
394
stream_config = FileBasedStreamConfig(...)
395
396
# Initialize cursor
397
cursor = Cursor(stream_config)
398
399
# Set initial state (handles legacy format)
400
cursor.set_initial_state(previous_state)
401
402
# Check if file should be synced
403
should_sync = cursor._should_sync_file(file, logger)
404
405
# Get current state for persistence
406
current_state = cursor.get_state()
407
```
408
409
### File Upload and Processing
410
411
```python
412
from source_s3.v4 import SourceS3StreamReader
413
414
# Upload (download) file locally
415
reader = SourceS3StreamReader(config=config)
416
file_data, file_ref = reader.upload(
417
file=remote_file,
418
local_directory="/tmp/airbyte_local",
419
logger=logger
420
)
421
422
# Check file size
423
size = reader.file_size(remote_file)
424
if size > SourceS3StreamReader.FILE_SIZE_LIMIT:
425
# Handle large file
426
pass
427
```
428
429
### Legacy Stream Operations
430
431
```python
432
from source_s3.stream import IncrementalFileStreamS3
433
434
# Create legacy stream
435
stream = IncrementalFileStreamS3(...)
436
437
# Iterate over files
438
for file_info in stream.filepath_iterator(stream_state=state):
439
if IncrementalFileStreamS3.is_not_folder(file_info.file):
440
# Process file
441
pass
442
```