or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/maven-org-apache-pulsar--pulsar-io-debezium-mysql

A Pulsar IO connector that integrates with Debezium MySQL connector to capture change data capture (CDC) events from MySQL databases and stream them to Apache Pulsar topics

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.pulsar/pulsar-io-debezium-mysql@4.0.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-pulsar--pulsar-io-debezium-mysql@4.0.0

index.mddocs/

Pulsar IO Debezium MySQL

A Pulsar IO source connector that integrates with Debezium MySQL connector to capture change data capture (CDC) events from MySQL databases and stream them to Apache Pulsar topics. This connector enables real-time data replication and streaming from MySQL databases into the Pulsar ecosystem.

Package Information

  • Package Name: pulsar-io-debezium-mysql
  • Package Type: maven
  • Language: Java
  • Group ID: org.apache.pulsar
  • Artifact ID: pulsar-io-debezium-mysql
  • Installation: Add as dependency in pom.xml or use the pre-built NAR file

Maven Dependency

<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-io-debezium-mysql</artifactId>
    <version>4.0.6</version>
</dependency>

Core Imports

import org.apache.pulsar.io.debezium.mysql.DebeziumMysqlSource;
import org.apache.pulsar.io.core.SourceContext;
import java.util.Map;

Basic Usage

This connector is typically deployed as a Pulsar IO connector using configuration files:

# debezium-mysql-source-config.yaml
tenant: "public"
namespace: "default"
name: "debezium-mysql-source"
topicName: "mysql-cdc-events"
archive: "connectors/pulsar-io-debezium-mysql-4.0.6.nar"
parallelism: 1

configs:
  # MySQL connection settings
  database.hostname: "localhost"
  database.port: "3306"
  database.user: "debezium"
  database.password: "dbz"
  database.server.id: "184054"
  database.server.name: "dbserver1"
  database.whitelist: "inventory"
  
  # Pulsar integration settings
  database.history.pulsar.topic: "mysql-history-topic"
  database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
  offset.storage.topic: "mysql-offset-topic"

Deploy the connector using Pulsar admin CLI:

bin/pulsar-admin sources create --source-config-file debezium-mysql-source-config.yaml

Architecture

The connector extends the Pulsar IO framework hierarchy:

  • Source Interface: Core Pulsar IO source contract with lifecycle management
  • KafkaConnectSource: Base class providing Kafka Connect integration
  • DebeziumSource: Abstract base for all Debezium CDC connectors
  • DebeziumMysqlSource: MySQL-specific implementation

The connector leverages Debezium's MySQL connector (io.debezium.connector.mysql.MySqlConnectorTask) to capture binlog events and transforms them into Pulsar messages through the Kafka Connect adaptation layer.

Capabilities

Source Connector Implementation

The main connector class that extends DebeziumSource to provide MySQL-specific CDC functionality.

public class DebeziumMysqlSource extends DebeziumSource {
    public void setDbConnectorTask(Map<String, Object> config) throws Exception;
}

Base Source Interface

Core Pulsar IO source interface providing lifecycle management and message reading functionality.

public interface Source<T> extends AutoCloseable {
    void open(Map<String, Object> config, SourceContext sourceContext) throws Exception;
    Record<T> read() throws Exception;
    void close() throws Exception;
}

Configuration Management

Static utility methods for managing connector configuration and integration with Pulsar.

public abstract class DebeziumSource extends KafkaConnectSource {
    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception;
    public abstract void setDbConnectorTask(Map<String, Object> config) throws Exception;
    
    // Static utility methods
    public static void throwExceptionIfConfigNotMatch(
        Map<String, Object> config, 
        String key, 
        String value
    ) throws IllegalArgumentException;
    
    public static void setConfigIfNull(
        Map<String, Object> config, 
        String key, 
        String value
    );
    
    public static String topicNamespace(SourceContext sourceContext);
    
    public static void tryLoadingConfigSecret(
        String secretName, 
        Map<String, Object> config, 
        SourceContext context
    );
}

Configuration Parameters

Database Connection

  • database.hostname (string): MySQL server hostname or IP address
  • database.port (string): MySQL server port number (default: 3306)
  • database.user (string): Username for MySQL connection
  • database.password (string): Password for MySQL connection (can be loaded from secrets)
  • database.server.id (string): Unique numeric identifier for this MySQL server within replication topology
  • database.server.name (string): Logical name identifying the MySQL server/cluster
  • database.whitelist (string): Comma-separated list of database names to monitor

Pulsar Integration

  • database.history.pulsar.topic (string): Pulsar topic name for storing database schema history
  • database.history.pulsar.service.url (string): Pulsar service URL for history storage
  • offset.storage.topic (string): Pulsar topic name for storing connector offset information

Connector Management

  • tenant (string): Pulsar tenant for the connector
  • namespace (string): Pulsar namespace for the connector
  • name (string): Unique name for this connector instance
  • topicName (string): Pulsar topic name where CDC events will be published
  • parallelism (integer): Number of parallel connector tasks to run
  • archive (string): Path to the connector NAR file

Types

// Core Pulsar IO types
public interface SourceContext {
    String getTenant();
    String getNamespace();
    String getSourceName();
    String getSecret(String secretName);
    PulsarClientBuilder getPulsarClientBuilder();
}

public interface Record<T> {
    T getValue();
    String getTopicName();
    String getKey();
    Map<String, String> getProperties();
    // Additional record metadata methods
}

// Configuration types
public class Map<String, Object> {
    // Standard Java Map interface for configuration parameters
}

Constants

// Default MySQL connector task class
public static final String DEFAULT_TASK = "io.debezium.connector.mysql.MySqlConnectorTask";

// Default converter configurations
public static final String DEFAULT_CONVERTER = "org.apache.kafka.connect.json.JsonConverter";
public static final String DEFAULT_HISTORY = "org.apache.pulsar.io.debezium.PulsarDatabaseHistory";
public static final String DEFAULT_OFFSET_TOPIC = "debezium-offset-topic";
public static final String DEFAULT_HISTORY_TOPIC = "debezium-history-topic";

Error Handling

The connector can throw the following exceptions:

  • IllegalArgumentException: When required configuration parameters are missing or invalid
  • Exception: General exceptions during connector initialization, database connection, or message processing
  • AutoCloseable exceptions: During connector shutdown and resource cleanup

Configuration validation occurs during the open() method call, and runtime exceptions may occur during read() operations when database connectivity issues arise or binlog processing encounters errors.

Deployment

The connector is packaged as a NAR (NiFi Archive) file and deployed to Pulsar brokers. Use the Pulsar admin CLI to manage connector lifecycle:

# Create/start the connector
bin/pulsar-admin sources create --source-config-file config.yaml

# Get connector status
bin/pulsar-admin sources get --tenant public --namespace default --name debezium-mysql-source

# Stop the connector
bin/pulsar-admin sources stop --tenant public --namespace default --name debezium-mysql-source

# Delete the connector
bin/pulsar-admin sources delete --tenant public --namespace default --name debezium-mysql-source