or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

apache-kafka-container.mdconfluent-kafka-container.mdindex.mdkafka-helper.mdlegacy-kafka-container.md

confluent-kafka-container.mddocs/

0

# Confluent Kafka Container

1

2

The `org.testcontainers.kafka.ConfluentKafkaContainer` provides support for Confluent Platform images (`confluentinc/cp-kafka`) version 7.4.0 and later with KRaft mode enabled by default. This is the recommended approach for testing with Confluent Platform.

3

4

## Constructors

5

6

```java { .api }

7

public ConfluentKafkaContainer(String imageName);

8

public ConfluentKafkaContainer(DockerImageName dockerImageName);

9

```

10

11

### Parameters

12

13

- `imageName` (String): Docker image name as string (e.g., "confluentinc/cp-kafka:7.4.0")

14

- `dockerImageName` (DockerImageName): Validated Docker image name object

15

16

### Supported Images

17

18

- `confluentinc/cp-kafka` - Confluent Platform Kafka (version 7.4.0+)

19

20

## Core Methods

21

22

### getBootstrapServers()

23

24

Returns the bootstrap servers connection string for Kafka clients.

25

26

```java { .api }

27

public String getBootstrapServers();

28

```

29

30

**Returns**: String in format `"host:port"` (e.g., "localhost:32768")

31

32

### Usage Example

33

34

```java

35

try (ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.4.0")) {

36

kafka.start();

37

38

String bootstrapServers = kafka.getBootstrapServers();

39

// Use with Kafka clients:

40

// props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

41

}

42

```

43

44

## Listener Configuration

45

46

### withListener(String listener)

47

48

Adds a custom listener for connections within the same container network.

49

50

```java { .api }

51

public ConfluentKafkaContainer withListener(String listener);

52

```

53

54

**Parameters**:

55

- `listener` (String): Listener in format `"host:port"` (e.g., "kafka:19092")

56

57

**Returns**: ConfluentKafkaContainer instance for method chaining

58

59

The host will be added as a network alias, allowing other containers in the same network to connect using this listener.

60

61

### withListener(String listener, Supplier<String> advertisedListener)

62

63

Adds a custom listener with separate advertised listener for external connections.

64

65

```java { .api }

66

public ConfluentKafkaContainer withListener(String listener, Supplier<String> advertisedListener);

67

```

68

69

**Parameters**:

70

- `listener` (String): Internal listener in format `"host:port"`

71

- `advertisedListener` (Supplier<String>): Supplier providing external advertised listener

72

73

**Returns**: ConfluentKafkaContainer instance for method chaining

74

75

### Network Usage Example

76

77

```java

78

Network network = Network.newNetwork();

79

80

try (ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.4.0")

81

.withListener("kafka:19092") // Internal network listener

82

.withNetwork(network);

83

// Other containers in same network can connect to "kafka:19092"

84

KCatContainer kcat = new KCatContainer()

85

.withNetwork(network)) {

86

87

kafka.start();

88

kcat.start();

89

90

// Send message via internal listener

91

kcat.execInContainer("kcat", "-b", "kafka:19092", "-t", "msgs", "-P", "-l", "/data/msgs.txt");

92

93

// Consume message via internal listener

94

String output = kcat.execInContainer("kcat", "-b", "kafka:19092", "-C", "-t", "msgs", "-c", "1")

95

.getStdout();

96

}

97

```

98

99

### External Proxy Example

100

101

```java

102

Network network = Network.newNetwork();

103

104

try (SocatContainer proxy = new SocatContainer()

105

.withNetwork(network)

106

.withTarget(2000, "kafka", 19092);

107

ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.4.0")

108

.withListener("kafka:19092", () -> proxy.getHost() + ":" + proxy.getMappedPort(2000))

109

.withNetwork(network)) {

110

111

proxy.start();

112

kafka.start();

113

114

// External clients can connect via proxy

115

String externalBootstrap = proxy.getHost() + ":" + proxy.getMappedPort(2000);

116

}

117

```

118

119

## Default Configuration

120

121

The Confluent Kafka container automatically configures:

122

123

- **KRaft Mode**: Enabled by default (no Zookeeper required)

124

- **Default Listeners**:

125

- `PLAINTEXT://0.0.0.0:9092` - External connections

126

- `BROKER://0.0.0.0:9093` - Inter-broker communication

127

- `CONTROLLER://0.0.0.0:9094` - KRaft controller

128

- **Default Advertised Listeners**:

129

- `PLAINTEXT://host:mappedPort` - For external connections

130

- `BROKER://containerHostname:9093` - For inter-broker communication

131

- **Single Node Setup**: Configured for single-node testing scenarios

132

- **Confluent Platform Features**: Access to Confluent Platform specific features and tools

133

134

## Constants

135

136

The container uses shared constants from `KafkaHelper`:

137

138

```java { .api }

139

static final int KAFKA_PORT = 9092;

140

static final String STARTER_SCRIPT = "/tmp/testcontainers_start.sh";

141

```

142

143

The container exposes port 9092 and automatically maps it to a random host port accessible via `getMappedPort(9092)` or `getBootstrapServers()`.

144

145

## Confluent Platform Integration

146

147

This container runs the Confluent Platform startup script at `/etc/confluent/docker/run` and includes Confluent-specific environment variable configurations. It provides the same core Kafka functionality as the Apache container but with Confluent Platform optimizations and compatibility.