or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connection-management.mddatabase-metadata.mdindex.mdresult-set-processing.mdstatement-execution.md

statement-execution.mddocs/

0

# SQL Statement Execution

1

2

Comprehensive SQL statement execution capabilities supporting both DDL and DML operations against Flink SQL Gateway, with special handling for batch mode queries and INSERT statements that return job IDs.

3

4

## Capabilities

5

6

### Statement Creation and Lifecycle

7

8

FlinkStatement provides SQL execution capabilities with proper resource management and cancellation support.

9

10

```java { .api }

11

public class FlinkStatement extends BaseStatement {

12

public FlinkStatement(FlinkConnection connection);

13

14

// Statement lifecycle

15

public void close() throws SQLException;

16

public boolean isClosed() throws SQLException;

17

public void cancel() throws SQLException;

18

public Connection getConnection() throws SQLException;

19

20

// Warning management (limited support)

21

public SQLWarning getWarnings() throws SQLException; // Returns null

22

public void clearWarnings() throws SQLException; // No-op

23

}

24

```

25

26

**Important**: FlinkStatement is NOT thread-safe. Use separate statements for each thread.

27

28

**Usage Example:**

29

```java

30

Connection connection = DriverManager.getConnection("jdbc:flink://localhost:8083");

31

Statement statement = connection.createStatement();

32

33

try {

34

// Use statement for queries

35

ResultSet results = statement.executeQuery("SELECT * FROM my_table");

36

// Process results...

37

} finally {

38

statement.close(); // Always close statements

39

}

40

```

41

42

### Query Execution

43

44

Execute SELECT queries and retrieve result sets for data analysis and reporting.

45

46

```java { .api }

47

public class FlinkStatement extends BaseStatement {

48

public ResultSet executeQuery(String sql) throws SQLException;

49

public ResultSet getResultSet() throws SQLException;

50

public boolean getMoreResults() throws SQLException; // Not supported for multiple results

51

}

52

```

53

54

**Usage Example:**

55

```java

56

Statement statement = connection.createStatement();

57

58

// Execute a SELECT query

59

String query = "SELECT customer_id, order_total, order_date " +

60

"FROM orders " +

61

"WHERE order_date >= '2024-01-01' " +

62

"ORDER BY order_total DESC " +

63

"LIMIT 100";

64

65

ResultSet results = statement.executeQuery(query);

66

67

// Process results

68

while (results.next()) {

69

int customerId = results.getInt("customer_id");

70

double orderTotal = results.getDouble("order_total");

71

Date orderDate = results.getDate("order_date");

72

73

System.out.printf("Customer %d: $%.2f on %s%n",

74

customerId, orderTotal, orderDate);

75

}

76

77

results.close();

78

```

79

80

### General Statement Execution

81

82

Execute any SQL statement (DDL, DML, or queries) with automatic result handling.

83

84

```java { .api }

85

public class FlinkStatement extends BaseStatement {

86

public boolean execute(String sql) throws SQLException;

87

public int getUpdateCount() throws SQLException;

88

}

89

```

90

91

The `execute()` method returns:

92

- `true` if the statement produces a result set (SELECT queries or INSERT with job ID)

93

- `false` if the statement produces an update count or no results (most DDL/DML)

94

95

**Usage Examples:**

96

97

**DDL Operations:**

98

```java

99

Statement statement = connection.createStatement();

100

101

// Create a table

102

boolean hasResults = statement.execute(

103

"CREATE TABLE sales (" +

104

" id BIGINT PRIMARY KEY," +

105

" product_name STRING," +

106

" amount DECIMAL(10,2)," +

107

" sale_time TIMESTAMP(3)" +

108

")"

109

);

110

// hasResults = false for DDL

111

112

// Create a view

113

statement.execute(

114

"CREATE VIEW high_value_sales AS " +

115

"SELECT * FROM sales WHERE amount > 1000.00"

116

);

117

```

118

119

**DML Operations:**

120

```java

121

// INSERT statement - returns job ID as result set in Flink

122

boolean hasResults = statement.execute(

123

"INSERT INTO sales " +

124

"SELECT id, product_name, amount, sale_time " +

125

"FROM staging_sales " +

126

"WHERE processed = false"

127

);

128

129

if (hasResults) {

130

// INSERT statements return job ID as result set

131

ResultSet jobResult = statement.getResultSet();

132

if (jobResult.next()) {

133

String jobId = jobResult.getString(1);

134

System.out.println("Started job: " + jobId);

135

}

136

jobResult.close();

137

}

138

```

139

140

**Query Operations:**

141

```java

142

// SELECT query

143

boolean hasResults = statement.execute("SELECT COUNT(*) FROM sales");

144

145

if (hasResults) {

146

ResultSet results = statement.getResultSet();

147

if (results.next()) {

148

long count = results.getLong(1);

149

System.out.println("Total sales records: " + count);

150

}

151

results.close();

152

}

153

```

154

155

### Statement Configuration and Limits

156

157

Control statement behavior and resource usage through standard JDBC mechanisms.

158

159

```java { .api }

160

// Standard JDBC statement configuration (inherited from BaseStatement)

161

public void setMaxRows(int max) throws SQLException;

162

public int getMaxRows() throws SQLException;

163

public void setQueryTimeout(int seconds) throws SQLException;

164

public int getQueryTimeout() throws SQLException;

165

public void setFetchSize(int rows) throws SQLException;

166

public int getFetchSize() throws SQLException;

167

public void setEscapeProcessing(boolean enable) throws SQLException;

168

```

169

170

**Usage Example:**

171

```java

172

Statement statement = connection.createStatement();

173

174

// Configure statement limits

175

statement.setQueryTimeout(300); // 5 minute timeout

176

statement.setMaxRows(10000); // Limit to 10K rows

177

statement.setFetchSize(1000); // Fetch 1000 rows at a time

178

179

ResultSet results = statement.executeQuery(

180

"SELECT * FROM large_table WHERE category = 'electronics'"

181

);

182

```

183

184

### Batch Operations

185

186

Execute multiple SQL statements efficiently using JDBC batch processing.

187

188

```java { .api }

189

public void addBatch(String sql) throws SQLException;

190

public void clearBatch() throws SQLException;

191

public int[] executeBatch() throws SQLException;

192

```

193

194

**Usage Example:**

195

```java

196

Statement statement = connection.createStatement();

197

198

// Add multiple statements to batch

199

statement.addBatch("INSERT INTO products VALUES (1, 'Laptop', 999.99)");

200

statement.addBatch("INSERT INTO products VALUES (2, 'Mouse', 29.99)");

201

statement.addBatch("INSERT INTO products VALUES (3, 'Keyboard', 79.99)");

202

203

// Execute all statements in batch

204

int[] updateCounts = statement.executeBatch();

205

206

for (int i = 0; i < updateCounts.length; i++) {

207

System.out.println("Statement " + i + " affected " + updateCounts[i] + " rows");

208

}

209

210

statement.clearBatch(); // Clear for next batch

211

```

212

213

### Supported SQL Operations

214

215

**DDL (Data Definition Language):**

216

- `CREATE TABLE` - Create new tables

217

- `DROP TABLE` - Delete tables

218

- `ALTER TABLE` - Modify table structure

219

- `CREATE VIEW` - Create views

220

- `DROP VIEW` - Delete views

221

- Catalog and schema operations

222

223

**DML (Data Manipulation Language):**

224

- `INSERT` - Insert data (returns job ID as result set)

225

- `UPDATE` - Update existing data

226

- `DELETE` - Delete data

227

- `UPSERT` - Insert or update data

228

229

**Queries:**

230

- `SELECT` - Data retrieval with full SQL support

231

- Joins, aggregations, window functions

232

- Complex analytical queries

233

234

**Example Complex Query:**

235

```java

236

String analyticsQuery =

237

"SELECT " +

238

" DATE_FORMAT(order_date, 'yyyy-MM') as month, " +

239

" product_category, " +

240

" COUNT(*) as order_count, " +

241

" SUM(order_total) as total_revenue, " +

242

" AVG(order_total) as avg_order_value " +

243

"FROM orders o " +

244

"JOIN products p ON o.product_id = p.id " +

245

"WHERE order_date >= DATE '2024-01-01' " +

246

"GROUP BY DATE_FORMAT(order_date, 'yyyy-MM'), product_category " +

247

"HAVING COUNT(*) > 100 " +

248

"ORDER BY month DESC, total_revenue DESC";

249

250

ResultSet results = statement.executeQuery(analyticsQuery);

251

```

252

253

### Unsupported Statement Features

254

255

The following JDBC statement features are not supported:

256

257

- **Generated Keys**: `getGeneratedKeys()`, execute methods with generated key parameters

258

- **Multiple Open Results**: `getMoreResults()` with multiple result sets

259

- **Result Set Type/Concurrency**: Only default forward-only, read-only result sets

260

- **Cursor Names**: `setCursorName()`, `getCursorName()`

261

262

### Error Handling

263

264

Statement execution may encounter various error conditions:

265

266

**SQL Syntax Errors:**

267

```java

268

try {

269

statement.executeQuery("SELCT * FROM table"); // Typo in SELECT

270

} catch (SQLException e) {

271

System.err.println("SQL syntax error: " + e.getMessage());

272

}

273

```

274

275

**Table/Column Not Found:**

276

```java

277

try {

278

statement.executeQuery("SELECT nonexistent_column FROM my_table");

279

} catch (SQLException e) {

280

System.err.println("Column not found: " + e.getMessage());

281

}

282

```

283

284

**Connection Issues:**

285

```java

286

try {

287

statement.executeQuery("SELECT * FROM remote_table");

288

} catch (SQLException e) {

289

System.err.println("Execution failed: " + e.getMessage());

290

// May need to retry or reconnect

291

}

292

```

293

294

**Statement Closed:**

295

```java

296

statement.close();

297

try {

298

statement.executeQuery("SELECT * FROM my_table");

299

} catch (SQLException e) {

300

System.err.println("Statement is closed: " + e.getMessage());

301

}

302

```

303

304

### Performance Considerations

305

306

**Batch Mode Only**: The driver only supports batch mode queries. Streaming queries may produce unexpected results.

307

308

**Connection Reuse**: Reuse connections when possible, but remember that connections are not thread-safe.

309

310

**Resource Cleanup**: Always close ResultSets and Statements to prevent resource leaks:

311

312

```java

313

Statement statement = null;

314

ResultSet results = null;

315

try {

316

statement = connection.createStatement();

317

results = statement.executeQuery("SELECT * FROM my_table");

318

// Process results...

319

} finally {

320

if (results != null) {

321

try { results.close(); } catch (SQLException e) { /* log */ }

322

}

323

if (statement != null) {

324

try { statement.close(); } catch (SQLException e) { /* log */ }

325

}

326

}

327

```

328

329

Or using try-with-resources:

330

331

```java

332

try (Statement statement = connection.createStatement();

333

ResultSet results = statement.executeQuery("SELECT * FROM my_table")) {

334

335

// Process results - automatic cleanup

336

while (results.next()) {

337

// Process each row

338

}

339

}

340

```