CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-sql-avro

Apache Flink SQL Avro format library that provides bundled and shaded Apache Avro dependencies for SQL usage in Flink applications.

Pending
Overview
Eval results
Files

filesystem.mddocs/

File System Operations

Capabilities

Avro Input Format

File input format for reading Avro files in batch processing scenarios.

/**
 * Input format for reading Avro files
 * @param <T> Type of records to read
 */
public class AvroInputFormat<T> extends FileInputFormat<T> {
    
    /**
     * Creates AvroInputFormat for SpecificRecord types
     * @param filePath Path to Avro file(s)
     * @param recordClazz SpecificRecord class to deserialize to
     */
    public AvroInputFormat(Path filePath, Class<T> recordClazz);
    
    /**
     * Creates AvroInputFormat for GenericRecord types
     * @param filePath Path to Avro file(s)  
     * @param schema Avro schema for GenericRecord deserialization
     */
    public AvroInputFormat(Path filePath, Schema schema);
    
    /**
     * Reads the next record from the input
     * @param reuse Object to reuse for the next record (may be null)
     * @return Next record, or null if end of input
     * @throws IOException If reading fails
     */
    public T nextRecord(T reuse) throws IOException;
    
    /**
     * Checks if the input has been exhausted
     * @return true if no more records available
     * @throws IOException If check fails
     */
    public boolean reachedEnd() throws IOException;
}

Avro Output Format

File output format for writing Avro files in batch processing scenarios.

/**
 * Output format for writing Avro files
 * @param <T> Type of records to write
 */
public class AvroOutputFormat<T> extends FileOutputFormat<T> {
    
    /**
     * Creates AvroOutputFormat for SpecificRecord types
     * @param outputFilePath Path where to write Avro file
     * @param recordClazz SpecificRecord class to serialize from
     */
    public AvroOutputFormat(Path outputFilePath, Class<T> recordClazz);
    
    /**
     * Creates AvroOutputFormat for GenericRecord types
     * @param outputFilePath Path where to write Avro file
     * @param schema Avro schema for GenericRecord serialization
     */
    public AvroOutputFormat(Path outputFilePath, Schema schema);
    
    /**
     * Writes a record to the output
     * @param record Record to write
     * @throws IOException If writing fails
     */
    public void writeRecord(T record) throws IOException;
    
    /**
     * Sets the compression codec for the output file
     * @param codecName Name of compression codec (snappy, gzip, etc.)
     */
    public void setCodec(String codecName);
}

Bulk Writer Support

High-performance bulk writing for streaming scenarios.

/**
 * Bulk writer for Avro files that wraps an Avro DataFileWriter
 * @param <T> Type of records to write
 */
public class AvroBulkWriter<T> implements BulkWriter<T> {
    
    /**
     * Creates a new AvroBulkWriter wrapping the given Avro DataFileWriter
     * @param dataFileWriter The underlying Avro DataFileWriter
     */
    public AvroBulkWriter(DataFileWriter<T> dataFileWriter);
    
    /**
     * Adds an element to the writer
     * @param element Element to write
     * @throws IOException If writing fails
     */
    public void addElement(T element) throws IOException;
    
    /**
     * Flushes pending writes
     * @throws IOException If flush fails
     */
    public void flush() throws IOException;
    
    /**
     * Finishes writing and closes the writer
     * @throws IOException If finish fails
     */
    public void finish() throws IOException;
}

/**
 * Factory for creating Avro bulk writers
 * @param <T> Type of records to write
 */
public class AvroWriterFactory<T> implements BulkWriter.Factory<T> {
    
    /**
     * Creates AvroWriterFactory with AvroBuilder
     * @param avroBuilder Builder for creating DataFileWriter instances
     */
    public AvroWriterFactory(AvroBuilder<T> avroBuilder);
    
    /**
     * Creates a bulk writer for the given output stream
     * @param out Output stream to write to
     * @return Bulk writer instance
     * @throws IOException If creation fails
     */
    public BulkWriter<T> create(FSDataOutputStream out) throws IOException;
}

/**
 * Builder interface for creating Avro DataFileWriter instances
 * This is a functional interface that extends Serializable
 * @param <T> Type of records the writer will handle
 */
@FunctionalInterface
public interface AvroBuilder<T> extends Serializable {
    
    /**
     * Creates and configures an Avro writer to the given output stream
     * @param outputStream Output stream to write to
     * @return Configured DataFileWriter
     * @throws IOException If creation fails
     */
    DataFileWriter<T> createWriter(OutputStream outputStream) throws IOException;
}

Usage Examples

Reading Avro Files - Batch Processing

// Reading SpecificRecord files
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// For generated SpecificRecord classes
AvroInputFormat<User> inputFormat = 
    new AvroInputFormat<>(new Path("/path/to/user/files/*.avro"), User.class);

DataSet<User> users = env.createInput(inputFormat);

// Process the data
DataSet<String> usernames = users
    .filter(user -> user.getAge() > 18)
    .map(user -> user.getUsername().toString());

// Reading GenericRecord files
Schema schema = new Schema.Parser().parse(new File("user-schema.avsc"));
AvroInputFormat<GenericRecord> genericInputFormat = 
    new AvroInputFormat<>(new Path("/path/to/generic/files/*.avro"), schema);

DataSet<GenericRecord> records = env.createInput(genericInputFormat);

// Process generic records
DataSet<Long> userIds = records
    .map(record -> (Long) record.get("user_id"));

Writing Avro Files - Batch Processing

// Writing SpecificRecord files
DataSet<User> users = // ... your user dataset

AvroOutputFormat<User> outputFormat = 
    new AvroOutputFormat<>(new Path("/output/path/users.avro"), User.class);

// Set compression codec
outputFormat.setCodec("snappy");

users.output(outputFormat);

// Writing GenericRecord files with custom schema
Schema schema = SchemaBuilder.record("ProcessedUser")
    .fields()
    .name("id").type().longType().noDefault()
    .name("processed_name").type().stringType().noDefault()
    .name("score").type().doubleType().noDefault()
    .endRecord();

DataSet<GenericRecord> processedUsers = users
    .map(new MapFunction<User, GenericRecord>() {
        @Override
        public GenericRecord map(User user) throws Exception {
            GenericRecord record = new GenericData.Record(schema);
            record.put("id", user.getId());
            record.put("processed_name", user.getUsername().toString().toUpperCase());
            record.put("score", calculateScore(user));
            return record;
        }
    });

AvroOutputFormat<GenericRecord> genericOutputFormat = 
    new AvroOutputFormat<>(new Path("/output/path/processed.avro"), schema);

processedUsers.output(genericOutputFormat);

env.execute("Process Avro Files");

Bulk Writing - Streaming to Files

// Setup for streaming to Avro files
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Create Avro builder for SpecificRecord
AvroBuilder<User> avroBuilder = new AvroBuilder<User>() {
    @Override
    public DataFileWriter<User> createWriter(OutputStream out) throws IOException {
        DatumWriter<User> datumWriter = new SpecificDatumWriter<>(User.class);
        DataFileWriter<User> dataFileWriter = new DataFileWriter<>(datumWriter);
        
        // Set codec
        dataFileWriter.setCodec(CodecFactory.snappyCodec());
        
        // Create with schema from SpecificRecord
        dataFileWriter.create(User.getClassSchema(), out);
        return dataFileWriter;
    }
};

// Create writer factory
AvroWriterFactory<User> writerFactory = new AvroWriterFactory<>(avroBuilder);

// Stream to files
DataStream<User> userStream = // ... your user stream

userStream
    .addSink(StreamingFileSink
        .forBulkFormat(new Path("/streaming/output/path"), writerFactory)
        .withRollingPolicy(OnCheckpointRollingPolicy.build())
        .build());

env.execute("Stream to Avro Files");

Custom Avro Builder with Compression

// Custom builder for GenericRecord with custom configuration
AvroBuilder<GenericRecord> customBuilder = new AvroBuilder<GenericRecord>() {
    private final Schema schema;
    
    public CustomAvroBuilder(Schema schema) {
        this.schema = schema;
    }
    
    @Override
    public DataFileWriter<GenericRecord> createWriter(OutputStream out) throws IOException {
        DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
        DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
        
        // Configure compression
        dataFileWriter.setCodec(CodecFactory.deflateCodec(6)); // Compression level 6
        
        // Set custom metadata
        dataFileWriter.setMeta("created_by", "flink-sql-avro");
        dataFileWriter.setMeta("created_at", System.currentTimeMillis());
        
        dataFileWriter.create(schema, out);
        return dataFileWriter;
    }
};

// Use custom builder
AvroWriterFactory<GenericRecord> customWriterFactory = 
    new AvroWriterFactory<>(customBuilder);

File System Table Integration

// Create file system table with Avro format
String createTableSQL = """
    CREATE TABLE user_files (
        user_id BIGINT,
        username STRING,
        email STRING,
        registration_date DATE,
        last_login TIMESTAMP(3)
    ) PARTITIONED BY (registration_date) 
    WITH (
        'connector' = 'filesystem',
        'path' = '/data/warehouse/users',
        'format' = 'avro',
        'avro.codec' = 'snappy',
        'sink.partition-commit.policy.kind' = 'success-file'
    )
    """;

tableEnv.executeSql(createTableSQL);

// Insert data into partitioned Avro files
String insertSQL = """
    INSERT INTO user_files
    SELECT 
        user_id,
        username,
        email,
        CAST(created_at AS DATE) as registration_date,
        last_login
    FROM user_stream
    """;

tableEnv.executeSql(insertSQL);

// Read from partitioned Avro files
Table result = tableEnv.sqlQuery("""
    SELECT 
        COUNT(*) as daily_registrations,
        registration_date
    FROM user_files
    WHERE registration_date >= CURRENT_DATE - INTERVAL '30' DAY
    GROUP BY registration_date
    ORDER BY registration_date
    """);

Performance Optimization

// Optimized settings for large file processing
AvroInputFormat<GenericRecord> optimizedInput = 
    new AvroInputFormat<>(new Path("/large/files/*.avro"), schema);

// Configure parallel reading
env.getConfig().setParallelism(16);

// For output, use appropriate compression
AvroOutputFormat<GenericRecord> optimizedOutput = 
    new AvroOutputFormat<>(new Path("/output/compressed.avro"), schema);

// Balance between compression ratio and speed
optimizedOutput.setCodec("snappy"); // Fast compression
// optimizedOutput.setCodec("gzip");  // Better compression ratio
// optimizedOutput.setCodec("zstd");  // Good balance

// For streaming, configure checkpointing for fault tolerance
env.enableCheckpointing(60000); // Checkpoint every minute

// Bulk writing with appropriate rolling policy
StreamingFileSink<GenericRecord> sink = StreamingFileSink
    .forBulkFormat(new Path("/streaming/output"), writerFactory)
    .withRollingPolicy(
        DefaultRollingPolicy.builder()
            .withRolloverInterval(TimeUnit.MINUTES.toMillis(15)) // Roll every 15 minutes
            .withInactivityInterval(TimeUnit.MINUTES.toMillis(5)) // Roll after 5 minutes of inactivity
            .withMaxPartSize(1024 * 1024 * 128) // Roll at 128MB
            .build())
    .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd/HH"))
    .build();

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-sql-avro

docs

configuration.md

filesystem.md

index.md

registry.md

rowdata.md

schemas.md

utilities.md

tile.json