or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

connection-config.mdindex.mdsink.mdsource.md
tile.json

source.mddocs/

Message Source

RabbitMQ source for consuming messages from queues with configurable delivery guarantees, automatic message acknowledgment during checkpoints, and support for exactly-once processing semantics.

Capabilities

RMQ Source Class

Main source class that reads messages from RabbitMQ queues with configurable processing guarantees.

/**
 * RabbitMQ source (consumer) which reads from a queue and acknowledges messages on checkpoints.
 * When checkpointing is enabled, it guarantees exactly-once processing semantics.
 * 
 * @param <OUT> The type of the data read from RabbitMQ
 */
public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OUT, String, Long>
    implements ResultTypeQueryable<OUT> {
    
    /**
     * Creates a new RabbitMQ source with at-least-once message processing guarantee when
     * checkpointing is enabled. No strong delivery guarantees when checkpointing is disabled.
     * 
     * @param rmqConnectionConfig The RabbitMQ connection configuration
     * @param queueName The queue to receive messages from
     * @param deserializationSchema A DeserializationSchema for turning bytes into Java objects
     */
    public RMQSource(RMQConnectionConfig rmqConnectionConfig, 
                     String queueName,
                     DeserializationSchema<OUT> deserializationSchema);
    
    /**
     * Creates a new RabbitMQ source with configurable correlation ID usage.
     * For exactly-once processing, set usesCorrelationId to true and enable checkpointing.
     * 
     * @param rmqConnectionConfig The RabbitMQ connection configuration
     * @param queueName The queue to receive messages from
     * @param usesCorrelationId Whether messages have unique correlation IDs for deduplication
     * @param deserializationSchema A DeserializationSchema for turning bytes into Java objects
     */
    public RMQSource(RMQConnectionConfig rmqConnectionConfig,
                     String queueName, 
                     boolean usesCorrelationId,
                     DeserializationSchema<OUT> deserializationSchema);
    
    /** Initializes the connection to RMQ and sets up the queue */
    public void open(Configuration config) throws Exception;
    
    /** Closes the RMQ connection */
    public void close() throws Exception;
    
    /** Main processing loop that consumes messages from the queue */
    public void run(SourceContext<OUT> ctx) throws Exception;
    
    /** Cancels the source operation */
    public void cancel();
    
    /** Returns the type information for the produced output type */
    public TypeInformation<OUT> getProducedType();
}

Customization Methods

Protected methods that can be overridden for custom queue and connection setup.

/**
 * Protected methods for customizing RMQ source behavior
 */
public class RMQSource<OUT> {
    
    /**
     * Initializes the connection to RMQ with a default connection factory.
     * Override this method to setup and configure a custom ConnectionFactory.
     */
    protected ConnectionFactory setupConnectionFactory() throws Exception;
    
    /**
     * Sets up the queue. The default implementation just declares the queue.
     * Override this method for custom queue setup (i.e. binding to an exchange
     * or defining custom queue parameters).
     */
    protected void setupQueue() throws IOException;
    
    /**
     * Acknowledges session IDs during checkpoint creation.
     * Called automatically by the framework during checkpointing.
     */
    protected void acknowledgeSessionIDs(List<Long> sessionIds);
}

Usage Examples:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

// Basic at-least-once source
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
    .setHost("localhost")
    .setPort(5672)
    .setVirtualHost("/")
    .setUserName("guest")
    .setPassword("guest")
    .build();

RMQSource<String> basicSource = new RMQSource<>(
    connectionConfig,
    "input-queue",
    new SimpleStringSchema()
);

// Exactly-once source with correlation IDs
RMQSource<String> exactlyOnceSource = new RMQSource<>(
    connectionConfig,
    "input-queue",
    true, // use correlation IDs
    new SimpleStringSchema()
);

// Custom source with queue binding
class CustomRMQSource extends RMQSource<String> {
    public CustomRMQSource(RMQConnectionConfig config, String queueName) {
        super(config, queueName, new SimpleStringSchema());
    }
    
    @Override
    protected void setupQueue() throws IOException {
        // Declare queue with custom parameters
        channel.queueDeclare(queueName, true, false, false, null);
        
        // Bind queue to exchange
        channel.queueBind(queueName, "events-exchange", "user.*");
    }
}

// Use in Flink pipeline
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // Enable checkpointing for delivery guarantees

env.addSource(exactlyOnceSource)
   .map(s -> processMessage(s))
   .print();

env.execute("RabbitMQ Source Example");

Processing Semantics

The RMQSource supports three different processing modes:

Exactly-Once Processing

  • Requirements: Checkpointing enabled + correlation IDs + RabbitMQ transactions
  • Usage: Set usesCorrelationId to true and enable Flink checkpointing
  • Behavior: Messages are acknowledged only during successful checkpoints; correlation IDs prevent duplicate processing
  • Producer Requirement: Must set unique correlation IDs on messages
// Enable checkpointing in Flink
env.enableCheckpointing(5000);

// Create source with correlation ID support
RMQSource<String> source = new RMQSource<>(
    connectionConfig,
    "queue-name",
    true, // enables exactly-once with correlation IDs
    new SimpleStringSchema()
);

At-Least-Once Processing

  • Requirements: Checkpointing enabled + RabbitMQ transactions (no correlation IDs)
  • Usage: Enable Flink checkpointing, set usesCorrelationId to false or use single-parameter constructor
  • Behavior: Messages acknowledged during checkpoints; may process duplicates after failures
// Enable checkpointing in Flink
env.enableCheckpointing(5000);

// Create source without correlation IDs
RMQSource<String> source = new RMQSource<>(
    connectionConfig,
    "queue-name",
    new SimpleStringSchema() // at-least-once processing
);

No Delivery Guarantees

  • Requirements: No checkpointing
  • Usage: Disable checkpointing or don't enable it
  • Behavior: Auto-acknowledgment mode; messages may be lost on failures but no transaction overhead
// No checkpointing enabled
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

RMQSource<String> source = new RMQSource<>(
    connectionConfig,
    "queue-name",
    new SimpleStringSchema()
);

Queue Configuration

Default Queue Setup

By default, setupQueue() declares a durable, non-exclusive, non-auto-delete queue:

channel.queueDeclare(queueName, true, false, false, null);

Custom Queue Setup

Override setupQueue() for custom queue configuration:

@Override
protected void setupQueue() throws IOException {
    Map<String, Object> args = new HashMap<>();
    args.put("x-message-ttl", 60000); // 60 second TTL
    args.put("x-max-length", 1000);   // Max 1000 messages
    
    // Declare queue with custom arguments
    channel.queueDeclare(queueName, true, false, false, args);
    
    // Bind to exchange with routing key
    channel.queueBind(queueName, "my-exchange", "routing.key");
}

Error Handling

Connection Failures

Connection failures during open() throw RuntimeException with descriptive messages:

try {
    source.open(config);
} catch (RuntimeException e) {
    // Handle connection failures
    logger.error("Failed to connect to RabbitMQ: " + e.getMessage());
}

Message Processing Failures

  • Exactly-once mode: Failed acknowledgments during checkpointing throw RuntimeException
  • Correlation ID violations: Missing correlation IDs when usesCorrelationId=true throw exceptions
  • Deserialization failures: Handled by the provided DeserializationSchema

Recovery Behavior

When automatic recovery is enabled in RMQConnectionConfig:

  • Network failures trigger automatic reconnection
  • Topology recovery re-declares queues and bindings
  • Recovery interval controls delay between attempts