A Pulsar IO connector that integrates with Debezium MySQL connector to capture change data capture (CDC) events from MySQL databases and stream them to Apache Pulsar topics
npx @tessl/cli install tessl/maven-org-apache-pulsar--pulsar-io-debezium-mysql@4.0.00
# Pulsar IO Debezium MySQL
1
2
A Pulsar IO source connector that integrates with Debezium MySQL connector to capture change data capture (CDC) events from MySQL databases and stream them to Apache Pulsar topics. This connector enables real-time data replication and streaming from MySQL databases into the Pulsar ecosystem.
3
4
## Package Information
5
6
- **Package Name**: pulsar-io-debezium-mysql
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Group ID**: org.apache.pulsar
10
- **Artifact ID**: pulsar-io-debezium-mysql
11
- **Installation**: Add as dependency in pom.xml or use the pre-built NAR file
12
13
## Maven Dependency
14
15
```xml
16
<dependency>
17
<groupId>org.apache.pulsar</groupId>
18
<artifactId>pulsar-io-debezium-mysql</artifactId>
19
<version>4.0.6</version>
20
</dependency>
21
```
22
23
## Core Imports
24
25
```java
26
import org.apache.pulsar.io.debezium.mysql.DebeziumMysqlSource;
27
import org.apache.pulsar.io.core.SourceContext;
28
import java.util.Map;
29
```
30
31
## Basic Usage
32
33
This connector is typically deployed as a Pulsar IO connector using configuration files:
34
35
```yaml
36
# debezium-mysql-source-config.yaml
37
tenant: "public"
38
namespace: "default"
39
name: "debezium-mysql-source"
40
topicName: "mysql-cdc-events"
41
archive: "connectors/pulsar-io-debezium-mysql-4.0.6.nar"
42
parallelism: 1
43
44
configs:
45
# MySQL connection settings
46
database.hostname: "localhost"
47
database.port: "3306"
48
database.user: "debezium"
49
database.password: "dbz"
50
database.server.id: "184054"
51
database.server.name: "dbserver1"
52
database.whitelist: "inventory"
53
54
# Pulsar integration settings
55
database.history.pulsar.topic: "mysql-history-topic"
56
database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
57
offset.storage.topic: "mysql-offset-topic"
58
```
59
60
Deploy the connector using Pulsar admin CLI:
61
62
```bash
63
bin/pulsar-admin sources create --source-config-file debezium-mysql-source-config.yaml
64
```
65
66
## Architecture
67
68
The connector extends the Pulsar IO framework hierarchy:
69
70
- **Source Interface**: Core Pulsar IO source contract with lifecycle management
71
- **KafkaConnectSource**: Base class providing Kafka Connect integration
72
- **DebeziumSource**: Abstract base for all Debezium CDC connectors
73
- **DebeziumMysqlSource**: MySQL-specific implementation
74
75
The connector leverages Debezium's MySQL connector (`io.debezium.connector.mysql.MySqlConnectorTask`) to capture binlog events and transforms them into Pulsar messages through the Kafka Connect adaptation layer.
76
77
## Capabilities
78
79
### Source Connector Implementation
80
81
The main connector class that extends DebeziumSource to provide MySQL-specific CDC functionality.
82
83
```java { .api }
84
public class DebeziumMysqlSource extends DebeziumSource {
85
public void setDbConnectorTask(Map<String, Object> config) throws Exception;
86
}
87
```
88
89
### Base Source Interface
90
91
Core Pulsar IO source interface providing lifecycle management and message reading functionality.
92
93
```java { .api }
94
public interface Source<T> extends AutoCloseable {
95
void open(Map<String, Object> config, SourceContext sourceContext) throws Exception;
96
Record<T> read() throws Exception;
97
void close() throws Exception;
98
}
99
```
100
101
### Configuration Management
102
103
Static utility methods for managing connector configuration and integration with Pulsar.
104
105
```java { .api }
106
public abstract class DebeziumSource extends KafkaConnectSource {
107
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception;
108
public abstract void setDbConnectorTask(Map<String, Object> config) throws Exception;
109
110
// Static utility methods
111
public static void throwExceptionIfConfigNotMatch(
112
Map<String, Object> config,
113
String key,
114
String value
115
) throws IllegalArgumentException;
116
117
public static void setConfigIfNull(
118
Map<String, Object> config,
119
String key,
120
String value
121
);
122
123
public static String topicNamespace(SourceContext sourceContext);
124
125
public static void tryLoadingConfigSecret(
126
String secretName,
127
Map<String, Object> config,
128
SourceContext context
129
);
130
}
131
```
132
133
## Configuration Parameters
134
135
### Database Connection
136
137
- **database.hostname** (string): MySQL server hostname or IP address
138
- **database.port** (string): MySQL server port number (default: 3306)
139
- **database.user** (string): Username for MySQL connection
140
- **database.password** (string): Password for MySQL connection (can be loaded from secrets)
141
- **database.server.id** (string): Unique numeric identifier for this MySQL server within replication topology
142
- **database.server.name** (string): Logical name identifying the MySQL server/cluster
143
- **database.whitelist** (string): Comma-separated list of database names to monitor
144
145
### Pulsar Integration
146
147
- **database.history.pulsar.topic** (string): Pulsar topic name for storing database schema history
148
- **database.history.pulsar.service.url** (string): Pulsar service URL for history storage
149
- **offset.storage.topic** (string): Pulsar topic name for storing connector offset information
150
151
### Connector Management
152
153
- **tenant** (string): Pulsar tenant for the connector
154
- **namespace** (string): Pulsar namespace for the connector
155
- **name** (string): Unique name for this connector instance
156
- **topicName** (string): Pulsar topic name where CDC events will be published
157
- **parallelism** (integer): Number of parallel connector tasks to run
158
- **archive** (string): Path to the connector NAR file
159
160
## Types
161
162
```java { .api }
163
// Core Pulsar IO types
164
public interface SourceContext {
165
String getTenant();
166
String getNamespace();
167
String getSourceName();
168
String getSecret(String secretName);
169
PulsarClientBuilder getPulsarClientBuilder();
170
}
171
172
public interface Record<T> {
173
T getValue();
174
String getTopicName();
175
String getKey();
176
Map<String, String> getProperties();
177
// Additional record metadata methods
178
}
179
180
// Configuration types
181
public class Map<String, Object> {
182
// Standard Java Map interface for configuration parameters
183
}
184
```
185
186
## Constants
187
188
```java { .api }
189
// Default MySQL connector task class
190
public static final String DEFAULT_TASK = "io.debezium.connector.mysql.MySqlConnectorTask";
191
192
// Default converter configurations
193
public static final String DEFAULT_CONVERTER = "org.apache.kafka.connect.json.JsonConverter";
194
public static final String DEFAULT_HISTORY = "org.apache.pulsar.io.debezium.PulsarDatabaseHistory";
195
public static final String DEFAULT_OFFSET_TOPIC = "debezium-offset-topic";
196
public static final String DEFAULT_HISTORY_TOPIC = "debezium-history-topic";
197
```
198
199
## Error Handling
200
201
The connector can throw the following exceptions:
202
203
- **IllegalArgumentException**: When required configuration parameters are missing or invalid
204
- **Exception**: General exceptions during connector initialization, database connection, or message processing
205
- **AutoCloseable exceptions**: During connector shutdown and resource cleanup
206
207
Configuration validation occurs during the `open()` method call, and runtime exceptions may occur during `read()` operations when database connectivity issues arise or binlog processing encounters errors.
208
209
## Deployment
210
211
The connector is packaged as a NAR (NiFi Archive) file and deployed to Pulsar brokers. Use the Pulsar admin CLI to manage connector lifecycle:
212
213
```bash
214
# Create/start the connector
215
bin/pulsar-admin sources create --source-config-file config.yaml
216
217
# Get connector status
218
bin/pulsar-admin sources get --tenant public --namespace default --name debezium-mysql-source
219
220
# Stop the connector
221
bin/pulsar-admin sources stop --tenant public --namespace default --name debezium-mysql-source
222
223
# Delete the connector
224
bin/pulsar-admin sources delete --tenant public --namespace default --name debezium-mysql-source
225
```