CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-rabbitmq-2-10

Apache Flink RabbitMQ connector that provides source and sink functionality for streaming data to and from RabbitMQ message queues with exactly-once processing semantics when checkpointing is enabled

Overview
Eval results
Files

Flink Connector RabbitMQ

Apache Flink RabbitMQ connector that provides source and sink functionality for streaming data to and from RabbitMQ message queues. This connector supports exactly-once processing semantics when checkpointing is enabled, at-least-once when checkpointing is enabled without correlation IDs, and no delivery guarantees when checkpointing is disabled.

Package Information

  • Package Name: flink-connector-rabbitmq_2.10
  • Package Type: maven
  • Language: Java
  • Group ID: org.apache.flink
  • Artifact ID: flink-connector-rabbitmq_2.10
  • Version: 1.3.3
  • Installation: Add dependency to your Maven POM file
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-rabbitmq_2.10</artifactId>
    <version>1.3.3</version>
</dependency>

Core Imports

import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.SerializationSchema;

Basic Usage

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

// Set up execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Configure RabbitMQ connection
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
    .setHost("localhost")
    .setPort(5672)
    .setVirtualHost("/")
    .setUserName("guest")
    .setPassword("guest")
    .build();

// Create RabbitMQ source
RMQSource<String> rmqSource = new RMQSource<>(
    connectionConfig,
    "input-queue",
    true, // use correlation IDs for exactly-once processing
    new SimpleStringSchema()
);

// Create RabbitMQ sink
RMQSink<String> rmqSink = new RMQSink<>(
    connectionConfig,
    "output-queue",
    new SimpleStringSchema()
);

// Build streaming pipeline
env.addSource(rmqSource)
   .map(s -> s.toUpperCase())
   .addSink(rmqSink);

env.execute("RabbitMQ Pipeline");

Architecture

The Flink RabbitMQ connector is built around three key components:

  • Connection Management: RMQConnectionConfig handles all connection parameters, timeouts, and recovery settings
  • Source Component: RMQSource provides message consumption with configurable delivery guarantees
  • Sink Component: RMQSink handles message publishing with error handling and queue setup

The connector supports different processing semantics based on configuration:

  • Exactly-once: Enabled via checkpointing + correlation IDs + transactions
  • At-least-once: Enabled via checkpointing + transactions (without correlation IDs)
  • No guarantees: Auto-commit mode when checkpointing is disabled

Capabilities

Connection Configuration

Comprehensive connection configuration supporting both individual parameters and URI-based setup, with full control over timeouts, recovery settings, and SSL options.

public class RMQConnectionConfig implements Serializable {
    public String getHost();
    public int getPort();
    public String getVirtualHost();
    public String getUsername();
    public String getPassword();
    public String getUri();
    public ConnectionFactory getConnectionFactory();
}

public static class Builder {
    public Builder setHost(String host);
    public Builder setPort(int port);
    public Builder setVirtualHost(String virtualHost);
    public Builder setUserName(String username);
    public Builder setPassword(String password);
    public Builder setUri(String uri);
    public RMQConnectionConfig build();
}

Connection Configuration

Message Source

RabbitMQ source for consuming messages from queues with configurable delivery guarantees and automatic message acknowledgment during checkpoints.

public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OUT, String, Long>
    implements ResultTypeQueryable<OUT> {
    
    public RMQSource(RMQConnectionConfig rmqConnectionConfig, 
                     String queueName,
                     DeserializationSchema<OUT> deserializationSchema);
    
    public RMQSource(RMQConnectionConfig rmqConnectionConfig,
                     String queueName, 
                     boolean usesCorrelationId,
                     DeserializationSchema<OUT> deserializationSchema);
}

Message Source

Message Sink

RabbitMQ sink for publishing messages to queues with configurable error handling and automatic queue setup.

public class RMQSink<IN> extends RichSinkFunction<IN> {
    public RMQSink(RMQConnectionConfig rmqConnectionConfig, 
                   String queueName, 
                   SerializationSchema<IN> schema);
    
    public void setLogFailuresOnly(boolean logFailuresOnly);
}

Message Sink

Core Types

// Required Flink interfaces for serialization
interface DeserializationSchema<T> {
    T deserialize(byte[] message);
    boolean isEndOfStream(T nextElement);
    TypeInformation<T> getProducedType();
}

interface SerializationSchema<T> {
    byte[] serialize(T element);
}

// RabbitMQ client types (from com.rabbitmq.client package)
class ConnectionFactory {
    Connection newConnection() throws IOException, TimeoutException;
}

class Connection {
    Channel createChannel() throws IOException;
    void close() throws IOException;
}

class Channel {
    void queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
    void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
    void basicAck(long deliveryTag, boolean multiple) throws IOException;
    void basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
    void txSelect() throws IOException;
    void txCommit() throws IOException;
    void close() throws IOException;
}

// RabbitMQ consumer classes (from com.rabbitmq.client package)
class QueueingConsumer {
    QueueingConsumer(Channel channel);
    Delivery nextDelivery() throws InterruptedException;
    
    static class Delivery {
        byte[] getBody();
        Envelope getEnvelope();
        BasicProperties getProperties();
    }
}

class Envelope {
    long getDeliveryTag();
    String getExchange();
    String getRoutingKey();
}

class BasicProperties {
    String getCorrelationId();
    Integer getDeliveryMode();
    String getContentType();
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-rabbitmq-2-10
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-connector-rabbitmq_2.10@1.3.x