tessl install tessl/maven-org-apache-kafka--kafka-2-13@4.1.0Apache Kafka is a distributed event streaming platform that combines publish-subscribe messaging, durable storage, and real-time stream processing capabilities.
Get started with Apache Kafka 4.1.1 in minutes.
Maven:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>4.1.1</version>
</dependency>Gradle:
implementation 'org.apache.kafka:kafka-clients:4.1.1'Send messages to a Kafka topic:
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", "key", "value");
producer.send(record, (metadata, exception) -> {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println("Sent to partition " + metadata.partition() +
" at offset " + metadata.offset());
}
});
producer.close();Read messages from a Kafka topic:
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n",
record.offset(), record.key(), record.value());
}
}Process data in real-time:
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.Properties;
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("input-topic");
source.mapValues(value -> value.toUpperCase())
.to("output-topic");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));Manage Kafka resources:
import org.apache.kafka.clients.admin.*;
import java.util.*;
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (Admin admin = Admin.create(props)) {
// Create topic
NewTopic newTopic = new NewTopic("my-topic", 3, (short) 2);
admin.createTopics(Collections.singletonList(newTopic)).all().get();
// List topics
Set<String> topics = admin.listTopics().names().get();
System.out.println("Topics: " + topics);
}