or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

assertion-matching.mdcontainer-testing.mdembedded-brokers.mdindex.mdjunit-integration.mdtest-utilities.mdtesting-annotations.md

test-utilities.mddocs/

0

# Test Utilities

1

2

Comprehensive testing utilities for creating configurations, polling messages, and common test operations. Simplifies Kafka test setup and message verification patterns.

3

4

## Capabilities

5

6

### Configuration Utilities

7

8

Helper methods for creating standard test configurations for consumers, producers, and Kafka Streams.

9

10

```java { .api }

11

/**

12

* Kafka testing utilities

13

*/

14

public final class KafkaTestUtils {

15

/**

16

* Set up test properties for an <Integer, String> consumer

17

* @param group the group id

18

* @param autoCommit the auto commit

19

* @param embeddedKafka a EmbeddedKafkaBroker instance

20

* @return the properties

21

*/

22

public static Map<String, Object> consumerProps(String group, String autoCommit, EmbeddedKafkaBroker embeddedKafka);

23

24

/**

25

* Set up test properties for an <Integer, String> consumer

26

* @param brokers the bootstrapServers property

27

* @param group the group id

28

* @return the properties

29

*/

30

public static Map<String, Object> consumerProps(String brokers, String group);

31

32

/**

33

* Set up test properties for an <Integer, String> consumer

34

* @param brokers the bootstrapServers property

35

* @param group the group id

36

* @param autoCommit the auto commit

37

* @return the properties

38

*/

39

public static Map<String, Object> consumerProps(String brokers, String group, String autoCommit);

40

41

/**

42

* Set up test properties for an <Integer, String> producer

43

* @param embeddedKafka a EmbeddedKafkaBroker instance

44

* @return the properties

45

*/

46

public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafka);

47

48

/**

49

* Set up test properties for an <Integer, String> producer

50

* @param brokers the bootstrapServers property

51

* @return the properties

52

*/

53

public static Map<String, Object> producerProps(String brokers);

54

55

/**

56

* Set up test properties for the Kafka Streams

57

* @param applicationId the applicationId for the Kafka Streams

58

* @param brokers the bootstrapServers property

59

* @return the properties

60

*/

61

public static Map<String, Object> streamsProps(String applicationId, String brokers);

62

}

63

```

64

65

### Message Polling Utilities

66

67

Methods for polling and retrieving messages from Kafka topics with various timeout and filtering options.

68

69

```java { .api }

70

public final class KafkaTestUtils {

71

/**

72

* Poll the consumer, expecting a single record for the specified topic

73

* @param consumer the consumer

74

* @param topic the topic

75

* @param <K> the key type

76

* @param <V> the value type

77

* @return the record

78

* @throws IllegalStateException if exactly one record is not received

79

*/

80

public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic);

81

82

/**

83

* Poll the consumer, expecting a single record for the specified topic

84

* @param consumer the consumer

85

* @param topic the topic

86

* @param timeout max duration to wait for records

87

* @param <K> the key type

88

* @param <V> the value type

89

* @return the record

90

* @throws IllegalStateException if exactly one record is not received

91

*/

92

public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic, Duration timeout);

93

94

/**

95

* Get a single record for the group from the topic/partition

96

* @param brokerAddresses the broker address(es)

97

* @param group the group

98

* @param topic the topic

99

* @param partition the partition

100

* @param seekToLast true to fetch an existing last record, if present

101

* @param commit commit offset after polling or not

102

* @param timeout the timeout

103

* @return the record or null if no record received

104

*/

105

@Nullable

106

public static ConsumerRecord<?, ?> getOneRecord(String brokerAddresses, String group, String topic, int partition,

107

boolean seekToLast, boolean commit, Duration timeout);

108

109

/**

110

* Poll the consumer for records

111

* @param consumer the consumer

112

* @param <K> the key type

113

* @param <V> the value type

114

* @return the records

115

*/

116

public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer);

117

118

/**

119

* Poll the consumer for records

120

* @param consumer the consumer

121

* @param timeout max time to wait for records

122

* @param <K> the key type

123

* @param <V> the value type

124

* @return the records

125

* @throws IllegalStateException if the poll returns null

126

*/

127

public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer, Duration timeout);

128

129

/**

130

* Poll the consumer for records

131

* @param consumer the consumer

132

* @param timeout max time to wait for records

133

* @param minRecords wait until the timeout or at least this number of records are received

134

* @param <K> the key type

135

* @param <V> the value type

136

* @return the records

137

* @throws IllegalStateException if the poll returns null

138

*/

139

public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer, Duration timeout, int minRecords);

140

}

141

```

142

143

### Offset and Metadata Utilities

144

145

Methods for retrieving consumer group offsets and topic partition metadata.

146

147

```java { .api }

148

public final class KafkaTestUtils {

149

/**

150

* Get the current offset and metadata for the provided group/topic/partition

151

* @param brokerAddresses the broker address(es)

152

* @param group the group

153

* @param topic the topic

154

* @param partition the partition

155

* @return the offset and metadata

156

* @throws Exception if an exception occurs

157

*/

158

public static OffsetAndMetadata getCurrentOffset(String brokerAddresses, String group, String topic, int partition)

159

throws Exception;

160

161

/**

162

* Get the current offset and metadata for the provided group/topic/partition

163

* @param adminClient the AdminClient instance

164

* @param group the group

165

* @param topic the topic

166

* @param partition the partition

167

* @return the offset and metadata

168

* @throws Exception if an exception occurs

169

*/

170

public static OffsetAndMetadata getCurrentOffset(AdminClient adminClient, String group, String topic, int partition)

171

throws Exception;

172

173

/**

174

* Return the end offsets of the requested topic/partitions

175

* @param consumer the consumer

176

* @param topic the topic

177

* @param partitions the partitions, or null for all partitions

178

* @return the map of end offsets

179

*/

180

public static Map<TopicPartition, Long> getEndOffsets(Consumer<?, ?> consumer, String topic, Integer... partitions);

181

}

182

```

183

184

### Property Access Utilities

185

186

Utilities for accessing nested properties in objects using dotted notation.

187

188

```java { .api }

189

public final class KafkaTestUtils {

190

/**

191

* Uses nested DirectFieldAccessors to obtain a property using dotted notation to traverse fields

192

* @param root The object

193

* @param propertyPath The path

194

* @return The field

195

*/

196

public static Object getPropertyValue(Object root, String propertyPath);

197

198

/**

199

* A typed version of getPropertyValue(Object, String)

200

* @param root the object

201

* @param propertyPath the path

202

* @param type the type to cast the object to

203

* @param <T> the type

204

* @return the field value

205

*/

206

public static <T> T getPropertyValue(Object root, String propertyPath, Class<T> type);

207

208

/**

209

* Return a Properties object equal to the default consumer property overrides

210

* Useful when matching arguments in Mockito tests

211

* @return the default properties

212

*/

213

public static Properties defaultPropertyOverrides();

214

}

215

```

216

217

### JUnit Utilities

218

219

Additional utilities for JUnit testing scenarios.

220

221

```java { .api }

222

/**

223

* JUnit testing utilities

224

*/

225

public final class JUnitUtils {

226

// Utility methods for JUnit integration

227

}

228

```

229

230

**Usage Examples:**

231

232

```java

233

// Basic consumer and producer setup

234

@Test

235

public void testBasicProducerConsumer() throws Exception {

236

// Create consumer

237

Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("test-group", "false", embeddedKafka);

238

Consumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);

239

embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "test-topic");

240

241

// Create producer

242

Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);

243

Producer<Integer, String> producer = new KafkaProducer<>(producerProps);

244

245

// Send message

246

producer.send(new ProducerRecord<>("test-topic", 1, "test-message"));

247

248

// Consume and verify

249

ConsumerRecord<Integer, String> record = KafkaTestUtils.getSingleRecord(consumer, "test-topic");

250

assertEquals("test-message", record.value());

251

assertEquals(Integer.valueOf(1), record.key());

252

}

253

254

// Multiple records polling

255

@Test

256

public void testMultipleRecords() throws Exception {

257

Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("test-group", "false", embeddedKafka);

258

Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);

259

embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "test-topic");

260

261

Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);

262

Producer<String, String> producer = new KafkaProducer<>(producerProps);

263

264

// Send multiple messages

265

for (int i = 0; i < 5; i++) {

266

producer.send(new ProducerRecord<>("test-topic", "key-" + i, "message-" + i));

267

}

268

269

// Poll for multiple records with timeout

270

ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer, Duration.ofSeconds(10), 5);

271

assertEquals(5, records.count());

272

273

// Verify each record

274

for (ConsumerRecord<String, String> record : records) {

275

assertTrue(record.key().startsWith("key-"));

276

assertTrue(record.value().startsWith("message-"));

277

}

278

}

279

280

// Offset management

281

@Test

282

public void testOffsetManagement() throws Exception {

283

String brokerAddresses = embeddedKafka.getBrokersAsString();

284

String group = "offset-test-group";

285

String topic = "offset-topic";

286

int partition = 0;

287

288

// Send a message

289

Map<String, Object> producerProps = KafkaTestUtils.producerProps(brokerAddresses);

290

Producer<String, String> producer = new KafkaProducer<>(producerProps);

291

producer.send(new ProducerRecord<>(topic, partition, "key", "value"));

292

293

// Consume the message

294

ConsumerRecord<?, ?> record = KafkaTestUtils.getOneRecord(

295

brokerAddresses, group, topic, partition, false, true, Duration.ofSeconds(10)

296

);

297

assertNotNull(record);

298

299

// Check committed offset

300

OffsetAndMetadata offset = KafkaTestUtils.getCurrentOffset(brokerAddresses, group, topic, partition);

301

assertNotNull(offset);

302

assertEquals(1L, offset.offset());

303

}

304

305

// End offsets

306

@Test

307

public void testEndOffsets() throws Exception {

308

Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("end-offset-group", "false", embeddedKafka);

309

Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);

310

311

// Get end offsets for all partitions

312

Map<TopicPartition, Long> endOffsets = KafkaTestUtils.getEndOffsets(consumer, "test-topic");

313

assertFalse(endOffsets.isEmpty());

314

315

// Get end offsets for specific partitions

316

Map<TopicPartition, Long> specificOffsets = KafkaTestUtils.getEndOffsets(consumer, "test-topic", 0, 1);

317

assertEquals(2, specificOffsets.size());

318

}

319

320

// Kafka Streams configuration

321

@Test

322

public void testStreamsConfiguration() {

323

String applicationId = "test-streams-app";

324

String brokers = embeddedKafka.getBrokersAsString();

325

326

Map<String, Object> streamsProps = KafkaTestUtils.streamsProps(applicationId, brokers);

327

328

assertEquals(applicationId, streamsProps.get(StreamsConfig.APPLICATION_ID_CONFIG));

329

assertEquals(brokers, streamsProps.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG));

330

}

331

332

// Property access utilities

333

@Test

334

public void testPropertyAccess() {

335

SomeComplexObject obj = new SomeComplexObject();

336

337

// Access nested properties

338

String nestedValue = KafkaTestUtils.getPropertyValue(obj, "nested.property.value", String.class);

339

Object rawValue = KafkaTestUtils.getPropertyValue(obj, "nested.property.value");

340

341

// Default properties

342

Properties defaults = KafkaTestUtils.defaultPropertyOverrides();

343

assertEquals("false", defaults.getProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));

344

}

345

```