Apache Kafka 0.8.x connector for Apache Flink streaming data processing
The Flink Kafka 0.8 connector provides Table API integration for using Kafka topics as table sources and sinks in SQL queries and Table API operations.
Generic table source for consuming Kafka topics with custom deserialization schemas.
/**
* Table source for Kafka 0.8.x topics with custom deserialization
*/
public class Kafka08TableSource extends KafkaTableSource {
/**
* Creates a Kafka table source with custom deserialization schema
* @param topic Kafka topic name
* @param properties Kafka consumer properties
* @param deserializationSchema Schema for deserializing Row objects
* @param typeInfo Type information for the resulting table
*/
public Kafka08TableSource(
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema,
TypeInformation<Row> typeInfo
);
}Usage Example:
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.streaming.connectors.kafka.Kafka08TableSource;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.types.Row;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import java.util.Properties;
// Configure Kafka properties
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("zookeeper.connect", "localhost:2181");
props.setProperty("group.id", "table-consumer");
// Define table schema
TypeInformation<?>[] fieldTypes = {Types.STRING, Types.INT, Types.DOUBLE};
String[] fieldNames = {"name", "age", "salary"};
RowTypeInfo typeInfo = new RowTypeInfo(fieldTypes, fieldNames);
// Create custom deserialization schema
DeserializationSchema<Row> deserializer = new MyCustomRowDeserializer(typeInfo);
// Create table source
Kafka08TableSource tableSource = new Kafka08TableSource(
"employees",
props,
deserializer,
typeInfo
);
// Register as table
TableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
tableEnv.registerTableSource("employee_stream", tableSource);
// Use in SQL
Table result = tableEnv.sqlQuery("SELECT name, age FROM employee_stream WHERE salary > 50000");Specialized table source for consuming JSON-formatted Kafka messages.
/**
* Table source for JSON-formatted Kafka 0.8.x topics
*/
public class Kafka08JsonTableSource extends KafkaJsonTableSource {
/**
* Creates a JSON table source for Kafka topics
* @param topic Kafka topic name
* @param properties Kafka consumer properties
* @param typeInfo Type information describing the JSON structure
*/
public Kafka08JsonTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo);
}Usage Example:
import org.apache.flink.streaming.connectors.kafka.Kafka08JsonTableSource;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
// Define JSON schema
TypeInformation<?>[] fieldTypes = {
Types.STRING, // user_id
Types.STRING, // event_type
Types.LONG, // timestamp
Types.DOUBLE // value
};
String[] fieldNames = {"user_id", "event_type", "timestamp", "value"};
RowTypeInfo jsonTypeInfo = new RowTypeInfo(fieldTypes, fieldNames);
// Create JSON table source
Kafka08JsonTableSource jsonSource = new Kafka08JsonTableSource(
"user-events",
props,
jsonTypeInfo
);
tableEnv.registerTableSource("events", jsonSource);
// Query JSON data
Table eventStats = tableEnv.sqlQuery(
"SELECT event_type, COUNT(*) as event_count, AVG(value) as avg_value " +
"FROM events " +
"GROUP BY event_type"
);Specialized table source for consuming Avro-formatted Kafka messages.
/**
* Table source for Avro-formatted Kafka 0.8.x topics
*/
public class Kafka08AvroTableSource extends KafkaAvroTableSource {
/**
* Creates an Avro table source for Kafka topics
* @param topic Kafka topic name
* @param properties Kafka consumer properties
* @param record Avro record class extending SpecificRecordBase
*/
public Kafka08AvroTableSource(
String topic,
Properties properties,
Class<? extends SpecificRecordBase> record
);
}Usage Example:
import org.apache.flink.streaming.connectors.kafka.Kafka08AvroTableSource;
import org.apache.avro.specific.SpecificRecordBase;
// Assume you have an Avro-generated class
public class UserEvent extends SpecificRecordBase {
// Avro-generated fields and methods
}
// Create Avro table source
Kafka08AvroTableSource avroSource = new Kafka08AvroTableSource(
"user-events-avro",
props,
UserEvent.class
);
tableEnv.registerTableSource("avro_events", avroSource);
// Query Avro data (field names from Avro schema)
Table avroResults = tableEnv.sqlQuery(
"SELECT userId, eventType, COUNT(*) " +
"FROM avro_events " +
"WHERE timestamp > UNIX_TIMESTAMP() - 3600 " +
"GROUP BY userId, eventType"
);Table sink for writing JSON-formatted data to Kafka topics.
/**
* Table sink for JSON-formatted Kafka 0.8.x topics
*/
public class Kafka08JsonTableSink extends KafkaJsonTableSink {
/**
* Creates a JSON table sink for Kafka topics
* @param topic Kafka topic name
* @param properties Kafka producer properties
* @param partitioner Custom partitioner for message distribution
*/
public Kafka08JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner);
/**
* @deprecated Use FlinkKafkaPartitioner instead of KafkaPartitioner
*/
@Deprecated
public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner);
}Usage Example:
import org.apache.flink.streaming.connectors.kafka.Kafka08JsonTableSink;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
// Configure producer properties
Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", "localhost:9092");
// Create JSON table sink
Kafka08JsonTableSink jsonSink = new Kafka08JsonTableSink(
"output-events",
producerProps,
new FlinkFixedPartitioner<>()
);
// Register as sink
tableEnv.registerTableSink("json_output", jsonSink);
// Write query results to Kafka
Table processedData = tableEnv.sqlQuery(
"SELECT user_id, event_type, COUNT(*) as count " +
"FROM events " +
"GROUP BY user_id, event_type"
);
processedData.insertInto("json_output");Define table schemas for different data formats:
// For JSON data like: {"id": "123", "name": "John", "score": 95.5}
TypeInformation<?>[] types = {Types.STRING, Types.STRING, Types.DOUBLE};
String[] names = {"id", "name", "score"};
RowTypeInfo jsonSchema = new RowTypeInfo(types, names);// For nested JSON structures
TypeInformation<?>[] outerTypes = {
Types.STRING, // user_id
Types.ROW(Types.STRING, Types.INT), // profile (name, age)
Types.OBJECT_ARRAY(Types.STRING) // tags array
};
String[] outerNames = {"user_id", "profile", "tags"};
RowTypeInfo complexSchema = new RowTypeInfo(outerTypes, outerNames);Use Kafka tables in SQL queries:
-- Create a view from Kafka source
CREATE VIEW user_events AS
SELECT
user_id,
event_type,
CAST(event_timestamp AS TIMESTAMP) as event_time,
value
FROM kafka_source;
-- Window aggregation
SELECT
TUMBLE_START(event_time, INTERVAL '1' HOUR) as window_start,
event_type,
COUNT(*) as event_count,
AVG(value) as avg_value
FROM user_events
GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR), event_type;
-- Insert results into output topic
INSERT INTO kafka_sink
SELECT user_id, event_type, COUNT(*) as count
FROM user_events
WHERE event_time > CURRENT_TIMESTAMP - INTERVAL '1' DAY
GROUP BY user_id, event_type;// JSON format properties
Properties jsonProps = new Properties();
jsonProps.setProperty("bootstrap.servers", "localhost:9092");
jsonProps.setProperty("group.id", "json-table-consumer");
jsonProps.setProperty("json.fail-on-missing-field", "false"); // Ignore missing fields
// Avro format properties
Properties avroProps = new Properties();
avroProps.setProperty("bootstrap.servers", "localhost:9092");
avroProps.setProperty("group.id", "avro-table-consumer");
avroProps.setProperty("schema.registry.url", "http://localhost:8081"); // If using schema registryHandle errors in table operations:
// Error handling configuration
Properties errorHandlingProps = new Properties();
errorHandlingProps.setProperty("bootstrap.servers", "localhost:9092");
errorHandlingProps.setProperty("group.id", "error-handling-consumer");
errorHandlingProps.setProperty("json.ignore-parse-errors", "true"); // Skip malformed JSON
errorHandlingProps.setProperty("consumer.max.poll.records", "100"); // Limit batch sizeInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-kafka-0-8-2-10