CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-postgresql--postgresql

PostgreSQL JDBC Driver allows Java programs to connect to a PostgreSQL database using standard, database independent Java code.

Overview
Eval results
Files

replication.mddocs/

Replication

This document covers PostgreSQL's replication protocol support for logical and physical replication, enabling change data capture and streaming replication.

Capabilities

PGReplicationConnection

Main interface for replication operations.

package org.postgresql.replication;

import org.postgresql.replication.fluent.ChainedStreamBuilder;
import org.postgresql.replication.fluent.ChainedCreateReplicationSlotBuilder;
import java.sql.SQLException;

/**
 * API for PostgreSQL replication protocol.
 * Only available when connection is opened with replication=database or replication=true parameter.
 *
 * Access via: PGConnection.getReplicationAPI()
 */
public interface PGReplicationConnection {
    /**
     * Starts building a replication stream (logical or physical).
     * Use the fluent API to configure stream parameters.
     *
     * @return Fluent builder for replication stream
     */
    ChainedStreamBuilder replicationStream();

    /**
     * Starts building a create replication slot command.
     * Replication slots ensure the server retains WAL segments until consumed.
     *
     * @return Fluent builder for creating replication slot
     */
    ChainedCreateReplicationSlotBuilder createReplicationSlot();

    /**
     * Drops a replication slot.
     * The slot must not be active.
     *
     * @param slotName Name of the slot to drop
     * @throws SQLException if slot cannot be dropped
     */
    void dropReplicationSlot(String slotName) throws SQLException;
}

Connection Setup:

// Open replication connection
String url = "jdbc:postgresql://localhost/postgres?replication=database";
Connection conn = DriverManager.getConnection(url, "user", "password");

PGConnection pgConn = conn.unwrap(PGConnection.class);
PGReplicationConnection replConn = pgConn.getReplicationAPI();

Logical Replication

Logical replication decodes WAL changes into structured format.

package org.postgresql.replication.fluent.logical;

import org.postgresql.replication.PGReplicationStream;
import org.postgresql.replication.LogSequenceNumber;
import java.sql.SQLException;

/**
 * Fluent builder for creating logical replication slots.
 */
public interface ChainedLogicalCreateSlotBuilder {
    /**
     * Sets the output plugin name.
     * Common plugins: test_decoding, wal2json, pgoutput
     *
     * @param outputPlugin Plugin name
     * @return Builder for chaining
     */
    ChainedLogicalCreateSlotBuilder withOutputPlugin(String outputPlugin);

    /**
     * Sets a slot option.
     *
     * @param optionName Option name
     * @param optionValue Option value
     * @return Builder for chaining
     */
    ChainedLogicalCreateSlotBuilder withSlotOption(String optionName, String optionValue);

    /**
     * Makes the slot temporary (dropped when connection closes).
     *
     * @param temporary true for temporary slot
     * @return Builder for chaining
     */
    ChainedLogicalCreateSlotBuilder temporary(boolean temporary);

    /**
     * Creates the replication slot.
     *
     * @throws SQLException if slot creation fails
     */
    void make() throws SQLException;
}

/**
 * Fluent builder for logical replication streams.
 */
public interface ChainedLogicalStreamBuilder {
    /**
     * Sets the replication slot name.
     *
     * @param slotName Slot name
     * @return Builder for chaining
     */
    ChainedLogicalStreamBuilder withSlotName(String slotName);

    /**
     * Sets the starting LSN position.
     *
     * @param lsn Log sequence number to start from
     * @return Builder for chaining
     */
    ChainedLogicalStreamBuilder withStartPosition(LogSequenceNumber lsn);

    /**
     * Sets a slot option for this stream.
     *
     * @param optionName Option name
     * @param optionValue Option value
     * @return Builder for chaining
     */
    ChainedLogicalStreamBuilder withSlotOption(String optionName, String optionValue);

    /**
     * Sets the status interval for progress reporting.
     *
     * @param statusIntervalMs Interval in milliseconds
     * @return Builder for chaining
     */
    ChainedLogicalStreamBuilder withStatusInterval(int statusIntervalMs);

    /**
     * Starts the replication stream.
     *
     * @return PGReplicationStream for reading changes
     * @throws SQLException if stream cannot be started
     */
    PGReplicationStream start() throws SQLException;
}

Logical Replication Example:

import org.postgresql.PGConnection;
import org.postgresql.replication.PGReplicationConnection;
import org.postgresql.replication.PGReplicationStream;
import org.postgresql.replication.LogSequenceNumber;
import java.nio.ByteBuffer;
import java.sql.*;

public class LogicalReplicationExample {
    public static void startLogicalReplication() throws SQLException {
        // Connect with replication parameter
        String url = "jdbc:postgresql://localhost/mydb?replication=database";
        Connection conn = DriverManager.getConnection(url, "replication_user", "password");

        PGConnection pgConn = conn.unwrap(PGConnection.class);
        PGReplicationConnection replConn = pgConn.getReplicationAPI();

        // Create logical replication slot
        replConn.createReplicationSlot()
                .logical()
                .withSlotName("my_slot")
                .withOutputPlugin("test_decoding")
                .make();

        // Start streaming changes
        PGReplicationStream stream = replConn.replicationStream()
                .logical()
                .withSlotName("my_slot")
                .withStartPosition(LogSequenceNumber.valueOf("0/0"))
                .withSlotOption("include-xids", "false")
                .withSlotOption("skip-empty-xacts", "true")
                .withStatusInterval(10000) // Report progress every 10 seconds
                .start();

        // Read changes
        while (true) {
            // Blocking read for next message
            ByteBuffer message = stream.read();

            if (message == null) {
                continue;
            }

            // Process message
            int offset = message.arrayOffset();
            byte[] source = message.array();
            int length = source.length - offset;
            String changeData = new String(source, offset, length);

            System.out.println("Change: " + changeData);

            // Update progress (important for slot management)
            stream.setAppliedLSN(stream.getLastReceiveLSN());
            stream.setFlushedLSN(stream.getLastReceiveLSN());
        }

        // Clean up
        // stream.close();
        // replConn.dropReplicationSlot("my_slot");
        // conn.close();
    }
}

Physical Replication

Physical replication streams raw WAL data.

package org.postgresql.replication.fluent.physical;

import org.postgresql.replication.PGReplicationStream;
import org.postgresql.replication.LogSequenceNumber;
import java.sql.SQLException;

/**
 * Fluent builder for creating physical replication slots.
 */
public interface ChainedPhysicalCreateSlotBuilder {
    /**
     * Makes the slot temporary.
     *
     * @param temporary true for temporary slot
     * @return Builder for chaining
     */
    ChainedPhysicalCreateSlotBuilder temporary(boolean temporary);

    /**
     * Creates the replication slot.
     *
     * @throws SQLException if slot creation fails
     */
    void make() throws SQLException;
}

/**
 * Fluent builder for physical replication streams.
 */
public interface ChainedPhysicalStreamBuilder {
    /**
     * Sets the replication slot name.
     *
     * @param slotName Slot name
     * @return Builder for chaining
     */
    ChainedPhysicalStreamBuilder withSlotName(String slotName);

    /**
     * Sets the starting LSN position.
     *
     * @param lsn Log sequence number to start from
     * @return Builder for chaining
     */
    ChainedPhysicalStreamBuilder withStartPosition(LogSequenceNumber lsn);

    /**
     * Sets the status interval.
     *
     * @param statusIntervalMs Interval in milliseconds
     * @return Builder for chaining
     */
    ChainedPhysicalStreamBuilder withStatusInterval(int statusIntervalMs);

    /**
     * Starts the replication stream.
     *
     * @return PGReplicationStream for reading WAL data
     * @throws SQLException if stream cannot be started
     */
    PGReplicationStream start() throws SQLException;
}

Physical Replication Example:

public class PhysicalReplicationExample {
    public static void startPhysicalReplication() throws SQLException {
        String url = "jdbc:postgresql://localhost/postgres?replication=true";
        Connection conn = DriverManager.getConnection(url, "replication_user", "password");

        PGConnection pgConn = conn.unwrap(PGConnection.class);
        PGReplicationConnection replConn = pgConn.getReplicationAPI();

        // Create physical replication slot
        replConn.createReplicationSlot()
                .physical()
                .withSlotName("physical_slot")
                .make();

        // Start streaming WAL
        PGReplicationStream stream = replConn.replicationStream()
                .physical()
                .withSlotName("physical_slot")
                .withStartPosition(LogSequenceNumber.valueOf("0/0"))
                .withStatusInterval(10000)
                .start();

        // Read WAL data
        while (true) {
            ByteBuffer walData = stream.read();
            if (walData != null) {
                // Process WAL data
                processWAL(walData);

                // Update progress
                stream.setFlushedLSN(stream.getLastReceiveLSN());
            }
        }
    }

    private static void processWAL(ByteBuffer walData) {
        // Process raw WAL data
    }
}

PGReplicationStream

Interface for reading replication data.

package org.postgresql.replication;

import java.nio.ByteBuffer;
import java.sql.SQLException;

/**
 * Stream for receiving replication data from PostgreSQL.
 */
public interface PGReplicationStream {
    /**
     * Reads the next message from replication stream.
     * Blocks until message is available.
     *
     * @return ByteBuffer containing message, or null if no message
     * @throws SQLException if read fails
     */
    ByteBuffer read() throws SQLException;

    /**
     * Reads pending message without blocking.
     *
     * @return ByteBuffer containing message, or null if no message available
     * @throws SQLException if read fails
     */
    ByteBuffer readPending() throws SQLException;

    /**
     * Sets the flushed LSN (data written to disk).
     *
     * @param lsn Log sequence number
     * @throws SQLException if update fails
     */
    void setFlushedLSN(LogSequenceNumber lsn) throws SQLException;

    /**
     * Sets the applied LSN (data applied/processed).
     *
     * @param lsn Log sequence number
     * @throws SQLException if update fails
     */
    void setAppliedLSN(LogSequenceNumber lsn) throws SQLException;

    /**
     * Forces status update to server immediately.
     *
     * @throws SQLException if update fails
     */
    void forceUpdateStatus() throws SQLException;

    /**
     * Checks if stream is closed.
     *
     * @return true if stream is closed
     */
    boolean isClosed();

    /**
     * Returns the last received LSN.
     *
     * @return Last received log sequence number
     */
    LogSequenceNumber getLastReceiveLSN();

    /**
     * Closes the replication stream.
     *
     * @throws SQLException if close fails
     */
    void close() throws SQLException;
}

LogSequenceNumber

Represents a position in the WAL.

package org.postgresql.replication;

/**
 * Represents a PostgreSQL log sequence number (LSN).
 * Format: X/Y where X and Y are hexadecimal numbers.
 */
public final class LogSequenceNumber implements Comparable<LogSequenceNumber> {
    /**
     * Invalid LSN constant.
     */
    public static final LogSequenceNumber INVALID_LSN;

    /**
     * Parses LSN from string.
     *
     * @param value LSN string (e.g., "0/16B37A8")
     * @return LogSequenceNumber instance
     */
    public static LogSequenceNumber valueOf(String value);

    /**
     * Creates LSN from long value.
     *
     * @param value LSN as long
     * @return LogSequenceNumber instance
     */
    public static LogSequenceNumber valueOf(long value);

    /**
     * Returns LSN as long value.
     *
     * @return LSN as long
     */
    public long asLong();

    /**
     * Returns LSN in string format.
     *
     * @return LSN string (e.g., "0/16B37A8")
     */
    public String asString();

    @Override
    public int compareTo(LogSequenceNumber other);

    @Override
    public boolean equals(Object obj);

    @Override
    public int hashCode();

    @Override
    public String toString();
}

Best Practices

  1. Use logical replication for:

    • Change data capture
    • Selective replication
    • Data transformation pipelines
  2. Use physical replication for:

    • Hot standby
    • Full database replication
    • Backup solutions
  3. Always update LSN positions:

    stream.setAppliedLSN(stream.getLastReceiveLSN());
    stream.setFlushedLSN(stream.getLastReceiveLSN());
  4. Handle replication lag:

    // Monitor lag
    LogSequenceNumber serverLSN = getServerLSN();
    LogSequenceNumber clientLSN = stream.getLastReceiveLSN();
    long lag = serverLSN.asLong() - clientLSN.asLong();
  5. Clean up slots when done:

    try {
        stream.close();
        replConn.dropReplicationSlot("my_slot");
    } finally {
        conn.close();
    }
  6. Use status intervals:

    // Report progress every 10 seconds
    .withStatusInterval(10000)
  7. Handle connection failures:

    while (true) {
        try {
            ByteBuffer msg = stream.read();
            // process
        } catch (SQLException e) {
            // Reconnect and resume from last LSN
            break;
        }
    }

Install with Tessl CLI

npx tessl i tessl/maven-org-postgresql--postgresql

docs

advanced-features.md

basic-connectivity.md

copy-operations.md

datasource.md

index.md

large-objects.md

postgresql-types.md

replication.md

resultset.md

ssl-security.md

statement-execution.md

transactions.md

tile.json