or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

assertion-matching.mdcontainer-testing.mdembedded-brokers.mdindex.mdjunit-integration.mdtest-utilities.mdtesting-annotations.md

embedded-brokers.mddocs/

0

# Embedded Kafka Brokers

1

2

Core embedded Kafka broker functionality providing both ZooKeeper-based and KRaft-based implementations for running Kafka instances in test environments without external dependencies.

3

4

## Capabilities

5

6

### EmbeddedKafkaBroker Interface

7

8

Core interface defining embedded Kafka broker functionality with lifecycle management and topic operations.

9

10

```java { .api }

11

/**

12

* Core interface for embedded Kafka broker functionality

13

*/

14

public interface EmbeddedKafkaBroker extends InitializingBean, DisposableBean {

15

int DEFAULT_ADMIN_TIMEOUT = 10;

16

String BEAN_NAME = "embeddedKafka";

17

String BROKER_LIST_PROPERTY = "spring.embedded.kafka.brokers.property";

18

String SPRING_EMBEDDED_KAFKA_BROKERS = "spring.embedded.kafka.brokers";

19

String LOOPBACK = "127.0.0.1";

20

21

/**

22

* Set explicit ports on which the kafka brokers will listen

23

* @param ports the ports

24

* @return the EmbeddedKafkaBroker

25

*/

26

EmbeddedKafkaBroker kafkaPorts(int... ports);

27

28

/**

29

* Specify properties to configure Kafka Broker before start

30

* @param properties the properties to use for configuring Kafka Broker(s)

31

* @return this for chaining configuration

32

*/

33

EmbeddedKafkaBroker brokerProperties(Map<String, String> properties);

34

35

/**

36

* Set the system property with this name to the list of broker addresses

37

* @param brokerListProperty the brokerListProperty to set

38

* @return this broker

39

*/

40

EmbeddedKafkaBroker brokerListProperty(String brokerListProperty);

41

42

/**

43

* Set the timeout in seconds for admin operations

44

* @param adminTimeout the timeout

45

* @return the EmbeddedKafkaBroker

46

*/

47

EmbeddedKafkaBroker adminTimeout(int adminTimeout);

48

49

/**

50

* Get the bootstrap server addresses as a String

51

* @return the bootstrap servers

52

*/

53

String getBrokersAsString();

54

55

/**

56

* Add topics to the existing broker(s) using the configured number of partitions

57

* @param topicsToAdd the topics

58

*/

59

void addTopics(String... topicsToAdd);

60

61

/**

62

* Add topics to the existing broker(s)

63

* @param topicsToAdd the topics

64

*/

65

void addTopics(NewTopic... topicsToAdd);

66

67

/**

68

* Add topics to the existing broker(s) and return results

69

* @param topicsToAdd the topics

70

* @return the results; null values indicate success

71

*/

72

Map<String, Exception> addTopicsWithResults(String... topicsToAdd);

73

74

/**

75

* Add topics to the existing broker(s) and return results

76

* @param topicsToAdd the topics

77

* @return the results; null values indicate success

78

*/

79

Map<String, Exception> addTopicsWithResults(NewTopic... topicsToAdd);

80

81

/**

82

* Subscribe a consumer to one or more of the embedded topics

83

* @param consumer the consumer

84

* @param seekToEnd true to seek to the end instead of the beginning

85

* @param topicsToConsume the topics

86

*/

87

void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, boolean seekToEnd, String... topicsToConsume);

88

89

/**

90

* Subscribe a consumer to one or more of the embedded topics

91

* @param consumer the consumer

92

* @param topicsToConsume the topics

93

*/

94

void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, String... topicsToConsume);

95

96

/**

97

* Subscribe a consumer to one of the embedded topics

98

* @param consumer the consumer

99

* @param seekToEnd true to seek to the end instead of the beginning

100

* @param topic the topic

101

*/

102

void consumeFromAnEmbeddedTopic(Consumer<?, ?> consumer, boolean seekToEnd, String topic);

103

104

/**

105

* Subscribe a consumer to one of the embedded topics

106

* @param consumer the consumer

107

* @param topic the topic

108

*/

109

void consumeFromAnEmbeddedTopic(Consumer<?, ?> consumer, String topic);

110

111

/**

112

* Subscribe a consumer to all the embedded topics

113

* @param consumer the consumer

114

* @param seekToEnd true to seek to the end instead of the beginning

115

*/

116

void consumeFromAllEmbeddedTopics(Consumer<?, ?> consumer, boolean seekToEnd);

117

118

/**

119

* Subscribe a consumer to all the embedded topics

120

* @param consumer the consumer

121

*/

122

void consumeFromAllEmbeddedTopics(Consumer<?, ?> consumer);

123

124

/**

125

* Get the topics

126

* @return the topics

127

*/

128

Set<String> getTopics();

129

130

/**

131

* Get the configured number of partitions per topic

132

* @return the partition count

133

*/

134

int getPartitionsPerTopic();

135

}

136

```

137

138

### ZooKeeper-based Embedded Broker

139

140

Implementation using ZooKeeper for coordination, providing full Kafka functionality with traditional ZooKeeper-based coordination.

141

142

```java { .api }

143

/**

144

* ZooKeeper-based embedded Kafka broker implementation

145

*/

146

public class EmbeddedKafkaZKBroker implements EmbeddedKafkaBroker {

147

public static final String SPRING_EMBEDDED_ZOOKEEPER_CONNECT = "spring.embedded.zookeeper.connect";

148

public static final int DEFAULT_ZK_SESSION_TIMEOUT = 18000;

149

public static final int DEFAULT_ZK_CONNECTION_TIMEOUT = DEFAULT_ZK_SESSION_TIMEOUT;

150

151

/**

152

* Create embedded Kafka brokers

153

* @param count the number of brokers

154

*/

155

public EmbeddedKafkaZKBroker(int count);

156

157

/**

158

* Create embedded Kafka brokers

159

* @param count the number of brokers

160

* @param controlledShutdown passed into TestUtils.createBrokerConfig

161

* @param topics the topics to create (2 partitions per)

162

*/

163

public EmbeddedKafkaZKBroker(int count, boolean controlledShutdown, String... topics);

164

165

/**

166

* Create embedded Kafka brokers listening on random ports

167

* @param count the number of brokers

168

* @param controlledShutdown passed into TestUtils.createBrokerConfig

169

* @param partitions partitions per topic

170

* @param topics the topics to create

171

*/

172

public EmbeddedKafkaZKBroker(int count, boolean controlledShutdown, int partitions, String... topics);

173

174

/**

175

* Specify a broker property

176

* @param property the property name

177

* @param value the value

178

* @return the EmbeddedKafkaBroker

179

*/

180

public EmbeddedKafkaBroker brokerProperty(String property, Object value);

181

182

/**

183

* Set an explicit port for the embedded Zookeeper

184

* @param port the port

185

* @return the EmbeddedKafkaBroker

186

*/

187

public EmbeddedKafkaZKBroker zkPort(int port);

188

189

/**

190

* Get the port that the embedded Zookeeper is running on or will run on

191

* @return the port

192

*/

193

public int getZkPort();

194

195

/**

196

* Set connection timeout for the client to the embedded Zookeeper

197

* @param zkConnectionTimeout the connection timeout

198

* @return the EmbeddedKafkaBroker

199

*/

200

public synchronized EmbeddedKafkaZKBroker zkConnectionTimeout(int zkConnectionTimeout);

201

202

/**

203

* Set session timeout for the client to the embedded Zookeeper

204

* @param zkSessionTimeout the session timeout

205

* @return the EmbeddedKafkaBroker

206

*/

207

public synchronized EmbeddedKafkaZKBroker zkSessionTimeout(int zkSessionTimeout);

208

209

/**

210

* Create an AdminClient; invoke the callback and reliably close the admin

211

* @param callback the callback

212

*/

213

public void doWithAdmin(Consumer<AdminClient> callback);

214

215

/**

216

* Create an AdminClient; invoke the callback and reliably close the admin

217

* @param callback the callback

218

* @param <T> the function return type

219

* @return a map of results

220

*/

221

public <T> T doWithAdminFunction(Function<AdminClient, T> callback);

222

223

/**

224

* Get the underlying Kafka servers

225

* @return the Kafka servers

226

*/

227

public List<KafkaServer> getKafkaServers();

228

229

/**

230

* Get specific Kafka server

231

* @param id the server id

232

* @return the Kafka server

233

*/

234

public KafkaServer getKafkaServer(int id);

235

236

/**

237

* Get embedded ZooKeeper

238

* @return the ZooKeeper instance

239

*/

240

public EmbeddedZookeeper getZookeeper();

241

242

/**

243

* Return the ZooKeeperClient

244

* @return the client

245

*/

246

public synchronized ZooKeeperClient getZooKeeperClient();

247

248

/**

249

* Get ZooKeeper connection string

250

* @return the connection string

251

*/

252

public String getZookeeperConnectionString();

253

254

/**

255

* Get broker address by index

256

* @param i the index

257

* @return the broker address

258

*/

259

public BrokerAddress getBrokerAddress(int i);

260

261

/**

262

* Get all broker addresses

263

* @return array of broker addresses

264

*/

265

public BrokerAddress[] getBrokerAddresses();

266

267

/**

268

* Restart a broker

269

* @param brokerAddress the broker to restart

270

*/

271

public void bounce(BrokerAddress brokerAddress);

272

273

/**

274

* Restart broker by index

275

* @param index the broker index

276

* @throws Exception if restart fails

277

*/

278

public void restart(int index) throws Exception;

279

}

280

```

281

282

### KRaft-based Embedded Broker

283

284

Implementation using KRaft (Kafka Raft) for coordination, providing ZooKeeper-free Kafka functionality.

285

286

```java { .api }

287

/**

288

* KRaft-based embedded Kafka broker implementation (ZooKeeper-free)

289

*/

290

public class EmbeddedKafkaKraftBroker implements EmbeddedKafkaBroker {

291

public static final String BROKER_LIST_PROPERTY = "spring.embedded.kafka.brokers.property";

292

public static final int DEFAULT_ADMIN_TIMEOUT = 10;

293

294

/**

295

* Create embedded Kafka brokers listening on random ports

296

* @param count the number of brokers

297

* @param partitions partitions per topic

298

* @param topics the topics to create

299

*/

300

public EmbeddedKafkaKraftBroker(int count, int partitions, String... topics);

301

302

/**

303

* Specify a broker property

304

* @param property the property name

305

* @param value the value

306

* @return the EmbeddedKafkaBroker

307

*/

308

public EmbeddedKafkaBroker brokerProperty(String property, Object value);

309

310

/**

311

* Set the timeout in seconds for admin operations

312

* @param adminTimeout the timeout

313

*/

314

public void setAdminTimeout(int adminTimeout);

315

316

/**

317

* Create an AdminClient; invoke the callback and reliably close the admin

318

* @param callback the callback

319

*/

320

public void doWithAdmin(Consumer<AdminClient> callback);

321

322

/**

323

* Create an AdminClient; invoke the callback and reliably close the admin

324

* @param callback the callback

325

* @param <T> the function return type

326

* @return a map of results

327

*/

328

public <T> T doWithAdminFunction(Function<AdminClient, T> callback);

329

330

/**

331

* Get underlying test cluster

332

* @return the cluster

333

*/

334

public KafkaClusterTestKit getCluster();

335

}

336

```

337

338

### Broker Factory

339

340

Factory for creating embedded Kafka brokers from annotation configuration.

341

342

```java { .api }

343

/**

344

* Factory to encapsulate EmbeddedKafkaBroker creation logic

345

*/

346

public final class EmbeddedKafkaBrokerFactory {

347

/**

348

* Create an EmbeddedKafkaBroker based on the EmbeddedKafka annotation

349

* @param embeddedKafka the EmbeddedKafka annotation

350

* @return a new EmbeddedKafkaBroker instance

351

*/

352

public static EmbeddedKafkaBroker create(EmbeddedKafka embeddedKafka);

353

354

/**

355

* Create an EmbeddedKafkaBroker based on the EmbeddedKafka annotation

356

* @param embeddedKafka the EmbeddedKafka annotation

357

* @param propertyResolver the Function for placeholders in the annotation attributes

358

* @return a new EmbeddedKafkaBroker instance

359

*/

360

public static EmbeddedKafkaBroker create(EmbeddedKafka embeddedKafka, Function<String, String> propertyResolver);

361

}

362

```

363

364

**Usage Examples:**

365

366

```java

367

// ZooKeeper-based broker

368

EmbeddedKafkaBroker zkBroker = new EmbeddedKafkaZKBroker(1, false, 2, "test-topic")

369

.brokerProperty("auto.create.topics.enable", "true")

370

.kafkaPorts(9092)

371

.zkPort(2181);

372

zkBroker.afterPropertiesSet();

373

374

// KRaft-based broker

375

EmbeddedKafkaBroker kraftBroker = new EmbeddedKafkaKraftBroker(1, 2, "test-topic")

376

.brokerProperty("auto.create.topics.enable", "true")

377

.adminTimeout(30);

378

kraftBroker.afterPropertiesSet();

379

380

// Using factory

381

@EmbeddedKafka(kraft = true, topics = "test-topic")

382

EmbeddedKafkaBroker broker = EmbeddedKafkaBrokerFactory.create(embeddedKafkaAnnotation);

383

384

// Topic management

385

broker.addTopics("new-topic");

386

broker.addTopics(new NewTopic("configured-topic", 3, (short) 1));

387

388

// Consumer subscription

389

Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);

390

broker.consumeFromAnEmbeddedTopic(consumer, "test-topic");

391

```

392

393

## Types

394

395

```java { .api }

396

public class BrokerAddress {

397

public static final int DEFAULT_PORT = 9092;

398

399

public BrokerAddress(String host, int port);

400

public BrokerAddress(String host);

401

public BrokerAddress(BrokerEndPoint broker);

402

public static BrokerAddress fromAddress(String address);

403

public String getHost();

404

public int getPort();

405

}

406

```