Apache Flink batch processing connectors for Avro, JDBC, Hadoop, HBase, and HCatalog data sources
npx @tessl/cli install tessl/maven-org-apache-flink--flink-batch-connectors@1.1.0Apache Flink Batch Connectors provides comprehensive connectivity for Apache Flink batch processing to various external data sources and sinks. This collection of connectors enables seamless integration with Avro files, JDBC databases, Hadoop MapReduce jobs, HBase tables, and HCatalog metadata systems.
pom.xml:<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-batch-connectors</artifactId>
<version>1.1.5</version>
</dependency>The batch connectors are organized into separate modules. Import the specific connector classes you need:
// Avro connectors
import org.apache.flink.api.java.io.AvroInputFormat;
import org.apache.flink.api.java.io.AvroOutputFormat;
// JDBC connectors
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
// Hadoop compatibility
import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;
// HBase connector
import org.apache.flink.addons.hbase.TableInputFormat;
// HCatalog connector
import org.apache.flink.hcatalog.java.HCatInputFormat;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;
// Create execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Read from JDBC database
JDBCInputFormat jdbcInput = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://localhost:3306/mydb")
.setUsername("user")
.setPassword("password")
.setQuery("SELECT id, name, age FROM users")
.setRowTypeInfo(new RowTypeInfo(/* type info */))
.finish();
DataSet<Row> users = env.createInput(jdbcInput);
// Process and write to another sink
users.print();The Apache Flink Batch Connectors architecture consists of five specialized modules:
Read and write Apache Avro files with full schema support and type safety. Supports both generic records and specific record types with automatic serialization.
public class AvroInputFormat<E> extends FileInputFormat<E>
implements ResultTypeQueryable<E>, CheckpointableInputFormat<FileInputSplit, Tuple2<Long, Long>> {
public AvroInputFormat(Path filePath, Class<E> type);
public void setReuseAvroValue(boolean reuseAvroValue);
public TypeInformation<E> getProducedType();
}
public class AvroOutputFormat<E> extends FileOutputFormat<E> {
public AvroOutputFormat(Path filePath, Class<E> type);
public void setSchema(Schema schema);
}Connect to relational databases via JDBC with support for parallel reading, prepared statements, and batch writing. Includes parameter providers for efficient parallel data access.
public class JDBCInputFormat extends RichInputFormat<Row, InputSplit>
implements ResultTypeQueryable {
public static JDBCInputFormatBuilder buildJDBCInputFormat();
public RowTypeInfo getProducedType();
}
public class JDBCOutputFormat extends RichOutputFormat<Row> {
public static JDBCOutputFormatBuilder buildJDBCOutputFormat();
}Seamless integration with existing Hadoop MapReduce jobs, allowing reuse of Hadoop Mapper and Reducer implementations within Flink batch programs.
@Public
public class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
extends RichFlatMapFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYOUT,VALUEOUT>>
implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper);
public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf);
}Read from Apache HBase tables with region-aware splitting for optimal distributed processing. Provides abstract base classes for custom HBase integration.
public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat<T, TableInputSplit> {
protected abstract Scan getScanner();
protected abstract String getTableName();
protected abstract T mapResultToTuple(Result r);
}Access Hive tables through HCatalog with support for partition filtering, field selection, and automatic schema mapping to Flink tuples.
public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, HadoopInputSplit>
implements ResultTypeQueryable<T> {
public HCatInputFormatBase(String database, String table);
public HCatInputFormatBase<T> getFields(String... fields);
public HCatInputFormatBase<T> withFilter(String filter);
}// Flink core types used across connectors
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.types.Row;
import org.apache.flink.core.fs.Path;
// Hadoop integration types
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.JobConf;
// Avro types
import org.apache.avro.Schema;
// HBase types
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Result;