or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced-features.mdbasic-connectivity.mdcopy-operations.mddatasource.mdindex.mdlarge-objects.mdpostgresql-types.mdreplication.mdresultset.mdssl-security.mdstatement-execution.mdtransactions.md

replication.mddocs/

0

# Replication

1

2

This document covers PostgreSQL's replication protocol support for logical and physical replication, enabling change data capture and streaming replication.

3

4

## Capabilities

5

6

### PGReplicationConnection

7

8

Main interface for replication operations.

9

10

```java { .api }

11

package org.postgresql.replication;

12

13

import org.postgresql.replication.fluent.ChainedStreamBuilder;

14

import org.postgresql.replication.fluent.ChainedCreateReplicationSlotBuilder;

15

import java.sql.SQLException;

16

17

/**

18

* API for PostgreSQL replication protocol.

19

* Only available when connection is opened with replication=database or replication=true parameter.

20

*

21

* Access via: PGConnection.getReplicationAPI()

22

*/

23

public interface PGReplicationConnection {

24

/**

25

* Starts building a replication stream (logical or physical).

26

* Use the fluent API to configure stream parameters.

27

*

28

* @return Fluent builder for replication stream

29

*/

30

ChainedStreamBuilder replicationStream();

31

32

/**

33

* Starts building a create replication slot command.

34

* Replication slots ensure the server retains WAL segments until consumed.

35

*

36

* @return Fluent builder for creating replication slot

37

*/

38

ChainedCreateReplicationSlotBuilder createReplicationSlot();

39

40

/**

41

* Drops a replication slot.

42

* The slot must not be active.

43

*

44

* @param slotName Name of the slot to drop

45

* @throws SQLException if slot cannot be dropped

46

*/

47

void dropReplicationSlot(String slotName) throws SQLException;

48

}

49

```

50

51

**Connection Setup:**

52

53

```java

54

// Open replication connection

55

String url = "jdbc:postgresql://localhost/postgres?replication=database";

56

Connection conn = DriverManager.getConnection(url, "user", "password");

57

58

PGConnection pgConn = conn.unwrap(PGConnection.class);

59

PGReplicationConnection replConn = pgConn.getReplicationAPI();

60

```

61

62

### Logical Replication

63

64

Logical replication decodes WAL changes into structured format.

65

66

```java { .api }

67

package org.postgresql.replication.fluent.logical;

68

69

import org.postgresql.replication.PGReplicationStream;

70

import org.postgresql.replication.LogSequenceNumber;

71

import java.sql.SQLException;

72

73

/**

74

* Fluent builder for creating logical replication slots.

75

*/

76

public interface ChainedLogicalCreateSlotBuilder {

77

/**

78

* Sets the output plugin name.

79

* Common plugins: test_decoding, wal2json, pgoutput

80

*

81

* @param outputPlugin Plugin name

82

* @return Builder for chaining

83

*/

84

ChainedLogicalCreateSlotBuilder withOutputPlugin(String outputPlugin);

85

86

/**

87

* Sets a slot option.

88

*

89

* @param optionName Option name

90

* @param optionValue Option value

91

* @return Builder for chaining

92

*/

93

ChainedLogicalCreateSlotBuilder withSlotOption(String optionName, String optionValue);

94

95

/**

96

* Makes the slot temporary (dropped when connection closes).

97

*

98

* @param temporary true for temporary slot

99

* @return Builder for chaining

100

*/

101

ChainedLogicalCreateSlotBuilder temporary(boolean temporary);

102

103

/**

104

* Creates the replication slot.

105

*

106

* @throws SQLException if slot creation fails

107

*/

108

void make() throws SQLException;

109

}

110

111

/**

112

* Fluent builder for logical replication streams.

113

*/

114

public interface ChainedLogicalStreamBuilder {

115

/**

116

* Sets the replication slot name.

117

*

118

* @param slotName Slot name

119

* @return Builder for chaining

120

*/

121

ChainedLogicalStreamBuilder withSlotName(String slotName);

122

123

/**

124

* Sets the starting LSN position.

125

*

126

* @param lsn Log sequence number to start from

127

* @return Builder for chaining

128

*/

129

ChainedLogicalStreamBuilder withStartPosition(LogSequenceNumber lsn);

130

131

/**

132

* Sets a slot option for this stream.

133

*

134

* @param optionName Option name

135

* @param optionValue Option value

136

* @return Builder for chaining

137

*/

138

ChainedLogicalStreamBuilder withSlotOption(String optionName, String optionValue);

139

140

/**

141

* Sets the status interval for progress reporting.

142

*

143

* @param statusIntervalMs Interval in milliseconds

144

* @return Builder for chaining

145

*/

146

ChainedLogicalStreamBuilder withStatusInterval(int statusIntervalMs);

147

148

/**

149

* Starts the replication stream.

150

*

151

* @return PGReplicationStream for reading changes

152

* @throws SQLException if stream cannot be started

153

*/

154

PGReplicationStream start() throws SQLException;

155

}

156

```

157

158

**Logical Replication Example:**

159

160

```java

161

import org.postgresql.PGConnection;

162

import org.postgresql.replication.PGReplicationConnection;

163

import org.postgresql.replication.PGReplicationStream;

164

import org.postgresql.replication.LogSequenceNumber;

165

import java.nio.ByteBuffer;

166

import java.sql.*;

167

168

public class LogicalReplicationExample {

169

public static void startLogicalReplication() throws SQLException {

170

// Connect with replication parameter

171

String url = "jdbc:postgresql://localhost/mydb?replication=database";

172

Connection conn = DriverManager.getConnection(url, "replication_user", "password");

173

174

PGConnection pgConn = conn.unwrap(PGConnection.class);

175

PGReplicationConnection replConn = pgConn.getReplicationAPI();

176

177

// Create logical replication slot

178

replConn.createReplicationSlot()

179

.logical()

180

.withSlotName("my_slot")

181

.withOutputPlugin("test_decoding")

182

.make();

183

184

// Start streaming changes

185

PGReplicationStream stream = replConn.replicationStream()

186

.logical()

187

.withSlotName("my_slot")

188

.withStartPosition(LogSequenceNumber.valueOf("0/0"))

189

.withSlotOption("include-xids", "false")

190

.withSlotOption("skip-empty-xacts", "true")

191

.withStatusInterval(10000) // Report progress every 10 seconds

192

.start();

193

194

// Read changes

195

while (true) {

196

// Blocking read for next message

197

ByteBuffer message = stream.read();

198

199

if (message == null) {

200

continue;

201

}

202

203

// Process message

204

int offset = message.arrayOffset();

205

byte[] source = message.array();

206

int length = source.length - offset;

207

String changeData = new String(source, offset, length);

208

209

System.out.println("Change: " + changeData);

210

211

// Update progress (important for slot management)

212

stream.setAppliedLSN(stream.getLastReceiveLSN());

213

stream.setFlushedLSN(stream.getLastReceiveLSN());

214

}

215

216

// Clean up

217

// stream.close();

218

// replConn.dropReplicationSlot("my_slot");

219

// conn.close();

220

}

221

}

222

```

223

224

### Physical Replication

225

226

Physical replication streams raw WAL data.

227

228

```java { .api }

229

package org.postgresql.replication.fluent.physical;

230

231

import org.postgresql.replication.PGReplicationStream;

232

import org.postgresql.replication.LogSequenceNumber;

233

import java.sql.SQLException;

234

235

/**

236

* Fluent builder for creating physical replication slots.

237

*/

238

public interface ChainedPhysicalCreateSlotBuilder {

239

/**

240

* Makes the slot temporary.

241

*

242

* @param temporary true for temporary slot

243

* @return Builder for chaining

244

*/

245

ChainedPhysicalCreateSlotBuilder temporary(boolean temporary);

246

247

/**

248

* Creates the replication slot.

249

*

250

* @throws SQLException if slot creation fails

251

*/

252

void make() throws SQLException;

253

}

254

255

/**

256

* Fluent builder for physical replication streams.

257

*/

258

public interface ChainedPhysicalStreamBuilder {

259

/**

260

* Sets the replication slot name.

261

*

262

* @param slotName Slot name

263

* @return Builder for chaining

264

*/

265

ChainedPhysicalStreamBuilder withSlotName(String slotName);

266

267

/**

268

* Sets the starting LSN position.

269

*

270

* @param lsn Log sequence number to start from

271

* @return Builder for chaining

272

*/

273

ChainedPhysicalStreamBuilder withStartPosition(LogSequenceNumber lsn);

274

275

/**

276

* Sets the status interval.

277

*

278

* @param statusIntervalMs Interval in milliseconds

279

* @return Builder for chaining

280

*/

281

ChainedPhysicalStreamBuilder withStatusInterval(int statusIntervalMs);

282

283

/**

284

* Starts the replication stream.

285

*

286

* @return PGReplicationStream for reading WAL data

287

* @throws SQLException if stream cannot be started

288

*/

289

PGReplicationStream start() throws SQLException;

290

}

291

```

292

293

**Physical Replication Example:**

294

295

```java

296

public class PhysicalReplicationExample {

297

public static void startPhysicalReplication() throws SQLException {

298

String url = "jdbc:postgresql://localhost/postgres?replication=true";

299

Connection conn = DriverManager.getConnection(url, "replication_user", "password");

300

301

PGConnection pgConn = conn.unwrap(PGConnection.class);

302

PGReplicationConnection replConn = pgConn.getReplicationAPI();

303

304

// Create physical replication slot

305

replConn.createReplicationSlot()

306

.physical()

307

.withSlotName("physical_slot")

308

.make();

309

310

// Start streaming WAL

311

PGReplicationStream stream = replConn.replicationStream()

312

.physical()

313

.withSlotName("physical_slot")

314

.withStartPosition(LogSequenceNumber.valueOf("0/0"))

315

.withStatusInterval(10000)

316

.start();

317

318

// Read WAL data

319

while (true) {

320

ByteBuffer walData = stream.read();

321

if (walData != null) {

322

// Process WAL data

323

processWAL(walData);

324

325

// Update progress

326

stream.setFlushedLSN(stream.getLastReceiveLSN());

327

}

328

}

329

}

330

331

private static void processWAL(ByteBuffer walData) {

332

// Process raw WAL data

333

}

334

}

335

```

336

337

### PGReplicationStream

338

339

Interface for reading replication data.

340

341

```java { .api }

342

package org.postgresql.replication;

343

344

import java.nio.ByteBuffer;

345

import java.sql.SQLException;

346

347

/**

348

* Stream for receiving replication data from PostgreSQL.

349

*/

350

public interface PGReplicationStream {

351

/**

352

* Reads the next message from replication stream.

353

* Blocks until message is available.

354

*

355

* @return ByteBuffer containing message, or null if no message

356

* @throws SQLException if read fails

357

*/

358

ByteBuffer read() throws SQLException;

359

360

/**

361

* Reads pending message without blocking.

362

*

363

* @return ByteBuffer containing message, or null if no message available

364

* @throws SQLException if read fails

365

*/

366

ByteBuffer readPending() throws SQLException;

367

368

/**

369

* Sets the flushed LSN (data written to disk).

370

*

371

* @param lsn Log sequence number

372

* @throws SQLException if update fails

373

*/

374

void setFlushedLSN(LogSequenceNumber lsn) throws SQLException;

375

376

/**

377

* Sets the applied LSN (data applied/processed).

378

*

379

* @param lsn Log sequence number

380

* @throws SQLException if update fails

381

*/

382

void setAppliedLSN(LogSequenceNumber lsn) throws SQLException;

383

384

/**

385

* Forces status update to server immediately.

386

*

387

* @throws SQLException if update fails

388

*/

389

void forceUpdateStatus() throws SQLException;

390

391

/**

392

* Checks if stream is closed.

393

*

394

* @return true if stream is closed

395

*/

396

boolean isClosed();

397

398

/**

399

* Returns the last received LSN.

400

*

401

* @return Last received log sequence number

402

*/

403

LogSequenceNumber getLastReceiveLSN();

404

405

/**

406

* Closes the replication stream.

407

*

408

* @throws SQLException if close fails

409

*/

410

void close() throws SQLException;

411

}

412

```

413

414

### LogSequenceNumber

415

416

Represents a position in the WAL.

417

418

```java { .api }

419

package org.postgresql.replication;

420

421

/**

422

* Represents a PostgreSQL log sequence number (LSN).

423

* Format: X/Y where X and Y are hexadecimal numbers.

424

*/

425

public final class LogSequenceNumber implements Comparable<LogSequenceNumber> {

426

/**

427

* Invalid LSN constant.

428

*/

429

public static final LogSequenceNumber INVALID_LSN;

430

431

/**

432

* Parses LSN from string.

433

*

434

* @param value LSN string (e.g., "0/16B37A8")

435

* @return LogSequenceNumber instance

436

*/

437

public static LogSequenceNumber valueOf(String value);

438

439

/**

440

* Creates LSN from long value.

441

*

442

* @param value LSN as long

443

* @return LogSequenceNumber instance

444

*/

445

public static LogSequenceNumber valueOf(long value);

446

447

/**

448

* Returns LSN as long value.

449

*

450

* @return LSN as long

451

*/

452

public long asLong();

453

454

/**

455

* Returns LSN in string format.

456

*

457

* @return LSN string (e.g., "0/16B37A8")

458

*/

459

public String asString();

460

461

@Override

462

public int compareTo(LogSequenceNumber other);

463

464

@Override

465

public boolean equals(Object obj);

466

467

@Override

468

public int hashCode();

469

470

@Override

471

public String toString();

472

}

473

```

474

475

### Best Practices

476

477

1. **Use logical replication for:**

478

- Change data capture

479

- Selective replication

480

- Data transformation pipelines

481

482

2. **Use physical replication for:**

483

- Hot standby

484

- Full database replication

485

- Backup solutions

486

487

3. **Always update LSN positions:**

488

```java

489

stream.setAppliedLSN(stream.getLastReceiveLSN());

490

stream.setFlushedLSN(stream.getLastReceiveLSN());

491

```

492

493

4. **Handle replication lag:**

494

```java

495

// Monitor lag

496

LogSequenceNumber serverLSN = getServerLSN();

497

LogSequenceNumber clientLSN = stream.getLastReceiveLSN();

498

long lag = serverLSN.asLong() - clientLSN.asLong();

499

```

500

501

5. **Clean up slots when done:**

502

```java

503

try {

504

stream.close();

505

replConn.dropReplicationSlot("my_slot");

506

} finally {

507

conn.close();

508

}

509

```

510

511

6. **Use status intervals:**

512

```java

513

// Report progress every 10 seconds

514

.withStatusInterval(10000)

515

```

516

517

7. **Handle connection failures:**

518

```java

519

while (true) {

520

try {

521

ByteBuffer msg = stream.read();

522

// process

523

} catch (SQLException e) {

524

// Reconnect and resume from last LSN

525

break;

526

}

527

}

528

```

529