or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.kafka/kafka_2.13@4.1.x

docs

index.md
tile.json

tessl/maven-org-apache-kafka--kafka-2-13

tessl install tessl/maven-org-apache-kafka--kafka-2-13@4.1.0

Apache Kafka is a distributed event streaming platform that combines publish-subscribe messaging, durable storage, and real-time stream processing capabilities.

processor-api.mddocs/streams/

Processor API

The low-level Processor API provides fine-grained control over stream processing logic.

Core Interfaces

Processor<KIn, VIn, KOut, VOut>

Custom processor for transforming records.

package org.apache.kafka.streams.processor.api;

public interface Processor<KIn, VIn, KOut, VOut> {
    /**
     * Initialize the processor with context.
     */
    default void init(ProcessorContext<KOut, VOut> context) {}

    /**
     * Process a single record.
     */
    void process(Record<KIn, VIn> record);

    /**
     * Close the processor and cleanup resources.
     */
    default void close() {}
}

FixedKeyProcessor<KIn, VIn, VOut>

Processor that doesn't change the record key.

package org.apache.kafka.streams.processor.api;

public interface FixedKeyProcessor<KIn, VIn, VOut> {
    default void init(FixedKeyProcessorContext<KIn, VOut> context) {}

    void process(FixedKeyRecord<KIn, VIn> record);

    default void close() {}
}

ProcessorSupplier<KIn, VIn, KOut, VOut>

Factory for creating processor instances.

package org.apache.kafka.streams.processor.api;

@FunctionalInterface
public interface ProcessorSupplier<KIn, VIn, KOut, VOut> {
    /**
     * Return a new processor instance.
     */
    Processor<KIn, VIn, KOut, VOut> get();

    /**
     * Return the names of state stores to connect.
     */
    default Set<StoreBuilder<?>> stores() {
        return Collections.emptySet();
    }
}

FixedKeyProcessorSupplier<KIn, VIn, VOut>

Factory for fixed-key processors.

package org.apache.kafka.streams.processor.api;

@FunctionalInterface
public interface FixedKeyProcessorSupplier<KIn, VIn, VOut> {
    FixedKeyProcessor<KIn, VIn, VOut> get();

    default Set<StoreBuilder<?>> stores() {
        return Collections.emptySet();
    }
}

Processor Context

ProcessorContext<KForward, VForward>

Runtime context for processors.

package org.apache.kafka.streams.processor.api;

public interface ProcessorContext<KForward, VForward> {
    // Application metadata
    String applicationId();
    TaskId taskId();
    Map<String, Object> appConfigs();
    Map<String, Object> appConfigsWithPrefix(String prefix);

    // State store access
    <S extends StateStore> S getStateStore(String name);

    // Scheduling
    Cancellable schedule(Duration interval,
                        PunctuationType type,
                        Punctuator callback);

    // Forwarding records
    void forward(Record<KForward, VForward> record);
    <K extends KForward, V extends VForward> void forward(Record<K, V> record,
                                                           String childName);

    // Commit
    void commit();

    // Record metadata
    Optional<RecordMetadata> recordMetadata();

    // Timestamps
    long currentStreamTimeMs();
    long currentSystemTimeMs();
}

FixedKeyProcessorContext<KForward, VForward>

Context for fixed-key processors.

package org.apache.kafka.streams.processor.api;

public interface FixedKeyProcessorContext<KForward, VForward> {
    // Same as ProcessorContext but forward methods use FixedKeyRecord
    void forward(FixedKeyRecord<KForward, VForward> record);
    <K extends KForward, V extends VForward> void forward(FixedKeyRecord<K, V> record,
                                                           String childName);

    // All other ProcessorContext methods
    String applicationId();
    TaskId taskId();
    <S extends StateStore> S getStateStore(String name);
    Cancellable schedule(Duration interval, PunctuationType type, Punctuator callback);
    // ... etc
}

Record Types

Record<K, V>

Mutable record with key, value, timestamp, and headers.

package org.apache.kafka.streams.processor.api;

public class Record<K, V> {
    // Constructors
    public Record(K key, V value, long timestamp);
    public Record(K key, V value, long timestamp, Headers headers);

    // Accessors and mutators
    public K key();
    public Record<K, V> withKey(K key);

    public V value();
    public Record<K, V> withValue(V value);

    public long timestamp();
    public Record<K, V> withTimestamp(long timestamp);

    public Headers headers();
    public Record<K, V> withHeaders(Headers headers);
}

FixedKeyRecord<K, V>

Record where key cannot be changed.

package org.apache.kafka.streams.processor.api;

public class FixedKeyRecord<K, V> {
    public K key();

    public V value();
    public FixedKeyRecord<K, V> withValue(V value);

    public long timestamp();
    public FixedKeyRecord<K, V> withTimestamp(long timestamp);

    public Headers headers();
    public FixedKeyRecord<K, V> withHeaders(Headers headers);
}

Punctuation

Punctuator

Scheduled callback for periodic operations.

package org.apache.kafka.streams.processor;

@FunctionalInterface
public interface Punctuator {
    /**
     * Called periodically based on schedule.
     * @param timestamp The timestamp when punctuation occurs
     */
    void punctuate(long timestamp);
}

PunctuationType

Type of punctuation scheduling.

package org.apache.kafka.streams.processor;

public enum PunctuationType {
    /**
     * Schedule based on stream-time (event time).
     */
    STREAM_TIME,

    /**
     * Schedule based on wall-clock time (system time).
     */
    WALL_CLOCK_TIME
}

Cancellable

Handle for cancelling scheduled punctuation.

package org.apache.kafka.streams.processor;

public interface Cancellable {
    /**
     * Cancel this scheduled operation.
     */
    void cancel();
}

Scheduling Examples:

import org.apache.kafka.streams.processor.api.*;
import org.apache.kafka.streams.processor.*;
import java.time.Duration;

public class MyProcessor implements Processor<String, String, String, String> {
    private ProcessorContext<String, String> context;
    private Cancellable streamTimePunctuator;
    private Cancellable wallClockPunctuator;

    @Override
    public void init(ProcessorContext<String, String> context) {
        this.context = context;

        // Schedule based on stream time (every 10 seconds of event time)
        streamTimePunctuator = context.schedule(
            Duration.ofSeconds(10),
            PunctuationType.STREAM_TIME,
            timestamp -> {
                System.out.println("Stream-time punctuation at: " + timestamp);
                // Perform periodic operation based on event time
            }
        );

        // Schedule based on wall-clock time (every 1 minute)
        wallClockPunctuator = context.schedule(
            Duration.ofMinutes(1),
            PunctuationType.WALL_CLOCK_TIME,
            timestamp -> {
                System.out.println("Wall-clock punctuation at: " + timestamp);
                // Perform periodic operation based on system time
            }
        );
    }

    @Override
    public void process(Record<String, String> record) {
        // Process record
        context.forward(record.withValue(record.value().toUpperCase()));
    }

    @Override
    public void close() {
        // Cancel scheduled operations
        if (streamTimePunctuator != null) {
            streamTimePunctuator.cancel();
        }
        if (wallClockPunctuator != null) {
            wallClockPunctuator.cancel();
        }
    }
}

Timestamp Extraction

TimestampExtractor

Extract timestamp from records.

package org.apache.kafka.streams.processor;

public interface TimestampExtractor {
    /**
     * Extract timestamp from consumer record.
     * @param record The consumer record
     * @param partitionTime The highest timestamp of previous records in partition
     * @return The timestamp to use for this record
     */
    long extract(ConsumerRecord<Object, Object> record, long partitionTime);
}

Built-in Extractors:

// Use record timestamp (default)
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;

// Use record timestamp, or current time if invalid
import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;

// Use wall-clock time
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;

Custom Extractor:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;

public class CustomTimestampExtractor implements TimestampExtractor {
    @Override
    public long extract(ConsumerRecord<Object, Object> record, long partitionTime) {
        // Extract timestamp from record value
        if (record.value() instanceof MyMessage) {
            MyMessage message = (MyMessage) record.value();
            return message.getTimestamp();
        }

        // Fall back to record timestamp
        return record.timestamp();
    }
}

// Configure in StreamsConfig
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
    CustomTimestampExtractor.class);

// Or configure per stream
builder.stream("topic",
    Consumed.with(Serdes.String(), Serdes.String())
        .withTimestampExtractor(new CustomTimestampExtractor()));

Complete Examples

Stateless Processor

import org.apache.kafka.streams.processor.api.*;

public class UpperCaseProcessor implements Processor<String, String, String, String> {
    private ProcessorContext<String, String> context;

    @Override
    public void init(ProcessorContext<String, String> context) {
        this.context = context;
    }

    @Override
    public void process(Record<String, String> record) {
        // Transform value to uppercase
        String upperValue = record.value().toUpperCase();

        // Forward transformed record
        context.forward(record.withValue(upperValue));
    }
}

// Usage
builder.stream("input")
       .process(() -> new UpperCaseProcessor())
       .to("output");

Stateful Processor

import org.apache.kafka.streams.processor.api.*;
import org.apache.kafka.streams.state.KeyValueStore;

public class CountingProcessor implements Processor<String, String, String, Long> {
    private ProcessorContext<String, Long> context;
    private KeyValueStore<String, Long> store;

    @Override
    public void init(ProcessorContext<String, Long> context) {
        this.context = context;
        this.store = context.getStateStore("counts-store");
    }

    @Override
    public void process(Record<String, String> record) {
        String key = record.key();

        // Get current count
        Long count = store.get(key);
        if (count == null) {
            count = 0L;
        }

        // Increment and update
        count++;
        store.put(key, count);

        // Forward updated count
        context.forward(new Record<>(key, count, record.timestamp()));
    }
}

// Usage with state store
StreamsBuilder builder = new StreamsBuilder();

StoreBuilder<KeyValueStore<String, Long>> storeBuilder =
    Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("counts-store"),
        Serdes.String(),
        Serdes.Long()
    );
builder.addStateStore(storeBuilder);

builder.stream("input")
       .process(() -> new CountingProcessor(), "counts-store")
       .to("output");

Processor with Punctuation

import org.apache.kafka.streams.processor.api.*;
import org.apache.kafka.streams.processor.*;
import org.apache.kafka.streams.state.KeyValueStore;
import java.time.Duration;

public class AggregatingProcessor implements Processor<String, String, String, String> {
    private ProcessorContext<String, String> context;
    private KeyValueStore<String, String> store;

    @Override
    public void init(ProcessorContext<String, String> context) {
        this.context = context;
        this.store = context.getStateStore("agg-store");

        // Schedule periodic flush every 60 seconds
        context.schedule(
            Duration.ofSeconds(60),
            PunctuationType.WALL_CLOCK_TIME,
            this::forwardAll
        );
    }

    @Override
    public void process(Record<String, String> record) {
        // Accumulate values
        String key = record.key();
        String existing = store.get(key);
        String newValue = existing == null ? record.value() : existing + "," + record.value();
        store.put(key, newValue);
    }

    private void forwardAll(long timestamp) {
        // Forward all accumulated values
        try (KeyValueIterator<String, String> iterator = store.all()) {
            while (iterator.hasNext()) {
                KeyValue<String, String> entry = iterator.next();
                context.forward(new Record<>(entry.key, entry.value, timestamp));

                // Clear entry after forwarding
                store.delete(entry.key);
            }
        }
    }
}

Multi-Output Processor

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.processor.api.*;

public class RoutingProcessor implements Processor<String, String, String, String> {
    private ProcessorContext<String, String> context;

    @Override
    public void init(ProcessorContext<String, String> context) {
        this.context = context;
    }

    @Override
    public void process(Record<String, String> record) {
        String value = record.value();

        // Route based on value
        if (value.startsWith("ERROR")) {
            context.forward(record, "error-sink");
        } else if (value.startsWith("WARN")) {
            context.forward(record, "warning-sink");
        } else {
            context.forward(record, "info-sink");
        }
    }
}

// Topology with named children
Topology topology = new Topology();

topology.addSource("Source", "input-topic")
        .addProcessor("Router", () -> new RoutingProcessor(), "Source")
        .addSink("ErrorSink", "error-topic", "Router")
        .addSink("WarningSink", "warning-topic", "Router")
        .addSink("InfoSink", "info-topic", "Router");

FixedKeyProcessor Example

import org.apache.kafka.streams.processor.api.*;

public class EnrichmentProcessor implements FixedKeyProcessor<String, String, String> {
    private FixedKeyProcessorContext<String, String> context;

    @Override
    public void init(FixedKeyProcessorContext<String, String> context) {
        this.context = context;
    }

    @Override
    public void process(FixedKeyRecord<String, String> record) {
        // Enrich value without changing key
        String enriched = record.value() + " [processed at " +
            System.currentTimeMillis() + "]";

        // Forward with same key
        context.forward(record.withValue(enriched));
    }
}

// Usage
builder.stream("input")
       .processValues(() -> new EnrichmentProcessor())
       .to("output");

Record Metadata

Access metadata about current record being processed:

import org.apache.kafka.streams.processor.api.*;
import org.apache.kafka.common.TopicPartition;

public class MetadataProcessor implements Processor<String, String, String, String> {
    private ProcessorContext<String, String> context;

    @Override
    public void init(ProcessorContext<String, String> context) {
        this.context = context;
    }

    @Override
    public void process(Record<String, String> record) {
        // Access record metadata
        context.recordMetadata().ifPresent(metadata -> {
            String topic = metadata.topic();
            int partition = metadata.partition();
            long offset = metadata.offset();

            System.out.println("Processing record from " + topic +
                "-" + partition + " at offset " + offset);
        });

        // Access headers
        record.headers().forEach(header -> {
            System.out.println("Header: " + header.key() + " = " +
                new String(header.value()));
        });

        context.forward(record);
    }
}

Best Practices

Error Handling

import org.apache.kafka.streams.processor.api.*;
import org.apache.kafka.streams.errors.StreamsException;

public class RobustProcessor implements Processor<String, String, String, String> {
    private ProcessorContext<String, String> context;

    @Override
    public void process(Record<String, String> record) {
        try {
            // Process record
            String result = processValue(record.value());
            context.forward(record.withValue(result));
        } catch (Exception e) {
            // Log error
            System.err.println("Error processing record: " + e.getMessage());

            // Option 1: Skip record
            // (do nothing)

            // Option 2: Forward to DLQ
            context.forward(record, "error-sink");

            // Option 3: Throw exception to fail task
            // throw new StreamsException("Processing failed", e);
        }
    }

    private String processValue(String value) throws Exception {
        // Processing logic that may throw
        return value.toUpperCase();
    }
}

Resource Management

import org.apache.kafka.streams.processor.api.*;
import java.sql.Connection;

public class ResourceProcessor implements Processor<String, String, String, String> {
    private Connection connection;
    private ProcessorContext<String, String> context;

    @Override
    public void init(ProcessorContext<String, String> context) {
        this.context = context;

        try {
            // Initialize external resources
            this.connection = createDatabaseConnection();
        } catch (Exception e) {
            throw new StreamsException("Failed to initialize processor", e);
        }
    }

    @Override
    public void process(Record<String, String> record) {
        // Use resource
        try {
            String result = queryDatabase(connection, record.value());
            context.forward(record.withValue(result));
        } catch (Exception e) {
            System.err.println("Database query failed: " + e.getMessage());
        }
    }

    @Override
    public void close() {
        // Clean up resources
        if (connection != null) {
            try {
                connection.close();
            } catch (Exception e) {
                System.err.println("Failed to close connection: " + e.getMessage());
            }
        }
    }

    private Connection createDatabaseConnection() {
        // Database connection logic
        return null;
    }

    private String queryDatabase(Connection conn, String value) {
        // Database query logic
        return value;
    }
}

Troubleshooting Processor API

Common Issues

Issue: State Store Access Errors

Symptoms:

  • NullPointerException when accessing state store
  • InvalidStateStoreException
  • Store returns null unexpectedly

Causes:

  • Store not connected to processor
  • Store accessed before init()
  • Store name mismatch
  • Store not added to topology

Solutions:

import org.apache.kafka.streams.processor.api.*;
import org.apache.kafka.streams.state.*;

public class SafeProcessor implements Processor<String, String, String, String> {
    private ProcessorContext<String, String> context;
    private KeyValueStore<String, Long> store;
    private final String storeName = "my-store";
    
    @Override
    public void init(ProcessorContext<String, String> context) {
        this.context = context;
        
        try {
            this.store = context.getStateStore(storeName);
            if (store == null) {
                throw new IllegalStateException("Store is null: " + storeName);
            }
            System.out.println("Successfully initialized store: " + storeName);
        } catch (Exception e) {
            System.err.println("Failed to get state store: " + e.getMessage());
            throw e;
        }
    }
    
    @Override
    public void process(Record<String, String> record) {
        if (store == null) {
            System.err.println("Store not initialized, skipping record");
            return;
        }
        
        try {
            Long count = store.get(record.key());
            store.put(record.key(), count == null ? 1L : count + 1);
        } catch (Exception e) {
            System.err.println("Store operation failed: " + e.getMessage());
        }
    }
}

// Ensure store is connected in topology
Topology topology = new Topology();
topology.addSource("Source", "input-topic");
topology.addProcessor("Process", () -> new SafeProcessor(), "Source");

// CRITICAL: Connect state store to processor
StoreBuilder<KeyValueStore<String, Long>> storeBuilder = 
    Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("my-store"),
        Serdes.String(),
        Serdes.Long()
    );
topology.addStateStore(storeBuilder, "Process"); // Must specify processor name

topology.addSink("Sink", "output-topic", "Process");

Prevention:

  • Always connect stores to processors
  • Validate store initialization
  • Check store != null before use
  • Use consistent store names

Issue: Punctuation Not Firing

Symptoms:

  • Scheduled punctuation never executes
  • Periodic operations not happening
  • Time-based logic not working

Causes:

  • No records flowing through topology
  • Wrong PunctuationType
  • Application not processing
  • Cancellable not stored (gets garbage collected)

Solutions:

public class PunctuationProcessor implements Processor<String, String, String, String> {
    private ProcessorContext<String, String> context;
    private Cancellable punctuator;
    
    @Override
    public void init(ProcessorContext<String, String> context) {
        this.context = context;
        
        // STREAM_TIME: Advances only when records processed
        this.punctuator = context.schedule(
            Duration.ofSeconds(10),
            PunctuationType.STREAM_TIME,
            timestamp -> {
                System.out.println("Stream time punctuation at: " + timestamp);
                // Only fires when records are processed
            }
        );
        
        // WALL_CLOCK_TIME: Advances with real time
        context.schedule(
            Duration.ofSeconds(10),
            PunctuationType.WALL_CLOCK_TIME,
            timestamp -> {
                System.out.println("Wall clock punctuation at: " + timestamp);
                // Fires every 10 seconds regardless of records
            }
        );
        
        // IMPORTANT: Store Cancellable to prevent GC
        // If not stored, may be garbage collected and stop firing
    }
    
    @Override
    public void process(Record<String, String> record) {
        // Regular processing
        context.forward(record);
    }
    
    @Override
    public void close() {
        // Cancel punctuation
        if (punctuator != null) {
            punctuator.cancel();
        }
    }
}

Prevention:

  • Use WALL_CLOCK_TIME for time-based operations
  • Use STREAM_TIME for record-based operations
  • Store Cancellable reference
  • Verify records are flowing

Edge Cases

Forwarding to Multiple Children

// Processor with multiple downstream processors
Topology topology = new Topology();
topology.addSource("Source", "input-topic");
topology.addProcessor("Process", () -> new MultiForwardProcessor(), "Source");
topology.addSink("Sink1", "output-topic-1", "Process");
topology.addSink("Sink2", "output-topic-2", "Process");

public class MultiForwardProcessor implements Processor<String, String, String, String> {
    private ProcessorContext<String, String> context;
    
    @Override
    public void init(ProcessorContext<String, String> context) {
        this.context = context;
    }
    
    @Override
    public void process(Record<String, String> record) {
        // Forward to specific child by name
        if (record.value().startsWith("A")) {
            context.forward(record, "Sink1");
        } else if (record.value().startsWith("B")) {
            context.forward(record, "Sink2");
        } else {
            // Forward to all children (default)
            context.forward(record);
        }
    }
}

Accessing Multiple State Stores

public class MultiStoreProcessor implements Processor<String, String, String, String> {
    private ProcessorContext<String, String> context;
    private KeyValueStore<String, Long> countsStore;
    private KeyValueStore<String, String> metadataStore;
    
    @Override
    public void init(ProcessorContext<String, String> context) {
        this.context = context;
        
        // Access multiple stores
        this.countsStore = context.getStateStore("counts-store");
        this.metadataStore = context.getStateStore("metadata-store");
        
        if (countsStore == null || metadataStore == null) {
            throw new IllegalStateException("Required stores not available");
        }
    }
    
    @Override
    public void process(Record<String, String> record) {
        // Use both stores
        Long count = countsStore.get(record.key());
        String metadata = metadataStore.get(record.key());
        
        countsStore.put(record.key(), count == null ? 1L : count + 1);
        metadataStore.put(record.key(), record.value());
        
        context.forward(record);
    }
}

// Connect both stores in topology
Topology topology = new Topology();
topology.addSource("Source", "input-topic");
topology.addProcessor("Process", () -> new MultiStoreProcessor(), "Source");

// Add both stores
StoreBuilder<KeyValueStore<String, Long>> countsBuilder = 
    Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("counts-store"),
        Serdes.String(), Serdes.Long());

StoreBuilder<KeyValueStore<String, String>> metadataBuilder = 
    Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("metadata-store"),
        Serdes.String(), Serdes.String());

topology.addStateStore(countsBuilder, "Process");
topology.addStateStore(metadataBuilder, "Process");

topology.addSink("Sink", "output-topic", "Process");

Best Practices

Resource Management in Processors

public class ResourceManagedProcessor implements Processor<String, String, String, String> {
    private ProcessorContext<String, String> context;
    private KeyValueStore<String, Long> store;
    private ExecutorService executor;
    private ScheduledExecutorService scheduler;
    
    @Override
    public void init(ProcessorContext<String, String> context) {
        this.context = context;
        this.store = context.getStateStore("my-store");
        
        // Initialize resources
        this.executor = Executors.newFixedThreadPool(5);
        this.scheduler = Executors.newScheduledThreadPool(1);
        
        // Schedule cleanup
        scheduler.scheduleAtFixedRate(
            this::cleanupOldEntries,
            1, 1, TimeUnit.HOURS
        );
    }
    
    @Override
    public void process(Record<String, String> record) {
        // Process record
        store.put(record.key(), System.currentTimeMillis());
        context.forward(record);
    }
    
    @Override
    public void close() {
        // Clean up all resources
        System.out.println("Closing processor resources");
        
        if (scheduler != null) {
            scheduler.shutdown();
            try {
                if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
                    scheduler.shutdownNow();
                }
            } catch (InterruptedException e) {
                scheduler.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }
        
        if (executor != null) {
            executor.shutdown();
            try {
                if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
                    executor.shutdownNow();
                }
            } catch (InterruptedException e) {
                executor.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }
    }
    
    private void cleanupOldEntries() {
        // Cleanup logic
        long cutoffTime = System.currentTimeMillis() - 86400000; // 24 hours
        
        try (KeyValueIterator<String, Long> iterator = store.all()) {
            while (iterator.hasNext()) {
                KeyValue<String, Long> entry = iterator.next();
                if (entry.value < cutoffTime) {
                    store.delete(entry.key);
                }
            }
        }
    }
}