0
# JDBC Connectors
1
2
JDBC database connectivity for Flink batch processing, supporting both reading from and writing to relational databases with parallel processing capabilities.
3
4
## Capabilities
5
6
### JDBCInputFormat
7
8
Reads data from JDBC databases with support for parallel processing and parameterized queries.
9
10
```java { .api }
11
/**
12
* InputFormat for reading from JDBC databases in Flink
13
*/
14
public class JDBCInputFormat extends RichInputFormat<Row, InputSplit>
15
implements ResultTypeQueryable {
16
17
/**
18
* Default constructor for JDBCInputFormat
19
*/
20
public JDBCInputFormat();
21
22
/**
23
* Returns the type information for rows produced by this format
24
* @return RowTypeInfo describing the database schema
25
*/
26
public RowTypeInfo getProducedType();
27
28
/**
29
* Configures the input format with parameters
30
* @param parameters Configuration parameters
31
*/
32
public void configure(Configuration parameters);
33
34
/**
35
* Opens the input format for reading
36
*/
37
public void openInputFormat() throws IOException;
38
39
/**
40
* Closes the input format
41
*/
42
public void closeInputFormat() throws IOException;
43
44
/**
45
* Opens an individual input split
46
* @param inputSplit The input split to open
47
*/
48
public void open(InputSplit inputSplit) throws IOException;
49
50
/**
51
* Closes the current input split
52
*/
53
public void close() throws IOException;
54
55
/**
56
* Checks if the end of input has been reached
57
* @return true if no more records are available
58
*/
59
public boolean reachedEnd() throws IOException;
60
61
/**
62
* Reads the next record from the input
63
* @param row Row instance to reuse (may be null)
64
* @return The next record
65
*/
66
public Row nextRecord(Row row) throws IOException;
67
68
/**
69
* Gets statistics about the input data
70
* @param cachedStatistics Previously cached statistics
71
* @return Statistics about the input
72
*/
73
public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException;
74
75
/**
76
* Creates input splits for parallel processing
77
* @param minNumSplits Minimum number of splits to create
78
* @return Array of input splits
79
*/
80
public InputSplit[] createInputSplits(int minNumSplits) throws IOException;
81
82
/**
83
* Gets the input split assigner for distributing splits
84
* @param inputSplits The input splits to assign
85
* @return Input split assigner
86
*/
87
public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits);
88
89
/**
90
* Creates a builder for configuring JDBCInputFormat
91
* @return JDBCInputFormatBuilder instance
92
*/
93
public static JDBCInputFormatBuilder buildJDBCInputFormat();
94
}
95
```
96
97
### JDBCInputFormatBuilder
98
99
Builder pattern for configuring JDBC input operations with fluent API.
100
101
```java { .api }
102
/**
103
* Builder for configuring JDBCInputFormat instances
104
*/
105
public static class JDBCInputFormatBuilder {
106
107
/**
108
* Creates a new JDBCInputFormatBuilder
109
*/
110
public JDBCInputFormatBuilder();
111
112
/**
113
* Sets the database username
114
* @param username Database username
115
* @return This builder instance for chaining
116
*/
117
public JDBCInputFormatBuilder setUsername(String username);
118
119
/**
120
* Sets the database password
121
* @param password Database password
122
* @return This builder instance for chaining
123
*/
124
public JDBCInputFormatBuilder setPassword(String password);
125
126
/**
127
* Sets the JDBC driver class name
128
* @param drivername Fully qualified driver class name
129
* @return This builder instance for chaining
130
*/
131
public JDBCInputFormatBuilder setDrivername(String drivername);
132
133
/**
134
* Sets the database connection URL
135
* @param dbURL JDBC connection URL
136
* @return This builder instance for chaining
137
*/
138
public JDBCInputFormatBuilder setDBUrl(String dbURL);
139
140
/**
141
* Sets the SQL query to execute
142
* @param query SQL SELECT query
143
* @return This builder instance for chaining
144
*/
145
public JDBCInputFormatBuilder setQuery(String query);
146
147
/**
148
* Sets the ResultSet type for scrolling behavior
149
* @param resultSetType ResultSet type constant (e.g., ResultSet.TYPE_FORWARD_ONLY)
150
* @return This builder instance for chaining
151
*/
152
public JDBCInputFormatBuilder setResultSetType(int resultSetType);
153
154
/**
155
* Sets the ResultSet concurrency for update behavior
156
* @param resultSetConcurrency ResultSet concurrency constant
157
* @return This builder instance for chaining
158
*/
159
public JDBCInputFormatBuilder setResultSetConcurrency(int resultSetConcurrency);
160
161
/**
162
* Sets parameter provider for parallel queries with different parameters
163
* @param parameterValuesProvider Provider for query parameters
164
* @return This builder instance for chaining
165
*/
166
public JDBCInputFormatBuilder setParametersProvider(ParameterValuesProvider parameterValuesProvider);
167
168
/**
169
* Sets the row type information for the query results
170
* @param rowTypeInfo Type information describing the result schema
171
* @return This builder instance for chaining
172
*/
173
public JDBCInputFormatBuilder setRowTypeInfo(RowTypeInfo rowTypeInfo);
174
175
/**
176
* Creates the configured JDBCInputFormat
177
* @return Configured JDBCInputFormat instance
178
*/
179
public JDBCInputFormat finish();
180
}
181
```
182
183
**Usage Example:**
184
185
```java
186
import org.apache.flink.api.java.ExecutionEnvironment;
187
import org.apache.flink.api.java.DataSet;
188
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
189
import org.apache.flink.api.java.typeutils.RowTypeInfo;
190
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
191
import org.apache.flink.types.Row;
192
193
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
194
195
// Configure JDBC input
196
JDBCInputFormat jdbcInput = JDBCInputFormat.buildJDBCInputFormat()
197
.setDrivername("com.mysql.jdbc.Driver")
198
.setDBUrl("jdbc:mysql://localhost:3306/mydb")
199
.setUsername("user")
200
.setPassword("password")
201
.setQuery("SELECT id, name, age FROM users WHERE age > ?")
202
.setParametersProvider(new GenericParameterValuesProvider(new Serializable[][] {
203
{18}, {21}, {25} // Multiple parameter sets for parallel execution
204
}))
205
.setRowTypeInfo(new RowTypeInfo(
206
BasicTypeInfo.INT_TYPE_INFO, // id
207
BasicTypeInfo.STRING_TYPE_INFO, // name
208
BasicTypeInfo.INT_TYPE_INFO // age
209
))
210
.finish();
211
212
DataSet<Row> users = env.createInput(jdbcInput);
213
users.print();
214
```
215
216
### JDBCOutputFormat
217
218
Writes data to JDBC databases with batch processing support for high-performance inserts.
219
220
```java { .api }
221
/**
222
* OutputFormat for writing to JDBC databases in Flink
223
*/
224
public class JDBCOutputFormat extends RichOutputFormat<Row> {
225
226
/**
227
* Default constructor for JDBCOutputFormat
228
*/
229
public JDBCOutputFormat();
230
231
/**
232
* Array of SQL types for the columns (public for configuration)
233
*/
234
public int[] typesArray;
235
236
/**
237
* Configures the output format with parameters
238
* @param parameters Configuration parameters
239
*/
240
public void configure(Configuration parameters);
241
242
/**
243
* Opens the output format for writing
244
* @param taskNumber The number of the parallel task
245
* @param numTasks The total number of parallel tasks
246
*/
247
public void open(int taskNumber, int numTasks) throws IOException;
248
249
/**
250
* Writes a record to the output
251
* @param row The row to write
252
*/
253
public void writeRecord(Row row) throws IOException;
254
255
/**
256
* Closes the output format and flushes any remaining data
257
*/
258
public void close() throws IOException;
259
260
/**
261
* Creates a builder for configuring JDBCOutputFormat
262
* @return JDBCOutputFormatBuilder instance
263
*/
264
public static JDBCOutputFormatBuilder buildJDBCOutputFormat();
265
}
266
```
267
268
### JDBCOutputFormatBuilder
269
270
Builder for configuring JDBC output operations with support for batch processing.
271
272
```java { .api }
273
/**
274
* Builder for configuring JDBCOutputFormat instances
275
*/
276
public static class JDBCOutputFormatBuilder {
277
278
/**
279
* Creates a new JDBCOutputFormatBuilder (protected constructor)
280
*/
281
protected JDBCOutputFormatBuilder();
282
283
/**
284
* Sets the database username
285
* @param username Database username
286
* @return This builder instance for chaining
287
*/
288
public JDBCOutputFormatBuilder setUsername(String username);
289
290
/**
291
* Sets the database password
292
* @param password Database password
293
* @return This builder instance for chaining
294
*/
295
public JDBCOutputFormatBuilder setPassword(String password);
296
297
/**
298
* Sets the JDBC driver class name
299
* @param drivername Fully qualified driver class name
300
* @return This builder instance for chaining
301
*/
302
public JDBCOutputFormatBuilder setDrivername(String drivername);
303
304
/**
305
* Sets the database connection URL
306
* @param dbURL JDBC connection URL
307
* @return This builder instance for chaining
308
*/
309
public JDBCOutputFormatBuilder setDBUrl(String dbURL);
310
311
/**
312
* Sets the SQL query for writing (INSERT, UPDATE, or DELETE)
313
* @param query SQL modification query with parameter placeholders
314
* @return This builder instance for chaining
315
*/
316
public JDBCOutputFormatBuilder setQuery(String query);
317
318
/**
319
* Sets the batch interval for bulk operations
320
* @param batchInterval Number of records to batch before executing
321
* @return This builder instance for chaining
322
*/
323
public JDBCOutputFormatBuilder setBatchInterval(int batchInterval);
324
325
/**
326
* Sets the SQL types for the columns
327
* @param typesArray Array of java.sql.Types constants
328
* @return This builder instance for chaining
329
*/
330
public JDBCOutputFormatBuilder setSqlTypes(int[] typesArray);
331
332
/**
333
* Creates the configured JDBCOutputFormat
334
* @return Configured JDBCOutputFormat instance
335
*/
336
public JDBCOutputFormat finish();
337
}
338
```
339
340
**Usage Example:**
341
342
```java
343
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
344
import java.sql.Types;
345
346
// Configure JDBC output
347
JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
348
.setDrivername("com.mysql.jdbc.Driver")
349
.setDBUrl("jdbc:mysql://localhost:3306/mydb")
350
.setUsername("user")
351
.setPassword("password")
352
.setQuery("INSERT INTO users (name, age, email) VALUES (?, ?, ?)")
353
.setBatchInterval(1000) // Batch 1000 records at a time
354
.setSqlTypes(new int[] {
355
Types.VARCHAR, // name
356
Types.INTEGER, // age
357
Types.VARCHAR // email
358
})
359
.finish();
360
361
// Write data
362
DataSet<Row> users = // ... your data
363
users.output(jdbcOutput);
364
```
365
366
### ParameterValuesProvider
367
368
Interface for providing query parameters to enable parallel JDBC reads with different parameter sets.
369
370
```java { .api }
371
/**
372
* Interface for providing query parameters for parallel JDBC reads
373
*/
374
public interface ParameterValuesProvider {
375
376
/**
377
* Returns a matrix of parameter values for parallel queries
378
* Each row represents parameters for one parallel query execution
379
* @return 2D array where each row contains parameters for one query
380
*/
381
Serializable[][] getParameterValues();
382
}
383
```
384
385
### GenericParameterValuesProvider
386
387
Generic implementation that wraps pre-computed query parameters.
388
389
```java { .api }
390
/**
391
* Generic implementation that wraps pre-computed query parameters
392
*/
393
public class GenericParameterValuesProvider implements ParameterValuesProvider {
394
395
/**
396
* Creates a provider with pre-computed parameters
397
* @param parameters 2D array of parameter values
398
*/
399
public GenericParameterValuesProvider(Serializable[][] parameters);
400
401
/**
402
* Returns the pre-computed parameters
403
* @return The parameter matrix
404
*/
405
public Serializable[][] getParameterValues();
406
}
407
```
408
409
**Usage Example:**
410
411
```java
412
// Create parameters for parallel execution
413
Serializable[][] parameters = {
414
{"USA", 1000}, // Query 1: WHERE country = 'USA' AND salary > 1000
415
{"Canada", 1200}, // Query 2: WHERE country = 'Canada' AND salary > 1200
416
{"UK", 800} // Query 3: WHERE country = 'UK' AND salary > 800
417
};
418
419
ParameterValuesProvider provider = new GenericParameterValuesProvider(parameters);
420
```
421
422
### NumericBetweenParametersProvider
423
424
Generates parameters for BETWEEN queries on numeric columns to enable range-based parallel processing.
425
426
```java { .api }
427
/**
428
* Generates parameters for BETWEEN queries on numeric columns for parallel processing
429
*/
430
public class NumericBetweenParametersProvider implements ParameterValuesProvider {
431
432
/**
433
* Creates a provider that generates BETWEEN parameters for numeric ranges
434
* @param fetchSize Number of records to fetch per parallel query
435
* @param min Minimum value of the numeric range
436
* @param max Maximum value of the numeric range
437
*/
438
public NumericBetweenParametersProvider(long fetchSize, long min, long max);
439
440
/**
441
* Returns generated BETWEEN parameters for parallel range queries
442
* @return Parameter matrix for BETWEEN queries
443
*/
444
public Serializable[][] getParameterValues();
445
}
446
```
447
448
**Usage Example:**
449
450
```java
451
// Generate parameters for parallel range queries
452
// This will create multiple queries like: WHERE id BETWEEN ? AND ?
453
NumericBetweenParametersProvider provider =
454
new NumericBetweenParametersProvider(
455
10000, // Fetch 10,000 records per query
456
1, // Minimum ID value
457
100000 // Maximum ID value
458
);
459
// Results in queries like: WHERE id BETWEEN 1 AND 10000, WHERE id BETWEEN 10001 AND 20000, etc.
460
461
JDBCInputFormat jdbcInput = JDBCInputFormat.buildJDBCInputFormat()
462
.setQuery("SELECT * FROM large_table WHERE id BETWEEN ? AND ?")
463
.setParametersProvider(provider)
464
// ... other configuration
465
.finish();
466
```
467
468
## Common Types
469
470
```java { .api }
471
// Flink types
472
import org.apache.flink.api.common.io.RichInputFormat;
473
import org.apache.flink.api.common.io.RichOutputFormat;
474
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
475
import org.apache.flink.api.java.typeutils.RowTypeInfo;
476
import org.apache.flink.api.common.io.statistics.BaseStatistics;
477
import org.apache.flink.core.io.InputSplit;
478
import org.apache.flink.core.io.InputSplitAssigner;
479
import org.apache.flink.configuration.Configuration;
480
import org.apache.flink.types.Row;
481
482
// JDBC parameter providers
483
import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
484
import org.apache.flink.api.java.io.jdbc.split.GenericParameterValuesProvider;
485
import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
486
487
// Java types
488
import java.io.Serializable;
489
import java.io.IOException;
490
import java.sql.Types;
491
import java.sql.ResultSet;
492
```