or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

environment.mdfunctions.mdindex.mdjoins.mdsources-sinks.mdtransformations.md
tile.json

sources-sinks.mddocs/

Data Sources and Sinks

Input and output operations for reading from and writing to various data sources. These operations provide the interface between Flink programs and external data systems, supporting files, collections, and streaming outputs.

Capabilities

Data Sources

Data sources create DataSets from external data systems or in-memory collections.

CSV File Sources

Reads structured data from CSV files with configurable parsing options.

def read_csv(self, path, types, line_delimiter="\n", field_delimiter=','):
    """
    Create a DataSet that represents the tuples produced by reading the given CSV file.
    
    Automatically parses CSV fields according to specified types and handles
    configurable delimiters for different CSV formats.
    
    Parameters:
        path (str): The path of the CSV file (local file system or HDFS)
        types (list): List specifying the types for CSV fields (e.g., [str, int, float])
        line_delimiter (str): Line delimiter, default "\n"
        field_delimiter (str): Field delimiter, default ","
        
    Returns:
        DataSet: A DataSet where each element is a tuple representing one CSV row
    """

Text File Sources

Reads unstructured text data line by line.

def read_text(self, path):
    """
    Creates a DataSet that represents the Strings produced by reading the given file line wise.
    
    The file will be read with the system's default character set. Each line becomes
    a separate element in the DataSet.
    
    Parameters:
        path (str): The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
        
    Returns:
        DataSet: A DataSet where each element is a string representing one line
    """

Collection Sources

Creates DataSets from in-memory Python collections.

def from_elements(self, *elements):
    """
    Creates a new data set that contains the given elements.
    
    The elements must all be of the same type, for example, all String or Integer.
    The sequence of elements must not be empty. Useful for testing and small datasets.
    
    Parameters:
        *elements: The elements to make up the data set (must be same type)
        
    Returns:
        DataSet: A DataSet representing the given list of elements
    """

Sequence Sources

Generates sequences of numbers for testing and synthetic data.

def generate_sequence(self, frm, to):
    """
    Creates a new data set that contains the given sequence of numbers.
    
    Generates consecutive integers from start to end (inclusive).
    Useful for testing and creating synthetic datasets.
    
    Parameters:
        frm (int): The start number for the sequence
        to (int): The end number for the sequence (inclusive)
        
    Returns:
        DataSet: A DataSet representing the given sequence of numbers
    """

Data Sinks

Data sinks write DataSet contents to external systems or output streams.

Text File Sinks

Writes DataSet elements as text files.

def write_text(self, path, write_mode=WriteMode.NO_OVERWRITE):
    """
    Writes a DataSet as a text file to the specified location.
    
    Each element is converted to its string representation and written as a separate line.
    Supports both local file system and distributed file systems like HDFS.
    
    Parameters:
        path (str): The path pointing to the location where the text file is written
        write_mode (WriteMode): Behavior when output file exists (NO_OVERWRITE or OVERWRITE)
        
    Returns:
        DataSink: Sink operation that can be configured further
    """

CSV File Sinks

Writes structured data as CSV files.

def write_csv(self, path, line_delimiter="\n", field_delimiter=',', write_mode=WriteMode.NO_OVERWRITE):
    """
    Writes a Tuple DataSet as a CSV file to the specified location.
    
    Only Tuple DataSets can be written as CSV files. Each tuple becomes a CSV row
    with fields separated by the specified delimiter.
    
    Parameters:
        path (str): The path pointing to the location where the CSV file is written
        line_delimiter (str): Line delimiter, default "\n"
        field_delimiter (str): Field delimiter, default ","
        write_mode (WriteMode): Behavior when output file exists
        
    Returns:
        DataSink: Sink operation that can be configured further
    """

Standard Output Sinks

Writes DataSet contents to standard output for debugging and monitoring.

def output(self, to_error=False):
    """
    Writes a DataSet to the standard output stream (stdout).
    
    Each element is converted to string and printed. Useful for debugging
    and small result sets. Not recommended for large datasets in production.
    
    Parameters:
        to_error (bool): Whether to write to stderr instead of stdout
        
    Returns:
        DataSink: Sink operation that can be configured further
    """

Write Modes

Configuration for file output behavior when target files exist.

class WriteMode:
    NO_OVERWRITE = 0  # Fail if output file already exists
    OVERWRITE = 1     # Overwrite existing files

Data Sink Configuration

Sink Naming

Sets descriptive names for sink operations.

def name(self, name):
    """
    Sets name for the sink operation (debugging/monitoring).
    
    Parameters:
        name (str): Descriptive name for the sink operation
        
    Returns:
        DataSink: Self for method chaining
    """

Sink Parallelism

Controls parallelism for sink operations.

def set_parallelism(self, parallelism):
    """
    Sets parallelism for this sink operation.
    
    Controls how many parallel writers are used for the output operation.
    Higher parallelism can improve write throughput for large datasets.
    
    Parameters:
        parallelism (int): Degree of parallelism for this sink
        
    Returns:
        DataSink: Self for method chaining
    """

Usage Examples

Reading Various Data Sources

from flink.plan.Environment import get_environment

env = get_environment()

# Read CSV file with mixed types
sales_data = env.read_csv("sales.csv", [str, str, int, float])
# Expected format: customer_id, product_name, quantity, price

# Read text file for unstructured data
log_lines = env.read_text("application.log")

# Create test data from collection
test_data = env.from_elements(
    ("Alice", 25, "Engineer"),
    ("Bob", 30, "Manager"),
    ("Charlie", 35, "Analyst")
)

# Generate sequence for testing
numbers = env.generate_sequence(1, 1000)

Writing to Different Output Formats

# Process some data
processed_data = sales_data.filter(lambda x: x[2] > 0)  # Filter positive quantities

# Write as text file
processed_data.write_text("output/results.txt", WriteMode.OVERWRITE)

# Write as CSV with custom delimiters
processed_data.write_csv(
    "output/results.csv",
    line_delimiter="\n",
    field_delimiter="|",
    write_mode=WriteMode.OVERWRITE
)

# Print to console for debugging
processed_data.output()

# Print errors to stderr
error_data = sales_data.filter(lambda x: x[2] < 0)
error_data.output(to_error=True)

Complex File Processing Pipeline

from flink.functions.FlatMapFunction import FlatMapFunction
from flink.functions.GroupReduceFunction import GroupReduceFunction

# Read multiple text files
input_files = [
    "logs/2023-01-01.log",
    "logs/2023-01-02.log", 
    "logs/2023-01-03.log"
]

# Process each file and union results
all_logs = None
for file_path in input_files:
    file_data = env.read_text(file_path)
    if all_logs is None:
        all_logs = file_data
    else:
        all_logs = all_logs.union(file_data)

# Extract error messages
class ErrorExtractor(FlatMapFunction):
    def flat_map(self, log_line, collector):
        if "ERROR" in log_line:
            parts = log_line.split(" ", 3)
            if len(parts) >= 4:
                timestamp = parts[0] + " " + parts[1]
                error_msg = parts[3]
                collector.collect((timestamp, error_msg))

errors = all_logs.flat_map(ErrorExtractor())

# Count errors by hour
class HourlyErrorCounter(GroupReduceFunction):
    def reduce(self, iterator, collector):
        hour_counts = {}
        for timestamp, error_msg in iterator:
            hour = timestamp[:13]  # Extract date and hour
            hour_counts[hour] = hour_counts.get(hour, 0) + 1
        
        for hour, count in hour_counts.items():
            collector.collect((hour, count))

hourly_errors = errors.group_by(0).reduce_group(HourlyErrorCounter())

# Write results to multiple outputs
hourly_errors.write_csv("output/hourly_error_counts.csv")
errors.write_text("output/all_errors.txt")

Configurable Data Processing

# Read configuration from CSV
config_data = env.read_csv("config/processing_config.csv", [str, str])

# Read main dataset
main_data = env.read_csv("data/input.csv", [str, int, float, str])

# Process and write with configuration
result = main_data.filter(lambda x: x[1] > 0)

# Configure output with descriptive names and parallelism
text_sink = result.write_text("output/processed_data.txt", WriteMode.OVERWRITE) \
    .name("Processed Data Output") \
    .set_parallelism(4)

csv_sink = result.write_csv("output/processed_data.csv", write_mode=WriteMode.OVERWRITE) \
    .name("CSV Export") \
    .set_parallelism(2)

# Also output to console for monitoring
monitoring_output = result.output().name("Console Monitor")

Handling Different File Formats

# Read data with different delimiters
pipe_delimited = env.read_csv("data/pipe_separated.txt", [str, int, str], field_delimiter='|')
tab_delimited = env.read_csv("data/tab_separated.tsv", [str, int, str], field_delimiter='\t')
semicolon_delimited = env.read_csv("data/semicolon.csv", [str, int, str], field_delimiter=';')

# Union all data sources
combined = pipe_delimited.union(tab_delimited).union(semicolon_delimited)

# Write with consistent format
combined.write_csv(
    "output/normalized.csv",
    field_delimiter=',',
    write_mode=WriteMode.OVERWRITE
)

Error Handling and Validation

from flink.functions.FilterFunction import FilterFunction

class DataValidator(FilterFunction):
    def filter(self, record):
        # Validate record has required fields and valid data
        if len(record) < 3:
            return False
        if not isinstance(record[1], int) or record[1] < 0:
            return False
        return True

# Read and validate data
raw_data = env.read_csv("input.csv", [str, int, float])
valid_data = raw_data.filter(DataValidator())
invalid_data = raw_data.filter(lambda x: not DataValidator().filter(x))

# Write valid and invalid data to separate outputs
valid_data.write_csv("output/valid_records.csv")
invalid_data.write_text("output/invalid_records.txt")

# Print summary statistics
valid_count = valid_data.map(lambda x: 1).reduce(lambda a, b: a + b)
invalid_count = invalid_data.map(lambda x: 1).reduce(lambda a, b: a + b)

env.from_elements("Data validation complete").output()

Performance Considerations

# For large files, consider parallelism settings
large_dataset = env.read_csv("very_large_file.csv", [str, int, float, str])

# Set appropriate parallelism for processing
processed = large_dataset.map(lambda x: x).set_parallelism(8)

# Use appropriate parallelism for output
processed.write_text("output/large_output.txt") \
    .set_parallelism(4) \
    .name("Large File Output")

# For small files, limit parallelism to avoid overhead
small_dataset = env.read_csv("small_file.csv", [str, int])
small_result = small_dataset.map(lambda x: x).set_parallelism(1)

small_result.write_csv("output/small_output.csv") \
    .set_parallelism(1) \
    .name("Small File Output")