0
# Data Sources and Sinks
1
2
Input and output operations for reading from and writing to various data sources. These operations provide the interface between Flink programs and external data systems, supporting files, collections, and streaming outputs.
3
4
## Capabilities
5
6
### Data Sources
7
8
Data sources create DataSets from external data systems or in-memory collections.
9
10
#### CSV File Sources
11
12
Reads structured data from CSV files with configurable parsing options.
13
14
```python { .api }
15
def read_csv(self, path, types, line_delimiter="\n", field_delimiter=','):
16
"""
17
Create a DataSet that represents the tuples produced by reading the given CSV file.
18
19
Automatically parses CSV fields according to specified types and handles
20
configurable delimiters for different CSV formats.
21
22
Parameters:
23
path (str): The path of the CSV file (local file system or HDFS)
24
types (list): List specifying the types for CSV fields (e.g., [str, int, float])
25
line_delimiter (str): Line delimiter, default "\n"
26
field_delimiter (str): Field delimiter, default ","
27
28
Returns:
29
DataSet: A DataSet where each element is a tuple representing one CSV row
30
"""
31
```
32
33
#### Text File Sources
34
35
Reads unstructured text data line by line.
36
37
```python { .api }
38
def read_text(self, path):
39
"""
40
Creates a DataSet that represents the Strings produced by reading the given file line wise.
41
42
The file will be read with the system's default character set. Each line becomes
43
a separate element in the DataSet.
44
45
Parameters:
46
path (str): The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
47
48
Returns:
49
DataSet: A DataSet where each element is a string representing one line
50
"""
51
```
52
53
#### Collection Sources
54
55
Creates DataSets from in-memory Python collections.
56
57
```python { .api }
58
def from_elements(self, *elements):
59
"""
60
Creates a new data set that contains the given elements.
61
62
The elements must all be of the same type, for example, all String or Integer.
63
The sequence of elements must not be empty. Useful for testing and small datasets.
64
65
Parameters:
66
*elements: The elements to make up the data set (must be same type)
67
68
Returns:
69
DataSet: A DataSet representing the given list of elements
70
"""
71
```
72
73
#### Sequence Sources
74
75
Generates sequences of numbers for testing and synthetic data.
76
77
```python { .api }
78
def generate_sequence(self, frm, to):
79
"""
80
Creates a new data set that contains the given sequence of numbers.
81
82
Generates consecutive integers from start to end (inclusive).
83
Useful for testing and creating synthetic datasets.
84
85
Parameters:
86
frm (int): The start number for the sequence
87
to (int): The end number for the sequence (inclusive)
88
89
Returns:
90
DataSet: A DataSet representing the given sequence of numbers
91
"""
92
```
93
94
### Data Sinks
95
96
Data sinks write DataSet contents to external systems or output streams.
97
98
#### Text File Sinks
99
100
Writes DataSet elements as text files.
101
102
```python { .api }
103
def write_text(self, path, write_mode=WriteMode.NO_OVERWRITE):
104
"""
105
Writes a DataSet as a text file to the specified location.
106
107
Each element is converted to its string representation and written as a separate line.
108
Supports both local file system and distributed file systems like HDFS.
109
110
Parameters:
111
path (str): The path pointing to the location where the text file is written
112
write_mode (WriteMode): Behavior when output file exists (NO_OVERWRITE or OVERWRITE)
113
114
Returns:
115
DataSink: Sink operation that can be configured further
116
"""
117
```
118
119
#### CSV File Sinks
120
121
Writes structured data as CSV files.
122
123
```python { .api }
124
def write_csv(self, path, line_delimiter="\n", field_delimiter=',', write_mode=WriteMode.NO_OVERWRITE):
125
"""
126
Writes a Tuple DataSet as a CSV file to the specified location.
127
128
Only Tuple DataSets can be written as CSV files. Each tuple becomes a CSV row
129
with fields separated by the specified delimiter.
130
131
Parameters:
132
path (str): The path pointing to the location where the CSV file is written
133
line_delimiter (str): Line delimiter, default "\n"
134
field_delimiter (str): Field delimiter, default ","
135
write_mode (WriteMode): Behavior when output file exists
136
137
Returns:
138
DataSink: Sink operation that can be configured further
139
"""
140
```
141
142
#### Standard Output Sinks
143
144
Writes DataSet contents to standard output for debugging and monitoring.
145
146
```python { .api }
147
def output(self, to_error=False):
148
"""
149
Writes a DataSet to the standard output stream (stdout).
150
151
Each element is converted to string and printed. Useful for debugging
152
and small result sets. Not recommended for large datasets in production.
153
154
Parameters:
155
to_error (bool): Whether to write to stderr instead of stdout
156
157
Returns:
158
DataSink: Sink operation that can be configured further
159
"""
160
```
161
162
### Write Modes
163
164
Configuration for file output behavior when target files exist.
165
166
```python { .api }
167
class WriteMode:
168
NO_OVERWRITE = 0 # Fail if output file already exists
169
OVERWRITE = 1 # Overwrite existing files
170
```
171
172
### Data Sink Configuration
173
174
#### Sink Naming
175
176
Sets descriptive names for sink operations.
177
178
```python { .api }
179
def name(self, name):
180
"""
181
Sets name for the sink operation (debugging/monitoring).
182
183
Parameters:
184
name (str): Descriptive name for the sink operation
185
186
Returns:
187
DataSink: Self for method chaining
188
"""
189
```
190
191
#### Sink Parallelism
192
193
Controls parallelism for sink operations.
194
195
```python { .api }
196
def set_parallelism(self, parallelism):
197
"""
198
Sets parallelism for this sink operation.
199
200
Controls how many parallel writers are used for the output operation.
201
Higher parallelism can improve write throughput for large datasets.
202
203
Parameters:
204
parallelism (int): Degree of parallelism for this sink
205
206
Returns:
207
DataSink: Self for method chaining
208
"""
209
```
210
211
## Usage Examples
212
213
### Reading Various Data Sources
214
215
```python
216
from flink.plan.Environment import get_environment
217
218
env = get_environment()
219
220
# Read CSV file with mixed types
221
sales_data = env.read_csv("sales.csv", [str, str, int, float])
222
# Expected format: customer_id, product_name, quantity, price
223
224
# Read text file for unstructured data
225
log_lines = env.read_text("application.log")
226
227
# Create test data from collection
228
test_data = env.from_elements(
229
("Alice", 25, "Engineer"),
230
("Bob", 30, "Manager"),
231
("Charlie", 35, "Analyst")
232
)
233
234
# Generate sequence for testing
235
numbers = env.generate_sequence(1, 1000)
236
```
237
238
### Writing to Different Output Formats
239
240
```python
241
# Process some data
242
processed_data = sales_data.filter(lambda x: x[2] > 0) # Filter positive quantities
243
244
# Write as text file
245
processed_data.write_text("output/results.txt", WriteMode.OVERWRITE)
246
247
# Write as CSV with custom delimiters
248
processed_data.write_csv(
249
"output/results.csv",
250
line_delimiter="\n",
251
field_delimiter="|",
252
write_mode=WriteMode.OVERWRITE
253
)
254
255
# Print to console for debugging
256
processed_data.output()
257
258
# Print errors to stderr
259
error_data = sales_data.filter(lambda x: x[2] < 0)
260
error_data.output(to_error=True)
261
```
262
263
### Complex File Processing Pipeline
264
265
```python
266
from flink.functions.FlatMapFunction import FlatMapFunction
267
from flink.functions.GroupReduceFunction import GroupReduceFunction
268
269
# Read multiple text files
270
input_files = [
271
"logs/2023-01-01.log",
272
"logs/2023-01-02.log",
273
"logs/2023-01-03.log"
274
]
275
276
# Process each file and union results
277
all_logs = None
278
for file_path in input_files:
279
file_data = env.read_text(file_path)
280
if all_logs is None:
281
all_logs = file_data
282
else:
283
all_logs = all_logs.union(file_data)
284
285
# Extract error messages
286
class ErrorExtractor(FlatMapFunction):
287
def flat_map(self, log_line, collector):
288
if "ERROR" in log_line:
289
parts = log_line.split(" ", 3)
290
if len(parts) >= 4:
291
timestamp = parts[0] + " " + parts[1]
292
error_msg = parts[3]
293
collector.collect((timestamp, error_msg))
294
295
errors = all_logs.flat_map(ErrorExtractor())
296
297
# Count errors by hour
298
class HourlyErrorCounter(GroupReduceFunction):
299
def reduce(self, iterator, collector):
300
hour_counts = {}
301
for timestamp, error_msg in iterator:
302
hour = timestamp[:13] # Extract date and hour
303
hour_counts[hour] = hour_counts.get(hour, 0) + 1
304
305
for hour, count in hour_counts.items():
306
collector.collect((hour, count))
307
308
hourly_errors = errors.group_by(0).reduce_group(HourlyErrorCounter())
309
310
# Write results to multiple outputs
311
hourly_errors.write_csv("output/hourly_error_counts.csv")
312
errors.write_text("output/all_errors.txt")
313
```
314
315
### Configurable Data Processing
316
317
```python
318
# Read configuration from CSV
319
config_data = env.read_csv("config/processing_config.csv", [str, str])
320
321
# Read main dataset
322
main_data = env.read_csv("data/input.csv", [str, int, float, str])
323
324
# Process and write with configuration
325
result = main_data.filter(lambda x: x[1] > 0)
326
327
# Configure output with descriptive names and parallelism
328
text_sink = result.write_text("output/processed_data.txt", WriteMode.OVERWRITE) \
329
.name("Processed Data Output") \
330
.set_parallelism(4)
331
332
csv_sink = result.write_csv("output/processed_data.csv", write_mode=WriteMode.OVERWRITE) \
333
.name("CSV Export") \
334
.set_parallelism(2)
335
336
# Also output to console for monitoring
337
monitoring_output = result.output().name("Console Monitor")
338
```
339
340
### Handling Different File Formats
341
342
```python
343
# Read data with different delimiters
344
pipe_delimited = env.read_csv("data/pipe_separated.txt", [str, int, str], field_delimiter='|')
345
tab_delimited = env.read_csv("data/tab_separated.tsv", [str, int, str], field_delimiter='\t')
346
semicolon_delimited = env.read_csv("data/semicolon.csv", [str, int, str], field_delimiter=';')
347
348
# Union all data sources
349
combined = pipe_delimited.union(tab_delimited).union(semicolon_delimited)
350
351
# Write with consistent format
352
combined.write_csv(
353
"output/normalized.csv",
354
field_delimiter=',',
355
write_mode=WriteMode.OVERWRITE
356
)
357
```
358
359
### Error Handling and Validation
360
361
```python
362
from flink.functions.FilterFunction import FilterFunction
363
364
class DataValidator(FilterFunction):
365
def filter(self, record):
366
# Validate record has required fields and valid data
367
if len(record) < 3:
368
return False
369
if not isinstance(record[1], int) or record[1] < 0:
370
return False
371
return True
372
373
# Read and validate data
374
raw_data = env.read_csv("input.csv", [str, int, float])
375
valid_data = raw_data.filter(DataValidator())
376
invalid_data = raw_data.filter(lambda x: not DataValidator().filter(x))
377
378
# Write valid and invalid data to separate outputs
379
valid_data.write_csv("output/valid_records.csv")
380
invalid_data.write_text("output/invalid_records.txt")
381
382
# Print summary statistics
383
valid_count = valid_data.map(lambda x: 1).reduce(lambda a, b: a + b)
384
invalid_count = invalid_data.map(lambda x: 1).reduce(lambda a, b: a + b)
385
386
env.from_elements("Data validation complete").output()
387
```
388
389
### Performance Considerations
390
391
```python
392
# For large files, consider parallelism settings
393
large_dataset = env.read_csv("very_large_file.csv", [str, int, float, str])
394
395
# Set appropriate parallelism for processing
396
processed = large_dataset.map(lambda x: x).set_parallelism(8)
397
398
# Use appropriate parallelism for output
399
processed.write_text("output/large_output.txt") \
400
.set_parallelism(4) \
401
.name("Large File Output")
402
403
# For small files, limit parallelism to avoid overhead
404
small_dataset = env.read_csv("small_file.csv", [str, int])
405
small_result = small_dataset.map(lambda x: x).set_parallelism(1)
406
407
small_result.write_csv("output/small_output.csv") \
408
.set_parallelism(1) \
409
.name("Small File Output")
410
```