or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-pulsar--pulsar-io-debezium-mysql

A Pulsar IO connector that integrates with Debezium MySQL connector to capture change data capture (CDC) events from MySQL databases and stream them to Apache Pulsar topics

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.pulsar/pulsar-io-debezium-mysql@4.0.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-pulsar--pulsar-io-debezium-mysql@4.0.0

0

# Pulsar IO Debezium MySQL

1

2

A Pulsar IO source connector that integrates with Debezium MySQL connector to capture change data capture (CDC) events from MySQL databases and stream them to Apache Pulsar topics. This connector enables real-time data replication and streaming from MySQL databases into the Pulsar ecosystem.

3

4

## Package Information

5

6

- **Package Name**: pulsar-io-debezium-mysql

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Group ID**: org.apache.pulsar

10

- **Artifact ID**: pulsar-io-debezium-mysql

11

- **Installation**: Add as dependency in pom.xml or use the pre-built NAR file

12

13

## Maven Dependency

14

15

```xml

16

<dependency>

17

<groupId>org.apache.pulsar</groupId>

18

<artifactId>pulsar-io-debezium-mysql</artifactId>

19

<version>4.0.6</version>

20

</dependency>

21

```

22

23

## Core Imports

24

25

```java

26

import org.apache.pulsar.io.debezium.mysql.DebeziumMysqlSource;

27

import org.apache.pulsar.io.core.SourceContext;

28

import java.util.Map;

29

```

30

31

## Basic Usage

32

33

This connector is typically deployed as a Pulsar IO connector using configuration files:

34

35

```yaml

36

# debezium-mysql-source-config.yaml

37

tenant: "public"

38

namespace: "default"

39

name: "debezium-mysql-source"

40

topicName: "mysql-cdc-events"

41

archive: "connectors/pulsar-io-debezium-mysql-4.0.6.nar"

42

parallelism: 1

43

44

configs:

45

# MySQL connection settings

46

database.hostname: "localhost"

47

database.port: "3306"

48

database.user: "debezium"

49

database.password: "dbz"

50

database.server.id: "184054"

51

database.server.name: "dbserver1"

52

database.whitelist: "inventory"

53

54

# Pulsar integration settings

55

database.history.pulsar.topic: "mysql-history-topic"

56

database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"

57

offset.storage.topic: "mysql-offset-topic"

58

```

59

60

Deploy the connector using Pulsar admin CLI:

61

62

```bash

63

bin/pulsar-admin sources create --source-config-file debezium-mysql-source-config.yaml

64

```

65

66

## Architecture

67

68

The connector extends the Pulsar IO framework hierarchy:

69

70

- **Source Interface**: Core Pulsar IO source contract with lifecycle management

71

- **KafkaConnectSource**: Base class providing Kafka Connect integration

72

- **DebeziumSource**: Abstract base for all Debezium CDC connectors

73

- **DebeziumMysqlSource**: MySQL-specific implementation

74

75

The connector leverages Debezium's MySQL connector (`io.debezium.connector.mysql.MySqlConnectorTask`) to capture binlog events and transforms them into Pulsar messages through the Kafka Connect adaptation layer.

76

77

## Capabilities

78

79

### Source Connector Implementation

80

81

The main connector class that extends DebeziumSource to provide MySQL-specific CDC functionality.

82

83

```java { .api }

84

public class DebeziumMysqlSource extends DebeziumSource {

85

public void setDbConnectorTask(Map<String, Object> config) throws Exception;

86

}

87

```

88

89

### Base Source Interface

90

91

Core Pulsar IO source interface providing lifecycle management and message reading functionality.

92

93

```java { .api }

94

public interface Source<T> extends AutoCloseable {

95

void open(Map<String, Object> config, SourceContext sourceContext) throws Exception;

96

Record<T> read() throws Exception;

97

void close() throws Exception;

98

}

99

```

100

101

### Configuration Management

102

103

Static utility methods for managing connector configuration and integration with Pulsar.

104

105

```java { .api }

106

public abstract class DebeziumSource extends KafkaConnectSource {

107

public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception;

108

public abstract void setDbConnectorTask(Map<String, Object> config) throws Exception;

109

110

// Static utility methods

111

public static void throwExceptionIfConfigNotMatch(

112

Map<String, Object> config,

113

String key,

114

String value

115

) throws IllegalArgumentException;

116

117

public static void setConfigIfNull(

118

Map<String, Object> config,

119

String key,

120

String value

121

);

122

123

public static String topicNamespace(SourceContext sourceContext);

124

125

public static void tryLoadingConfigSecret(

126

String secretName,

127

Map<String, Object> config,

128

SourceContext context

129

);

130

}

131

```

132

133

## Configuration Parameters

134

135

### Database Connection

136

137

- **database.hostname** (string): MySQL server hostname or IP address

138

- **database.port** (string): MySQL server port number (default: 3306)

139

- **database.user** (string): Username for MySQL connection

140

- **database.password** (string): Password for MySQL connection (can be loaded from secrets)

141

- **database.server.id** (string): Unique numeric identifier for this MySQL server within replication topology

142

- **database.server.name** (string): Logical name identifying the MySQL server/cluster

143

- **database.whitelist** (string): Comma-separated list of database names to monitor

144

145

### Pulsar Integration

146

147

- **database.history.pulsar.topic** (string): Pulsar topic name for storing database schema history

148

- **database.history.pulsar.service.url** (string): Pulsar service URL for history storage

149

- **offset.storage.topic** (string): Pulsar topic name for storing connector offset information

150

151

### Connector Management

152

153

- **tenant** (string): Pulsar tenant for the connector

154

- **namespace** (string): Pulsar namespace for the connector

155

- **name** (string): Unique name for this connector instance

156

- **topicName** (string): Pulsar topic name where CDC events will be published

157

- **parallelism** (integer): Number of parallel connector tasks to run

158

- **archive** (string): Path to the connector NAR file

159

160

## Types

161

162

```java { .api }

163

// Core Pulsar IO types

164

public interface SourceContext {

165

String getTenant();

166

String getNamespace();

167

String getSourceName();

168

String getSecret(String secretName);

169

PulsarClientBuilder getPulsarClientBuilder();

170

}

171

172

public interface Record<T> {

173

T getValue();

174

String getTopicName();

175

String getKey();

176

Map<String, String> getProperties();

177

// Additional record metadata methods

178

}

179

180

// Configuration types

181

public class Map<String, Object> {

182

// Standard Java Map interface for configuration parameters

183

}

184

```

185

186

## Constants

187

188

```java { .api }

189

// Default MySQL connector task class

190

public static final String DEFAULT_TASK = "io.debezium.connector.mysql.MySqlConnectorTask";

191

192

// Default converter configurations

193

public static final String DEFAULT_CONVERTER = "org.apache.kafka.connect.json.JsonConverter";

194

public static final String DEFAULT_HISTORY = "org.apache.pulsar.io.debezium.PulsarDatabaseHistory";

195

public static final String DEFAULT_OFFSET_TOPIC = "debezium-offset-topic";

196

public static final String DEFAULT_HISTORY_TOPIC = "debezium-history-topic";

197

```

198

199

## Error Handling

200

201

The connector can throw the following exceptions:

202

203

- **IllegalArgumentException**: When required configuration parameters are missing or invalid

204

- **Exception**: General exceptions during connector initialization, database connection, or message processing

205

- **AutoCloseable exceptions**: During connector shutdown and resource cleanup

206

207

Configuration validation occurs during the `open()` method call, and runtime exceptions may occur during `read()` operations when database connectivity issues arise or binlog processing encounters errors.

208

209

## Deployment

210

211

The connector is packaged as a NAR (NiFi Archive) file and deployed to Pulsar brokers. Use the Pulsar admin CLI to manage connector lifecycle:

212

213

```bash

214

# Create/start the connector

215

bin/pulsar-admin sources create --source-config-file config.yaml

216

217

# Get connector status

218

bin/pulsar-admin sources get --tenant public --namespace default --name debezium-mysql-source

219

220

# Stop the connector

221

bin/pulsar-admin sources stop --tenant public --namespace default --name debezium-mysql-source

222

223

# Delete the connector

224

bin/pulsar-admin sources delete --tenant public --namespace default --name debezium-mysql-source

225

```