or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

assertion-matching.mdcontainer-testing.mdembedded-brokers.mdindex.mdjunit-integration.mdtest-utilities.mdtesting-annotations.md

index.mddocs/

0

# Spring Kafka Test

1

2

Spring Kafka Test provides comprehensive testing support for Spring Kafka applications through embedded Kafka broker functionality, testing utilities, assertion helpers, and container testing support. It enables reliable integration testing that validates message production, consumption, and processing workflows in isolated test environments.

3

4

## Package Information

5

6

- **Package Name**: spring-kafka-test

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Installation**:

10

```xml

11

<dependency>

12

<groupId>org.springframework.kafka</groupId>

13

<artifactId>spring-kafka-test</artifactId>

14

<version>3.3.7</version>

15

<scope>test</scope>

16

</dependency>

17

```

18

19

## Core Imports

20

21

```java

22

import org.springframework.kafka.test.EmbeddedKafkaBroker;

23

import org.springframework.kafka.test.EmbeddedKafkaZKBroker;

24

import org.springframework.kafka.test.EmbeddedKafkaKraftBroker;

25

import org.springframework.kafka.test.context.EmbeddedKafka;

26

import org.springframework.kafka.test.utils.KafkaTestUtils;

27

```

28

29

## Basic Usage

30

31

```java

32

@SpringBootTest

33

@EmbeddedKafka(partitions = 1, topics = { "test-topic" })

34

public class KafkaIntegrationTest {

35

36

@Autowired

37

private EmbeddedKafkaBroker embeddedKafka;

38

39

@Test

40

public void testKafkaProducerConsumer() throws Exception {

41

// Create consumer properties

42

Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("test-group", "false", embeddedKafka);

43

Consumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);

44

45

// Subscribe to embedded topic

46

embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "test-topic");

47

48

// Create producer properties

49

Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);

50

Producer<Integer, String> producer = new KafkaProducer<>(producerProps);

51

52

// Send message

53

producer.send(new ProducerRecord<>("test-topic", 1, "test-message"));

54

55

// Consume and verify

56

ConsumerRecord<Integer, String> record = KafkaTestUtils.getSingleRecord(consumer, "test-topic");

57

assertThat(record.value()).isEqualTo("test-message");

58

}

59

}

60

```

61

62

## Architecture

63

64

Spring Kafka Test is built around several key components:

65

66

- **Embedded Brokers**: Both ZooKeeper-based (`EmbeddedKafkaZKBroker`) and KRaft-based (`EmbeddedKafkaKraftBroker`) implementations for running Kafka in test environments

67

- **Testing Annotations**: `@EmbeddedKafka` annotation for declarative test setup with Spring TestContext

68

- **Utility Classes**: Helper methods for creating test configurations, polling records, and common testing operations

69

- **Assertion Support**: Both Hamcrest matchers and AssertJ conditions for validating Kafka message behavior

70

- **JUnit Integration**: Rules for JUnit 4 and extensions for JUnit 5 to manage broker lifecycle

71

- **Container Testing**: Utilities for testing Spring Kafka listener containers

72

73

## Capabilities

74

75

### Embedded Kafka Brokers

76

77

Core embedded Kafka broker functionality providing both ZooKeeper-based and KRaft-based implementations. Essential for running integration tests with real Kafka instances without external dependencies.

78

79

```java { .api }

80

public interface EmbeddedKafkaBroker extends InitializingBean, DisposableBean {

81

int DEFAULT_ADMIN_TIMEOUT = 10;

82

String BEAN_NAME = "embeddedKafka";

83

String BROKER_LIST_PROPERTY = "spring.embedded.kafka.brokers.property";

84

String SPRING_EMBEDDED_KAFKA_BROKERS = "spring.embedded.kafka.brokers";

85

86

EmbeddedKafkaBroker kafkaPorts(int... ports);

87

EmbeddedKafkaBroker brokerProperties(Map<String, String> properties);

88

EmbeddedKafkaBroker brokerListProperty(String brokerListProperty);

89

EmbeddedKafkaBroker adminTimeout(int adminTimeout);

90

String getBrokersAsString();

91

void addTopics(String... topicsToAdd);

92

void addTopics(NewTopic... topicsToAdd);

93

Map<String, Exception> addTopicsWithResults(String... topicsToAdd);

94

Map<String, Exception> addTopicsWithResults(NewTopic... topicsToAdd);

95

void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, boolean seekToEnd, String... topicsToConsume);

96

void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, String... topicsToConsume);

97

void consumeFromAnEmbeddedTopic(Consumer<?, ?> consumer, boolean seekToEnd, String topic);

98

void consumeFromAnEmbeddedTopic(Consumer<?, ?> consumer, String topic);

99

void consumeFromAllEmbeddedTopics(Consumer<?, ?> consumer, boolean seekToEnd);

100

void consumeFromAllEmbeddedTopics(Consumer<?, ?> consumer);

101

Set<String> getTopics();

102

int getPartitionsPerTopic();

103

}

104

```

105

106

[Embedded Brokers](./embedded-brokers.md)

107

108

### Testing Annotations

109

110

Annotation-based configuration for embedded Kafka in Spring tests. Provides declarative setup with automatic broker lifecycle management.

111

112

```java { .api }

113

@Target(ElementType.TYPE)

114

@Retention(RetentionPolicy.RUNTIME)

115

@ExtendWith(EmbeddedKafkaCondition.class)

116

public @interface EmbeddedKafka {

117

int value() default 1;

118

int count() default 1;

119

boolean controlledShutdown() default false;

120

int[] ports() default { 0 };

121

int zookeeperPort() default 0;

122

int partitions() default 2;

123

String[] topics() default {};

124

String[] brokerProperties() default {};

125

String brokerPropertiesLocation() default "";

126

String bootstrapServersProperty() default "spring.kafka.bootstrap-servers";

127

int zkConnectionTimeout() default EmbeddedKafkaZKBroker.DEFAULT_ZK_CONNECTION_TIMEOUT;

128

int zkSessionTimeout() default EmbeddedKafkaZKBroker.DEFAULT_ZK_SESSION_TIMEOUT;

129

int adminTimeout() default EmbeddedKafkaBroker.DEFAULT_ADMIN_TIMEOUT;

130

boolean kraft() default false;

131

}

132

```

133

134

[Testing Annotations](./testing-annotations.md)

135

136

### Test Utilities

137

138

Comprehensive testing utilities for creating configurations, polling messages, and common test operations. Simplifies Kafka test setup and message verification.

139

140

```java { .api }

141

public final class KafkaTestUtils {

142

public static Map<String, Object> consumerProps(String group, String autoCommit, EmbeddedKafkaBroker embeddedKafka);

143

public static Map<String, Object> consumerProps(String brokers, String group);

144

public static Map<String, Object> consumerProps(String brokers, String group, String autoCommit);

145

public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafka);

146

public static Map<String, Object> producerProps(String brokers);

147

public static Map<String, Object> streamsProps(String applicationId, String brokers);

148

public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic);

149

public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic, Duration timeout);

150

@Nullable

151

public static ConsumerRecord<?, ?> getOneRecord(String brokerAddresses, String group, String topic, int partition, boolean seekToLast, boolean commit, Duration timeout);

152

public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer);

153

public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer, Duration timeout);

154

public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer, Duration timeout, int minRecords);

155

public static OffsetAndMetadata getCurrentOffset(String brokerAddresses, String group, String topic, int partition) throws Exception;

156

public static OffsetAndMetadata getCurrentOffset(AdminClient adminClient, String group, String topic, int partition) throws Exception;

157

public static Map<TopicPartition, Long> getEndOffsets(Consumer<?, ?> consumer, String topic, Integer... partitions);

158

public static Object getPropertyValue(Object root, String propertyPath);

159

public static <T> T getPropertyValue(Object root, String propertyPath, Class<T> type);

160

public static Properties defaultPropertyOverrides();

161

}

162

```

163

164

[Test Utilities](./test-utilities.md)

165

166

### Assertion and Matching

167

168

Hamcrest matchers and AssertJ conditions for validating Kafka message behavior. Provides fluent assertions for record keys, values, partitions, and timestamps.

169

170

```java { .api }

171

public final class KafkaMatchers {

172

public static <K> Matcher<ConsumerRecord<K, ?>> hasKey(K key);

173

public static <V> Matcher<ConsumerRecord<?, V>> hasValue(V value);

174

public static Matcher<ConsumerRecord<?, ?>> hasPartition(int partition);

175

public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(long ts);

176

}

177

178

public final class KafkaConditions {

179

public static <K> Condition<ConsumerRecord<K, ?>> key(K key);

180

public static <V> Condition<ConsumerRecord<?, V>> value(V value);

181

public static <K, V> Condition<ConsumerRecord<K, V>> keyValue(K key, V value);

182

public static Condition<ConsumerRecord<?, ?>> partition(int partition);

183

}

184

```

185

186

[Assertion and Matching](./assertion-matching.md)

187

188

### JUnit Integration

189

190

JUnit rules and extensions for managing embedded Kafka broker lifecycle. Supports both JUnit 4 rules and JUnit 5 extensions.

191

192

```java { .api }

193

public class EmbeddedKafkaRule extends ExternalResource {

194

public EmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics);

195

public EmbeddedKafkaRule brokerProperties(Map<String, String> brokerProperties);

196

public EmbeddedKafkaRule kafkaPorts(int... kafkaPorts);

197

public EmbeddedKafkaBroker getEmbeddedKafka();

198

}

199

200

public class EmbeddedKafkaCondition implements ExecutionCondition, AfterAllCallback, ParameterResolver {

201

public static EmbeddedKafkaBroker getBroker();

202

}

203

```

204

205

[JUnit Integration](./junit-integration.md)

206

207

### Container Testing

208

209

Utilities for testing Spring Kafka listener containers. Provides methods for waiting on partition assignments and container lifecycle management.

210

211

```java { .api }

212

public final class ContainerTestUtils {

213

public static void waitForAssignment(Object container, int partitions);

214

}

215

```

216

217

[Container Testing](./container-testing.md)

218

219

## Types

220

221

```java { .api }

222

public class BrokerAddress {

223

public static final int DEFAULT_PORT = 9092;

224

225

public BrokerAddress(String host, int port);

226

public BrokerAddress(String host);

227

public static BrokerAddress fromAddress(String address);

228

public String getHost();

229

public int getPort();

230

}

231

232

public class EmbeddedKafkaZKBroker implements EmbeddedKafkaBroker {

233

public static final int DEFAULT_ZK_SESSION_TIMEOUT = 18000;

234

public static final int DEFAULT_ZK_CONNECTION_TIMEOUT = DEFAULT_ZK_SESSION_TIMEOUT;

235

}

236

```