Apache Flink batch processing connectors for Avro, JDBC, Hadoop, HBase, and HCatalog data sources
—
JDBC database connectivity for Flink batch processing, supporting both reading from and writing to relational databases with parallel processing capabilities.
Reads data from JDBC databases with support for parallel processing and parameterized queries.
/**
* InputFormat for reading from JDBC databases in Flink
*/
public class JDBCInputFormat extends RichInputFormat<Row, InputSplit>
implements ResultTypeQueryable {
/**
* Default constructor for JDBCInputFormat
*/
public JDBCInputFormat();
/**
* Returns the type information for rows produced by this format
* @return RowTypeInfo describing the database schema
*/
public RowTypeInfo getProducedType();
/**
* Configures the input format with parameters
* @param parameters Configuration parameters
*/
public void configure(Configuration parameters);
/**
* Opens the input format for reading
*/
public void openInputFormat() throws IOException;
/**
* Closes the input format
*/
public void closeInputFormat() throws IOException;
/**
* Opens an individual input split
* @param inputSplit The input split to open
*/
public void open(InputSplit inputSplit) throws IOException;
/**
* Closes the current input split
*/
public void close() throws IOException;
/**
* Checks if the end of input has been reached
* @return true if no more records are available
*/
public boolean reachedEnd() throws IOException;
/**
* Reads the next record from the input
* @param row Row instance to reuse (may be null)
* @return The next record
*/
public Row nextRecord(Row row) throws IOException;
/**
* Gets statistics about the input data
* @param cachedStatistics Previously cached statistics
* @return Statistics about the input
*/
public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException;
/**
* Creates input splits for parallel processing
* @param minNumSplits Minimum number of splits to create
* @return Array of input splits
*/
public InputSplit[] createInputSplits(int minNumSplits) throws IOException;
/**
* Gets the input split assigner for distributing splits
* @param inputSplits The input splits to assign
* @return Input split assigner
*/
public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits);
/**
* Creates a builder for configuring JDBCInputFormat
* @return JDBCInputFormatBuilder instance
*/
public static JDBCInputFormatBuilder buildJDBCInputFormat();
}Builder pattern for configuring JDBC input operations with fluent API.
/**
* Builder for configuring JDBCInputFormat instances
*/
public static class JDBCInputFormatBuilder {
/**
* Creates a new JDBCInputFormatBuilder
*/
public JDBCInputFormatBuilder();
/**
* Sets the database username
* @param username Database username
* @return This builder instance for chaining
*/
public JDBCInputFormatBuilder setUsername(String username);
/**
* Sets the database password
* @param password Database password
* @return This builder instance for chaining
*/
public JDBCInputFormatBuilder setPassword(String password);
/**
* Sets the JDBC driver class name
* @param drivername Fully qualified driver class name
* @return This builder instance for chaining
*/
public JDBCInputFormatBuilder setDrivername(String drivername);
/**
* Sets the database connection URL
* @param dbURL JDBC connection URL
* @return This builder instance for chaining
*/
public JDBCInputFormatBuilder setDBUrl(String dbURL);
/**
* Sets the SQL query to execute
* @param query SQL SELECT query
* @return This builder instance for chaining
*/
public JDBCInputFormatBuilder setQuery(String query);
/**
* Sets the ResultSet type for scrolling behavior
* @param resultSetType ResultSet type constant (e.g., ResultSet.TYPE_FORWARD_ONLY)
* @return This builder instance for chaining
*/
public JDBCInputFormatBuilder setResultSetType(int resultSetType);
/**
* Sets the ResultSet concurrency for update behavior
* @param resultSetConcurrency ResultSet concurrency constant
* @return This builder instance for chaining
*/
public JDBCInputFormatBuilder setResultSetConcurrency(int resultSetConcurrency);
/**
* Sets parameter provider for parallel queries with different parameters
* @param parameterValuesProvider Provider for query parameters
* @return This builder instance for chaining
*/
public JDBCInputFormatBuilder setParametersProvider(ParameterValuesProvider parameterValuesProvider);
/**
* Sets the row type information for the query results
* @param rowTypeInfo Type information describing the result schema
* @return This builder instance for chaining
*/
public JDBCInputFormatBuilder setRowTypeInfo(RowTypeInfo rowTypeInfo);
/**
* Creates the configured JDBCInputFormat
* @return Configured JDBCInputFormat instance
*/
public JDBCInputFormat finish();
}Usage Example:
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.types.Row;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Configure JDBC input
JDBCInputFormat jdbcInput = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://localhost:3306/mydb")
.setUsername("user")
.setPassword("password")
.setQuery("SELECT id, name, age FROM users WHERE age > ?")
.setParametersProvider(new GenericParameterValuesProvider(new Serializable[][] {
{18}, {21}, {25} // Multiple parameter sets for parallel execution
}))
.setRowTypeInfo(new RowTypeInfo(
BasicTypeInfo.INT_TYPE_INFO, // id
BasicTypeInfo.STRING_TYPE_INFO, // name
BasicTypeInfo.INT_TYPE_INFO // age
))
.finish();
DataSet<Row> users = env.createInput(jdbcInput);
users.print();Writes data to JDBC databases with batch processing support for high-performance inserts.
/**
* OutputFormat for writing to JDBC databases in Flink
*/
public class JDBCOutputFormat extends RichOutputFormat<Row> {
/**
* Default constructor for JDBCOutputFormat
*/
public JDBCOutputFormat();
/**
* Array of SQL types for the columns (public for configuration)
*/
public int[] typesArray;
/**
* Configures the output format with parameters
* @param parameters Configuration parameters
*/
public void configure(Configuration parameters);
/**
* Opens the output format for writing
* @param taskNumber The number of the parallel task
* @param numTasks The total number of parallel tasks
*/
public void open(int taskNumber, int numTasks) throws IOException;
/**
* Writes a record to the output
* @param row The row to write
*/
public void writeRecord(Row row) throws IOException;
/**
* Closes the output format and flushes any remaining data
*/
public void close() throws IOException;
/**
* Creates a builder for configuring JDBCOutputFormat
* @return JDBCOutputFormatBuilder instance
*/
public static JDBCOutputFormatBuilder buildJDBCOutputFormat();
}Builder for configuring JDBC output operations with support for batch processing.
/**
* Builder for configuring JDBCOutputFormat instances
*/
public static class JDBCOutputFormatBuilder {
/**
* Creates a new JDBCOutputFormatBuilder (protected constructor)
*/
protected JDBCOutputFormatBuilder();
/**
* Sets the database username
* @param username Database username
* @return This builder instance for chaining
*/
public JDBCOutputFormatBuilder setUsername(String username);
/**
* Sets the database password
* @param password Database password
* @return This builder instance for chaining
*/
public JDBCOutputFormatBuilder setPassword(String password);
/**
* Sets the JDBC driver class name
* @param drivername Fully qualified driver class name
* @return This builder instance for chaining
*/
public JDBCOutputFormatBuilder setDrivername(String drivername);
/**
* Sets the database connection URL
* @param dbURL JDBC connection URL
* @return This builder instance for chaining
*/
public JDBCOutputFormatBuilder setDBUrl(String dbURL);
/**
* Sets the SQL query for writing (INSERT, UPDATE, or DELETE)
* @param query SQL modification query with parameter placeholders
* @return This builder instance for chaining
*/
public JDBCOutputFormatBuilder setQuery(String query);
/**
* Sets the batch interval for bulk operations
* @param batchInterval Number of records to batch before executing
* @return This builder instance for chaining
*/
public JDBCOutputFormatBuilder setBatchInterval(int batchInterval);
/**
* Sets the SQL types for the columns
* @param typesArray Array of java.sql.Types constants
* @return This builder instance for chaining
*/
public JDBCOutputFormatBuilder setSqlTypes(int[] typesArray);
/**
* Creates the configured JDBCOutputFormat
* @return Configured JDBCOutputFormat instance
*/
public JDBCOutputFormat finish();
}Usage Example:
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import java.sql.Types;
// Configure JDBC output
JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://localhost:3306/mydb")
.setUsername("user")
.setPassword("password")
.setQuery("INSERT INTO users (name, age, email) VALUES (?, ?, ?)")
.setBatchInterval(1000) // Batch 1000 records at a time
.setSqlTypes(new int[] {
Types.VARCHAR, // name
Types.INTEGER, // age
Types.VARCHAR // email
})
.finish();
// Write data
DataSet<Row> users = // ... your data
users.output(jdbcOutput);Interface for providing query parameters to enable parallel JDBC reads with different parameter sets.
/**
* Interface for providing query parameters for parallel JDBC reads
*/
public interface ParameterValuesProvider {
/**
* Returns a matrix of parameter values for parallel queries
* Each row represents parameters for one parallel query execution
* @return 2D array where each row contains parameters for one query
*/
Serializable[][] getParameterValues();
}Generic implementation that wraps pre-computed query parameters.
/**
* Generic implementation that wraps pre-computed query parameters
*/
public class GenericParameterValuesProvider implements ParameterValuesProvider {
/**
* Creates a provider with pre-computed parameters
* @param parameters 2D array of parameter values
*/
public GenericParameterValuesProvider(Serializable[][] parameters);
/**
* Returns the pre-computed parameters
* @return The parameter matrix
*/
public Serializable[][] getParameterValues();
}Usage Example:
// Create parameters for parallel execution
Serializable[][] parameters = {
{"USA", 1000}, // Query 1: WHERE country = 'USA' AND salary > 1000
{"Canada", 1200}, // Query 2: WHERE country = 'Canada' AND salary > 1200
{"UK", 800} // Query 3: WHERE country = 'UK' AND salary > 800
};
ParameterValuesProvider provider = new GenericParameterValuesProvider(parameters);Generates parameters for BETWEEN queries on numeric columns to enable range-based parallel processing.
/**
* Generates parameters for BETWEEN queries on numeric columns for parallel processing
*/
public class NumericBetweenParametersProvider implements ParameterValuesProvider {
/**
* Creates a provider that generates BETWEEN parameters for numeric ranges
* @param fetchSize Number of records to fetch per parallel query
* @param min Minimum value of the numeric range
* @param max Maximum value of the numeric range
*/
public NumericBetweenParametersProvider(long fetchSize, long min, long max);
/**
* Returns generated BETWEEN parameters for parallel range queries
* @return Parameter matrix for BETWEEN queries
*/
public Serializable[][] getParameterValues();
}Usage Example:
// Generate parameters for parallel range queries
// This will create multiple queries like: WHERE id BETWEEN ? AND ?
NumericBetweenParametersProvider provider =
new NumericBetweenParametersProvider(
10000, // Fetch 10,000 records per query
1, // Minimum ID value
100000 // Maximum ID value
);
// Results in queries like: WHERE id BETWEEN 1 AND 10000, WHERE id BETWEEN 10001 AND 20000, etc.
JDBCInputFormat jdbcInput = JDBCInputFormat.buildJDBCInputFormat()
.setQuery("SELECT * FROM large_table WHERE id BETWEEN ? AND ?")
.setParametersProvider(provider)
// ... other configuration
.finish();// Flink types
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.Row;
// JDBC parameter providers
import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
import org.apache.flink.api.java.io.jdbc.split.GenericParameterValuesProvider;
import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
// Java types
import java.io.Serializable;
import java.io.IOException;
import java.sql.Types;
import java.sql.ResultSet;