or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

connection-config.mdindex.mdsink.mdsource.md
tile.json

tessl/maven-org-apache-flink--flink-connector-rabbitmq_2-10

Apache Flink RabbitMQ connector that provides source and sink functionality for streaming data to and from RabbitMQ message queues with exactly-once processing semantics when checkpointing is enabled

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-connector-rabbitmq_2.10@1.3.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-rabbitmq_2-10@1.3.0

index.mddocs/

Flink Connector RabbitMQ

Apache Flink RabbitMQ connector that provides source and sink functionality for streaming data to and from RabbitMQ message queues. This connector supports exactly-once processing semantics when checkpointing is enabled, at-least-once when checkpointing is enabled without correlation IDs, and no delivery guarantees when checkpointing is disabled.

Package Information

  • Package Name: flink-connector-rabbitmq_2.10
  • Package Type: maven
  • Language: Java
  • Group ID: org.apache.flink
  • Artifact ID: flink-connector-rabbitmq_2.10
  • Version: 1.3.3
  • Installation: Add dependency to your Maven POM file
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-rabbitmq_2.10</artifactId>
    <version>1.3.3</version>
</dependency>

Core Imports

import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.SerializationSchema;

Basic Usage

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

// Set up execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Configure RabbitMQ connection
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
    .setHost("localhost")
    .setPort(5672)
    .setVirtualHost("/")
    .setUserName("guest")
    .setPassword("guest")
    .build();

// Create RabbitMQ source
RMQSource<String> rmqSource = new RMQSource<>(
    connectionConfig,
    "input-queue",
    true, // use correlation IDs for exactly-once processing
    new SimpleStringSchema()
);

// Create RabbitMQ sink
RMQSink<String> rmqSink = new RMQSink<>(
    connectionConfig,
    "output-queue",
    new SimpleStringSchema()
);

// Build streaming pipeline
env.addSource(rmqSource)
   .map(s -> s.toUpperCase())
   .addSink(rmqSink);

env.execute("RabbitMQ Pipeline");

Architecture

The Flink RabbitMQ connector is built around three key components:

  • Connection Management: RMQConnectionConfig handles all connection parameters, timeouts, and recovery settings
  • Source Component: RMQSource provides message consumption with configurable delivery guarantees
  • Sink Component: RMQSink handles message publishing with error handling and queue setup

The connector supports different processing semantics based on configuration:

  • Exactly-once: Enabled via checkpointing + correlation IDs + transactions
  • At-least-once: Enabled via checkpointing + transactions (without correlation IDs)
  • No guarantees: Auto-commit mode when checkpointing is disabled

Capabilities

Connection Configuration

Comprehensive connection configuration supporting both individual parameters and URI-based setup, with full control over timeouts, recovery settings, and SSL options.

public class RMQConnectionConfig implements Serializable {
    public String getHost();
    public int getPort();
    public String getVirtualHost();
    public String getUsername();
    public String getPassword();
    public String getUri();
    public ConnectionFactory getConnectionFactory();
}

public static class Builder {
    public Builder setHost(String host);
    public Builder setPort(int port);
    public Builder setVirtualHost(String virtualHost);
    public Builder setUserName(String username);
    public Builder setPassword(String password);
    public Builder setUri(String uri);
    public RMQConnectionConfig build();
}

Connection Configuration

Message Source

RabbitMQ source for consuming messages from queues with configurable delivery guarantees and automatic message acknowledgment during checkpoints.

public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OUT, String, Long>
    implements ResultTypeQueryable<OUT> {
    
    public RMQSource(RMQConnectionConfig rmqConnectionConfig, 
                     String queueName,
                     DeserializationSchema<OUT> deserializationSchema);
    
    public RMQSource(RMQConnectionConfig rmqConnectionConfig,
                     String queueName, 
                     boolean usesCorrelationId,
                     DeserializationSchema<OUT> deserializationSchema);
}

Message Source

Message Sink

RabbitMQ sink for publishing messages to queues with configurable error handling and automatic queue setup.

public class RMQSink<IN> extends RichSinkFunction<IN> {
    public RMQSink(RMQConnectionConfig rmqConnectionConfig, 
                   String queueName, 
                   SerializationSchema<IN> schema);
    
    public void setLogFailuresOnly(boolean logFailuresOnly);
}

Message Sink

Core Types

// Required Flink interfaces for serialization
interface DeserializationSchema<T> {
    T deserialize(byte[] message);
    boolean isEndOfStream(T nextElement);
    TypeInformation<T> getProducedType();
}

interface SerializationSchema<T> {
    byte[] serialize(T element);
}

// RabbitMQ client types (from com.rabbitmq.client package)
class ConnectionFactory {
    Connection newConnection() throws IOException, TimeoutException;
}

class Connection {
    Channel createChannel() throws IOException;
    void close() throws IOException;
}

class Channel {
    void queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
    void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
    void basicAck(long deliveryTag, boolean multiple) throws IOException;
    void basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
    void txSelect() throws IOException;
    void txCommit() throws IOException;
    void close() throws IOException;
}

// RabbitMQ consumer classes (from com.rabbitmq.client package)
class QueueingConsumer {
    QueueingConsumer(Channel channel);
    Delivery nextDelivery() throws InterruptedException;
    
    static class Delivery {
        byte[] getBody();
        Envelope getEnvelope();
        BasicProperties getProperties();
    }
}

class Envelope {
    long getDeliveryTag();
    String getExchange();
    String getRoutingKey();
}

class BasicProperties {
    String getCorrelationId();
    Integer getDeliveryMode();
    String getContentType();
}