or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

avro.mdhadoop.mdhbase.mdhcatalog.mdindex.mdjdbc.md

jdbc.mddocs/

0

# JDBC Connectors

1

2

JDBC database connectivity for Flink batch processing, supporting both reading from and writing to relational databases with parallel processing capabilities.

3

4

## Capabilities

5

6

### JDBCInputFormat

7

8

Reads data from JDBC databases with support for parallel processing and parameterized queries.

9

10

```java { .api }

11

/**

12

* InputFormat for reading from JDBC databases in Flink

13

*/

14

public class JDBCInputFormat extends RichInputFormat<Row, InputSplit>

15

implements ResultTypeQueryable {

16

17

/**

18

* Default constructor for JDBCInputFormat

19

*/

20

public JDBCInputFormat();

21

22

/**

23

* Returns the type information for rows produced by this format

24

* @return RowTypeInfo describing the database schema

25

*/

26

public RowTypeInfo getProducedType();

27

28

/**

29

* Configures the input format with parameters

30

* @param parameters Configuration parameters

31

*/

32

public void configure(Configuration parameters);

33

34

/**

35

* Opens the input format for reading

36

*/

37

public void openInputFormat() throws IOException;

38

39

/**

40

* Closes the input format

41

*/

42

public void closeInputFormat() throws IOException;

43

44

/**

45

* Opens an individual input split

46

* @param inputSplit The input split to open

47

*/

48

public void open(InputSplit inputSplit) throws IOException;

49

50

/**

51

* Closes the current input split

52

*/

53

public void close() throws IOException;

54

55

/**

56

* Checks if the end of input has been reached

57

* @return true if no more records are available

58

*/

59

public boolean reachedEnd() throws IOException;

60

61

/**

62

* Reads the next record from the input

63

* @param row Row instance to reuse (may be null)

64

* @return The next record

65

*/

66

public Row nextRecord(Row row) throws IOException;

67

68

/**

69

* Gets statistics about the input data

70

* @param cachedStatistics Previously cached statistics

71

* @return Statistics about the input

72

*/

73

public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException;

74

75

/**

76

* Creates input splits for parallel processing

77

* @param minNumSplits Minimum number of splits to create

78

* @return Array of input splits

79

*/

80

public InputSplit[] createInputSplits(int minNumSplits) throws IOException;

81

82

/**

83

* Gets the input split assigner for distributing splits

84

* @param inputSplits The input splits to assign

85

* @return Input split assigner

86

*/

87

public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits);

88

89

/**

90

* Creates a builder for configuring JDBCInputFormat

91

* @return JDBCInputFormatBuilder instance

92

*/

93

public static JDBCInputFormatBuilder buildJDBCInputFormat();

94

}

95

```

96

97

### JDBCInputFormatBuilder

98

99

Builder pattern for configuring JDBC input operations with fluent API.

100

101

```java { .api }

102

/**

103

* Builder for configuring JDBCInputFormat instances

104

*/

105

public static class JDBCInputFormatBuilder {

106

107

/**

108

* Creates a new JDBCInputFormatBuilder

109

*/

110

public JDBCInputFormatBuilder();

111

112

/**

113

* Sets the database username

114

* @param username Database username

115

* @return This builder instance for chaining

116

*/

117

public JDBCInputFormatBuilder setUsername(String username);

118

119

/**

120

* Sets the database password

121

* @param password Database password

122

* @return This builder instance for chaining

123

*/

124

public JDBCInputFormatBuilder setPassword(String password);

125

126

/**

127

* Sets the JDBC driver class name

128

* @param drivername Fully qualified driver class name

129

* @return This builder instance for chaining

130

*/

131

public JDBCInputFormatBuilder setDrivername(String drivername);

132

133

/**

134

* Sets the database connection URL

135

* @param dbURL JDBC connection URL

136

* @return This builder instance for chaining

137

*/

138

public JDBCInputFormatBuilder setDBUrl(String dbURL);

139

140

/**

141

* Sets the SQL query to execute

142

* @param query SQL SELECT query

143

* @return This builder instance for chaining

144

*/

145

public JDBCInputFormatBuilder setQuery(String query);

146

147

/**

148

* Sets the ResultSet type for scrolling behavior

149

* @param resultSetType ResultSet type constant (e.g., ResultSet.TYPE_FORWARD_ONLY)

150

* @return This builder instance for chaining

151

*/

152

public JDBCInputFormatBuilder setResultSetType(int resultSetType);

153

154

/**

155

* Sets the ResultSet concurrency for update behavior

156

* @param resultSetConcurrency ResultSet concurrency constant

157

* @return This builder instance for chaining

158

*/

159

public JDBCInputFormatBuilder setResultSetConcurrency(int resultSetConcurrency);

160

161

/**

162

* Sets parameter provider for parallel queries with different parameters

163

* @param parameterValuesProvider Provider for query parameters

164

* @return This builder instance for chaining

165

*/

166

public JDBCInputFormatBuilder setParametersProvider(ParameterValuesProvider parameterValuesProvider);

167

168

/**

169

* Sets the row type information for the query results

170

* @param rowTypeInfo Type information describing the result schema

171

* @return This builder instance for chaining

172

*/

173

public JDBCInputFormatBuilder setRowTypeInfo(RowTypeInfo rowTypeInfo);

174

175

/**

176

* Creates the configured JDBCInputFormat

177

* @return Configured JDBCInputFormat instance

178

*/

179

public JDBCInputFormat finish();

180

}

181

```

182

183

**Usage Example:**

184

185

```java

186

import org.apache.flink.api.java.ExecutionEnvironment;

187

import org.apache.flink.api.java.DataSet;

188

import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;

189

import org.apache.flink.api.java.typeutils.RowTypeInfo;

190

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;

191

import org.apache.flink.types.Row;

192

193

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

194

195

// Configure JDBC input

196

JDBCInputFormat jdbcInput = JDBCInputFormat.buildJDBCInputFormat()

197

.setDrivername("com.mysql.jdbc.Driver")

198

.setDBUrl("jdbc:mysql://localhost:3306/mydb")

199

.setUsername("user")

200

.setPassword("password")

201

.setQuery("SELECT id, name, age FROM users WHERE age > ?")

202

.setParametersProvider(new GenericParameterValuesProvider(new Serializable[][] {

203

{18}, {21}, {25} // Multiple parameter sets for parallel execution

204

}))

205

.setRowTypeInfo(new RowTypeInfo(

206

BasicTypeInfo.INT_TYPE_INFO, // id

207

BasicTypeInfo.STRING_TYPE_INFO, // name

208

BasicTypeInfo.INT_TYPE_INFO // age

209

))

210

.finish();

211

212

DataSet<Row> users = env.createInput(jdbcInput);

213

users.print();

214

```

215

216

### JDBCOutputFormat

217

218

Writes data to JDBC databases with batch processing support for high-performance inserts.

219

220

```java { .api }

221

/**

222

* OutputFormat for writing to JDBC databases in Flink

223

*/

224

public class JDBCOutputFormat extends RichOutputFormat<Row> {

225

226

/**

227

* Default constructor for JDBCOutputFormat

228

*/

229

public JDBCOutputFormat();

230

231

/**

232

* Array of SQL types for the columns (public for configuration)

233

*/

234

public int[] typesArray;

235

236

/**

237

* Configures the output format with parameters

238

* @param parameters Configuration parameters

239

*/

240

public void configure(Configuration parameters);

241

242

/**

243

* Opens the output format for writing

244

* @param taskNumber The number of the parallel task

245

* @param numTasks The total number of parallel tasks

246

*/

247

public void open(int taskNumber, int numTasks) throws IOException;

248

249

/**

250

* Writes a record to the output

251

* @param row The row to write

252

*/

253

public void writeRecord(Row row) throws IOException;

254

255

/**

256

* Closes the output format and flushes any remaining data

257

*/

258

public void close() throws IOException;

259

260

/**

261

* Creates a builder for configuring JDBCOutputFormat

262

* @return JDBCOutputFormatBuilder instance

263

*/

264

public static JDBCOutputFormatBuilder buildJDBCOutputFormat();

265

}

266

```

267

268

### JDBCOutputFormatBuilder

269

270

Builder for configuring JDBC output operations with support for batch processing.

271

272

```java { .api }

273

/**

274

* Builder for configuring JDBCOutputFormat instances

275

*/

276

public static class JDBCOutputFormatBuilder {

277

278

/**

279

* Creates a new JDBCOutputFormatBuilder (protected constructor)

280

*/

281

protected JDBCOutputFormatBuilder();

282

283

/**

284

* Sets the database username

285

* @param username Database username

286

* @return This builder instance for chaining

287

*/

288

public JDBCOutputFormatBuilder setUsername(String username);

289

290

/**

291

* Sets the database password

292

* @param password Database password

293

* @return This builder instance for chaining

294

*/

295

public JDBCOutputFormatBuilder setPassword(String password);

296

297

/**

298

* Sets the JDBC driver class name

299

* @param drivername Fully qualified driver class name

300

* @return This builder instance for chaining

301

*/

302

public JDBCOutputFormatBuilder setDrivername(String drivername);

303

304

/**

305

* Sets the database connection URL

306

* @param dbURL JDBC connection URL

307

* @return This builder instance for chaining

308

*/

309

public JDBCOutputFormatBuilder setDBUrl(String dbURL);

310

311

/**

312

* Sets the SQL query for writing (INSERT, UPDATE, or DELETE)

313

* @param query SQL modification query with parameter placeholders

314

* @return This builder instance for chaining

315

*/

316

public JDBCOutputFormatBuilder setQuery(String query);

317

318

/**

319

* Sets the batch interval for bulk operations

320

* @param batchInterval Number of records to batch before executing

321

* @return This builder instance for chaining

322

*/

323

public JDBCOutputFormatBuilder setBatchInterval(int batchInterval);

324

325

/**

326

* Sets the SQL types for the columns

327

* @param typesArray Array of java.sql.Types constants

328

* @return This builder instance for chaining

329

*/

330

public JDBCOutputFormatBuilder setSqlTypes(int[] typesArray);

331

332

/**

333

* Creates the configured JDBCOutputFormat

334

* @return Configured JDBCOutputFormat instance

335

*/

336

public JDBCOutputFormat finish();

337

}

338

```

339

340

**Usage Example:**

341

342

```java

343

import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;

344

import java.sql.Types;

345

346

// Configure JDBC output

347

JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()

348

.setDrivername("com.mysql.jdbc.Driver")

349

.setDBUrl("jdbc:mysql://localhost:3306/mydb")

350

.setUsername("user")

351

.setPassword("password")

352

.setQuery("INSERT INTO users (name, age, email) VALUES (?, ?, ?)")

353

.setBatchInterval(1000) // Batch 1000 records at a time

354

.setSqlTypes(new int[] {

355

Types.VARCHAR, // name

356

Types.INTEGER, // age

357

Types.VARCHAR // email

358

})

359

.finish();

360

361

// Write data

362

DataSet<Row> users = // ... your data

363

users.output(jdbcOutput);

364

```

365

366

### ParameterValuesProvider

367

368

Interface for providing query parameters to enable parallel JDBC reads with different parameter sets.

369

370

```java { .api }

371

/**

372

* Interface for providing query parameters for parallel JDBC reads

373

*/

374

public interface ParameterValuesProvider {

375

376

/**

377

* Returns a matrix of parameter values for parallel queries

378

* Each row represents parameters for one parallel query execution

379

* @return 2D array where each row contains parameters for one query

380

*/

381

Serializable[][] getParameterValues();

382

}

383

```

384

385

### GenericParameterValuesProvider

386

387

Generic implementation that wraps pre-computed query parameters.

388

389

```java { .api }

390

/**

391

* Generic implementation that wraps pre-computed query parameters

392

*/

393

public class GenericParameterValuesProvider implements ParameterValuesProvider {

394

395

/**

396

* Creates a provider with pre-computed parameters

397

* @param parameters 2D array of parameter values

398

*/

399

public GenericParameterValuesProvider(Serializable[][] parameters);

400

401

/**

402

* Returns the pre-computed parameters

403

* @return The parameter matrix

404

*/

405

public Serializable[][] getParameterValues();

406

}

407

```

408

409

**Usage Example:**

410

411

```java

412

// Create parameters for parallel execution

413

Serializable[][] parameters = {

414

{"USA", 1000}, // Query 1: WHERE country = 'USA' AND salary > 1000

415

{"Canada", 1200}, // Query 2: WHERE country = 'Canada' AND salary > 1200

416

{"UK", 800} // Query 3: WHERE country = 'UK' AND salary > 800

417

};

418

419

ParameterValuesProvider provider = new GenericParameterValuesProvider(parameters);

420

```

421

422

### NumericBetweenParametersProvider

423

424

Generates parameters for BETWEEN queries on numeric columns to enable range-based parallel processing.

425

426

```java { .api }

427

/**

428

* Generates parameters for BETWEEN queries on numeric columns for parallel processing

429

*/

430

public class NumericBetweenParametersProvider implements ParameterValuesProvider {

431

432

/**

433

* Creates a provider that generates BETWEEN parameters for numeric ranges

434

* @param fetchSize Number of records to fetch per parallel query

435

* @param min Minimum value of the numeric range

436

* @param max Maximum value of the numeric range

437

*/

438

public NumericBetweenParametersProvider(long fetchSize, long min, long max);

439

440

/**

441

* Returns generated BETWEEN parameters for parallel range queries

442

* @return Parameter matrix for BETWEEN queries

443

*/

444

public Serializable[][] getParameterValues();

445

}

446

```

447

448

**Usage Example:**

449

450

```java

451

// Generate parameters for parallel range queries

452

// This will create multiple queries like: WHERE id BETWEEN ? AND ?

453

NumericBetweenParametersProvider provider =

454

new NumericBetweenParametersProvider(

455

10000, // Fetch 10,000 records per query

456

1, // Minimum ID value

457

100000 // Maximum ID value

458

);

459

// Results in queries like: WHERE id BETWEEN 1 AND 10000, WHERE id BETWEEN 10001 AND 20000, etc.

460

461

JDBCInputFormat jdbcInput = JDBCInputFormat.buildJDBCInputFormat()

462

.setQuery("SELECT * FROM large_table WHERE id BETWEEN ? AND ?")

463

.setParametersProvider(provider)

464

// ... other configuration

465

.finish();

466

```

467

468

## Common Types

469

470

```java { .api }

471

// Flink types

472

import org.apache.flink.api.common.io.RichInputFormat;

473

import org.apache.flink.api.common.io.RichOutputFormat;

474

import org.apache.flink.api.java.typeutils.ResultTypeQueryable;

475

import org.apache.flink.api.java.typeutils.RowTypeInfo;

476

import org.apache.flink.api.common.io.statistics.BaseStatistics;

477

import org.apache.flink.core.io.InputSplit;

478

import org.apache.flink.core.io.InputSplitAssigner;

479

import org.apache.flink.configuration.Configuration;

480

import org.apache.flink.types.Row;

481

482

// JDBC parameter providers

483

import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;

484

import org.apache.flink.api.java.io.jdbc.split.GenericParameterValuesProvider;

485

import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;

486

487

// Java types

488

import java.io.Serializable;

489

import java.io.IOException;

490

import java.sql.Types;

491

import java.sql.ResultSet;

492

```