0
# SQL Statement Execution
1
2
Comprehensive SQL statement execution capabilities supporting both DDL and DML operations against Flink SQL Gateway, with special handling for batch mode queries and INSERT statements that return job IDs.
3
4
## Capabilities
5
6
### Statement Creation and Lifecycle
7
8
FlinkStatement provides SQL execution capabilities with proper resource management and cancellation support.
9
10
```java { .api }
11
public class FlinkStatement extends BaseStatement {
12
public FlinkStatement(FlinkConnection connection);
13
14
// Statement lifecycle
15
public void close() throws SQLException;
16
public boolean isClosed() throws SQLException;
17
public void cancel() throws SQLException;
18
public Connection getConnection() throws SQLException;
19
20
// Warning management (limited support)
21
public SQLWarning getWarnings() throws SQLException; // Returns null
22
public void clearWarnings() throws SQLException; // No-op
23
}
24
```
25
26
**Important**: FlinkStatement is NOT thread-safe. Use separate statements for each thread.
27
28
**Usage Example:**
29
```java
30
Connection connection = DriverManager.getConnection("jdbc:flink://localhost:8083");
31
Statement statement = connection.createStatement();
32
33
try {
34
// Use statement for queries
35
ResultSet results = statement.executeQuery("SELECT * FROM my_table");
36
// Process results...
37
} finally {
38
statement.close(); // Always close statements
39
}
40
```
41
42
### Query Execution
43
44
Execute SELECT queries and retrieve result sets for data analysis and reporting.
45
46
```java { .api }
47
public class FlinkStatement extends BaseStatement {
48
public ResultSet executeQuery(String sql) throws SQLException;
49
public ResultSet getResultSet() throws SQLException;
50
public boolean getMoreResults() throws SQLException; // Not supported for multiple results
51
}
52
```
53
54
**Usage Example:**
55
```java
56
Statement statement = connection.createStatement();
57
58
// Execute a SELECT query
59
String query = "SELECT customer_id, order_total, order_date " +
60
"FROM orders " +
61
"WHERE order_date >= '2024-01-01' " +
62
"ORDER BY order_total DESC " +
63
"LIMIT 100";
64
65
ResultSet results = statement.executeQuery(query);
66
67
// Process results
68
while (results.next()) {
69
int customerId = results.getInt("customer_id");
70
double orderTotal = results.getDouble("order_total");
71
Date orderDate = results.getDate("order_date");
72
73
System.out.printf("Customer %d: $%.2f on %s%n",
74
customerId, orderTotal, orderDate);
75
}
76
77
results.close();
78
```
79
80
### General Statement Execution
81
82
Execute any SQL statement (DDL, DML, or queries) with automatic result handling.
83
84
```java { .api }
85
public class FlinkStatement extends BaseStatement {
86
public boolean execute(String sql) throws SQLException;
87
public int getUpdateCount() throws SQLException;
88
}
89
```
90
91
The `execute()` method returns:
92
- `true` if the statement produces a result set (SELECT queries or INSERT with job ID)
93
- `false` if the statement produces an update count or no results (most DDL/DML)
94
95
**Usage Examples:**
96
97
**DDL Operations:**
98
```java
99
Statement statement = connection.createStatement();
100
101
// Create a table
102
boolean hasResults = statement.execute(
103
"CREATE TABLE sales (" +
104
" id BIGINT PRIMARY KEY," +
105
" product_name STRING," +
106
" amount DECIMAL(10,2)," +
107
" sale_time TIMESTAMP(3)" +
108
")"
109
);
110
// hasResults = false for DDL
111
112
// Create a view
113
statement.execute(
114
"CREATE VIEW high_value_sales AS " +
115
"SELECT * FROM sales WHERE amount > 1000.00"
116
);
117
```
118
119
**DML Operations:**
120
```java
121
// INSERT statement - returns job ID as result set in Flink
122
boolean hasResults = statement.execute(
123
"INSERT INTO sales " +
124
"SELECT id, product_name, amount, sale_time " +
125
"FROM staging_sales " +
126
"WHERE processed = false"
127
);
128
129
if (hasResults) {
130
// INSERT statements return job ID as result set
131
ResultSet jobResult = statement.getResultSet();
132
if (jobResult.next()) {
133
String jobId = jobResult.getString(1);
134
System.out.println("Started job: " + jobId);
135
}
136
jobResult.close();
137
}
138
```
139
140
**Query Operations:**
141
```java
142
// SELECT query
143
boolean hasResults = statement.execute("SELECT COUNT(*) FROM sales");
144
145
if (hasResults) {
146
ResultSet results = statement.getResultSet();
147
if (results.next()) {
148
long count = results.getLong(1);
149
System.out.println("Total sales records: " + count);
150
}
151
results.close();
152
}
153
```
154
155
### Statement Configuration and Limits
156
157
Control statement behavior and resource usage through standard JDBC mechanisms.
158
159
```java { .api }
160
// Standard JDBC statement configuration (inherited from BaseStatement)
161
public void setMaxRows(int max) throws SQLException;
162
public int getMaxRows() throws SQLException;
163
public void setQueryTimeout(int seconds) throws SQLException;
164
public int getQueryTimeout() throws SQLException;
165
public void setFetchSize(int rows) throws SQLException;
166
public int getFetchSize() throws SQLException;
167
public void setEscapeProcessing(boolean enable) throws SQLException;
168
```
169
170
**Usage Example:**
171
```java
172
Statement statement = connection.createStatement();
173
174
// Configure statement limits
175
statement.setQueryTimeout(300); // 5 minute timeout
176
statement.setMaxRows(10000); // Limit to 10K rows
177
statement.setFetchSize(1000); // Fetch 1000 rows at a time
178
179
ResultSet results = statement.executeQuery(
180
"SELECT * FROM large_table WHERE category = 'electronics'"
181
);
182
```
183
184
### Batch Operations
185
186
Execute multiple SQL statements efficiently using JDBC batch processing.
187
188
```java { .api }
189
public void addBatch(String sql) throws SQLException;
190
public void clearBatch() throws SQLException;
191
public int[] executeBatch() throws SQLException;
192
```
193
194
**Usage Example:**
195
```java
196
Statement statement = connection.createStatement();
197
198
// Add multiple statements to batch
199
statement.addBatch("INSERT INTO products VALUES (1, 'Laptop', 999.99)");
200
statement.addBatch("INSERT INTO products VALUES (2, 'Mouse', 29.99)");
201
statement.addBatch("INSERT INTO products VALUES (3, 'Keyboard', 79.99)");
202
203
// Execute all statements in batch
204
int[] updateCounts = statement.executeBatch();
205
206
for (int i = 0; i < updateCounts.length; i++) {
207
System.out.println("Statement " + i + " affected " + updateCounts[i] + " rows");
208
}
209
210
statement.clearBatch(); // Clear for next batch
211
```
212
213
### Supported SQL Operations
214
215
**DDL (Data Definition Language):**
216
- `CREATE TABLE` - Create new tables
217
- `DROP TABLE` - Delete tables
218
- `ALTER TABLE` - Modify table structure
219
- `CREATE VIEW` - Create views
220
- `DROP VIEW` - Delete views
221
- Catalog and schema operations
222
223
**DML (Data Manipulation Language):**
224
- `INSERT` - Insert data (returns job ID as result set)
225
- `UPDATE` - Update existing data
226
- `DELETE` - Delete data
227
- `UPSERT` - Insert or update data
228
229
**Queries:**
230
- `SELECT` - Data retrieval with full SQL support
231
- Joins, aggregations, window functions
232
- Complex analytical queries
233
234
**Example Complex Query:**
235
```java
236
String analyticsQuery =
237
"SELECT " +
238
" DATE_FORMAT(order_date, 'yyyy-MM') as month, " +
239
" product_category, " +
240
" COUNT(*) as order_count, " +
241
" SUM(order_total) as total_revenue, " +
242
" AVG(order_total) as avg_order_value " +
243
"FROM orders o " +
244
"JOIN products p ON o.product_id = p.id " +
245
"WHERE order_date >= DATE '2024-01-01' " +
246
"GROUP BY DATE_FORMAT(order_date, 'yyyy-MM'), product_category " +
247
"HAVING COUNT(*) > 100 " +
248
"ORDER BY month DESC, total_revenue DESC";
249
250
ResultSet results = statement.executeQuery(analyticsQuery);
251
```
252
253
### Unsupported Statement Features
254
255
The following JDBC statement features are not supported:
256
257
- **Generated Keys**: `getGeneratedKeys()`, execute methods with generated key parameters
258
- **Multiple Open Results**: `getMoreResults()` with multiple result sets
259
- **Result Set Type/Concurrency**: Only default forward-only, read-only result sets
260
- **Cursor Names**: `setCursorName()`, `getCursorName()`
261
262
### Error Handling
263
264
Statement execution may encounter various error conditions:
265
266
**SQL Syntax Errors:**
267
```java
268
try {
269
statement.executeQuery("SELCT * FROM table"); // Typo in SELECT
270
} catch (SQLException e) {
271
System.err.println("SQL syntax error: " + e.getMessage());
272
}
273
```
274
275
**Table/Column Not Found:**
276
```java
277
try {
278
statement.executeQuery("SELECT nonexistent_column FROM my_table");
279
} catch (SQLException e) {
280
System.err.println("Column not found: " + e.getMessage());
281
}
282
```
283
284
**Connection Issues:**
285
```java
286
try {
287
statement.executeQuery("SELECT * FROM remote_table");
288
} catch (SQLException e) {
289
System.err.println("Execution failed: " + e.getMessage());
290
// May need to retry or reconnect
291
}
292
```
293
294
**Statement Closed:**
295
```java
296
statement.close();
297
try {
298
statement.executeQuery("SELECT * FROM my_table");
299
} catch (SQLException e) {
300
System.err.println("Statement is closed: " + e.getMessage());
301
}
302
```
303
304
### Performance Considerations
305
306
**Batch Mode Only**: The driver only supports batch mode queries. Streaming queries may produce unexpected results.
307
308
**Connection Reuse**: Reuse connections when possible, but remember that connections are not thread-safe.
309
310
**Resource Cleanup**: Always close ResultSets and Statements to prevent resource leaks:
311
312
```java
313
Statement statement = null;
314
ResultSet results = null;
315
try {
316
statement = connection.createStatement();
317
results = statement.executeQuery("SELECT * FROM my_table");
318
// Process results...
319
} finally {
320
if (results != null) {
321
try { results.close(); } catch (SQLException e) { /* log */ }
322
}
323
if (statement != null) {
324
try { statement.close(); } catch (SQLException e) { /* log */ }
325
}
326
}
327
```
328
329
Or using try-with-resources:
330
331
```java
332
try (Statement statement = connection.createStatement();
333
ResultSet results = statement.executeQuery("SELECT * FROM my_table")) {
334
335
// Process results - automatic cleanup
336
while (results.next()) {
337
// Process each row
338
}
339
}
340
```