or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

channel-message-store.mdinbound-polling.mdindex.mdjava-dsl.mdlock-registry.mdmessage-store.mdmetadata-store.mdoutbound-gateway.mdoutbound-updates.mdparameter-factories.mdpostgres-channels.mdstored-procedures.md
tile.json

postgres-channels.mddocs/

PostgreSQL Channels

PostgreSQL-specific channel support provides push-based message delivery using PostgreSQL's LISTEN/NOTIFY mechanism. Unlike polling-based channels, PostgreSQL channels deliver messages immediately when they arrive, reducing latency and database load for queue-based messaging patterns.

Key Information for Agents

Required Dependencies:

  • spring-integration-jdbc (this package)
  • spring-integration-core is required
  • PostgreSQL database with LISTEN/NOTIFY support
  • PostgreSQL JDBC driver
  • Database trigger must be configured for NOTIFY

Default Behaviors:

  • Default table prefix: "INT_" (table: INT_CHANNEL_MESSAGE)
  • Default notification timeout: Duration.ofMinutes(1)
  • Push-based delivery (no polling overhead)
  • Requires un-pooled connection for LISTEN (connection must be dedicated)
  • Messages delivered immediately when inserted (via database trigger)

Threading Model:

  • Subscriber maintains long-lived connection for LISTEN
  • Message dispatch uses async executor (configurable)
  • Transactional message retrieval supported
  • Multiple subscribers supported (publish-subscribe pattern)

Lifecycle:

  • PostgresChannelMessageTableSubscriber implements SmartLifecycle (auto-starts by default)
  • Starts LISTEN connection on startup
  • Stops LISTEN connection on shutdown
  • Automatic reconnection on connection failure

Exceptions:

  • DataAccessException - Database access failures
  • SQLException - Connection failures (triggers reconnection)
  • MessagingException - Message handling failures

Edge Cases:

  • PostgreSQL Only: Requires PostgreSQL database (not compatible with other databases)
  • Database Triggers: Must configure trigger for NOTIFY (see setup section)
  • Un-pooled Connection: LISTEN requires dedicated, un-pooled connection
  • Connection Durability: Subscriber maintains long-lived connection for LISTEN
  • Immediate Delivery: Messages delivered as soon as they arrive (push vs poll)
  • Latency Reduction: Sub-second latency vs. polling interval latency
  • Resource Efficiency: No empty polling cycles
  • Transaction Support: Message retrieval can be transactional
  • Multiple Subscribers: Supports publish-subscribe pattern with multiple handlers
  • Region Support: Multiple regions can have separate LISTEN channels
  • Table Prefix: Must match between store, subscriber, and trigger

Core Classes

PostgresSubscribableChannel

package org.springframework.integration.jdbc.channel;

public class PostgresSubscribableChannel extends AbstractSubscribableChannel
    implements PostgresChannelMessageTableSubscriber.Subscription {

    public PostgresSubscribableChannel(
        JdbcChannelMessageStore store,
        Object groupId,
        PostgresChannelMessageTableSubscriber subscriber
    );

    public void setDispatcherExecutor(Executor executor);
    public void setTransactionManager(PlatformTransactionManager transactionManager);
    public void setRetryTemplate(RetryTemplate retryTemplate);
    public void setErrorHandler(ErrorHandler errorHandler);

    public void notifyUpdate();
    public String getRegion();
    public Object getGroupId();
}

PostgresChannelMessageTableSubscriber

package org.springframework.integration.jdbc.channel;

public final class PostgresChannelMessageTableSubscriber implements SmartLifecycle {
    public PostgresChannelMessageTableSubscriber(PgConnectionSupplier connectionSupplier);
    public PostgresChannelMessageTableSubscriber(PgConnectionSupplier connectionSupplier, String tablePrefix);

    public void setTaskExecutor(AsyncTaskExecutor taskExecutor);
    public void setNotificationTimeout(Duration notificationTimeout);

    public boolean subscribe(Subscription subscription);
    public boolean unsubscribe(Subscription subscription);
}

PgConnectionSupplier

package org.springframework.integration.jdbc.channel;

@FunctionalInterface
public interface PgConnectionSupplier {
    PgConnection get() throws SQLException;
}

Usage Examples

Basic Setup

import org.springframework.integration.jdbc.channel.PostgresSubscribableChannel;
import org.springframework.integration.jdbc.channel.PostgresChannelMessageTableSubscriber;
import org.springframework.integration.jdbc.store.JdbcChannelMessageStore;
import org.postgresql.PGConnection;

// Setup channel message store
JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(dataSource);
messageStore.setRegion("PRODUCTION");

// Setup PostgreSQL subscriber
PgConnectionSupplier connectionSupplier = () -> {
    PgConnection conn = (PgConnection) dataSource.getConnection().unwrap(PgConnection.class);
    return conn;
};
PostgresChannelMessageTableSubscriber subscriber =
    new PostgresChannelMessageTableSubscriber(connectionSupplier);

// Create PostgreSQL subscribable channel
PostgresSubscribableChannel channel = new PostgresSubscribableChannel(
    messageStore,
    "orderNotifications",
    subscriber
);

// Configure async dispatch
channel.setDispatcherExecutor(taskExecutor);
channel.setTransactionManager(transactionManager);

// Subscribe message handler
channel.subscribe(message -> {
    System.out.println("Received immediate notification: " + message.getPayload());
    processOrder(message);
});

// Send message - subscribers notified immediately
channel.send(MessageBuilder.withPayload(new Order("ORD-123")).build());

Channel with Retry and Error Handling

// Channel with retry and error handling
PostgresSubscribableChannel resilientChannel = new PostgresSubscribableChannel(
    messageStore,
    "resilientChannel",
    subscriber
);

// Configure retry template
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(new SimpleRetryPolicy(3)); // Retry 3 times
retryTemplate.setBackOffPolicy(new FixedBackOffPolicy());

resilientChannel.setRetryTemplate(retryTemplate);
resilientChannel.setErrorHandler(error -> {
    System.err.println("Failed to process message: " + error.getMessage());
    // Send to dead letter queue, alert, etc.
});

Database Trigger Setup

PostgreSQL channels require database trigger for NOTIFY:

-- Create function to notify on insert
CREATE OR REPLACE FUNCTION int_channel_message_notify()
RETURNS TRIGGER AS $$
BEGIN
    PERFORM pg_notify(
        'int_channel_message_' || NEW.region,
        NEW.group_key::text
    );
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

-- Create trigger on insert
CREATE TRIGGER int_channel_message_trigger
AFTER INSERT ON int_channel_message
FOR EACH ROW
EXECUTE FUNCTION int_channel_message_notify();

Integration with Spring Integration

PostgreSQL channels integrate with Spring Integration flows for real-time messaging:

import org.springframework.integration.dsl.IntegrationFlow;

@Bean
public JdbcChannelMessageStore channelMessageStore(DataSource dataSource) {
    JdbcChannelMessageStore store = new JdbcChannelMessageStore(dataSource);
    store.setRegion("PRODUCTION");
    return store;
}

@Bean
public PgConnectionSupplier pgConnectionSupplier(DataSource dataSource) {
    return () -> dataSource.getConnection().unwrap(PGConnection.class);
}

@Bean
public PostgresChannelMessageTableSubscriber subscriber(PgConnectionSupplier connectionSupplier) {
    PostgresChannelMessageTableSubscriber subscriber =
        new PostgresChannelMessageTableSubscriber(connectionSupplier);
    subscriber.setTaskExecutor(new SimpleAsyncTaskExecutor("pg-notify-"));
    subscriber.setNotificationTimeout(Duration.ofSeconds(30));
    return subscriber;
}

@Bean
public PostgresSubscribableChannel orderNotificationChannel(
    JdbcChannelMessageStore store,
    PostgresChannelMessageTableSubscriber subscriber
) {
    PostgresSubscribableChannel channel = new PostgresSubscribableChannel(
        store,
        "orders",
        subscriber
    );
    channel.setDispatcherExecutor(new SimpleAsyncTaskExecutor("order-dispatch-"));
    channel.setTransactionManager(transactionManager);
    return channel;
}

@Bean
public IntegrationFlow notificationFlow(PostgresSubscribableChannel orderNotificationChannel) {
    return IntegrationFlow
        .from(orderNotificationChannel)
        .handle(msg -> {
            // Immediate notification when order message arrives
            System.out.println("Order notification: " + msg.getPayload());
            processOrderImmediately(msg);
        })
        .get();
}

Key Considerations

  • PostgreSQL Only: Requires PostgreSQL database with LISTEN/NOTIFY support
  • Database Triggers: Must configure trigger for notifications (see setup section)
  • Un-pooled Connection: LISTEN requires dedicated, un-pooled connection
  • Connection Durability: Subscriber maintains long-lived connection for LISTEN
  • Immediate Delivery: Messages delivered as soon as they arrive (push vs poll)
  • Latency Reduction: Sub-second latency vs. polling interval latency
  • Resource Efficiency: No empty polling cycles
  • Transaction Support: Message retrieval can be transactional
  • Multiple Subscribers: Supports publish-subscribe pattern with multiple handlers
  • Region Support: Multiple regions can have separate LISTEN channels
  • Error Handling: Configure retry and error handlers for resilience
  • Async Dispatch: Use task executor for non-blocking message dispatch
  • Connection Recovery: Subscriber automatically reconnects on connection failure
  • Notification Timeout: Configure timeout for connection health checks
  • Table Prefix: Must match between store, subscriber, and trigger
  • Spring Lifecycle: Subscriber starts/stops with application context