or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdcore-source.mdfile-formats.mdindex.mdstream-operations.mdutilities.mdzip-support.md

stream-operations.mddocs/

0

# Stream Operations

1

2

Stream reading, cursor management, and incremental synchronization functionality. Provides S3-specific file discovery, reading operations, and state management for efficient data synchronization.

3

4

## Capabilities

5

6

### Stream Reader

7

8

S3-specific stream reader that handles file discovery, reading, and upload operations with comprehensive S3 integration.

9

10

```python { .api }

11

class SourceS3StreamReader(AbstractFileBasedStreamReader):

12

"""

13

Handles S3 file reading and streaming operations.

14

Inherits from AbstractFileBasedStreamReader for file-based connector compatibility.

15

"""

16

17

FILE_SIZE_LIMIT = 1_500_000_000

18

"""Maximum file size limit (1.5GB)"""

19

20

@property

21

def config(self) -> Config:

22

"""

23

Configuration getter/setter for the stream reader.

24

25

Returns:

26

Current Config instance

27

"""

28

29

@property

30

def s3_client(self) -> BaseClient:

31

"""

32

S3 client property with lazy loading.

33

Creates and caches S3 client based on configuration.

34

35

Returns:

36

Configured S3 client instance

37

"""

38

39

def get_matching_files(self, globs: List[str], prefix: Optional[str], logger) -> Iterable[RemoteFile]:

40

"""

41

Finds S3 files matching the specified glob patterns.

42

43

Args:

44

globs: List of glob patterns to match files

45

prefix: Optional path prefix to filter files

46

logger: Logger instance for operation logging

47

48

Returns:

49

Iterable of RemoteFile objects matching the patterns

50

"""

51

52

def open_file(self, file: RemoteFile, mode: FileReadMode, encoding: Optional[str], logger) -> IOBase:

53

"""

54

Opens an S3 file for reading with specified mode and encoding.

55

56

Args:

57

file: RemoteFile object representing the S3 file

58

mode: File reading mode (text, binary, etc.)

59

encoding: Optional text encoding for file reading

60

logger: Logger instance for operation logging

61

62

Returns:

63

File-like object for reading the file content

64

"""

65

66

def upload(self, file: RemoteFile, local_directory: str, logger) -> Tuple[FileRecordData, AirbyteRecordMessageFileReference]:

67

"""

68

Downloads S3 file to local directory for processing.

69

70

Args:

71

file: RemoteFile object to download

72

local_directory: Local directory path for file storage

73

logger: Logger instance for operation logging

74

75

Returns:

76

Tuple of file record data and message reference

77

"""

78

79

def file_size(self, file: RemoteFile) -> int:

80

"""

81

Gets the size of an S3 file in bytes.

82

83

Args:

84

file: RemoteFile object

85

86

Returns:

87

File size in bytes

88

"""

89

90

def is_modified_after_start_date(self, last_modified_date: Optional[datetime]) -> bool:

91

"""

92

Checks if file was modified after the configured start date.

93

94

Args:

95

last_modified_date: File's last modification timestamp

96

97

Returns:

98

True if file should be included based on modification date

99

"""

100

101

def _get_iam_s3_client(self, client_kv_args: dict) -> BaseClient:

102

"""

103

Creates S3 client with IAM role assumption.

104

105

Args:

106

client_kv_args: Client configuration arguments

107

108

Returns:

109

Configured S3 client with IAM authentication

110

"""

111

112

def _construct_s3_uri(self, file: RemoteFile) -> str:

113

"""

114

Constructs S3 URI for the given file.

115

116

Args:

117

file: RemoteFile object

118

119

Returns:

120

S3 URI string (s3://bucket/key)

121

"""

122

123

def _page(self, s3, globs, bucket, prefix, seen, logger) -> Iterable[RemoteFile]:

124

"""

125

Paginates through S3 objects matching the criteria.

126

127

Args:

128

s3: S3 client instance

129

globs: Glob patterns for file matching

130

bucket: S3 bucket name

131

prefix: Optional key prefix

132

seen: Set of already processed files

133

logger: Logger instance

134

135

Returns:

136

Iterable of matching RemoteFile objects

137

"""

138

139

def _handle_file(self, file):

140

"""

141

Handles file processing for both regular and ZIP files.

142

143

Args:

144

file: File object to process

145

"""

146

147

def _handle_zip_file(self, file):

148

"""

149

Handles ZIP file extraction and processing.

150

151

Args:

152

file: ZIP file object to process

153

"""

154

155

def _handle_regular_file(self, file):

156

"""

157

Handles regular file processing.

158

159

Args:

160

file: Regular file object to process

161

"""

162

163

@staticmethod

164

def create_progress_handler(file_size: int, local_file_path: str, logger) -> Callable:

165

"""

166

Creates a progress handler for file download operations.

167

168

Args:

169

file_size: Total file size in bytes

170

local_file_path: Local path where file is being saved

171

logger: Logger instance for progress reporting

172

173

Returns:

174

Callable progress handler function

175

"""

176

177

@staticmethod

178

def _is_folder(file) -> bool:

179

"""

180

Checks if S3 object represents a folder.

181

182

Args:

183

file: S3 object to check

184

185

Returns:

186

True if object is a folder, False otherwise

187

"""

188

```

189

190

### Cursor Management

191

192

Manages incremental synchronization state and file tracking with support for legacy state migration.

193

194

```python { .api }

195

class Cursor(DefaultFileBasedCursor):

196

"""

197

Manages incremental sync state and file tracking.

198

Inherits from DefaultFileBasedCursor with S3-specific enhancements.

199

"""

200

201

_DATE_FORMAT = "%Y-%m-%d"

202

"""Date format string for cursor timestamps"""

203

204

_LEGACY_DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"

205

"""Legacy datetime format for V3 compatibility"""

206

207

_V4_MIGRATION_BUFFER = timedelta(hours=1)

208

"""Buffer time for V3 to V4 migration"""

209

210

_V3_MIN_SYNC_DATE_FIELD = "v3_min_sync_date"

211

"""Field name for V3 minimum sync date"""

212

213

def __init__(self, stream_config: FileBasedStreamConfig, **_):

214

"""

215

Initialize cursor with stream configuration.

216

217

Args:

218

stream_config: Configuration for the file-based stream

219

"""

220

221

def set_initial_state(self, value: StreamState) -> None:

222

"""

223

Sets the initial cursor state, handling legacy format conversion.

224

225

Args:

226

value: Stream state to set as initial state

227

"""

228

229

def get_state(self) -> StreamState:

230

"""

231

Gets the current cursor state for incremental synchronization.

232

233

Returns:

234

Current stream state for persistence

235

"""

236

237

def _should_sync_file(self, file: RemoteFile, logger) -> bool:

238

"""

239

Determines if a file should be synchronized based on cursor state.

240

241

Args:

242

file: RemoteFile to evaluate

243

logger: Logger instance for decision logging

244

245

Returns:

246

True if file should be synced, False otherwise

247

"""

248

249

@staticmethod

250

def _is_legacy_state(value: StreamState) -> bool:

251

"""

252

Checks if the provided state is in legacy V3 format.

253

254

Args:

255

value: Stream state to check

256

257

Returns:

258

True if state is legacy format, False otherwise

259

"""

260

261

@staticmethod

262

def _convert_legacy_state(legacy_state: StreamState) -> MutableMapping[str, Any]:

263

"""

264

Converts legacy V3 state to V4 format.

265

266

Args:

267

legacy_state: V3 format stream state

268

269

Returns:

270

V4 format state dictionary

271

"""

272

273

@staticmethod

274

def _get_adjusted_date_timestamp(cursor_datetime: datetime, file_datetime: datetime) -> datetime:

275

"""

276

Adjusts timestamps for proper cursor comparison.

277

278

Args:

279

cursor_datetime: Current cursor timestamp

280

file_datetime: File modification timestamp

281

282

Returns:

283

Adjusted datetime for comparison

284

"""

285

```

286

287

### Legacy Stream Implementation

288

289

S3-specific stream implementation for backward compatibility with V3 configurations.

290

291

```python { .api }

292

class IncrementalFileStreamS3(IncrementalFileStream):

293

"""

294

S3-specific incremental file stream implementation.

295

Provides compatibility with legacy V3 stream operations.

296

"""

297

298

@property

299

def storagefile_class(self) -> type:

300

"""

301

Returns the S3File class for file handling.

302

303

Returns:

304

S3File class type

305

"""

306

307

def filepath_iterator(self, stream_state=None) -> Iterator[FileInfo]:

308

"""

309

Iterates over S3 file paths for stream processing.

310

311

Args:

312

stream_state: Optional stream state for incremental sync

313

314

Yields:

315

FileInfo objects for each file to process

316

"""

317

318

def _filter_by_last_modified_date(self, file=None, stream_state=None):

319

"""

320

Filters files based on last modification date and stream state.

321

322

Args:

323

file: File object to filter

324

stream_state: Current stream state for comparison

325

"""

326

327

@staticmethod

328

def is_not_folder(file) -> bool:

329

"""

330

Checks if S3 object is not a folder.

331

332

Args:

333

file: S3 object to check

334

335

Returns:

336

True if object is not a folder, False otherwise

337

"""

338

```

339

340

### Helper Functions

341

342

Utility functions for S3 client configuration and file handling.

343

344

```python { .api }

345

def _get_s3_compatible_client_args(config: Config) -> dict:

346

"""

347

Returns configuration for S3-compatible client creation.

348

349

Args:

350

config: Configuration object with S3 settings

351

352

Returns:

353

Dictionary with client configuration parameters

354

"""

355

```

356

357

## Usage Examples

358

359

### Basic Stream Reader Usage

360

361

```python

362

from source_s3.v4 import SourceS3StreamReader, Config

363

364

# Create configuration

365

config = Config(

366

bucket="my-bucket",

367

region_name="us-east-1"

368

)

369

370

# Create stream reader

371

reader = SourceS3StreamReader(config=config)

372

373

# Find matching files

374

files = reader.get_matching_files(

375

globs=["*.csv", "*.json"],

376

prefix="data/",

377

logger=logger

378

)

379

380

# Process files

381

for file in files:

382

with reader.open_file(file, mode="text", encoding="utf-8", logger=logger) as f:

383

content = f.read()

384

# Process file content

385

```

386

387

### Cursor State Management

388

389

```python

390

from source_s3.v4 import Cursor

391

from airbyte_cdk.sources.file_based.config import FileBasedStreamConfig

392

393

# Create stream configuration

394

stream_config = FileBasedStreamConfig(...)

395

396

# Initialize cursor

397

cursor = Cursor(stream_config)

398

399

# Set initial state (handles legacy format)

400

cursor.set_initial_state(previous_state)

401

402

# Check if file should be synced

403

should_sync = cursor._should_sync_file(file, logger)

404

405

# Get current state for persistence

406

current_state = cursor.get_state()

407

```

408

409

### File Upload and Processing

410

411

```python

412

from source_s3.v4 import SourceS3StreamReader

413

414

# Upload (download) file locally

415

reader = SourceS3StreamReader(config=config)

416

file_data, file_ref = reader.upload(

417

file=remote_file,

418

local_directory="/tmp/airbyte_local",

419

logger=logger

420

)

421

422

# Check file size

423

size = reader.file_size(remote_file)

424

if size > SourceS3StreamReader.FILE_SIZE_LIMIT:

425

# Handle large file

426

pass

427

```

428

429

### Legacy Stream Operations

430

431

```python

432

from source_s3.stream import IncrementalFileStreamS3

433

434

# Create legacy stream

435

stream = IncrementalFileStreamS3(...)

436

437

# Iterate over files

438

for file_info in stream.filepath_iterator(stream_state=state):

439

if IncrementalFileStreamS3.is_not_folder(file_info.file):

440

# Process file

441

pass

442

```