A bundled JDBC driver for Apache Flink SQL that packages the JDBC driver implementation along with its dependencies into a single JAR file
—
Comprehensive result set processing capabilities with support for all Java primitive types, temporal data, decimal precision, and complex data structures including Maps. Provides both index-based and label-based data access with complete type conversion support.
FlinkResultSet provides forward-only cursor navigation through query results with proper resource management.
public class FlinkResultSet extends BaseResultSet {
public FlinkResultSet(Statement statement, StatementResult result);
public FlinkResultSet(Statement statement, CloseableResultIterator<RowData> iterator, ResolvedSchema schema);
// Navigation
public boolean next() throws SQLException;
public void close() throws SQLException;
public boolean isClosed() throws SQLException;
public boolean wasNull() throws SQLException;
// Metadata access
public ResultSetMetaData getMetaData() throws SQLException;
public Statement getStatement() throws SQLException;
public int findColumn(String columnLabel) throws SQLException;
}Usage Example:
Statement statement = connection.createStatement();
ResultSet results = statement.executeQuery("SELECT id, name, balance FROM accounts");
try {
while (results.next()) {
int id = results.getInt(1);
String name = results.getString(2);
double balance = results.getDouble(3);
System.out.printf("Account %d: %s has balance $%.2f%n", id, name, balance);
}
} finally {
results.close();
}Retrieve primitive Java data types by column index or label with automatic type conversion.
public class FlinkResultSet extends BaseResultSet {
// By column index (1-based)
public boolean getBoolean(int columnIndex) throws SQLException;
public byte getByte(int columnIndex) throws SQLException;
public short getShort(int columnIndex) throws SQLException;
public int getInt(int columnIndex) throws SQLException;
public long getLong(int columnIndex) throws SQLException;
public float getFloat(int columnIndex) throws SQLException;
public double getDouble(int columnIndex) throws SQLException;
// By column label
public boolean getBoolean(String columnLabel) throws SQLException;
public byte getByte(String columnLabel) throws SQLException;
public short getShort(String columnLabel) throws SQLException;
public int getInt(String columnLabel) throws SQLException;
public long getLong(String columnLabel) throws SQLException;
public float getFloat(String columnLabel) throws SQLException;
public double getDouble(String columnLabel) throws SQLException;
}Usage Example:
ResultSet results = statement.executeQuery(
"SELECT order_id, quantity, price, is_shipped, tax_rate " +
"FROM orders WHERE status = 'completed'"
);
while (results.next()) {
// Access by index
int orderId = results.getInt(1);
short quantity = results.getShort(2);
double price = results.getDouble(3);
boolean isShipped = results.getBoolean(4);
float taxRate = results.getFloat(5);
// Or access by column name
int orderIdByName = results.getInt("order_id");
short quantityByName = results.getShort("quantity");
System.out.printf("Order %d: %d items at $%.2f (shipped: %b, tax: %.2f%%)%n",
orderId, quantity, price, isShipped, taxRate * 100);
}Retrieve string and binary data with comprehensive character encoding support.
public class FlinkResultSet extends BaseResultSet {
// String data
public String getString(int columnIndex) throws SQLException;
public String getString(String columnLabel) throws SQLException;
// Binary data
public byte[] getBytes(int columnIndex) throws SQLException;
public byte[] getBytes(String columnLabel) throws SQLException;
}Usage Example:
ResultSet results = statement.executeQuery(
"SELECT user_id, username, profile_data, avatar_image " +
"FROM user_profiles"
);
while (results.next()) {
int userId = results.getInt("user_id");
String username = results.getString("username");
String profileData = results.getString("profile_data"); // JSON string
byte[] avatarImage = results.getBytes("avatar_image"); // Binary data
System.out.printf("User %d (%s): %d bytes profile, %d bytes avatar%n",
userId, username, profileData.length(), avatarImage.length);
}Handle high-precision decimal numbers with configurable scale and precision.
public class FlinkResultSet extends BaseResultSet {
public BigDecimal getBigDecimal(int columnIndex) throws SQLException;
public BigDecimal getBigDecimal(String columnLabel) throws SQLException;
// Deprecated method with scale parameter
public BigDecimal getBigDecimal(int columnIndex, int scale) throws SQLException;
public BigDecimal getBigDecimal(String columnLabel, int scale) throws SQLException;
}Usage Example:
ResultSet results = statement.executeQuery(
"SELECT product_id, price, discount_rate, tax_amount " +
"FROM financial_transactions " +
"WHERE amount > 1000.00"
);
while (results.next()) {
int productId = results.getInt("product_id");
BigDecimal price = results.getBigDecimal("price");
BigDecimal discountRate = results.getBigDecimal("discount_rate");
BigDecimal taxAmount = results.getBigDecimal("tax_amount");
// Calculate final amount with precision
BigDecimal discountAmount = price.multiply(discountRate);
BigDecimal finalAmount = price.subtract(discountAmount).add(taxAmount);
System.out.printf("Product %d: $%s (discount: $%s, tax: $%s, final: $%s)%n",
productId, price, discountAmount, taxAmount, finalAmount);
}Retrieve date, time, and timestamp data with proper Java temporal type support.
public class FlinkResultSet extends BaseResultSet {
public Date getDate(int columnIndex) throws SQLException;
public Date getDate(String columnLabel) throws SQLException;
public Time getTime(int columnIndex) throws SQLException;
public Time getTime(String columnLabel) throws SQLException;
public Timestamp getTimestamp(int columnIndex) throws SQLException;
public Timestamp getTimestamp(String columnLabel) throws SQLException;
}Usage Example:
ResultSet results = statement.executeQuery(
"SELECT event_id, event_date, start_time, created_timestamp " +
"FROM events " +
"WHERE event_date >= CURRENT_DATE"
);
while (results.next()) {
int eventId = results.getInt("event_id");
Date eventDate = results.getDate("event_date");
Time startTime = results.getTime("start_time");
Timestamp createdTimestamp = results.getTimestamp("created_timestamp");
System.out.printf("Event %d on %s at %s (created: %s)%n",
eventId, eventDate, startTime, createdTimestamp);
}Retrieve any column as a generic Object for dynamic processing or complex data types.
public class FlinkResultSet extends BaseResultSet {
public Object getObject(int columnIndex) throws SQLException;
public Object getObject(String columnLabel) throws SQLException;
}Usage Example:
ResultSet results = statement.executeQuery(
"SELECT id, data_column, map_column " +
"FROM mixed_data_table"
);
while (results.next()) {
int id = results.getInt("id");
Object data = results.getObject("data_column");
Object mapData = results.getObject("map_column"); // Returns Map for Flink MAP type
System.out.printf("ID %d: data type=%s, map type=%s%n",
id, data.getClass().getSimpleName(), mapData.getClass().getSimpleName());
// Handle Map data
if (mapData instanceof Map) {
Map<?, ?> map = (Map<?, ?>) mapData;
System.out.println("Map contents: " + map);
}
}Proper null value detection and handling for all data types.
public class FlinkResultSet extends BaseResultSet {
public boolean wasNull() throws SQLException;
}Usage Example:
ResultSet results = statement.executeQuery(
"SELECT customer_id, email, phone_number " +
"FROM customers"
);
while (results.next()) {
int customerId = results.getInt("customer_id");
String email = results.getString("email");
boolean emailWasNull = results.wasNull();
String phoneNumber = results.getString("phone_number");
boolean phoneWasNull = results.wasNull();
System.out.printf("Customer %d: email=%s, phone=%s%n",
customerId,
emailWasNull ? "NULL" : email,
phoneWasNull ? "NULL" : phoneNumber);
}Access comprehensive metadata about result set structure and column information.
public class FlinkResultSetMetaData implements ResultSetMetaData {
public FlinkResultSetMetaData(List<String> columnNames, List<DataType> columnTypes);
// Column information
public int getColumnCount() throws SQLException;
public String getColumnName(int column) throws SQLException;
public String getColumnLabel(int column) throws SQLException;
public int getColumnType(int column) throws SQLException;
public String getColumnTypeName(int column) throws SQLException;
public String getColumnClassName(int column) throws SQLException;
// Numeric metadata
public int getPrecision(int column) throws SQLException;
public int getScale(int column) throws SQLException;
public int getColumnDisplaySize(int column) throws SQLException;
// Column properties
public int isNullable(int column) throws SQLException;
public boolean isSigned(int column) throws SQLException;
public boolean isAutoIncrement(int column) throws SQLException; // Always false
public boolean isCaseSensitive(int column) throws SQLException; // Always false
public boolean isSearchable(int column) throws SQLException; // Always true
public boolean isCurrency(int column) throws SQLException; // Always false
public boolean isReadOnly(int column) throws SQLException; // Always true
public boolean isWritable(int column) throws SQLException; // Always false
public boolean isDefinitelyWritable(int column) throws SQLException; // Always false
}Usage Example:
ResultSet results = statement.executeQuery("SELECT * FROM sales_data");
ResultSetMetaData metadata = results.getMetaData();
int columnCount = metadata.getColumnCount();
System.out.println("Result set has " + columnCount + " columns:");
for (int i = 1; i <= columnCount; i++) {
String columnName = metadata.getColumnName(i);
String columnType = metadata.getColumnTypeName(i);
int precision = metadata.getPrecision(i);
int scale = metadata.getScale(i);
boolean nullable = metadata.isNullable(i) == ResultSetMetaData.columnNullable;
System.out.printf("Column %d: %s (%s) precision=%d scale=%d nullable=%b%n",
i, columnName, columnType, precision, scale, nullable);
}
// Process data
while (results.next()) {
for (int i = 1; i <= columnCount; i++) {
Object value = results.getObject(i);
boolean wasNull = results.wasNull();
System.out.printf("%s=%s ", metadata.getColumnName(i),
wasNull ? "NULL" : value.toString());
}
System.out.println();
}Convert column labels to indices for efficient repeated access.
public class FlinkResultSet extends BaseResultSet {
public int findColumn(String columnLabel) throws SQLException;
}Usage Example:
ResultSet results = statement.executeQuery(
"SELECT customer_id, first_name, last_name, email " +
"FROM customers ORDER BY last_name"
);
// Get column indices once for efficiency
int customerIdIndex = results.findColumn("customer_id");
int firstNameIndex = results.findColumn("first_name");
int lastNameIndex = results.findColumn("last_name");
int emailIndex = results.findColumn("email");
while (results.next()) {
// Use indices for faster access
int customerId = results.getInt(customerIdIndex);
String firstName = results.getString(firstNameIndex);
String lastName = results.getString(lastNameIndex);
String email = results.getString(emailIndex);
System.out.printf("Customer %d: %s %s <%s>%n",
customerId, firstName, lastName, email);
}Understanding how Flink data types map to Java types:
Primitive Types:
BOOLEAN → booleanTINYINT → byteSMALLINT → shortINT → intBIGINT → longFLOAT → floatDOUBLE → doubleText Types:
CHAR, VARCHAR, STRING → StringBINARY, VARBINARY → byte[]Numeric Types:
DECIMAL(p,s) → BigDecimalTemporal Types:
DATE → java.sql.DateTIME → java.sql.TimeTIMESTAMP → java.sql.TimestampComplex Types:
MAP<K,V> → java.util.Map<K,V>ARRAY<T> → Not supported (throws SQLFeatureNotSupportedException)ROW(...) → Object representationThe following JDBC ResultSet features are not supported:
getArray() methods throw SQLFeatureNotSupportedExceptiongetAsciiStream(), getBinaryStream(), getCharacterStream()updateXXX() methodsprevious(), first(), last(), absolute(), relative()insertRow(), updateRow(), deleteRow(), refreshRow()Column Access Patterns:
// Efficient: Access columns in order
while (results.next()) {
int col1 = results.getInt(1); // Good
String col2 = results.getString(2); // Good
double col3 = results.getDouble(3); // Good
}
// Less efficient: Random column access
while (results.next()) {
double col3 = results.getDouble(3); // OK but not optimal
int col1 = results.getInt(1); // OK but not optimal
String col2 = results.getString(2); // OK but not optimal
}Resource Management:
// Always close result sets
try (Statement stmt = connection.createStatement();
ResultSet results = stmt.executeQuery("SELECT * FROM large_table")) {
while (results.next()) {
// Process rows efficiently
}
// Automatic cleanup with try-with-resources
}Common result set processing errors:
Column Not Found:
try {
String value = results.getString("nonexistent_column");
} catch (SQLException e) {
System.err.println("Column not found: " + e.getMessage());
}Type Conversion Error:
try {
int value = results.getInt("text_column"); // Contains "abc"
} catch (SQLException e) {
System.err.println("Type conversion failed: " + e.getMessage());
}Result Set Closed:
results.close();
try {
results.next();
} catch (SQLException e) {
System.err.println("Result set is closed: " + e.getMessage());
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-jdbc-driver-bundle