or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

assertion-matching.mdcontainer-testing.mdembedded-brokers.mdindex.mdjunit-integration.mdtest-utilities.mdtesting-annotations.md

junit-integration.mddocs/

0

# JUnit Integration

1

2

JUnit rules and extensions for managing embedded Kafka broker lifecycle. Supports both JUnit 4 rules and JUnit 5 extensions with automatic broker setup and teardown.

3

4

## Capabilities

5

6

### JUnit 4 Rule Support

7

8

JUnit 4 rule wrapper for embedded Kafka broker with automatic lifecycle management.

9

10

```java { .api }

11

/**

12

* A JUnit rules TestRule wrapper around an EmbeddedKafkaBroker

13

*/

14

public class EmbeddedKafkaRule extends ExternalResource {

15

/**

16

* Create embedded Kafka brokers

17

* @param count the number of brokers

18

*/

19

public EmbeddedKafkaRule(int count);

20

21

/**

22

* Create embedded Kafka brokers

23

* @param count the number of brokers

24

* @param controlledShutdown passed into TestUtils.createBrokerConfig

25

* @param topics the topics to create (2 partitions per)

26

*/

27

public EmbeddedKafkaRule(int count, boolean controlledShutdown, String... topics);

28

29

/**

30

* Create embedded Kafka brokers listening on random ports

31

* @param count the number of brokers

32

* @param controlledShutdown passed into TestUtils.createBrokerConfig

33

* @param partitions partitions per topic

34

* @param topics the topics to create

35

*/

36

public EmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics);

37

38

/**

39

* Specify the properties to configure Kafka Broker before start

40

* @param brokerProperties the properties to use for configuring Kafka Broker(s)

41

* @return this for chaining configuration

42

*/

43

public EmbeddedKafkaRule brokerProperties(Map<String, String> brokerProperties);

44

45

/**

46

* Specify a broker property

47

* @param property the property name

48

* @param value the value

49

* @return the EmbeddedKafkaRule

50

*/

51

public EmbeddedKafkaRule brokerProperty(String property, Object value);

52

53

/**

54

* Set explicit ports on which the kafka brokers will listen

55

* @param kafkaPorts the ports

56

* @return the rule

57

*/

58

public EmbeddedKafkaRule kafkaPorts(int... kafkaPorts);

59

60

/**

61

* Set ZooKeeper port

62

* @param port the port

63

* @return the rule

64

*/

65

public EmbeddedKafkaRule zkPort(int port);

66

67

/**

68

* Return an underlying delegator EmbeddedKafkaBroker instance

69

* @return the EmbeddedKafkaBroker instance

70

*/

71

public EmbeddedKafkaBroker getEmbeddedKafka();

72

}

73

```

74

75

### JUnit 5 Extension Support

76

77

JUnit 5 condition and extension for embedded broker setup with parameter injection support.

78

79

```java { .api }

80

/**

81

* JUnit5 condition for an embedded broker

82

*/

83

public class EmbeddedKafkaCondition implements ExecutionCondition, AfterAllCallback, ParameterResolver {

84

/**

85

* Check if parameter is supported for injection

86

* @param parameterContext the parameter context

87

* @param extensionContext the extension context

88

* @return true if EmbeddedKafkaBroker parameter is supported

89

*/

90

public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext);

91

92

/**

93

* Resolve parameter for injection

94

* @param parameterContext the parameter context

95

* @param context the extension context

96

* @return the EmbeddedKafkaBroker instance

97

*/

98

public Object resolveParameter(ParameterContext parameterContext, ExtensionContext context);

99

100

/**

101

* Clean up after all tests

102

* @param context the extension context

103

*/

104

public void afterAll(ExtensionContext context);

105

106

/**

107

* Evaluate execution condition

108

* @param context the extension context

109

* @return condition evaluation result

110

*/

111

public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context);

112

113

/**

114

* Get the current thread's broker instance

115

* @return the EmbeddedKafkaBroker

116

*/

117

public static EmbeddedKafkaBroker getBroker();

118

}

119

```

120

121

### Global Test Execution Listener

122

123

Spring TestContext integration for global embedded Kafka lifecycle management.

124

125

```java { .api }

126

/**

127

* Global test execution listener for embedded Kafka lifecycle

128

*/

129

public class GlobalEmbeddedKafkaTestExecutionListener implements TestExecutionListener {

130

// Automatic lifecycle management through Spring TestContext

131

}

132

```

133

134

**Usage Examples:**

135

136

```java

137

// JUnit 4 Rule - Basic Setup

138

public class JUnit4KafkaTest {

139

@Rule

140

public EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, false, "test-topic");

141

142

@Test

143

public void testKafkaWithRule() throws Exception {

144

EmbeddedKafkaBroker broker = embeddedKafka.getEmbeddedKafka();

145

146

// Create consumer and producer

147

Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("test-group", "false", broker);

148

Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);

149

150

try (Consumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);

151

Producer<Integer, String> producer = new KafkaProducer<>(producerProps)) {

152

153

broker.consumeFromAnEmbeddedTopic(consumer, "test-topic");

154

producer.send(new ProducerRecord<>("test-topic", 1, "test-message"));

155

156

ConsumerRecord<Integer, String> record = KafkaTestUtils.getSingleRecord(consumer, "test-topic");

157

assertEquals("test-message", record.value());

158

}

159

}

160

}

161

162

// JUnit 4 Rule - Advanced Configuration

163

public class JUnit4AdvancedKafkaTest {

164

@Rule

165

public EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(2, true, 3, "orders", "payments")

166

.brokerProperties(Map.of(

167

"auto.create.topics.enable", "true",

168

"transaction.state.log.replication.factor", "1"

169

))

170

.brokerProperty("offsets.topic.replication.factor", "1")

171

.kafkaPorts(9092, 9093)

172

.zkPort(2181);

173

174

@Test

175

public void testMultiBrokerSetup() {

176

EmbeddedKafkaBroker broker = embeddedKafka.getEmbeddedKafka();

177

178

assertTrue(broker.getTopics().contains("orders"));

179

assertTrue(broker.getTopics().contains("payments"));

180

assertTrue(broker.getBrokersAsString().contains("9092"));

181

assertTrue(broker.getBrokersAsString().contains("9093"));

182

}

183

}

184

185

// JUnit 5 Extension - Parameter Injection

186

@ExtendWith(EmbeddedKafkaCondition.class)

187

@EmbeddedKafka(partitions = 1, topics = { "test-topic" })

188

public class JUnit5ParameterInjectionTest {

189

190

@Test

191

public void testWithInjectedBroker(EmbeddedKafkaBroker embeddedKafka) throws Exception {

192

// Broker is automatically injected

193

Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("test-group", "false", embeddedKafka);

194

Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);

195

196

try (Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);

197

Producer<String, String> producer = new KafkaProducer<>(producerProps)) {

198

199

embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "test-topic");

200

producer.send(new ProducerRecord<>("test-topic", "key", "value"));

201

202

ConsumerRecord<String, String> record = KafkaTestUtils.getSingleRecord(consumer, "test-topic");

203

assertEquals("value", record.value());

204

}

205

}

206

207

@Test

208

public void testStaticBrokerAccess() {

209

// Alternative access via static method

210

EmbeddedKafkaBroker broker = EmbeddedKafkaCondition.getBroker();

211

assertNotNull(broker);

212

assertTrue(broker.getTopics().contains("test-topic"));

213

}

214

}

215

216

// JUnit 5 Extension - Without Spring

217

@EmbeddedKafka(count = 2, partitions = 2, topics = { "events", "commands" })

218

public class JUnit5NonSpringTest {

219

220

@Test

221

public void testNonSpringSetup() {

222

// Access broker via static method when not using parameter injection

223

EmbeddedKafkaBroker broker = EmbeddedKafkaCondition.getBroker();

224

225

assertEquals(2, broker.getPartitionsPerTopic());

226

assertTrue(broker.getTopics().contains("events"));

227

assertTrue(broker.getTopics().contains("commands"));

228

}

229

}

230

231

// JUnit 5 with Spring TestContext

232

@SpringBootTest

233

@EmbeddedKafka(partitions = 1, topics = { "integration-topic" })

234

public class JUnit5SpringIntegrationTest {

235

236

@Autowired

237

private EmbeddedKafkaBroker embeddedKafka;

238

239

@Test

240

public void testSpringIntegration() {

241

// Broker is automatically configured by Spring TestContext

242

assertNotNull(embeddedKafka);

243

assertTrue(embeddedKafka.getTopics().contains("integration-topic"));

244

}

245

}

246

247

// Complex JUnit 4 Test with Multiple Rules

248

public class ComplexJUnit4Test {

249

250

@Rule

251

public EmbeddedKafkaRule kafkaRule = new EmbeddedKafkaRule(1, false, 1, "source-topic", "sink-topic")

252

.brokerProperty("auto.create.topics.enable", "false");

253

254

@Rule

255

public TestRule chain = RuleChain

256

.outerRule(kafkaRule)

257

.around(new CustomTestRule());

258

259

@Test

260

public void testKafkaStreamsProcessing() throws Exception {

261

EmbeddedKafkaBroker broker = kafkaRule.getEmbeddedKafka();

262

263

// Setup Kafka Streams

264

Map<String, Object> streamsProps = KafkaTestUtils.streamsProps("test-streams", broker.getBrokersAsString());

265

266

// Build topology

267

StreamsBuilder builder = new StreamsBuilder();

268

builder.stream("source-topic")

269

.mapValues(value -> value.toString().toUpperCase())

270

.to("sink-topic");

271

272

Topology topology = builder.build();

273

274

try (KafkaStreams streams = new KafkaStreams(topology, new Properties(streamsProps))) {

275

streams.start();

276

277

// Send input message

278

Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);

279

try (Producer<String, String> producer = new KafkaProducer<>(producerProps)) {

280

producer.send(new ProducerRecord<>("source-topic", "key", "hello world"));

281

}

282

283

// Verify output

284

Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("test-group", "false", broker);

285

try (Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {

286

broker.consumeFromAnEmbeddedTopic(consumer, "sink-topic");

287

ConsumerRecord<String, String> record = KafkaTestUtils.getSingleRecord(consumer, "sink-topic");

288

assertEquals("HELLO WORLD", record.value());

289

}

290

}

291

}

292

}

293

294

// Nested Test Classes with JUnit 5

295

@EmbeddedKafka(partitions = 1, topics = { "parent-topic" })

296

public class NestedJUnit5Test {

297

298

@Nested

299

@EmbeddedKafka(partitions = 2, topics = { "nested-topic" })

300

class NestedTests {

301

302

@Test

303

public void testNestedBroker(EmbeddedKafkaBroker embeddedKafka) {

304

// This broker has the nested configuration

305

assertTrue(embeddedKafka.getTopics().contains("nested-topic"));

306

assertEquals(2, embeddedKafka.getPartitionsPerTopic());

307

}

308

}

309

310

@Test

311

public void testParentBroker(EmbeddedKafkaBroker embeddedKafka) {

312

// This broker has the parent configuration

313

assertTrue(embeddedKafka.getTopics().contains("parent-topic"));

314

assertEquals(1, embeddedKafka.getPartitionsPerTopic());

315

}

316

}

317

```