tessl install tessl/maven-org-apache-kafka--kafka-2-13@4.1.0Apache Kafka is a distributed event streaming platform that combines publish-subscribe messaging, durable storage, and real-time stream processing capabilities.
The low-level Processor API provides fine-grained control over stream processing logic.
Custom processor for transforming records.
package org.apache.kafka.streams.processor.api;
public interface Processor<KIn, VIn, KOut, VOut> {
/**
* Initialize the processor with context.
*/
default void init(ProcessorContext<KOut, VOut> context) {}
/**
* Process a single record.
*/
void process(Record<KIn, VIn> record);
/**
* Close the processor and cleanup resources.
*/
default void close() {}
}Processor that doesn't change the record key.
package org.apache.kafka.streams.processor.api;
public interface FixedKeyProcessor<KIn, VIn, VOut> {
default void init(FixedKeyProcessorContext<KIn, VOut> context) {}
void process(FixedKeyRecord<KIn, VIn> record);
default void close() {}
}Factory for creating processor instances.
package org.apache.kafka.streams.processor.api;
@FunctionalInterface
public interface ProcessorSupplier<KIn, VIn, KOut, VOut> {
/**
* Return a new processor instance.
*/
Processor<KIn, VIn, KOut, VOut> get();
/**
* Return the names of state stores to connect.
*/
default Set<StoreBuilder<?>> stores() {
return Collections.emptySet();
}
}Factory for fixed-key processors.
package org.apache.kafka.streams.processor.api;
@FunctionalInterface
public interface FixedKeyProcessorSupplier<KIn, VIn, VOut> {
FixedKeyProcessor<KIn, VIn, VOut> get();
default Set<StoreBuilder<?>> stores() {
return Collections.emptySet();
}
}Runtime context for processors.
package org.apache.kafka.streams.processor.api;
public interface ProcessorContext<KForward, VForward> {
// Application metadata
String applicationId();
TaskId taskId();
Map<String, Object> appConfigs();
Map<String, Object> appConfigsWithPrefix(String prefix);
// State store access
<S extends StateStore> S getStateStore(String name);
// Scheduling
Cancellable schedule(Duration interval,
PunctuationType type,
Punctuator callback);
// Forwarding records
void forward(Record<KForward, VForward> record);
<K extends KForward, V extends VForward> void forward(Record<K, V> record,
String childName);
// Commit
void commit();
// Record metadata
Optional<RecordMetadata> recordMetadata();
// Timestamps
long currentStreamTimeMs();
long currentSystemTimeMs();
}Context for fixed-key processors.
package org.apache.kafka.streams.processor.api;
public interface FixedKeyProcessorContext<KForward, VForward> {
// Same as ProcessorContext but forward methods use FixedKeyRecord
void forward(FixedKeyRecord<KForward, VForward> record);
<K extends KForward, V extends VForward> void forward(FixedKeyRecord<K, V> record,
String childName);
// All other ProcessorContext methods
String applicationId();
TaskId taskId();
<S extends StateStore> S getStateStore(String name);
Cancellable schedule(Duration interval, PunctuationType type, Punctuator callback);
// ... etc
}Mutable record with key, value, timestamp, and headers.
package org.apache.kafka.streams.processor.api;
public class Record<K, V> {
// Constructors
public Record(K key, V value, long timestamp);
public Record(K key, V value, long timestamp, Headers headers);
// Accessors and mutators
public K key();
public Record<K, V> withKey(K key);
public V value();
public Record<K, V> withValue(V value);
public long timestamp();
public Record<K, V> withTimestamp(long timestamp);
public Headers headers();
public Record<K, V> withHeaders(Headers headers);
}Record where key cannot be changed.
package org.apache.kafka.streams.processor.api;
public class FixedKeyRecord<K, V> {
public K key();
public V value();
public FixedKeyRecord<K, V> withValue(V value);
public long timestamp();
public FixedKeyRecord<K, V> withTimestamp(long timestamp);
public Headers headers();
public FixedKeyRecord<K, V> withHeaders(Headers headers);
}Scheduled callback for periodic operations.
package org.apache.kafka.streams.processor;
@FunctionalInterface
public interface Punctuator {
/**
* Called periodically based on schedule.
* @param timestamp The timestamp when punctuation occurs
*/
void punctuate(long timestamp);
}Type of punctuation scheduling.
package org.apache.kafka.streams.processor;
public enum PunctuationType {
/**
* Schedule based on stream-time (event time).
*/
STREAM_TIME,
/**
* Schedule based on wall-clock time (system time).
*/
WALL_CLOCK_TIME
}Handle for cancelling scheduled punctuation.
package org.apache.kafka.streams.processor;
public interface Cancellable {
/**
* Cancel this scheduled operation.
*/
void cancel();
}Scheduling Examples:
import org.apache.kafka.streams.processor.api.*;
import org.apache.kafka.streams.processor.*;
import java.time.Duration;
public class MyProcessor implements Processor<String, String, String, String> {
private ProcessorContext<String, String> context;
private Cancellable streamTimePunctuator;
private Cancellable wallClockPunctuator;
@Override
public void init(ProcessorContext<String, String> context) {
this.context = context;
// Schedule based on stream time (every 10 seconds of event time)
streamTimePunctuator = context.schedule(
Duration.ofSeconds(10),
PunctuationType.STREAM_TIME,
timestamp -> {
System.out.println("Stream-time punctuation at: " + timestamp);
// Perform periodic operation based on event time
}
);
// Schedule based on wall-clock time (every 1 minute)
wallClockPunctuator = context.schedule(
Duration.ofMinutes(1),
PunctuationType.WALL_CLOCK_TIME,
timestamp -> {
System.out.println("Wall-clock punctuation at: " + timestamp);
// Perform periodic operation based on system time
}
);
}
@Override
public void process(Record<String, String> record) {
// Process record
context.forward(record.withValue(record.value().toUpperCase()));
}
@Override
public void close() {
// Cancel scheduled operations
if (streamTimePunctuator != null) {
streamTimePunctuator.cancel();
}
if (wallClockPunctuator != null) {
wallClockPunctuator.cancel();
}
}
}Extract timestamp from records.
package org.apache.kafka.streams.processor;
public interface TimestampExtractor {
/**
* Extract timestamp from consumer record.
* @param record The consumer record
* @param partitionTime The highest timestamp of previous records in partition
* @return The timestamp to use for this record
*/
long extract(ConsumerRecord<Object, Object> record, long partitionTime);
}Built-in Extractors:
// Use record timestamp (default)
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
// Use record timestamp, or current time if invalid
import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
// Use wall-clock time
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;Custom Extractor:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;
public class CustomTimestampExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord<Object, Object> record, long partitionTime) {
// Extract timestamp from record value
if (record.value() instanceof MyMessage) {
MyMessage message = (MyMessage) record.value();
return message.getTimestamp();
}
// Fall back to record timestamp
return record.timestamp();
}
}
// Configure in StreamsConfig
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
CustomTimestampExtractor.class);
// Or configure per stream
builder.stream("topic",
Consumed.with(Serdes.String(), Serdes.String())
.withTimestampExtractor(new CustomTimestampExtractor()));import org.apache.kafka.streams.processor.api.*;
public class UpperCaseProcessor implements Processor<String, String, String, String> {
private ProcessorContext<String, String> context;
@Override
public void init(ProcessorContext<String, String> context) {
this.context = context;
}
@Override
public void process(Record<String, String> record) {
// Transform value to uppercase
String upperValue = record.value().toUpperCase();
// Forward transformed record
context.forward(record.withValue(upperValue));
}
}
// Usage
builder.stream("input")
.process(() -> new UpperCaseProcessor())
.to("output");import org.apache.kafka.streams.processor.api.*;
import org.apache.kafka.streams.state.KeyValueStore;
public class CountingProcessor implements Processor<String, String, String, Long> {
private ProcessorContext<String, Long> context;
private KeyValueStore<String, Long> store;
@Override
public void init(ProcessorContext<String, Long> context) {
this.context = context;
this.store = context.getStateStore("counts-store");
}
@Override
public void process(Record<String, String> record) {
String key = record.key();
// Get current count
Long count = store.get(key);
if (count == null) {
count = 0L;
}
// Increment and update
count++;
store.put(key, count);
// Forward updated count
context.forward(new Record<>(key, count, record.timestamp()));
}
}
// Usage with state store
StreamsBuilder builder = new StreamsBuilder();
StoreBuilder<KeyValueStore<String, Long>> storeBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("counts-store"),
Serdes.String(),
Serdes.Long()
);
builder.addStateStore(storeBuilder);
builder.stream("input")
.process(() -> new CountingProcessor(), "counts-store")
.to("output");import org.apache.kafka.streams.processor.api.*;
import org.apache.kafka.streams.processor.*;
import org.apache.kafka.streams.state.KeyValueStore;
import java.time.Duration;
public class AggregatingProcessor implements Processor<String, String, String, String> {
private ProcessorContext<String, String> context;
private KeyValueStore<String, String> store;
@Override
public void init(ProcessorContext<String, String> context) {
this.context = context;
this.store = context.getStateStore("agg-store");
// Schedule periodic flush every 60 seconds
context.schedule(
Duration.ofSeconds(60),
PunctuationType.WALL_CLOCK_TIME,
this::forwardAll
);
}
@Override
public void process(Record<String, String> record) {
// Accumulate values
String key = record.key();
String existing = store.get(key);
String newValue = existing == null ? record.value() : existing + "," + record.value();
store.put(key, newValue);
}
private void forwardAll(long timestamp) {
// Forward all accumulated values
try (KeyValueIterator<String, String> iterator = store.all()) {
while (iterator.hasNext()) {
KeyValue<String, String> entry = iterator.next();
context.forward(new Record<>(entry.key, entry.value, timestamp));
// Clear entry after forwarding
store.delete(entry.key);
}
}
}
}import org.apache.kafka.streams.*;
import org.apache.kafka.streams.processor.api.*;
public class RoutingProcessor implements Processor<String, String, String, String> {
private ProcessorContext<String, String> context;
@Override
public void init(ProcessorContext<String, String> context) {
this.context = context;
}
@Override
public void process(Record<String, String> record) {
String value = record.value();
// Route based on value
if (value.startsWith("ERROR")) {
context.forward(record, "error-sink");
} else if (value.startsWith("WARN")) {
context.forward(record, "warning-sink");
} else {
context.forward(record, "info-sink");
}
}
}
// Topology with named children
Topology topology = new Topology();
topology.addSource("Source", "input-topic")
.addProcessor("Router", () -> new RoutingProcessor(), "Source")
.addSink("ErrorSink", "error-topic", "Router")
.addSink("WarningSink", "warning-topic", "Router")
.addSink("InfoSink", "info-topic", "Router");import org.apache.kafka.streams.processor.api.*;
public class EnrichmentProcessor implements FixedKeyProcessor<String, String, String> {
private FixedKeyProcessorContext<String, String> context;
@Override
public void init(FixedKeyProcessorContext<String, String> context) {
this.context = context;
}
@Override
public void process(FixedKeyRecord<String, String> record) {
// Enrich value without changing key
String enriched = record.value() + " [processed at " +
System.currentTimeMillis() + "]";
// Forward with same key
context.forward(record.withValue(enriched));
}
}
// Usage
builder.stream("input")
.processValues(() -> new EnrichmentProcessor())
.to("output");Access metadata about current record being processed:
import org.apache.kafka.streams.processor.api.*;
import org.apache.kafka.common.TopicPartition;
public class MetadataProcessor implements Processor<String, String, String, String> {
private ProcessorContext<String, String> context;
@Override
public void init(ProcessorContext<String, String> context) {
this.context = context;
}
@Override
public void process(Record<String, String> record) {
// Access record metadata
context.recordMetadata().ifPresent(metadata -> {
String topic = metadata.topic();
int partition = metadata.partition();
long offset = metadata.offset();
System.out.println("Processing record from " + topic +
"-" + partition + " at offset " + offset);
});
// Access headers
record.headers().forEach(header -> {
System.out.println("Header: " + header.key() + " = " +
new String(header.value()));
});
context.forward(record);
}
}import org.apache.kafka.streams.processor.api.*;
import org.apache.kafka.streams.errors.StreamsException;
public class RobustProcessor implements Processor<String, String, String, String> {
private ProcessorContext<String, String> context;
@Override
public void process(Record<String, String> record) {
try {
// Process record
String result = processValue(record.value());
context.forward(record.withValue(result));
} catch (Exception e) {
// Log error
System.err.println("Error processing record: " + e.getMessage());
// Option 1: Skip record
// (do nothing)
// Option 2: Forward to DLQ
context.forward(record, "error-sink");
// Option 3: Throw exception to fail task
// throw new StreamsException("Processing failed", e);
}
}
private String processValue(String value) throws Exception {
// Processing logic that may throw
return value.toUpperCase();
}
}import org.apache.kafka.streams.processor.api.*;
import java.sql.Connection;
public class ResourceProcessor implements Processor<String, String, String, String> {
private Connection connection;
private ProcessorContext<String, String> context;
@Override
public void init(ProcessorContext<String, String> context) {
this.context = context;
try {
// Initialize external resources
this.connection = createDatabaseConnection();
} catch (Exception e) {
throw new StreamsException("Failed to initialize processor", e);
}
}
@Override
public void process(Record<String, String> record) {
// Use resource
try {
String result = queryDatabase(connection, record.value());
context.forward(record.withValue(result));
} catch (Exception e) {
System.err.println("Database query failed: " + e.getMessage());
}
}
@Override
public void close() {
// Clean up resources
if (connection != null) {
try {
connection.close();
} catch (Exception e) {
System.err.println("Failed to close connection: " + e.getMessage());
}
}
}
private Connection createDatabaseConnection() {
// Database connection logic
return null;
}
private String queryDatabase(Connection conn, String value) {
// Database query logic
return value;
}
}Symptoms:
Causes:
Solutions:
import org.apache.kafka.streams.processor.api.*;
import org.apache.kafka.streams.state.*;
public class SafeProcessor implements Processor<String, String, String, String> {
private ProcessorContext<String, String> context;
private KeyValueStore<String, Long> store;
private final String storeName = "my-store";
@Override
public void init(ProcessorContext<String, String> context) {
this.context = context;
try {
this.store = context.getStateStore(storeName);
if (store == null) {
throw new IllegalStateException("Store is null: " + storeName);
}
System.out.println("Successfully initialized store: " + storeName);
} catch (Exception e) {
System.err.println("Failed to get state store: " + e.getMessage());
throw e;
}
}
@Override
public void process(Record<String, String> record) {
if (store == null) {
System.err.println("Store not initialized, skipping record");
return;
}
try {
Long count = store.get(record.key());
store.put(record.key(), count == null ? 1L : count + 1);
} catch (Exception e) {
System.err.println("Store operation failed: " + e.getMessage());
}
}
}
// Ensure store is connected in topology
Topology topology = new Topology();
topology.addSource("Source", "input-topic");
topology.addProcessor("Process", () -> new SafeProcessor(), "Source");
// CRITICAL: Connect state store to processor
StoreBuilder<KeyValueStore<String, Long>> storeBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"),
Serdes.String(),
Serdes.Long()
);
topology.addStateStore(storeBuilder, "Process"); // Must specify processor name
topology.addSink("Sink", "output-topic", "Process");Prevention:
Symptoms:
Causes:
Solutions:
public class PunctuationProcessor implements Processor<String, String, String, String> {
private ProcessorContext<String, String> context;
private Cancellable punctuator;
@Override
public void init(ProcessorContext<String, String> context) {
this.context = context;
// STREAM_TIME: Advances only when records processed
this.punctuator = context.schedule(
Duration.ofSeconds(10),
PunctuationType.STREAM_TIME,
timestamp -> {
System.out.println("Stream time punctuation at: " + timestamp);
// Only fires when records are processed
}
);
// WALL_CLOCK_TIME: Advances with real time
context.schedule(
Duration.ofSeconds(10),
PunctuationType.WALL_CLOCK_TIME,
timestamp -> {
System.out.println("Wall clock punctuation at: " + timestamp);
// Fires every 10 seconds regardless of records
}
);
// IMPORTANT: Store Cancellable to prevent GC
// If not stored, may be garbage collected and stop firing
}
@Override
public void process(Record<String, String> record) {
// Regular processing
context.forward(record);
}
@Override
public void close() {
// Cancel punctuation
if (punctuator != null) {
punctuator.cancel();
}
}
}Prevention:
// Processor with multiple downstream processors
Topology topology = new Topology();
topology.addSource("Source", "input-topic");
topology.addProcessor("Process", () -> new MultiForwardProcessor(), "Source");
topology.addSink("Sink1", "output-topic-1", "Process");
topology.addSink("Sink2", "output-topic-2", "Process");
public class MultiForwardProcessor implements Processor<String, String, String, String> {
private ProcessorContext<String, String> context;
@Override
public void init(ProcessorContext<String, String> context) {
this.context = context;
}
@Override
public void process(Record<String, String> record) {
// Forward to specific child by name
if (record.value().startsWith("A")) {
context.forward(record, "Sink1");
} else if (record.value().startsWith("B")) {
context.forward(record, "Sink2");
} else {
// Forward to all children (default)
context.forward(record);
}
}
}public class MultiStoreProcessor implements Processor<String, String, String, String> {
private ProcessorContext<String, String> context;
private KeyValueStore<String, Long> countsStore;
private KeyValueStore<String, String> metadataStore;
@Override
public void init(ProcessorContext<String, String> context) {
this.context = context;
// Access multiple stores
this.countsStore = context.getStateStore("counts-store");
this.metadataStore = context.getStateStore("metadata-store");
if (countsStore == null || metadataStore == null) {
throw new IllegalStateException("Required stores not available");
}
}
@Override
public void process(Record<String, String> record) {
// Use both stores
Long count = countsStore.get(record.key());
String metadata = metadataStore.get(record.key());
countsStore.put(record.key(), count == null ? 1L : count + 1);
metadataStore.put(record.key(), record.value());
context.forward(record);
}
}
// Connect both stores in topology
Topology topology = new Topology();
topology.addSource("Source", "input-topic");
topology.addProcessor("Process", () -> new MultiStoreProcessor(), "Source");
// Add both stores
StoreBuilder<KeyValueStore<String, Long>> countsBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("counts-store"),
Serdes.String(), Serdes.Long());
StoreBuilder<KeyValueStore<String, String>> metadataBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("metadata-store"),
Serdes.String(), Serdes.String());
topology.addStateStore(countsBuilder, "Process");
topology.addStateStore(metadataBuilder, "Process");
topology.addSink("Sink", "output-topic", "Process");public class ResourceManagedProcessor implements Processor<String, String, String, String> {
private ProcessorContext<String, String> context;
private KeyValueStore<String, Long> store;
private ExecutorService executor;
private ScheduledExecutorService scheduler;
@Override
public void init(ProcessorContext<String, String> context) {
this.context = context;
this.store = context.getStateStore("my-store");
// Initialize resources
this.executor = Executors.newFixedThreadPool(5);
this.scheduler = Executors.newScheduledThreadPool(1);
// Schedule cleanup
scheduler.scheduleAtFixedRate(
this::cleanupOldEntries,
1, 1, TimeUnit.HOURS
);
}
@Override
public void process(Record<String, String> record) {
// Process record
store.put(record.key(), System.currentTimeMillis());
context.forward(record);
}
@Override
public void close() {
// Clean up all resources
System.out.println("Closing processor resources");
if (scheduler != null) {
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
if (executor != null) {
executor.shutdown();
try {
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
private void cleanupOldEntries() {
// Cleanup logic
long cutoffTime = System.currentTimeMillis() - 86400000; // 24 hours
try (KeyValueIterator<String, Long> iterator = store.all()) {
while (iterator.hasNext()) {
KeyValue<String, Long> entry = iterator.next();
if (entry.value < cutoffTime) {
store.delete(entry.key);
}
}
}
}
}