or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

connection-config.mdindex.mdsink.mdsource.md
tile.json

sink.mddocs/

Message Sink

RabbitMQ sink for publishing messages to queues with configurable error handling, automatic queue setup, and robust connection management.

Capabilities

RMQ Sink Class

Main sink class that publishes messages to RabbitMQ queues with configurable error handling behavior.

/**
 * A Sink for publishing data into RabbitMQ
 * 
 * @param <IN> The type of the data to be published to RabbitMQ
 */
public class RMQSink<IN> extends RichSinkFunction<IN> {
    
    /**
     * Creates a new RabbitMQ sink for publishing messages to a queue
     * 
     * @param rmqConnectionConfig The RabbitMQ connection configuration
     * @param queueName The queue to publish messages to
     * @param schema A SerializationSchema for turning Java objects into bytes
     */
    public RMQSink(RMQConnectionConfig rmqConnectionConfig, 
                   String queueName, 
                   SerializationSchema<IN> schema);
    
    /** Initializes the RabbitMQ connection and channel, and sets up the queue */
    public void open(Configuration config) throws Exception;
    
    /**
     * Called when new data arrives to the sink, and forwards it to RMQ
     * 
     * @param value The incoming data to publish
     */
    public void invoke(IN value);
    
    /** Closes the RabbitMQ connection and channel */
    public void close();
    
    /**
     * Defines whether the producer should fail on errors, or only log them.
     * If set to true, exceptions will be only logged.
     * If set to false, exceptions will be thrown and cause the streaming program to fail.
     * 
     * @param logFailuresOnly The flag to indicate logging-only on exceptions
     */
    public void setLogFailuresOnly(boolean logFailuresOnly);
}

Customization Methods

Protected methods that can be overridden for custom queue setup.

/**
 * Protected methods for customizing RMQ sink behavior
 */
public class RMQSink<IN> {
    
    /**
     * Sets up the queue. The default implementation just declares the queue.
     * Override this method for custom queue setup (i.e. binding to an exchange
     * or defining custom queue parameters).
     */
    protected void setupQueue() throws IOException;
}

Usage Examples:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

// Basic sink configuration
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
    .setHost("localhost")
    .setPort(5672)
    .setVirtualHost("/")
    .setUserName("guest")
    .setPassword("guest")
    .build();

RMQSink<String> basicSink = new RMQSink<>(
    connectionConfig,
    "output-queue",
    new SimpleStringSchema()
);

// Sink with error logging (doesn't fail on publish errors)
RMQSink<String> resilientSink = new RMQSink<>(
    connectionConfig,
    "output-queue",
    new SimpleStringSchema()
);
resilientSink.setLogFailuresOnly(true);

// Custom sink with exchange publishing
class CustomRMQSink extends RMQSink<String> {
    private final String exchangeName;
    private final String routingKey;
    
    public CustomRMQSink(RMQConnectionConfig config, String exchange, String routing) {
        super(config, "", new SimpleStringSchema()); // queue name not used
        this.exchangeName = exchange;
        this.routingKey = routing;
    }
    
    @Override
    protected void setupQueue() throws IOException {
        // Declare exchange instead of queue
        channel.exchangeDeclare(exchangeName, "topic", true);
    }
    
    @Override
    public void invoke(String value) {
        try {
            byte[] msg = schema.serialize(value);
            // Publish to exchange with routing key instead of queue
            channel.basicPublish(exchangeName, routingKey, null, msg);
        } catch (IOException e) {
            if (logFailuresOnly) {
                LOG.error("Cannot send message to exchange {}", exchangeName, e);
            } else {
                throw new RuntimeException("Cannot send message to exchange " + exchangeName, e);
            }
        }
    }
}

// Use in Flink pipeline
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.fromElements("Hello", "World", "from", "Flink")
   .map(s -> s.toUpperCase())
   .addSink(basicSink);

env.execute("RabbitMQ Sink Example");

Publishing Behavior

Default Publishing

By default, messages are published to the specified queue using the default exchange:

channel.basicPublish("", queueName, null, serializedMessage);
  • Exchange: "" (default exchange)
  • Routing Key: Queue name
  • Properties: null (no special message properties)
  • Body: Serialized message bytes

Custom Publishing

Override invoke() method for custom publishing behavior:

@Override
public void invoke(MyDataType value) {
    try {
        byte[] msg = schema.serialize(value);
        
        // Custom message properties
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
            .contentType("application/json")
            .deliveryMode(2) // persistent
            .timestamp(new Date())
            .build();
        
        // Publish to specific exchange with routing key
        channel.basicPublish("my-exchange", "routing.key", props, msg);
        
    } catch (IOException e) {
        handlePublishError(e);
    }
}

Queue Configuration

Default Queue Setup

By default, setupQueue() declares a non-durable, non-exclusive, non-auto-delete queue:

channel.queueDeclare(queueName, false, false, false, null);

Custom Queue Setup

Override setupQueue() for custom queue configuration:

@Override
protected void setupQueue() throws IOException {
    Map<String, Object> args = new HashMap<>();
    args.put("x-message-ttl", 300000); // 5 minute TTL
    args.put("x-max-length", 10000);   // Max 10,000 messages
    args.put("x-overflow", "reject-publish"); // Reject when full
    
    // Declare durable queue with custom arguments
    channel.queueDeclare(queueName, true, false, false, args);
    
    // Bind queue to exchange
    channel.queueBind(queueName, "events-exchange", "output.*");
}

Exchange Setup

For exchange-based publishing, override setupQueue() to declare exchanges:

@Override
protected void setupQueue() throws IOException {
    // Declare topic exchange
    channel.exchangeDeclare("events-exchange", "topic", true);
    
    // Optionally declare queues bound to the exchange
    channel.queueDeclare("event-queue", true, false, false, null);
    channel.queueBind("event-queue", "events-exchange", "event.*");
}

Error Handling

Configuration Options

The sink supports two error handling modes via setLogFailuresOnly():

Fail-Fast Mode (default)

RMQSink<String> sink = new RMQSink<>(config, "queue", schema);
sink.setLogFailuresOnly(false); // default behavior

// Publishing errors throw RuntimeException, causing job failure

Resilient Mode

RMQSink<String> sink = new RMQSink<>(config, "queue", schema);
sink.setLogFailuresOnly(true);

// Publishing errors are logged but don't cause job failure

Error Types

Connection Errors

Thrown during open() when connection setup fails:

try {
    sink.open(config);
} catch (RuntimeException e) {
    // Handle connection setup failures
    logger.error("Failed to connect to RabbitMQ: " + e.getMessage());
}

Publishing Errors

Thrown during invoke() when message publishing fails:

  • Network issues: Connection lost during publish
  • Queue full: When queue reaches maximum capacity
  • Authentication: Invalid credentials or permissions
  • Serialization: Schema serialization failures
// Error handling in custom sink
@Override
public void invoke(MyType value) {
    try {
        byte[] msg = schema.serialize(value);
        channel.basicPublish("", queueName, null, msg);
    } catch (IOException e) {
        if (logFailuresOnly) {
            LOG.error("Failed to publish message", e);
            // Continue processing other messages
        } else {
            throw new RuntimeException("Publishing failed", e);
            // Job will restart from last checkpoint
        }
    }
}

Connection Cleanup Errors

Thrown during close() when connection cleanup fails. The implementation attempts to close both channel and connection, logging the first error and throwing the second if both fail:

@Override
public void close() {
    IOException t = null;
    try {
        channel.close();
    } catch (IOException e) {
        t = e;
    }

    try {
        connection.close();
    } catch (IOException e) {
        if(t != null) {
            LOG.warn("Both channel and connection closing failed. Logging channel exception and failing with connection exception", t);
        }
        t = e;
    }
    if(t != null) {
        throw new RuntimeException("Error while closing RMQ connection with " + queueName
                + " at " + rmqConnectionConfig.getHost(), t);
    }
}

Performance Considerations

Batching

For high-throughput scenarios, consider implementing custom batching:

class BatchingRMQSink extends RMQSink<String> {
    private final List<String> batch = new ArrayList<>();
    private final int batchSize = 100;
    
    @Override
    public void invoke(String value) {
        synchronized (batch) {
            batch.add(value);
            if (batch.size() >= batchSize) {
                flushBatch();
            }
        }
    }
    
    private void flushBatch() {
        // Publish entire batch in one operation
        for (String item : batch) {
            super.invoke(item);
        }
        batch.clear();
    }
}

Connection Pooling

For multiple sinks, consider sharing connection configuration:

// Shared connection config
RMQConnectionConfig sharedConfig = new RMQConnectionConfig.Builder()
    .setHost("rabbitmq-cluster")
    .setAutomaticRecovery(true)
    .setNetworkRecoveryInterval(5000)
    .build();

// Multiple sinks with shared config
RMQSink<String> sink1 = new RMQSink<>(sharedConfig, "queue1", schema);
RMQSink<String> sink2 = new RMQSink<>(sharedConfig, "queue2", schema);