or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdkafka-consumer.mdkafka-producer.mdoffset-management.mdtable-api.md

table-api.mddocs/

0

# Table API Integration

1

2

The Flink Kafka 0.8 connector provides Table API integration for using Kafka topics as table sources and sinks in SQL queries and Table API operations.

3

4

## Capabilities

5

6

### Kafka08TableSource

7

8

Generic table source for consuming Kafka topics with custom deserialization schemas.

9

10

```java { .api }

11

/**

12

* Table source for Kafka 0.8.x topics with custom deserialization

13

*/

14

public class Kafka08TableSource extends KafkaTableSource {

15

/**

16

* Creates a Kafka table source with custom deserialization schema

17

* @param topic Kafka topic name

18

* @param properties Kafka consumer properties

19

* @param deserializationSchema Schema for deserializing Row objects

20

* @param typeInfo Type information for the resulting table

21

*/

22

public Kafka08TableSource(

23

String topic,

24

Properties properties,

25

DeserializationSchema<Row> deserializationSchema,

26

TypeInformation<Row> typeInfo

27

);

28

}

29

```

30

31

**Usage Example:**

32

33

```java

34

import org.apache.flink.table.api.TableEnvironment;

35

import org.apache.flink.streaming.connectors.kafka.Kafka08TableSource;

36

import org.apache.flink.streaming.util.serialization.DeserializationSchema;

37

import org.apache.flink.types.Row;

38

import org.apache.flink.api.common.typeinfo.TypeInformation;

39

import org.apache.flink.api.java.typeutils.RowTypeInfo;

40

41

import java.util.Properties;

42

43

// Configure Kafka properties

44

Properties props = new Properties();

45

props.setProperty("bootstrap.servers", "localhost:9092");

46

props.setProperty("zookeeper.connect", "localhost:2181");

47

props.setProperty("group.id", "table-consumer");

48

49

// Define table schema

50

TypeInformation<?>[] fieldTypes = {Types.STRING, Types.INT, Types.DOUBLE};

51

String[] fieldNames = {"name", "age", "salary"};

52

RowTypeInfo typeInfo = new RowTypeInfo(fieldTypes, fieldNames);

53

54

// Create custom deserialization schema

55

DeserializationSchema<Row> deserializer = new MyCustomRowDeserializer(typeInfo);

56

57

// Create table source

58

Kafka08TableSource tableSource = new Kafka08TableSource(

59

"employees",

60

props,

61

deserializer,

62

typeInfo

63

);

64

65

// Register as table

66

TableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

67

tableEnv.registerTableSource("employee_stream", tableSource);

68

69

// Use in SQL

70

Table result = tableEnv.sqlQuery("SELECT name, age FROM employee_stream WHERE salary > 50000");

71

```

72

73

### Kafka08JsonTableSource

74

75

Specialized table source for consuming JSON-formatted Kafka messages.

76

77

```java { .api }

78

/**

79

* Table source for JSON-formatted Kafka 0.8.x topics

80

*/

81

public class Kafka08JsonTableSource extends KafkaJsonTableSource {

82

/**

83

* Creates a JSON table source for Kafka topics

84

* @param topic Kafka topic name

85

* @param properties Kafka consumer properties

86

* @param typeInfo Type information describing the JSON structure

87

*/

88

public Kafka08JsonTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo);

89

}

90

```

91

92

**Usage Example:**

93

94

```java

95

import org.apache.flink.streaming.connectors.kafka.Kafka08JsonTableSource;

96

import org.apache.flink.api.common.typeinfo.Types;

97

import org.apache.flink.api.java.typeutils.RowTypeInfo;

98

99

// Define JSON schema

100

TypeInformation<?>[] fieldTypes = {

101

Types.STRING, // user_id

102

Types.STRING, // event_type

103

Types.LONG, // timestamp

104

Types.DOUBLE // value

105

};

106

String[] fieldNames = {"user_id", "event_type", "timestamp", "value"};

107

RowTypeInfo jsonTypeInfo = new RowTypeInfo(fieldTypes, fieldNames);

108

109

// Create JSON table source

110

Kafka08JsonTableSource jsonSource = new Kafka08JsonTableSource(

111

"user-events",

112

props,

113

jsonTypeInfo

114

);

115

116

tableEnv.registerTableSource("events", jsonSource);

117

118

// Query JSON data

119

Table eventStats = tableEnv.sqlQuery(

120

"SELECT event_type, COUNT(*) as event_count, AVG(value) as avg_value " +

121

"FROM events " +

122

"GROUP BY event_type"

123

);

124

```

125

126

### Kafka08AvroTableSource

127

128

Specialized table source for consuming Avro-formatted Kafka messages.

129

130

```java { .api }

131

/**

132

* Table source for Avro-formatted Kafka 0.8.x topics

133

*/

134

public class Kafka08AvroTableSource extends KafkaAvroTableSource {

135

/**

136

* Creates an Avro table source for Kafka topics

137

* @param topic Kafka topic name

138

* @param properties Kafka consumer properties

139

* @param record Avro record class extending SpecificRecordBase

140

*/

141

public Kafka08AvroTableSource(

142

String topic,

143

Properties properties,

144

Class<? extends SpecificRecordBase> record

145

);

146

}

147

```

148

149

**Usage Example:**

150

151

```java

152

import org.apache.flink.streaming.connectors.kafka.Kafka08AvroTableSource;

153

import org.apache.avro.specific.SpecificRecordBase;

154

155

// Assume you have an Avro-generated class

156

public class UserEvent extends SpecificRecordBase {

157

// Avro-generated fields and methods

158

}

159

160

// Create Avro table source

161

Kafka08AvroTableSource avroSource = new Kafka08AvroTableSource(

162

"user-events-avro",

163

props,

164

UserEvent.class

165

);

166

167

tableEnv.registerTableSource("avro_events", avroSource);

168

169

// Query Avro data (field names from Avro schema)

170

Table avroResults = tableEnv.sqlQuery(

171

"SELECT userId, eventType, COUNT(*) " +

172

"FROM avro_events " +

173

"WHERE timestamp > UNIX_TIMESTAMP() - 3600 " +

174

"GROUP BY userId, eventType"

175

);

176

```

177

178

### Kafka08JsonTableSink

179

180

Table sink for writing JSON-formatted data to Kafka topics.

181

182

```java { .api }

183

/**

184

* Table sink for JSON-formatted Kafka 0.8.x topics

185

*/

186

public class Kafka08JsonTableSink extends KafkaJsonTableSink {

187

/**

188

* Creates a JSON table sink for Kafka topics

189

* @param topic Kafka topic name

190

* @param properties Kafka producer properties

191

* @param partitioner Custom partitioner for message distribution

192

*/

193

public Kafka08JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner);

194

195

/**

196

* @deprecated Use FlinkKafkaPartitioner instead of KafkaPartitioner

197

*/

198

@Deprecated

199

public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner);

200

}

201

```

202

203

**Usage Example:**

204

205

```java

206

import org.apache.flink.streaming.connectors.kafka.Kafka08JsonTableSink;

207

import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;

208

209

// Configure producer properties

210

Properties producerProps = new Properties();

211

producerProps.setProperty("bootstrap.servers", "localhost:9092");

212

213

// Create JSON table sink

214

Kafka08JsonTableSink jsonSink = new Kafka08JsonTableSink(

215

"output-events",

216

producerProps,

217

new FlinkFixedPartitioner<>()

218

);

219

220

// Register as sink

221

tableEnv.registerTableSink("json_output", jsonSink);

222

223

// Write query results to Kafka

224

Table processedData = tableEnv.sqlQuery(

225

"SELECT user_id, event_type, COUNT(*) as count " +

226

"FROM events " +

227

"GROUP BY user_id, event_type"

228

);

229

230

processedData.insertInto("json_output");

231

```

232

233

## Schema Definition

234

235

Define table schemas for different data formats:

236

237

### JSON Schema Definition

238

239

```java

240

// For JSON data like: {"id": "123", "name": "John", "score": 95.5}

241

TypeInformation<?>[] types = {Types.STRING, Types.STRING, Types.DOUBLE};

242

String[] names = {"id", "name", "score"};

243

RowTypeInfo jsonSchema = new RowTypeInfo(types, names);

244

```

245

246

### Complex Schema Definition

247

248

```java

249

// For nested JSON structures

250

TypeInformation<?>[] outerTypes = {

251

Types.STRING, // user_id

252

Types.ROW(Types.STRING, Types.INT), // profile (name, age)

253

Types.OBJECT_ARRAY(Types.STRING) // tags array

254

};

255

String[] outerNames = {"user_id", "profile", "tags"};

256

RowTypeInfo complexSchema = new RowTypeInfo(outerTypes, outerNames);

257

```

258

259

## SQL Integration

260

261

Use Kafka tables in SQL queries:

262

263

```sql

264

-- Create a view from Kafka source

265

CREATE VIEW user_events AS

266

SELECT

267

user_id,

268

event_type,

269

CAST(event_timestamp AS TIMESTAMP) as event_time,

270

value

271

FROM kafka_source;

272

273

-- Window aggregation

274

SELECT

275

TUMBLE_START(event_time, INTERVAL '1' HOUR) as window_start,

276

event_type,

277

COUNT(*) as event_count,

278

AVG(value) as avg_value

279

FROM user_events

280

GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR), event_type;

281

282

-- Insert results into output topic

283

INSERT INTO kafka_sink

284

SELECT user_id, event_type, COUNT(*) as count

285

FROM user_events

286

WHERE event_time > CURRENT_TIMESTAMP - INTERVAL '1' DAY

287

GROUP BY user_id, event_type;

288

```

289

290

## Configuration

291

292

### Consumer Properties for Table Sources

293

294

- **bootstrap.servers**: Kafka broker addresses

295

- **zookeeper.connect**: ZooKeeper connection string

296

- **group.id**: Consumer group for table source

297

- **auto.offset.reset**: Starting position for new consumer groups

298

299

### Producer Properties for Table Sinks

300

301

- **bootstrap.servers**: Kafka broker addresses

302

- **batch.size**: Batching configuration for performance

303

- **compression.type**: Message compression algorithm

304

305

### Format-Specific Configuration

306

307

```java

308

// JSON format properties

309

Properties jsonProps = new Properties();

310

jsonProps.setProperty("bootstrap.servers", "localhost:9092");

311

jsonProps.setProperty("group.id", "json-table-consumer");

312

jsonProps.setProperty("json.fail-on-missing-field", "false"); // Ignore missing fields

313

314

// Avro format properties

315

Properties avroProps = new Properties();

316

avroProps.setProperty("bootstrap.servers", "localhost:9092");

317

avroProps.setProperty("group.id", "avro-table-consumer");

318

avroProps.setProperty("schema.registry.url", "http://localhost:8081"); // If using schema registry

319

```

320

321

## Error Handling

322

323

Handle errors in table operations:

324

325

- **Deserialization errors**: Configure fail-on-error behavior

326

- **Schema evolution**: Handle missing or additional fields

327

- **Connection failures**: Implement retry logic at application level

328

329

```java

330

// Error handling configuration

331

Properties errorHandlingProps = new Properties();

332

errorHandlingProps.setProperty("bootstrap.servers", "localhost:9092");

333

errorHandlingProps.setProperty("group.id", "error-handling-consumer");

334

errorHandlingProps.setProperty("json.ignore-parse-errors", "true"); // Skip malformed JSON

335

errorHandlingProps.setProperty("consumer.max.poll.records", "100"); // Limit batch size

336

```