A bundled JDBC driver for Apache Flink SQL that packages the JDBC driver implementation along with its dependencies into a single JAR file
—
Comprehensive SQL statement execution capabilities supporting both DDL and DML operations against Flink SQL Gateway, with special handling for batch mode queries and INSERT statements that return job IDs.
FlinkStatement provides SQL execution capabilities with proper resource management and cancellation support.
public class FlinkStatement extends BaseStatement {
public FlinkStatement(FlinkConnection connection);
// Statement lifecycle
public void close() throws SQLException;
public boolean isClosed() throws SQLException;
public void cancel() throws SQLException;
public Connection getConnection() throws SQLException;
// Warning management (limited support)
public SQLWarning getWarnings() throws SQLException; // Returns null
public void clearWarnings() throws SQLException; // No-op
}Important: FlinkStatement is NOT thread-safe. Use separate statements for each thread.
Usage Example:
Connection connection = DriverManager.getConnection("jdbc:flink://localhost:8083");
Statement statement = connection.createStatement();
try {
// Use statement for queries
ResultSet results = statement.executeQuery("SELECT * FROM my_table");
// Process results...
} finally {
statement.close(); // Always close statements
}Execute SELECT queries and retrieve result sets for data analysis and reporting.
public class FlinkStatement extends BaseStatement {
public ResultSet executeQuery(String sql) throws SQLException;
public ResultSet getResultSet() throws SQLException;
public boolean getMoreResults() throws SQLException; // Not supported for multiple results
}Usage Example:
Statement statement = connection.createStatement();
// Execute a SELECT query
String query = "SELECT customer_id, order_total, order_date " +
"FROM orders " +
"WHERE order_date >= '2024-01-01' " +
"ORDER BY order_total DESC " +
"LIMIT 100";
ResultSet results = statement.executeQuery(query);
// Process results
while (results.next()) {
int customerId = results.getInt("customer_id");
double orderTotal = results.getDouble("order_total");
Date orderDate = results.getDate("order_date");
System.out.printf("Customer %d: $%.2f on %s%n",
customerId, orderTotal, orderDate);
}
results.close();Execute any SQL statement (DDL, DML, or queries) with automatic result handling.
public class FlinkStatement extends BaseStatement {
public boolean execute(String sql) throws SQLException;
public int getUpdateCount() throws SQLException;
}The execute() method returns:
true if the statement produces a result set (SELECT queries or INSERT with job ID)false if the statement produces an update count or no results (most DDL/DML)Usage Examples:
DDL Operations:
Statement statement = connection.createStatement();
// Create a table
boolean hasResults = statement.execute(
"CREATE TABLE sales (" +
" id BIGINT PRIMARY KEY," +
" product_name STRING," +
" amount DECIMAL(10,2)," +
" sale_time TIMESTAMP(3)" +
")"
);
// hasResults = false for DDL
// Create a view
statement.execute(
"CREATE VIEW high_value_sales AS " +
"SELECT * FROM sales WHERE amount > 1000.00"
);DML Operations:
// INSERT statement - returns job ID as result set in Flink
boolean hasResults = statement.execute(
"INSERT INTO sales " +
"SELECT id, product_name, amount, sale_time " +
"FROM staging_sales " +
"WHERE processed = false"
);
if (hasResults) {
// INSERT statements return job ID as result set
ResultSet jobResult = statement.getResultSet();
if (jobResult.next()) {
String jobId = jobResult.getString(1);
System.out.println("Started job: " + jobId);
}
jobResult.close();
}Query Operations:
// SELECT query
boolean hasResults = statement.execute("SELECT COUNT(*) FROM sales");
if (hasResults) {
ResultSet results = statement.getResultSet();
if (results.next()) {
long count = results.getLong(1);
System.out.println("Total sales records: " + count);
}
results.close();
}Control statement behavior and resource usage through standard JDBC mechanisms.
// Standard JDBC statement configuration (inherited from BaseStatement)
public void setMaxRows(int max) throws SQLException;
public int getMaxRows() throws SQLException;
public void setQueryTimeout(int seconds) throws SQLException;
public int getQueryTimeout() throws SQLException;
public void setFetchSize(int rows) throws SQLException;
public int getFetchSize() throws SQLException;
public void setEscapeProcessing(boolean enable) throws SQLException;Usage Example:
Statement statement = connection.createStatement();
// Configure statement limits
statement.setQueryTimeout(300); // 5 minute timeout
statement.setMaxRows(10000); // Limit to 10K rows
statement.setFetchSize(1000); // Fetch 1000 rows at a time
ResultSet results = statement.executeQuery(
"SELECT * FROM large_table WHERE category = 'electronics'"
);Execute multiple SQL statements efficiently using JDBC batch processing.
public void addBatch(String sql) throws SQLException;
public void clearBatch() throws SQLException;
public int[] executeBatch() throws SQLException;Usage Example:
Statement statement = connection.createStatement();
// Add multiple statements to batch
statement.addBatch("INSERT INTO products VALUES (1, 'Laptop', 999.99)");
statement.addBatch("INSERT INTO products VALUES (2, 'Mouse', 29.99)");
statement.addBatch("INSERT INTO products VALUES (3, 'Keyboard', 79.99)");
// Execute all statements in batch
int[] updateCounts = statement.executeBatch();
for (int i = 0; i < updateCounts.length; i++) {
System.out.println("Statement " + i + " affected " + updateCounts[i] + " rows");
}
statement.clearBatch(); // Clear for next batchDDL (Data Definition Language):
CREATE TABLE - Create new tablesDROP TABLE - Delete tablesALTER TABLE - Modify table structureCREATE VIEW - Create viewsDROP VIEW - Delete viewsDML (Data Manipulation Language):
INSERT - Insert data (returns job ID as result set)UPDATE - Update existing dataDELETE - Delete dataUPSERT - Insert or update dataQueries:
SELECT - Data retrieval with full SQL supportExample Complex Query:
String analyticsQuery =
"SELECT " +
" DATE_FORMAT(order_date, 'yyyy-MM') as month, " +
" product_category, " +
" COUNT(*) as order_count, " +
" SUM(order_total) as total_revenue, " +
" AVG(order_total) as avg_order_value " +
"FROM orders o " +
"JOIN products p ON o.product_id = p.id " +
"WHERE order_date >= DATE '2024-01-01' " +
"GROUP BY DATE_FORMAT(order_date, 'yyyy-MM'), product_category " +
"HAVING COUNT(*) > 100 " +
"ORDER BY month DESC, total_revenue DESC";
ResultSet results = statement.executeQuery(analyticsQuery);The following JDBC statement features are not supported:
getGeneratedKeys(), execute methods with generated key parametersgetMoreResults() with multiple result setssetCursorName(), getCursorName()Statement execution may encounter various error conditions:
SQL Syntax Errors:
try {
statement.executeQuery("SELCT * FROM table"); // Typo in SELECT
} catch (SQLException e) {
System.err.println("SQL syntax error: " + e.getMessage());
}Table/Column Not Found:
try {
statement.executeQuery("SELECT nonexistent_column FROM my_table");
} catch (SQLException e) {
System.err.println("Column not found: " + e.getMessage());
}Connection Issues:
try {
statement.executeQuery("SELECT * FROM remote_table");
} catch (SQLException e) {
System.err.println("Execution failed: " + e.getMessage());
// May need to retry or reconnect
}Statement Closed:
statement.close();
try {
statement.executeQuery("SELECT * FROM my_table");
} catch (SQLException e) {
System.err.println("Statement is closed: " + e.getMessage());
}Batch Mode Only: The driver only supports batch mode queries. Streaming queries may produce unexpected results.
Connection Reuse: Reuse connections when possible, but remember that connections are not thread-safe.
Resource Cleanup: Always close ResultSets and Statements to prevent resource leaks:
Statement statement = null;
ResultSet results = null;
try {
statement = connection.createStatement();
results = statement.executeQuery("SELECT * FROM my_table");
// Process results...
} finally {
if (results != null) {
try { results.close(); } catch (SQLException e) { /* log */ }
}
if (statement != null) {
try { statement.close(); } catch (SQLException e) { /* log */ }
}
}Or using try-with-resources:
try (Statement statement = connection.createStatement();
ResultSet results = statement.executeQuery("SELECT * FROM my_table")) {
// Process results - automatic cleanup
while (results.next()) {
// Process each row
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-jdbc-driver-bundle