or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

environment.mdfunctions.mdindex.mdjoins.mdsources-sinks.mdtransformations.md

sources-sinks.mddocs/

0

# Data Sources and Sinks

1

2

Input and output operations for reading from and writing to various data sources. These operations provide the interface between Flink programs and external data systems, supporting files, collections, and streaming outputs.

3

4

## Capabilities

5

6

### Data Sources

7

8

Data sources create DataSets from external data systems or in-memory collections.

9

10

#### CSV File Sources

11

12

Reads structured data from CSV files with configurable parsing options.

13

14

```python { .api }

15

def read_csv(self, path, types, line_delimiter="\n", field_delimiter=','):

16

"""

17

Create a DataSet that represents the tuples produced by reading the given CSV file.

18

19

Automatically parses CSV fields according to specified types and handles

20

configurable delimiters for different CSV formats.

21

22

Parameters:

23

path (str): The path of the CSV file (local file system or HDFS)

24

types (list): List specifying the types for CSV fields (e.g., [str, int, float])

25

line_delimiter (str): Line delimiter, default "\n"

26

field_delimiter (str): Field delimiter, default ","

27

28

Returns:

29

DataSet: A DataSet where each element is a tuple representing one CSV row

30

"""

31

```

32

33

#### Text File Sources

34

35

Reads unstructured text data line by line.

36

37

```python { .api }

38

def read_text(self, path):

39

"""

40

Creates a DataSet that represents the Strings produced by reading the given file line wise.

41

42

The file will be read with the system's default character set. Each line becomes

43

a separate element in the DataSet.

44

45

Parameters:

46

path (str): The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")

47

48

Returns:

49

DataSet: A DataSet where each element is a string representing one line

50

"""

51

```

52

53

#### Collection Sources

54

55

Creates DataSets from in-memory Python collections.

56

57

```python { .api }

58

def from_elements(self, *elements):

59

"""

60

Creates a new data set that contains the given elements.

61

62

The elements must all be of the same type, for example, all String or Integer.

63

The sequence of elements must not be empty. Useful for testing and small datasets.

64

65

Parameters:

66

*elements: The elements to make up the data set (must be same type)

67

68

Returns:

69

DataSet: A DataSet representing the given list of elements

70

"""

71

```

72

73

#### Sequence Sources

74

75

Generates sequences of numbers for testing and synthetic data.

76

77

```python { .api }

78

def generate_sequence(self, frm, to):

79

"""

80

Creates a new data set that contains the given sequence of numbers.

81

82

Generates consecutive integers from start to end (inclusive).

83

Useful for testing and creating synthetic datasets.

84

85

Parameters:

86

frm (int): The start number for the sequence

87

to (int): The end number for the sequence (inclusive)

88

89

Returns:

90

DataSet: A DataSet representing the given sequence of numbers

91

"""

92

```

93

94

### Data Sinks

95

96

Data sinks write DataSet contents to external systems or output streams.

97

98

#### Text File Sinks

99

100

Writes DataSet elements as text files.

101

102

```python { .api }

103

def write_text(self, path, write_mode=WriteMode.NO_OVERWRITE):

104

"""

105

Writes a DataSet as a text file to the specified location.

106

107

Each element is converted to its string representation and written as a separate line.

108

Supports both local file system and distributed file systems like HDFS.

109

110

Parameters:

111

path (str): The path pointing to the location where the text file is written

112

write_mode (WriteMode): Behavior when output file exists (NO_OVERWRITE or OVERWRITE)

113

114

Returns:

115

DataSink: Sink operation that can be configured further

116

"""

117

```

118

119

#### CSV File Sinks

120

121

Writes structured data as CSV files.

122

123

```python { .api }

124

def write_csv(self, path, line_delimiter="\n", field_delimiter=',', write_mode=WriteMode.NO_OVERWRITE):

125

"""

126

Writes a Tuple DataSet as a CSV file to the specified location.

127

128

Only Tuple DataSets can be written as CSV files. Each tuple becomes a CSV row

129

with fields separated by the specified delimiter.

130

131

Parameters:

132

path (str): The path pointing to the location where the CSV file is written

133

line_delimiter (str): Line delimiter, default "\n"

134

field_delimiter (str): Field delimiter, default ","

135

write_mode (WriteMode): Behavior when output file exists

136

137

Returns:

138

DataSink: Sink operation that can be configured further

139

"""

140

```

141

142

#### Standard Output Sinks

143

144

Writes DataSet contents to standard output for debugging and monitoring.

145

146

```python { .api }

147

def output(self, to_error=False):

148

"""

149

Writes a DataSet to the standard output stream (stdout).

150

151

Each element is converted to string and printed. Useful for debugging

152

and small result sets. Not recommended for large datasets in production.

153

154

Parameters:

155

to_error (bool): Whether to write to stderr instead of stdout

156

157

Returns:

158

DataSink: Sink operation that can be configured further

159

"""

160

```

161

162

### Write Modes

163

164

Configuration for file output behavior when target files exist.

165

166

```python { .api }

167

class WriteMode:

168

NO_OVERWRITE = 0 # Fail if output file already exists

169

OVERWRITE = 1 # Overwrite existing files

170

```

171

172

### Data Sink Configuration

173

174

#### Sink Naming

175

176

Sets descriptive names for sink operations.

177

178

```python { .api }

179

def name(self, name):

180

"""

181

Sets name for the sink operation (debugging/monitoring).

182

183

Parameters:

184

name (str): Descriptive name for the sink operation

185

186

Returns:

187

DataSink: Self for method chaining

188

"""

189

```

190

191

#### Sink Parallelism

192

193

Controls parallelism for sink operations.

194

195

```python { .api }

196

def set_parallelism(self, parallelism):

197

"""

198

Sets parallelism for this sink operation.

199

200

Controls how many parallel writers are used for the output operation.

201

Higher parallelism can improve write throughput for large datasets.

202

203

Parameters:

204

parallelism (int): Degree of parallelism for this sink

205

206

Returns:

207

DataSink: Self for method chaining

208

"""

209

```

210

211

## Usage Examples

212

213

### Reading Various Data Sources

214

215

```python

216

from flink.plan.Environment import get_environment

217

218

env = get_environment()

219

220

# Read CSV file with mixed types

221

sales_data = env.read_csv("sales.csv", [str, str, int, float])

222

# Expected format: customer_id, product_name, quantity, price

223

224

# Read text file for unstructured data

225

log_lines = env.read_text("application.log")

226

227

# Create test data from collection

228

test_data = env.from_elements(

229

("Alice", 25, "Engineer"),

230

("Bob", 30, "Manager"),

231

("Charlie", 35, "Analyst")

232

)

233

234

# Generate sequence for testing

235

numbers = env.generate_sequence(1, 1000)

236

```

237

238

### Writing to Different Output Formats

239

240

```python

241

# Process some data

242

processed_data = sales_data.filter(lambda x: x[2] > 0) # Filter positive quantities

243

244

# Write as text file

245

processed_data.write_text("output/results.txt", WriteMode.OVERWRITE)

246

247

# Write as CSV with custom delimiters

248

processed_data.write_csv(

249

"output/results.csv",

250

line_delimiter="\n",

251

field_delimiter="|",

252

write_mode=WriteMode.OVERWRITE

253

)

254

255

# Print to console for debugging

256

processed_data.output()

257

258

# Print errors to stderr

259

error_data = sales_data.filter(lambda x: x[2] < 0)

260

error_data.output(to_error=True)

261

```

262

263

### Complex File Processing Pipeline

264

265

```python

266

from flink.functions.FlatMapFunction import FlatMapFunction

267

from flink.functions.GroupReduceFunction import GroupReduceFunction

268

269

# Read multiple text files

270

input_files = [

271

"logs/2023-01-01.log",

272

"logs/2023-01-02.log",

273

"logs/2023-01-03.log"

274

]

275

276

# Process each file and union results

277

all_logs = None

278

for file_path in input_files:

279

file_data = env.read_text(file_path)

280

if all_logs is None:

281

all_logs = file_data

282

else:

283

all_logs = all_logs.union(file_data)

284

285

# Extract error messages

286

class ErrorExtractor(FlatMapFunction):

287

def flat_map(self, log_line, collector):

288

if "ERROR" in log_line:

289

parts = log_line.split(" ", 3)

290

if len(parts) >= 4:

291

timestamp = parts[0] + " " + parts[1]

292

error_msg = parts[3]

293

collector.collect((timestamp, error_msg))

294

295

errors = all_logs.flat_map(ErrorExtractor())

296

297

# Count errors by hour

298

class HourlyErrorCounter(GroupReduceFunction):

299

def reduce(self, iterator, collector):

300

hour_counts = {}

301

for timestamp, error_msg in iterator:

302

hour = timestamp[:13] # Extract date and hour

303

hour_counts[hour] = hour_counts.get(hour, 0) + 1

304

305

for hour, count in hour_counts.items():

306

collector.collect((hour, count))

307

308

hourly_errors = errors.group_by(0).reduce_group(HourlyErrorCounter())

309

310

# Write results to multiple outputs

311

hourly_errors.write_csv("output/hourly_error_counts.csv")

312

errors.write_text("output/all_errors.txt")

313

```

314

315

### Configurable Data Processing

316

317

```python

318

# Read configuration from CSV

319

config_data = env.read_csv("config/processing_config.csv", [str, str])

320

321

# Read main dataset

322

main_data = env.read_csv("data/input.csv", [str, int, float, str])

323

324

# Process and write with configuration

325

result = main_data.filter(lambda x: x[1] > 0)

326

327

# Configure output with descriptive names and parallelism

328

text_sink = result.write_text("output/processed_data.txt", WriteMode.OVERWRITE) \

329

.name("Processed Data Output") \

330

.set_parallelism(4)

331

332

csv_sink = result.write_csv("output/processed_data.csv", write_mode=WriteMode.OVERWRITE) \

333

.name("CSV Export") \

334

.set_parallelism(2)

335

336

# Also output to console for monitoring

337

monitoring_output = result.output().name("Console Monitor")

338

```

339

340

### Handling Different File Formats

341

342

```python

343

# Read data with different delimiters

344

pipe_delimited = env.read_csv("data/pipe_separated.txt", [str, int, str], field_delimiter='|')

345

tab_delimited = env.read_csv("data/tab_separated.tsv", [str, int, str], field_delimiter='\t')

346

semicolon_delimited = env.read_csv("data/semicolon.csv", [str, int, str], field_delimiter=';')

347

348

# Union all data sources

349

combined = pipe_delimited.union(tab_delimited).union(semicolon_delimited)

350

351

# Write with consistent format

352

combined.write_csv(

353

"output/normalized.csv",

354

field_delimiter=',',

355

write_mode=WriteMode.OVERWRITE

356

)

357

```

358

359

### Error Handling and Validation

360

361

```python

362

from flink.functions.FilterFunction import FilterFunction

363

364

class DataValidator(FilterFunction):

365

def filter(self, record):

366

# Validate record has required fields and valid data

367

if len(record) < 3:

368

return False

369

if not isinstance(record[1], int) or record[1] < 0:

370

return False

371

return True

372

373

# Read and validate data

374

raw_data = env.read_csv("input.csv", [str, int, float])

375

valid_data = raw_data.filter(DataValidator())

376

invalid_data = raw_data.filter(lambda x: not DataValidator().filter(x))

377

378

# Write valid and invalid data to separate outputs

379

valid_data.write_csv("output/valid_records.csv")

380

invalid_data.write_text("output/invalid_records.txt")

381

382

# Print summary statistics

383

valid_count = valid_data.map(lambda x: 1).reduce(lambda a, b: a + b)

384

invalid_count = invalid_data.map(lambda x: 1).reduce(lambda a, b: a + b)

385

386

env.from_elements("Data validation complete").output()

387

```

388

389

### Performance Considerations

390

391

```python

392

# For large files, consider parallelism settings

393

large_dataset = env.read_csv("very_large_file.csv", [str, int, float, str])

394

395

# Set appropriate parallelism for processing

396

processed = large_dataset.map(lambda x: x).set_parallelism(8)

397

398

# Use appropriate parallelism for output

399

processed.write_text("output/large_output.txt") \

400

.set_parallelism(4) \

401

.name("Large File Output")

402

403

# For small files, limit parallelism to avoid overhead

404

small_dataset = env.read_csv("small_file.csv", [str, int])

405

small_result = small_dataset.map(lambda x: x).set_parallelism(1)

406

407

small_result.write_csv("output/small_output.csv") \

408

.set_parallelism(1) \

409

.name("Small File Output")

410

```