PostgreSQL-specific channel support provides push-based message delivery using PostgreSQL's LISTEN/NOTIFY mechanism. Unlike polling-based channels, PostgreSQL channels deliver messages immediately when they arrive, reducing latency and database load for queue-based messaging patterns.
Required Dependencies:
spring-integration-jdbc (this package)spring-integration-core is requiredDefault Behaviors:
"INT_" (table: INT_CHANNEL_MESSAGE)Duration.ofMinutes(1)Threading Model:
Lifecycle:
PostgresChannelMessageTableSubscriber implements SmartLifecycle (auto-starts by default)Exceptions:
DataAccessException - Database access failuresSQLException - Connection failures (triggers reconnection)MessagingException - Message handling failuresEdge Cases:
package org.springframework.integration.jdbc.channel;
public class PostgresSubscribableChannel extends AbstractSubscribableChannel
implements PostgresChannelMessageTableSubscriber.Subscription {
public PostgresSubscribableChannel(
JdbcChannelMessageStore store,
Object groupId,
PostgresChannelMessageTableSubscriber subscriber
);
public void setDispatcherExecutor(Executor executor);
public void setTransactionManager(PlatformTransactionManager transactionManager);
public void setRetryTemplate(RetryTemplate retryTemplate);
public void setErrorHandler(ErrorHandler errorHandler);
public void notifyUpdate();
public String getRegion();
public Object getGroupId();
}package org.springframework.integration.jdbc.channel;
public final class PostgresChannelMessageTableSubscriber implements SmartLifecycle {
public PostgresChannelMessageTableSubscriber(PgConnectionSupplier connectionSupplier);
public PostgresChannelMessageTableSubscriber(PgConnectionSupplier connectionSupplier, String tablePrefix);
public void setTaskExecutor(AsyncTaskExecutor taskExecutor);
public void setNotificationTimeout(Duration notificationTimeout);
public boolean subscribe(Subscription subscription);
public boolean unsubscribe(Subscription subscription);
}package org.springframework.integration.jdbc.channel;
@FunctionalInterface
public interface PgConnectionSupplier {
PgConnection get() throws SQLException;
}import org.springframework.integration.jdbc.channel.PostgresSubscribableChannel;
import org.springframework.integration.jdbc.channel.PostgresChannelMessageTableSubscriber;
import org.springframework.integration.jdbc.store.JdbcChannelMessageStore;
import org.postgresql.PGConnection;
// Setup channel message store
JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(dataSource);
messageStore.setRegion("PRODUCTION");
// Setup PostgreSQL subscriber
PgConnectionSupplier connectionSupplier = () -> {
PgConnection conn = (PgConnection) dataSource.getConnection().unwrap(PgConnection.class);
return conn;
};
PostgresChannelMessageTableSubscriber subscriber =
new PostgresChannelMessageTableSubscriber(connectionSupplier);
// Create PostgreSQL subscribable channel
PostgresSubscribableChannel channel = new PostgresSubscribableChannel(
messageStore,
"orderNotifications",
subscriber
);
// Configure async dispatch
channel.setDispatcherExecutor(taskExecutor);
channel.setTransactionManager(transactionManager);
// Subscribe message handler
channel.subscribe(message -> {
System.out.println("Received immediate notification: " + message.getPayload());
processOrder(message);
});
// Send message - subscribers notified immediately
channel.send(MessageBuilder.withPayload(new Order("ORD-123")).build());// Channel with retry and error handling
PostgresSubscribableChannel resilientChannel = new PostgresSubscribableChannel(
messageStore,
"resilientChannel",
subscriber
);
// Configure retry template
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(new SimpleRetryPolicy(3)); // Retry 3 times
retryTemplate.setBackOffPolicy(new FixedBackOffPolicy());
resilientChannel.setRetryTemplate(retryTemplate);
resilientChannel.setErrorHandler(error -> {
System.err.println("Failed to process message: " + error.getMessage());
// Send to dead letter queue, alert, etc.
});PostgreSQL channels require database trigger for NOTIFY:
-- Create function to notify on insert
CREATE OR REPLACE FUNCTION int_channel_message_notify()
RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify(
'int_channel_message_' || NEW.region,
NEW.group_key::text
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- Create trigger on insert
CREATE TRIGGER int_channel_message_trigger
AFTER INSERT ON int_channel_message
FOR EACH ROW
EXECUTE FUNCTION int_channel_message_notify();PostgreSQL channels integrate with Spring Integration flows for real-time messaging:
import org.springframework.integration.dsl.IntegrationFlow;
@Bean
public JdbcChannelMessageStore channelMessageStore(DataSource dataSource) {
JdbcChannelMessageStore store = new JdbcChannelMessageStore(dataSource);
store.setRegion("PRODUCTION");
return store;
}
@Bean
public PgConnectionSupplier pgConnectionSupplier(DataSource dataSource) {
return () -> dataSource.getConnection().unwrap(PGConnection.class);
}
@Bean
public PostgresChannelMessageTableSubscriber subscriber(PgConnectionSupplier connectionSupplier) {
PostgresChannelMessageTableSubscriber subscriber =
new PostgresChannelMessageTableSubscriber(connectionSupplier);
subscriber.setTaskExecutor(new SimpleAsyncTaskExecutor("pg-notify-"));
subscriber.setNotificationTimeout(Duration.ofSeconds(30));
return subscriber;
}
@Bean
public PostgresSubscribableChannel orderNotificationChannel(
JdbcChannelMessageStore store,
PostgresChannelMessageTableSubscriber subscriber
) {
PostgresSubscribableChannel channel = new PostgresSubscribableChannel(
store,
"orders",
subscriber
);
channel.setDispatcherExecutor(new SimpleAsyncTaskExecutor("order-dispatch-"));
channel.setTransactionManager(transactionManager);
return channel;
}
@Bean
public IntegrationFlow notificationFlow(PostgresSubscribableChannel orderNotificationChannel) {
return IntegrationFlow
.from(orderNotificationChannel)
.handle(msg -> {
// Immediate notification when order message arrives
System.out.println("Order notification: " + msg.getPayload());
processOrderImmediately(msg);
})
.get();
}