A Pulsar IO connector that integrates with Debezium MySQL connector to capture change data capture (CDC) events from MySQL databases and stream them to Apache Pulsar topics
npx @tessl/cli install tessl/maven-org-apache-pulsar--pulsar-io-debezium-mysql@4.0.0A Pulsar IO source connector that integrates with Debezium MySQL connector to capture change data capture (CDC) events from MySQL databases and stream them to Apache Pulsar topics. This connector enables real-time data replication and streaming from MySQL databases into the Pulsar ecosystem.
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-io-debezium-mysql</artifactId>
<version>4.0.6</version>
</dependency>import org.apache.pulsar.io.debezium.mysql.DebeziumMysqlSource;
import org.apache.pulsar.io.core.SourceContext;
import java.util.Map;This connector is typically deployed as a Pulsar IO connector using configuration files:
# debezium-mysql-source-config.yaml
tenant: "public"
namespace: "default"
name: "debezium-mysql-source"
topicName: "mysql-cdc-events"
archive: "connectors/pulsar-io-debezium-mysql-4.0.6.nar"
parallelism: 1
configs:
# MySQL connection settings
database.hostname: "localhost"
database.port: "3306"
database.user: "debezium"
database.password: "dbz"
database.server.id: "184054"
database.server.name: "dbserver1"
database.whitelist: "inventory"
# Pulsar integration settings
database.history.pulsar.topic: "mysql-history-topic"
database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
offset.storage.topic: "mysql-offset-topic"Deploy the connector using Pulsar admin CLI:
bin/pulsar-admin sources create --source-config-file debezium-mysql-source-config.yamlThe connector extends the Pulsar IO framework hierarchy:
The connector leverages Debezium's MySQL connector (io.debezium.connector.mysql.MySqlConnectorTask) to capture binlog events and transforms them into Pulsar messages through the Kafka Connect adaptation layer.
The main connector class that extends DebeziumSource to provide MySQL-specific CDC functionality.
public class DebeziumMysqlSource extends DebeziumSource {
public void setDbConnectorTask(Map<String, Object> config) throws Exception;
}Core Pulsar IO source interface providing lifecycle management and message reading functionality.
public interface Source<T> extends AutoCloseable {
void open(Map<String, Object> config, SourceContext sourceContext) throws Exception;
Record<T> read() throws Exception;
void close() throws Exception;
}Static utility methods for managing connector configuration and integration with Pulsar.
public abstract class DebeziumSource extends KafkaConnectSource {
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception;
public abstract void setDbConnectorTask(Map<String, Object> config) throws Exception;
// Static utility methods
public static void throwExceptionIfConfigNotMatch(
Map<String, Object> config,
String key,
String value
) throws IllegalArgumentException;
public static void setConfigIfNull(
Map<String, Object> config,
String key,
String value
);
public static String topicNamespace(SourceContext sourceContext);
public static void tryLoadingConfigSecret(
String secretName,
Map<String, Object> config,
SourceContext context
);
}// Core Pulsar IO types
public interface SourceContext {
String getTenant();
String getNamespace();
String getSourceName();
String getSecret(String secretName);
PulsarClientBuilder getPulsarClientBuilder();
}
public interface Record<T> {
T getValue();
String getTopicName();
String getKey();
Map<String, String> getProperties();
// Additional record metadata methods
}
// Configuration types
public class Map<String, Object> {
// Standard Java Map interface for configuration parameters
}// Default MySQL connector task class
public static final String DEFAULT_TASK = "io.debezium.connector.mysql.MySqlConnectorTask";
// Default converter configurations
public static final String DEFAULT_CONVERTER = "org.apache.kafka.connect.json.JsonConverter";
public static final String DEFAULT_HISTORY = "org.apache.pulsar.io.debezium.PulsarDatabaseHistory";
public static final String DEFAULT_OFFSET_TOPIC = "debezium-offset-topic";
public static final String DEFAULT_HISTORY_TOPIC = "debezium-history-topic";The connector can throw the following exceptions:
Configuration validation occurs during the open() method call, and runtime exceptions may occur during read() operations when database connectivity issues arise or binlog processing encounters errors.
The connector is packaged as a NAR (NiFi Archive) file and deployed to Pulsar brokers. Use the Pulsar admin CLI to manage connector lifecycle:
# Create/start the connector
bin/pulsar-admin sources create --source-config-file config.yaml
# Get connector status
bin/pulsar-admin sources get --tenant public --namespace default --name debezium-mysql-source
# Stop the connector
bin/pulsar-admin sources stop --tenant public --namespace default --name debezium-mysql-source
# Delete the connector
bin/pulsar-admin sources delete --tenant public --namespace default --name debezium-mysql-source