or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

assertion-matching.mdcontainer-testing.mdembedded-brokers.mdindex.mdjunit-integration.mdtest-utilities.mdtesting-annotations.md

container-testing.mddocs/

0

# Container Testing

1

2

Utilities for testing Spring Kafka listener containers. Provides methods for waiting on partition assignments and container lifecycle management without hard dependencies on container classes.

3

4

## Capabilities

5

6

### Container Utilities

7

8

Utilities for testing listener containers with partition assignment waiting and lifecycle management.

9

10

```java { .api }

11

/**

12

* Utilities for testing listener containers

13

* No hard references to container classes are used to avoid circular project dependencies

14

*/

15

public final class ContainerTestUtils {

16

/**

17

* Wait until the container has the required number of assigned partitions

18

* @param container the container

19

* @param partitions the number of partitions

20

* @throws IllegalStateException if the operation cannot be completed as expected

21

* @throws ContainerTestUtilsException if the call to the container's getAssignedPartitions() method fails

22

*/

23

public static void waitForAssignment(Object container, int partitions);

24

}

25

26

/**

27

* Exception thrown when container test utilities fail

28

*/

29

public static class ContainerTestUtilsException extends RuntimeException {

30

ContainerTestUtilsException(String message, Throwable cause);

31

}

32

```

33

34

**Usage Examples:**

35

36

```java

37

// Basic container assignment waiting

38

@SpringBootTest

39

@EmbeddedKafka(partitions = 3, topics = { "test-topic" })

40

public class ContainerAssignmentTest {

41

42

@Autowired

43

private EmbeddedKafkaBroker embeddedKafka;

44

45

@Test

46

public void testSingleContainerAssignment() throws Exception {

47

// Configure consumer properties

48

Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("test-group", "false", embeddedKafka);

49

50

// Create container factory

51

ConcurrentKafkaListenerContainerFactory<Integer, String> factory =

52

new ConcurrentKafkaListenerContainerFactory<>();

53

factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));

54

55

// Create container

56

ContainerProperties containerProperties = new ContainerProperties("test-topic");

57

containerProperties.setGroupId("test-group");

58

59

KafkaMessageListenerContainer<Integer, String> container =

60

new KafkaMessageListenerContainer<>(factory.getConsumerFactory(), containerProperties);

61

62

container.start();

63

try {

64

// Wait for all 3 partitions to be assigned

65

ContainerTestUtils.waitForAssignment(container, 3);

66

67

// Container is now ready for testing

68

assertTrue(container.isRunning());

69

70

} finally {

71

container.stop();

72

}

73

}

74

}

75

76

// Multi-container assignment waiting

77

@SpringBootTest

78

@EmbeddedKafka(partitions = 6, topics = { "multi-partition-topic" })

79

public class MultiContainerAssignmentTest {

80

81

@Autowired

82

private EmbeddedKafkaBroker embeddedKafka;

83

84

@Test

85

public void testConcurrentContainerAssignment() throws Exception {

86

Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("multi-group", "false", embeddedKafka);

87

88

// Create concurrent container factory

89

ConcurrentKafkaListenerContainerFactory<String, String> factory =

90

new ConcurrentKafkaListenerContainerFactory<>();

91

factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));

92

factory.setConcurrency(3); // 3 consumer threads

93

94

// Create concurrent container

95

ContainerProperties containerProperties = new ContainerProperties("multi-partition-topic");

96

containerProperties.setGroupId("multi-group");

97

98

ConcurrentMessageListenerContainer<String, String> container =

99

new ConcurrentMessageListenerContainer<>(factory.getConsumerFactory(), containerProperties);

100

container.setConcurrency(3);

101

102

container.start();

103

try {

104

// Wait for all 6 partitions to be assigned across the 3 concurrent containers

105

ContainerTestUtils.waitForAssignment(container, 6);

106

107

// All partitions are now assigned

108

assertTrue(container.isRunning());

109

110

} finally {

111

container.stop();

112

}

113

}

114

}

115

116

// Integration test with message processing

117

@SpringBootTest

118

@EmbeddedKafka(partitions = 2, topics = { "processing-topic" })

119

public class MessageProcessingContainerTest {

120

121

@Autowired

122

private EmbeddedKafkaBroker embeddedKafka;

123

124

private final CountDownLatch latch = new CountDownLatch(3);

125

private final List<String> receivedMessages = new ArrayList<>();

126

127

@Test

128

public void testMessageProcessingWithContainer() throws Exception {

129

Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("processing-group", "false", embeddedKafka);

130

131

// Create container with message listener

132

ContainerProperties containerProperties = new ContainerProperties("processing-topic");

133

containerProperties.setGroupId("processing-group");

134

containerProperties.setMessageListener(new MessageListener<String, String>() {

135

@Override

136

public void onMessage(ConsumerRecord<String, String> record) {

137

receivedMessages.add(record.value());

138

latch.countDown();

139

}

140

});

141

142

DefaultKafkaConsumerFactory<String, String> consumerFactory =

143

new DefaultKafkaConsumerFactory<>(consumerProps);

144

145

KafkaMessageListenerContainer<String, String> container =

146

new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);

147

148

container.start();

149

try {

150

// Wait for partition assignment

151

ContainerTestUtils.waitForAssignment(container, 2);

152

153

// Send test messages

154

Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);

155

try (Producer<String, String> producer = new KafkaProducer<>(producerProps)) {

156

producer.send(new ProducerRecord<>("processing-topic", "key1", "message1"));

157

producer.send(new ProducerRecord<>("processing-topic", "key2", "message2"));

158

producer.send(new ProducerRecord<>("processing-topic", "key3", "message3"));

159

}

160

161

// Wait for messages to be processed

162

assertTrue(latch.await(30, TimeUnit.SECONDS));

163

164

// Verify messages were received

165

assertEquals(3, receivedMessages.size());

166

assertTrue(receivedMessages.contains("message1"));

167

assertTrue(receivedMessages.contains("message2"));

168

assertTrue(receivedMessages.contains("message3"));

169

170

} finally {

171

container.stop();

172

}

173

}

174

}

175

176

// Error handling and timeout scenarios

177

@SpringBootTest

178

@EmbeddedKafka(partitions = 1, topics = { "error-topic" })

179

public class ContainerErrorHandlingTest {

180

181

@Autowired

182

private EmbeddedKafkaBroker embeddedKafka;

183

184

@Test

185

public void testAssignmentTimeout() {

186

Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("error-group", "false", embeddedKafka);

187

188

// Create container with invalid topic (should not exist)

189

ContainerProperties containerProperties = new ContainerProperties("non-existent-topic");

190

containerProperties.setGroupId("error-group");

191

192

DefaultKafkaConsumerFactory<String, String> consumerFactory =

193

new DefaultKafkaConsumerFactory<>(consumerProps);

194

195

KafkaMessageListenerContainer<String, String> container =

196

new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);

197

198

container.start();

199

try {

200

// This should throw IllegalStateException due to no partitions being assigned

201

assertThrows(IllegalStateException.class, () -> {

202

ContainerTestUtils.waitForAssignment(container, 1);

203

});

204

205

} finally {

206

container.stop();

207

}

208

}

209

210

@Test

211

public void testPartialAssignment() throws Exception {

212

Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("partial-group", "false", embeddedKafka);

213

214

ContainerProperties containerProperties = new ContainerProperties("error-topic");

215

containerProperties.setGroupId("partial-group");

216

217

DefaultKafkaConsumerFactory<String, String> consumerFactory =

218

new DefaultKafkaConsumerFactory<>(consumerProps);

219

220

KafkaMessageListenerContainer<String, String> container =

221

new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);

222

223

container.start();

224

try {

225

// Wait for actual partition count (1), not more

226

ContainerTestUtils.waitForAssignment(container, 1);

227

228

// This should throw because we expect more partitions than available

229

assertThrows(IllegalStateException.class, () -> {

230

ContainerTestUtils.waitForAssignment(container, 5);

231

});

232

233

} finally {

234

container.stop();

235

}

236

}

237

}

238

239

// Custom container implementations

240

@SpringBootTest

241

@EmbeddedKafka(partitions = 4, topics = { "custom-topic" })

242

public class CustomContainerTest {

243

244

@Autowired

245

private EmbeddedKafkaBroker embeddedKafka;

246

247

@Test

248

public void testCustomContainerImplementation() throws Exception {

249

// Create a custom container that implements getAssignedPartitions()

250

CustomKafkaContainer customContainer = new CustomKafkaContainer(embeddedKafka, "custom-topic");

251

252

customContainer.start();

253

try {

254

// ContainerTestUtils uses reflection to call getAssignedPartitions()

255

ContainerTestUtils.waitForAssignment(customContainer, 4);

256

257

// Verify custom container is working

258

assertTrue(customContainer.isRunning());

259

assertEquals(4, customContainer.getAssignedPartitions().size());

260

261

} finally {

262

customContainer.stop();

263

}

264

}

265

266

private static class CustomKafkaContainer {

267

private Consumer<String, String> consumer;

268

private boolean running = false;

269

private Collection<TopicPartition> assignedPartitions = new HashSet<>();

270

271

public CustomKafkaContainer(EmbeddedKafkaBroker broker, String topic) {

272

Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("custom-group", "false", broker);

273

this.consumer = new KafkaConsumer<>(consumerProps);

274

}

275

276

public void start() {

277

consumer.subscribe(Arrays.asList("custom-topic"), new ConsumerRebalanceListener() {

278

@Override

279

public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

280

assignedPartitions.clear();

281

}

282

283

@Override

284

public void onPartitionsAssigned(Collection<TopicPartition> partitions) {

285

assignedPartitions = new HashSet<>(partitions);

286

}

287

});

288

289

// Trigger initial assignment

290

consumer.poll(Duration.ofMillis(100));

291

running = true;

292

}

293

294

public void stop() {

295

running = false;

296

if (consumer != null) {

297

consumer.close();

298

}

299

}

300

301

public boolean isRunning() {

302

return running;

303

}

304

305

// This method will be called by ContainerTestUtils via reflection

306

public Collection<TopicPartition> getAssignedPartitions() {

307

if (!running) {

308

return Collections.emptyList();

309

}

310

311

// Poll to trigger rebalance if needed

312

consumer.poll(Duration.ofMillis(100));

313

return new HashSet<>(assignedPartitions);

314

}

315

}

316

}

317

318

// Kafka Streams container testing

319

@SpringBootTest

320

@EmbeddedKafka(partitions = 2, topics = { "input-topic", "output-topic" })

321

public class KafkaStreamsContainerTest {

322

323

@Autowired

324

private EmbeddedKafkaBroker embeddedKafka;

325

326

@Test

327

public void testKafkaStreamsWithContainerUtils() throws Exception {

328

// Setup Kafka Streams

329

Map<String, Object> streamsProps = KafkaTestUtils.streamsProps("streams-app", embeddedKafka.getBrokersAsString());

330

331

StreamsBuilder builder = new StreamsBuilder();

332

builder.stream("input-topic")

333

.mapValues(value -> value.toString().toUpperCase())

334

.to("output-topic");

335

336

Topology topology = builder.build();

337

KafkaStreams streams = new KafkaStreams(topology, new Properties(streamsProps));

338

339

// Create output consumer

340

Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("output-group", "false", embeddedKafka);

341

342

ContainerProperties containerProperties = new ContainerProperties("output-topic");

343

containerProperties.setGroupId("output-group");

344

345

DefaultKafkaConsumerFactory<String, String> consumerFactory =

346

new DefaultKafkaConsumerFactory<>(consumerProps);

347

348

KafkaMessageListenerContainer<String, String> outputContainer =

349

new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);

350

351

try {

352

streams.start();

353

outputContainer.start();

354

355

// Wait for output topic partition assignment

356

ContainerTestUtils.waitForAssignment(outputContainer, 2);

357

358

// Send input message

359

Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);

360

try (Producer<String, String> producer = new KafkaProducer<>(producerProps)) {

361

producer.send(new ProducerRecord<>("input-topic", "key", "hello streams"));

362

}

363

364

// Verify output

365

Map<String, Object> outputConsumerProps = KafkaTestUtils.consumerProps("verify-group", "false", embeddedKafka);

366

try (Consumer<String, String> verifyConsumer = new KafkaConsumer<>(outputConsumerProps)) {

367

embeddedKafka.consumeFromAnEmbeddedTopic(verifyConsumer, "output-topic");

368

ConsumerRecord<String, String> record = KafkaTestUtils.getSingleRecord(verifyConsumer, "output-topic");

369

assertEquals("HELLO STREAMS", record.value());

370

}

371

372

} finally {

373

streams.close();

374

outputContainer.stop();

375

}

376

}

377

}

378

```