or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

avro.mdhadoop.mdhbase.mdhcatalog.mdindex.mdjdbc.md

index.mddocs/

0

# Apache Flink Batch Connectors

1

2

Apache Flink Batch Connectors provides comprehensive connectivity for Apache Flink batch processing to various external data sources and sinks. This collection of connectors enables seamless integration with Avro files, JDBC databases, Hadoop MapReduce jobs, HBase tables, and HCatalog metadata systems.

3

4

## Package Information

5

6

- **Package Name**: org.apache.flink:flink-batch-connectors

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Installation**: Add to your Maven `pom.xml`:

10

11

```xml

12

<dependency>

13

<groupId>org.apache.flink</groupId>

14

<artifactId>flink-batch-connectors</artifactId>

15

<version>1.1.5</version>

16

</dependency>

17

```

18

19

## Core Imports

20

21

The batch connectors are organized into separate modules. Import the specific connector classes you need:

22

23

```java

24

// Avro connectors

25

import org.apache.flink.api.java.io.AvroInputFormat;

26

import org.apache.flink.api.java.io.AvroOutputFormat;

27

28

// JDBC connectors

29

import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;

30

import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;

31

32

// Hadoop compatibility

33

import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;

34

import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;

35

36

// HBase connector

37

import org.apache.flink.addons.hbase.TableInputFormat;

38

39

// HCatalog connector

40

import org.apache.flink.hcatalog.java.HCatInputFormat;

41

```

42

43

## Basic Usage

44

45

```java

46

import org.apache.flink.api.java.ExecutionEnvironment;

47

import org.apache.flink.api.java.DataSet;

48

import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;

49

import org.apache.flink.api.java.typeutils.RowTypeInfo;

50

import org.apache.flink.types.Row;

51

52

// Create execution environment

53

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

54

55

// Read from JDBC database

56

JDBCInputFormat jdbcInput = JDBCInputFormat.buildJDBCInputFormat()

57

.setDrivername("com.mysql.jdbc.Driver")

58

.setDBUrl("jdbc:mysql://localhost:3306/mydb")

59

.setUsername("user")

60

.setPassword("password")

61

.setQuery("SELECT id, name, age FROM users")

62

.setRowTypeInfo(new RowTypeInfo(/* type info */))

63

.finish();

64

65

DataSet<Row> users = env.createInput(jdbcInput);

66

67

// Process and write to another sink

68

users.print();

69

```

70

71

## Architecture

72

73

The Apache Flink Batch Connectors architecture consists of five specialized modules:

74

75

- **Input/Output Formats**: Primary abstraction for reading from and writing to external systems

76

- **Type Safety**: Full integration with Flink's type system for compile-time type checking

77

- **Parallel Processing**: Built-in support for distributed data processing across Flink clusters

78

- **Fault Tolerance**: Integration with Flink's checkpointing and recovery mechanisms

79

- **Configuration Builders**: Fluent APIs for easy connector configuration

80

81

## Capabilities

82

83

### Avro File Processing

84

85

Read and write Apache Avro files with full schema support and type safety. Supports both generic records and specific record types with automatic serialization.

86

87

```java { .api }

88

public class AvroInputFormat<E> extends FileInputFormat<E>

89

implements ResultTypeQueryable<E>, CheckpointableInputFormat<FileInputSplit, Tuple2<Long, Long>> {

90

public AvroInputFormat(Path filePath, Class<E> type);

91

public void setReuseAvroValue(boolean reuseAvroValue);

92

public TypeInformation<E> getProducedType();

93

}

94

95

public class AvroOutputFormat<E> extends FileOutputFormat<E> {

96

public AvroOutputFormat(Path filePath, Class<E> type);

97

public void setSchema(Schema schema);

98

}

99

```

100

101

[Avro Connectors](./avro.md)

102

103

### JDBC Database Connectivity

104

105

Connect to relational databases via JDBC with support for parallel reading, prepared statements, and batch writing. Includes parameter providers for efficient parallel data access.

106

107

```java { .api }

108

public class JDBCInputFormat extends RichInputFormat<Row, InputSplit>

109

implements ResultTypeQueryable {

110

public static JDBCInputFormatBuilder buildJDBCInputFormat();

111

public RowTypeInfo getProducedType();

112

}

113

114

public class JDBCOutputFormat extends RichOutputFormat<Row> {

115

public static JDBCOutputFormatBuilder buildJDBCOutputFormat();

116

}

117

```

118

119

[JDBC Connectors](./jdbc.md)

120

121

### Hadoop MapReduce Compatibility

122

123

Seamless integration with existing Hadoop MapReduce jobs, allowing reuse of Hadoop Mapper and Reducer implementations within Flink batch programs.

124

125

```java { .api }

126

@Public

127

public class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

128

extends RichFlatMapFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYOUT,VALUEOUT>>

129

implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {

130

public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper);

131

public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf);

132

}

133

```

134

135

[Hadoop Compatibility](./hadoop.md)

136

137

### HBase Table Access

138

139

Read from Apache HBase tables with region-aware splitting for optimal distributed processing. Provides abstract base classes for custom HBase integration.

140

141

```java { .api }

142

public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat<T, TableInputSplit> {

143

protected abstract Scan getScanner();

144

protected abstract String getTableName();

145

protected abstract T mapResultToTuple(Result r);

146

}

147

```

148

149

[HBase Connector](./hbase.md)

150

151

### HCatalog Metadata Integration

152

153

Access Hive tables through HCatalog with support for partition filtering, field selection, and automatic schema mapping to Flink tuples.

154

155

```java { .api }

156

public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, HadoopInputSplit>

157

implements ResultTypeQueryable<T> {

158

public HCatInputFormatBase(String database, String table);

159

public HCatInputFormatBase<T> getFields(String... fields);

160

public HCatInputFormatBase<T> withFilter(String filter);

161

}

162

```

163

164

[HCatalog Connector](./hcatalog.md)

165

166

## Common Types

167

168

```java { .api }

169

// Flink core types used across connectors

170

import org.apache.flink.api.common.typeinfo.TypeInformation;

171

import org.apache.flink.api.java.tuple.Tuple2;

172

import org.apache.flink.types.Row;

173

import org.apache.flink.core.fs.Path;

174

175

// Hadoop integration types

176

import org.apache.hadoop.mapred.Mapper;

177

import org.apache.hadoop.mapred.Reducer;

178

import org.apache.hadoop.mapred.JobConf;

179

180

// Avro types

181

import org.apache.avro.Schema;

182

183

// HBase types

184

import org.apache.hadoop.hbase.client.Scan;

185

import org.apache.hadoop.hbase.client.Result;

186

```