Input and output operations for reading from and writing to various data sources. These operations provide the interface between Flink programs and external data systems, supporting files, collections, and streaming outputs.
Data sources create DataSets from external data systems or in-memory collections.
Reads structured data from CSV files with configurable parsing options.
def read_csv(self, path, types, line_delimiter="\n", field_delimiter=','):
"""
Create a DataSet that represents the tuples produced by reading the given CSV file.
Automatically parses CSV fields according to specified types and handles
configurable delimiters for different CSV formats.
Parameters:
path (str): The path of the CSV file (local file system or HDFS)
types (list): List specifying the types for CSV fields (e.g., [str, int, float])
line_delimiter (str): Line delimiter, default "\n"
field_delimiter (str): Field delimiter, default ","
Returns:
DataSet: A DataSet where each element is a tuple representing one CSV row
"""Reads unstructured text data line by line.
def read_text(self, path):
"""
Creates a DataSet that represents the Strings produced by reading the given file line wise.
The file will be read with the system's default character set. Each line becomes
a separate element in the DataSet.
Parameters:
path (str): The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
Returns:
DataSet: A DataSet where each element is a string representing one line
"""Creates DataSets from in-memory Python collections.
def from_elements(self, *elements):
"""
Creates a new data set that contains the given elements.
The elements must all be of the same type, for example, all String or Integer.
The sequence of elements must not be empty. Useful for testing and small datasets.
Parameters:
*elements: The elements to make up the data set (must be same type)
Returns:
DataSet: A DataSet representing the given list of elements
"""Generates sequences of numbers for testing and synthetic data.
def generate_sequence(self, frm, to):
"""
Creates a new data set that contains the given sequence of numbers.
Generates consecutive integers from start to end (inclusive).
Useful for testing and creating synthetic datasets.
Parameters:
frm (int): The start number for the sequence
to (int): The end number for the sequence (inclusive)
Returns:
DataSet: A DataSet representing the given sequence of numbers
"""Data sinks write DataSet contents to external systems or output streams.
Writes DataSet elements as text files.
def write_text(self, path, write_mode=WriteMode.NO_OVERWRITE):
"""
Writes a DataSet as a text file to the specified location.
Each element is converted to its string representation and written as a separate line.
Supports both local file system and distributed file systems like HDFS.
Parameters:
path (str): The path pointing to the location where the text file is written
write_mode (WriteMode): Behavior when output file exists (NO_OVERWRITE or OVERWRITE)
Returns:
DataSink: Sink operation that can be configured further
"""Writes structured data as CSV files.
def write_csv(self, path, line_delimiter="\n", field_delimiter=',', write_mode=WriteMode.NO_OVERWRITE):
"""
Writes a Tuple DataSet as a CSV file to the specified location.
Only Tuple DataSets can be written as CSV files. Each tuple becomes a CSV row
with fields separated by the specified delimiter.
Parameters:
path (str): The path pointing to the location where the CSV file is written
line_delimiter (str): Line delimiter, default "\n"
field_delimiter (str): Field delimiter, default ","
write_mode (WriteMode): Behavior when output file exists
Returns:
DataSink: Sink operation that can be configured further
"""Writes DataSet contents to standard output for debugging and monitoring.
def output(self, to_error=False):
"""
Writes a DataSet to the standard output stream (stdout).
Each element is converted to string and printed. Useful for debugging
and small result sets. Not recommended for large datasets in production.
Parameters:
to_error (bool): Whether to write to stderr instead of stdout
Returns:
DataSink: Sink operation that can be configured further
"""Configuration for file output behavior when target files exist.
class WriteMode:
NO_OVERWRITE = 0 # Fail if output file already exists
OVERWRITE = 1 # Overwrite existing filesSets descriptive names for sink operations.
def name(self, name):
"""
Sets name for the sink operation (debugging/monitoring).
Parameters:
name (str): Descriptive name for the sink operation
Returns:
DataSink: Self for method chaining
"""Controls parallelism for sink operations.
def set_parallelism(self, parallelism):
"""
Sets parallelism for this sink operation.
Controls how many parallel writers are used for the output operation.
Higher parallelism can improve write throughput for large datasets.
Parameters:
parallelism (int): Degree of parallelism for this sink
Returns:
DataSink: Self for method chaining
"""from flink.plan.Environment import get_environment
env = get_environment()
# Read CSV file with mixed types
sales_data = env.read_csv("sales.csv", [str, str, int, float])
# Expected format: customer_id, product_name, quantity, price
# Read text file for unstructured data
log_lines = env.read_text("application.log")
# Create test data from collection
test_data = env.from_elements(
("Alice", 25, "Engineer"),
("Bob", 30, "Manager"),
("Charlie", 35, "Analyst")
)
# Generate sequence for testing
numbers = env.generate_sequence(1, 1000)# Process some data
processed_data = sales_data.filter(lambda x: x[2] > 0) # Filter positive quantities
# Write as text file
processed_data.write_text("output/results.txt", WriteMode.OVERWRITE)
# Write as CSV with custom delimiters
processed_data.write_csv(
"output/results.csv",
line_delimiter="\n",
field_delimiter="|",
write_mode=WriteMode.OVERWRITE
)
# Print to console for debugging
processed_data.output()
# Print errors to stderr
error_data = sales_data.filter(lambda x: x[2] < 0)
error_data.output(to_error=True)from flink.functions.FlatMapFunction import FlatMapFunction
from flink.functions.GroupReduceFunction import GroupReduceFunction
# Read multiple text files
input_files = [
"logs/2023-01-01.log",
"logs/2023-01-02.log",
"logs/2023-01-03.log"
]
# Process each file and union results
all_logs = None
for file_path in input_files:
file_data = env.read_text(file_path)
if all_logs is None:
all_logs = file_data
else:
all_logs = all_logs.union(file_data)
# Extract error messages
class ErrorExtractor(FlatMapFunction):
def flat_map(self, log_line, collector):
if "ERROR" in log_line:
parts = log_line.split(" ", 3)
if len(parts) >= 4:
timestamp = parts[0] + " " + parts[1]
error_msg = parts[3]
collector.collect((timestamp, error_msg))
errors = all_logs.flat_map(ErrorExtractor())
# Count errors by hour
class HourlyErrorCounter(GroupReduceFunction):
def reduce(self, iterator, collector):
hour_counts = {}
for timestamp, error_msg in iterator:
hour = timestamp[:13] # Extract date and hour
hour_counts[hour] = hour_counts.get(hour, 0) + 1
for hour, count in hour_counts.items():
collector.collect((hour, count))
hourly_errors = errors.group_by(0).reduce_group(HourlyErrorCounter())
# Write results to multiple outputs
hourly_errors.write_csv("output/hourly_error_counts.csv")
errors.write_text("output/all_errors.txt")# Read configuration from CSV
config_data = env.read_csv("config/processing_config.csv", [str, str])
# Read main dataset
main_data = env.read_csv("data/input.csv", [str, int, float, str])
# Process and write with configuration
result = main_data.filter(lambda x: x[1] > 0)
# Configure output with descriptive names and parallelism
text_sink = result.write_text("output/processed_data.txt", WriteMode.OVERWRITE) \
.name("Processed Data Output") \
.set_parallelism(4)
csv_sink = result.write_csv("output/processed_data.csv", write_mode=WriteMode.OVERWRITE) \
.name("CSV Export") \
.set_parallelism(2)
# Also output to console for monitoring
monitoring_output = result.output().name("Console Monitor")# Read data with different delimiters
pipe_delimited = env.read_csv("data/pipe_separated.txt", [str, int, str], field_delimiter='|')
tab_delimited = env.read_csv("data/tab_separated.tsv", [str, int, str], field_delimiter='\t')
semicolon_delimited = env.read_csv("data/semicolon.csv", [str, int, str], field_delimiter=';')
# Union all data sources
combined = pipe_delimited.union(tab_delimited).union(semicolon_delimited)
# Write with consistent format
combined.write_csv(
"output/normalized.csv",
field_delimiter=',',
write_mode=WriteMode.OVERWRITE
)from flink.functions.FilterFunction import FilterFunction
class DataValidator(FilterFunction):
def filter(self, record):
# Validate record has required fields and valid data
if len(record) < 3:
return False
if not isinstance(record[1], int) or record[1] < 0:
return False
return True
# Read and validate data
raw_data = env.read_csv("input.csv", [str, int, float])
valid_data = raw_data.filter(DataValidator())
invalid_data = raw_data.filter(lambda x: not DataValidator().filter(x))
# Write valid and invalid data to separate outputs
valid_data.write_csv("output/valid_records.csv")
invalid_data.write_text("output/invalid_records.txt")
# Print summary statistics
valid_count = valid_data.map(lambda x: 1).reduce(lambda a, b: a + b)
invalid_count = invalid_data.map(lambda x: 1).reduce(lambda a, b: a + b)
env.from_elements("Data validation complete").output()# For large files, consider parallelism settings
large_dataset = env.read_csv("very_large_file.csv", [str, int, float, str])
# Set appropriate parallelism for processing
processed = large_dataset.map(lambda x: x).set_parallelism(8)
# Use appropriate parallelism for output
processed.write_text("output/large_output.txt") \
.set_parallelism(4) \
.name("Large File Output")
# For small files, limit parallelism to avoid overhead
small_dataset = env.read_csv("small_file.csv", [str, int])
small_result = small_dataset.map(lambda x: x).set_parallelism(1)
small_result.write_csv("output/small_output.csv") \
.set_parallelism(1) \
.name("Small File Output")