PostgreSQL JDBC Driver allows Java programs to connect to a PostgreSQL database using standard, database independent Java code.
This document covers PostgreSQL's replication protocol support for logical and physical replication, enabling change data capture and streaming replication.
Main interface for replication operations.
package org.postgresql.replication;
import org.postgresql.replication.fluent.ChainedStreamBuilder;
import org.postgresql.replication.fluent.ChainedCreateReplicationSlotBuilder;
import java.sql.SQLException;
/**
* API for PostgreSQL replication protocol.
* Only available when connection is opened with replication=database or replication=true parameter.
*
* Access via: PGConnection.getReplicationAPI()
*/
public interface PGReplicationConnection {
/**
* Starts building a replication stream (logical or physical).
* Use the fluent API to configure stream parameters.
*
* @return Fluent builder for replication stream
*/
ChainedStreamBuilder replicationStream();
/**
* Starts building a create replication slot command.
* Replication slots ensure the server retains WAL segments until consumed.
*
* @return Fluent builder for creating replication slot
*/
ChainedCreateReplicationSlotBuilder createReplicationSlot();
/**
* Drops a replication slot.
* The slot must not be active.
*
* @param slotName Name of the slot to drop
* @throws SQLException if slot cannot be dropped
*/
void dropReplicationSlot(String slotName) throws SQLException;
}Connection Setup:
// Open replication connection
String url = "jdbc:postgresql://localhost/postgres?replication=database";
Connection conn = DriverManager.getConnection(url, "user", "password");
PGConnection pgConn = conn.unwrap(PGConnection.class);
PGReplicationConnection replConn = pgConn.getReplicationAPI();Logical replication decodes WAL changes into structured format.
package org.postgresql.replication.fluent.logical;
import org.postgresql.replication.PGReplicationStream;
import org.postgresql.replication.LogSequenceNumber;
import java.sql.SQLException;
/**
* Fluent builder for creating logical replication slots.
*/
public interface ChainedLogicalCreateSlotBuilder {
/**
* Sets the output plugin name.
* Common plugins: test_decoding, wal2json, pgoutput
*
* @param outputPlugin Plugin name
* @return Builder for chaining
*/
ChainedLogicalCreateSlotBuilder withOutputPlugin(String outputPlugin);
/**
* Sets a slot option.
*
* @param optionName Option name
* @param optionValue Option value
* @return Builder for chaining
*/
ChainedLogicalCreateSlotBuilder withSlotOption(String optionName, String optionValue);
/**
* Makes the slot temporary (dropped when connection closes).
*
* @param temporary true for temporary slot
* @return Builder for chaining
*/
ChainedLogicalCreateSlotBuilder temporary(boolean temporary);
/**
* Creates the replication slot.
*
* @throws SQLException if slot creation fails
*/
void make() throws SQLException;
}
/**
* Fluent builder for logical replication streams.
*/
public interface ChainedLogicalStreamBuilder {
/**
* Sets the replication slot name.
*
* @param slotName Slot name
* @return Builder for chaining
*/
ChainedLogicalStreamBuilder withSlotName(String slotName);
/**
* Sets the starting LSN position.
*
* @param lsn Log sequence number to start from
* @return Builder for chaining
*/
ChainedLogicalStreamBuilder withStartPosition(LogSequenceNumber lsn);
/**
* Sets a slot option for this stream.
*
* @param optionName Option name
* @param optionValue Option value
* @return Builder for chaining
*/
ChainedLogicalStreamBuilder withSlotOption(String optionName, String optionValue);
/**
* Sets the status interval for progress reporting.
*
* @param statusIntervalMs Interval in milliseconds
* @return Builder for chaining
*/
ChainedLogicalStreamBuilder withStatusInterval(int statusIntervalMs);
/**
* Starts the replication stream.
*
* @return PGReplicationStream for reading changes
* @throws SQLException if stream cannot be started
*/
PGReplicationStream start() throws SQLException;
}Logical Replication Example:
import org.postgresql.PGConnection;
import org.postgresql.replication.PGReplicationConnection;
import org.postgresql.replication.PGReplicationStream;
import org.postgresql.replication.LogSequenceNumber;
import java.nio.ByteBuffer;
import java.sql.*;
public class LogicalReplicationExample {
public static void startLogicalReplication() throws SQLException {
// Connect with replication parameter
String url = "jdbc:postgresql://localhost/mydb?replication=database";
Connection conn = DriverManager.getConnection(url, "replication_user", "password");
PGConnection pgConn = conn.unwrap(PGConnection.class);
PGReplicationConnection replConn = pgConn.getReplicationAPI();
// Create logical replication slot
replConn.createReplicationSlot()
.logical()
.withSlotName("my_slot")
.withOutputPlugin("test_decoding")
.make();
// Start streaming changes
PGReplicationStream stream = replConn.replicationStream()
.logical()
.withSlotName("my_slot")
.withStartPosition(LogSequenceNumber.valueOf("0/0"))
.withSlotOption("include-xids", "false")
.withSlotOption("skip-empty-xacts", "true")
.withStatusInterval(10000) // Report progress every 10 seconds
.start();
// Read changes
while (true) {
// Blocking read for next message
ByteBuffer message = stream.read();
if (message == null) {
continue;
}
// Process message
int offset = message.arrayOffset();
byte[] source = message.array();
int length = source.length - offset;
String changeData = new String(source, offset, length);
System.out.println("Change: " + changeData);
// Update progress (important for slot management)
stream.setAppliedLSN(stream.getLastReceiveLSN());
stream.setFlushedLSN(stream.getLastReceiveLSN());
}
// Clean up
// stream.close();
// replConn.dropReplicationSlot("my_slot");
// conn.close();
}
}Physical replication streams raw WAL data.
package org.postgresql.replication.fluent.physical;
import org.postgresql.replication.PGReplicationStream;
import org.postgresql.replication.LogSequenceNumber;
import java.sql.SQLException;
/**
* Fluent builder for creating physical replication slots.
*/
public interface ChainedPhysicalCreateSlotBuilder {
/**
* Makes the slot temporary.
*
* @param temporary true for temporary slot
* @return Builder for chaining
*/
ChainedPhysicalCreateSlotBuilder temporary(boolean temporary);
/**
* Creates the replication slot.
*
* @throws SQLException if slot creation fails
*/
void make() throws SQLException;
}
/**
* Fluent builder for physical replication streams.
*/
public interface ChainedPhysicalStreamBuilder {
/**
* Sets the replication slot name.
*
* @param slotName Slot name
* @return Builder for chaining
*/
ChainedPhysicalStreamBuilder withSlotName(String slotName);
/**
* Sets the starting LSN position.
*
* @param lsn Log sequence number to start from
* @return Builder for chaining
*/
ChainedPhysicalStreamBuilder withStartPosition(LogSequenceNumber lsn);
/**
* Sets the status interval.
*
* @param statusIntervalMs Interval in milliseconds
* @return Builder for chaining
*/
ChainedPhysicalStreamBuilder withStatusInterval(int statusIntervalMs);
/**
* Starts the replication stream.
*
* @return PGReplicationStream for reading WAL data
* @throws SQLException if stream cannot be started
*/
PGReplicationStream start() throws SQLException;
}Physical Replication Example:
public class PhysicalReplicationExample {
public static void startPhysicalReplication() throws SQLException {
String url = "jdbc:postgresql://localhost/postgres?replication=true";
Connection conn = DriverManager.getConnection(url, "replication_user", "password");
PGConnection pgConn = conn.unwrap(PGConnection.class);
PGReplicationConnection replConn = pgConn.getReplicationAPI();
// Create physical replication slot
replConn.createReplicationSlot()
.physical()
.withSlotName("physical_slot")
.make();
// Start streaming WAL
PGReplicationStream stream = replConn.replicationStream()
.physical()
.withSlotName("physical_slot")
.withStartPosition(LogSequenceNumber.valueOf("0/0"))
.withStatusInterval(10000)
.start();
// Read WAL data
while (true) {
ByteBuffer walData = stream.read();
if (walData != null) {
// Process WAL data
processWAL(walData);
// Update progress
stream.setFlushedLSN(stream.getLastReceiveLSN());
}
}
}
private static void processWAL(ByteBuffer walData) {
// Process raw WAL data
}
}Interface for reading replication data.
package org.postgresql.replication;
import java.nio.ByteBuffer;
import java.sql.SQLException;
/**
* Stream for receiving replication data from PostgreSQL.
*/
public interface PGReplicationStream {
/**
* Reads the next message from replication stream.
* Blocks until message is available.
*
* @return ByteBuffer containing message, or null if no message
* @throws SQLException if read fails
*/
ByteBuffer read() throws SQLException;
/**
* Reads pending message without blocking.
*
* @return ByteBuffer containing message, or null if no message available
* @throws SQLException if read fails
*/
ByteBuffer readPending() throws SQLException;
/**
* Sets the flushed LSN (data written to disk).
*
* @param lsn Log sequence number
* @throws SQLException if update fails
*/
void setFlushedLSN(LogSequenceNumber lsn) throws SQLException;
/**
* Sets the applied LSN (data applied/processed).
*
* @param lsn Log sequence number
* @throws SQLException if update fails
*/
void setAppliedLSN(LogSequenceNumber lsn) throws SQLException;
/**
* Forces status update to server immediately.
*
* @throws SQLException if update fails
*/
void forceUpdateStatus() throws SQLException;
/**
* Checks if stream is closed.
*
* @return true if stream is closed
*/
boolean isClosed();
/**
* Returns the last received LSN.
*
* @return Last received log sequence number
*/
LogSequenceNumber getLastReceiveLSN();
/**
* Closes the replication stream.
*
* @throws SQLException if close fails
*/
void close() throws SQLException;
}Represents a position in the WAL.
package org.postgresql.replication;
/**
* Represents a PostgreSQL log sequence number (LSN).
* Format: X/Y where X and Y are hexadecimal numbers.
*/
public final class LogSequenceNumber implements Comparable<LogSequenceNumber> {
/**
* Invalid LSN constant.
*/
public static final LogSequenceNumber INVALID_LSN;
/**
* Parses LSN from string.
*
* @param value LSN string (e.g., "0/16B37A8")
* @return LogSequenceNumber instance
*/
public static LogSequenceNumber valueOf(String value);
/**
* Creates LSN from long value.
*
* @param value LSN as long
* @return LogSequenceNumber instance
*/
public static LogSequenceNumber valueOf(long value);
/**
* Returns LSN as long value.
*
* @return LSN as long
*/
public long asLong();
/**
* Returns LSN in string format.
*
* @return LSN string (e.g., "0/16B37A8")
*/
public String asString();
@Override
public int compareTo(LogSequenceNumber other);
@Override
public boolean equals(Object obj);
@Override
public int hashCode();
@Override
public String toString();
}Use logical replication for:
Use physical replication for:
Always update LSN positions:
stream.setAppliedLSN(stream.getLastReceiveLSN());
stream.setFlushedLSN(stream.getLastReceiveLSN());Handle replication lag:
// Monitor lag
LogSequenceNumber serverLSN = getServerLSN();
LogSequenceNumber clientLSN = stream.getLastReceiveLSN();
long lag = serverLSN.asLong() - clientLSN.asLong();Clean up slots when done:
try {
stream.close();
replConn.dropReplicationSlot("my_slot");
} finally {
conn.close();
}Use status intervals:
// Report progress every 10 seconds
.withStatusInterval(10000)Handle connection failures:
while (true) {
try {
ByteBuffer msg = stream.read();
// process
} catch (SQLException e) {
// Reconnect and resume from last LSN
break;
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-postgresql--postgresql