0
# Replication
1
2
This document covers PostgreSQL's replication protocol support for logical and physical replication, enabling change data capture and streaming replication.
3
4
## Capabilities
5
6
### PGReplicationConnection
7
8
Main interface for replication operations.
9
10
```java { .api }
11
package org.postgresql.replication;
12
13
import org.postgresql.replication.fluent.ChainedStreamBuilder;
14
import org.postgresql.replication.fluent.ChainedCreateReplicationSlotBuilder;
15
import java.sql.SQLException;
16
17
/**
18
* API for PostgreSQL replication protocol.
19
* Only available when connection is opened with replication=database or replication=true parameter.
20
*
21
* Access via: PGConnection.getReplicationAPI()
22
*/
23
public interface PGReplicationConnection {
24
/**
25
* Starts building a replication stream (logical or physical).
26
* Use the fluent API to configure stream parameters.
27
*
28
* @return Fluent builder for replication stream
29
*/
30
ChainedStreamBuilder replicationStream();
31
32
/**
33
* Starts building a create replication slot command.
34
* Replication slots ensure the server retains WAL segments until consumed.
35
*
36
* @return Fluent builder for creating replication slot
37
*/
38
ChainedCreateReplicationSlotBuilder createReplicationSlot();
39
40
/**
41
* Drops a replication slot.
42
* The slot must not be active.
43
*
44
* @param slotName Name of the slot to drop
45
* @throws SQLException if slot cannot be dropped
46
*/
47
void dropReplicationSlot(String slotName) throws SQLException;
48
}
49
```
50
51
**Connection Setup:**
52
53
```java
54
// Open replication connection
55
String url = "jdbc:postgresql://localhost/postgres?replication=database";
56
Connection conn = DriverManager.getConnection(url, "user", "password");
57
58
PGConnection pgConn = conn.unwrap(PGConnection.class);
59
PGReplicationConnection replConn = pgConn.getReplicationAPI();
60
```
61
62
### Logical Replication
63
64
Logical replication decodes WAL changes into structured format.
65
66
```java { .api }
67
package org.postgresql.replication.fluent.logical;
68
69
import org.postgresql.replication.PGReplicationStream;
70
import org.postgresql.replication.LogSequenceNumber;
71
import java.sql.SQLException;
72
73
/**
74
* Fluent builder for creating logical replication slots.
75
*/
76
public interface ChainedLogicalCreateSlotBuilder {
77
/**
78
* Sets the output plugin name.
79
* Common plugins: test_decoding, wal2json, pgoutput
80
*
81
* @param outputPlugin Plugin name
82
* @return Builder for chaining
83
*/
84
ChainedLogicalCreateSlotBuilder withOutputPlugin(String outputPlugin);
85
86
/**
87
* Sets a slot option.
88
*
89
* @param optionName Option name
90
* @param optionValue Option value
91
* @return Builder for chaining
92
*/
93
ChainedLogicalCreateSlotBuilder withSlotOption(String optionName, String optionValue);
94
95
/**
96
* Makes the slot temporary (dropped when connection closes).
97
*
98
* @param temporary true for temporary slot
99
* @return Builder for chaining
100
*/
101
ChainedLogicalCreateSlotBuilder temporary(boolean temporary);
102
103
/**
104
* Creates the replication slot.
105
*
106
* @throws SQLException if slot creation fails
107
*/
108
void make() throws SQLException;
109
}
110
111
/**
112
* Fluent builder for logical replication streams.
113
*/
114
public interface ChainedLogicalStreamBuilder {
115
/**
116
* Sets the replication slot name.
117
*
118
* @param slotName Slot name
119
* @return Builder for chaining
120
*/
121
ChainedLogicalStreamBuilder withSlotName(String slotName);
122
123
/**
124
* Sets the starting LSN position.
125
*
126
* @param lsn Log sequence number to start from
127
* @return Builder for chaining
128
*/
129
ChainedLogicalStreamBuilder withStartPosition(LogSequenceNumber lsn);
130
131
/**
132
* Sets a slot option for this stream.
133
*
134
* @param optionName Option name
135
* @param optionValue Option value
136
* @return Builder for chaining
137
*/
138
ChainedLogicalStreamBuilder withSlotOption(String optionName, String optionValue);
139
140
/**
141
* Sets the status interval for progress reporting.
142
*
143
* @param statusIntervalMs Interval in milliseconds
144
* @return Builder for chaining
145
*/
146
ChainedLogicalStreamBuilder withStatusInterval(int statusIntervalMs);
147
148
/**
149
* Starts the replication stream.
150
*
151
* @return PGReplicationStream for reading changes
152
* @throws SQLException if stream cannot be started
153
*/
154
PGReplicationStream start() throws SQLException;
155
}
156
```
157
158
**Logical Replication Example:**
159
160
```java
161
import org.postgresql.PGConnection;
162
import org.postgresql.replication.PGReplicationConnection;
163
import org.postgresql.replication.PGReplicationStream;
164
import org.postgresql.replication.LogSequenceNumber;
165
import java.nio.ByteBuffer;
166
import java.sql.*;
167
168
public class LogicalReplicationExample {
169
public static void startLogicalReplication() throws SQLException {
170
// Connect with replication parameter
171
String url = "jdbc:postgresql://localhost/mydb?replication=database";
172
Connection conn = DriverManager.getConnection(url, "replication_user", "password");
173
174
PGConnection pgConn = conn.unwrap(PGConnection.class);
175
PGReplicationConnection replConn = pgConn.getReplicationAPI();
176
177
// Create logical replication slot
178
replConn.createReplicationSlot()
179
.logical()
180
.withSlotName("my_slot")
181
.withOutputPlugin("test_decoding")
182
.make();
183
184
// Start streaming changes
185
PGReplicationStream stream = replConn.replicationStream()
186
.logical()
187
.withSlotName("my_slot")
188
.withStartPosition(LogSequenceNumber.valueOf("0/0"))
189
.withSlotOption("include-xids", "false")
190
.withSlotOption("skip-empty-xacts", "true")
191
.withStatusInterval(10000) // Report progress every 10 seconds
192
.start();
193
194
// Read changes
195
while (true) {
196
// Blocking read for next message
197
ByteBuffer message = stream.read();
198
199
if (message == null) {
200
continue;
201
}
202
203
// Process message
204
int offset = message.arrayOffset();
205
byte[] source = message.array();
206
int length = source.length - offset;
207
String changeData = new String(source, offset, length);
208
209
System.out.println("Change: " + changeData);
210
211
// Update progress (important for slot management)
212
stream.setAppliedLSN(stream.getLastReceiveLSN());
213
stream.setFlushedLSN(stream.getLastReceiveLSN());
214
}
215
216
// Clean up
217
// stream.close();
218
// replConn.dropReplicationSlot("my_slot");
219
// conn.close();
220
}
221
}
222
```
223
224
### Physical Replication
225
226
Physical replication streams raw WAL data.
227
228
```java { .api }
229
package org.postgresql.replication.fluent.physical;
230
231
import org.postgresql.replication.PGReplicationStream;
232
import org.postgresql.replication.LogSequenceNumber;
233
import java.sql.SQLException;
234
235
/**
236
* Fluent builder for creating physical replication slots.
237
*/
238
public interface ChainedPhysicalCreateSlotBuilder {
239
/**
240
* Makes the slot temporary.
241
*
242
* @param temporary true for temporary slot
243
* @return Builder for chaining
244
*/
245
ChainedPhysicalCreateSlotBuilder temporary(boolean temporary);
246
247
/**
248
* Creates the replication slot.
249
*
250
* @throws SQLException if slot creation fails
251
*/
252
void make() throws SQLException;
253
}
254
255
/**
256
* Fluent builder for physical replication streams.
257
*/
258
public interface ChainedPhysicalStreamBuilder {
259
/**
260
* Sets the replication slot name.
261
*
262
* @param slotName Slot name
263
* @return Builder for chaining
264
*/
265
ChainedPhysicalStreamBuilder withSlotName(String slotName);
266
267
/**
268
* Sets the starting LSN position.
269
*
270
* @param lsn Log sequence number to start from
271
* @return Builder for chaining
272
*/
273
ChainedPhysicalStreamBuilder withStartPosition(LogSequenceNumber lsn);
274
275
/**
276
* Sets the status interval.
277
*
278
* @param statusIntervalMs Interval in milliseconds
279
* @return Builder for chaining
280
*/
281
ChainedPhysicalStreamBuilder withStatusInterval(int statusIntervalMs);
282
283
/**
284
* Starts the replication stream.
285
*
286
* @return PGReplicationStream for reading WAL data
287
* @throws SQLException if stream cannot be started
288
*/
289
PGReplicationStream start() throws SQLException;
290
}
291
```
292
293
**Physical Replication Example:**
294
295
```java
296
public class PhysicalReplicationExample {
297
public static void startPhysicalReplication() throws SQLException {
298
String url = "jdbc:postgresql://localhost/postgres?replication=true";
299
Connection conn = DriverManager.getConnection(url, "replication_user", "password");
300
301
PGConnection pgConn = conn.unwrap(PGConnection.class);
302
PGReplicationConnection replConn = pgConn.getReplicationAPI();
303
304
// Create physical replication slot
305
replConn.createReplicationSlot()
306
.physical()
307
.withSlotName("physical_slot")
308
.make();
309
310
// Start streaming WAL
311
PGReplicationStream stream = replConn.replicationStream()
312
.physical()
313
.withSlotName("physical_slot")
314
.withStartPosition(LogSequenceNumber.valueOf("0/0"))
315
.withStatusInterval(10000)
316
.start();
317
318
// Read WAL data
319
while (true) {
320
ByteBuffer walData = stream.read();
321
if (walData != null) {
322
// Process WAL data
323
processWAL(walData);
324
325
// Update progress
326
stream.setFlushedLSN(stream.getLastReceiveLSN());
327
}
328
}
329
}
330
331
private static void processWAL(ByteBuffer walData) {
332
// Process raw WAL data
333
}
334
}
335
```
336
337
### PGReplicationStream
338
339
Interface for reading replication data.
340
341
```java { .api }
342
package org.postgresql.replication;
343
344
import java.nio.ByteBuffer;
345
import java.sql.SQLException;
346
347
/**
348
* Stream for receiving replication data from PostgreSQL.
349
*/
350
public interface PGReplicationStream {
351
/**
352
* Reads the next message from replication stream.
353
* Blocks until message is available.
354
*
355
* @return ByteBuffer containing message, or null if no message
356
* @throws SQLException if read fails
357
*/
358
ByteBuffer read() throws SQLException;
359
360
/**
361
* Reads pending message without blocking.
362
*
363
* @return ByteBuffer containing message, or null if no message available
364
* @throws SQLException if read fails
365
*/
366
ByteBuffer readPending() throws SQLException;
367
368
/**
369
* Sets the flushed LSN (data written to disk).
370
*
371
* @param lsn Log sequence number
372
* @throws SQLException if update fails
373
*/
374
void setFlushedLSN(LogSequenceNumber lsn) throws SQLException;
375
376
/**
377
* Sets the applied LSN (data applied/processed).
378
*
379
* @param lsn Log sequence number
380
* @throws SQLException if update fails
381
*/
382
void setAppliedLSN(LogSequenceNumber lsn) throws SQLException;
383
384
/**
385
* Forces status update to server immediately.
386
*
387
* @throws SQLException if update fails
388
*/
389
void forceUpdateStatus() throws SQLException;
390
391
/**
392
* Checks if stream is closed.
393
*
394
* @return true if stream is closed
395
*/
396
boolean isClosed();
397
398
/**
399
* Returns the last received LSN.
400
*
401
* @return Last received log sequence number
402
*/
403
LogSequenceNumber getLastReceiveLSN();
404
405
/**
406
* Closes the replication stream.
407
*
408
* @throws SQLException if close fails
409
*/
410
void close() throws SQLException;
411
}
412
```
413
414
### LogSequenceNumber
415
416
Represents a position in the WAL.
417
418
```java { .api }
419
package org.postgresql.replication;
420
421
/**
422
* Represents a PostgreSQL log sequence number (LSN).
423
* Format: X/Y where X and Y are hexadecimal numbers.
424
*/
425
public final class LogSequenceNumber implements Comparable<LogSequenceNumber> {
426
/**
427
* Invalid LSN constant.
428
*/
429
public static final LogSequenceNumber INVALID_LSN;
430
431
/**
432
* Parses LSN from string.
433
*
434
* @param value LSN string (e.g., "0/16B37A8")
435
* @return LogSequenceNumber instance
436
*/
437
public static LogSequenceNumber valueOf(String value);
438
439
/**
440
* Creates LSN from long value.
441
*
442
* @param value LSN as long
443
* @return LogSequenceNumber instance
444
*/
445
public static LogSequenceNumber valueOf(long value);
446
447
/**
448
* Returns LSN as long value.
449
*
450
* @return LSN as long
451
*/
452
public long asLong();
453
454
/**
455
* Returns LSN in string format.
456
*
457
* @return LSN string (e.g., "0/16B37A8")
458
*/
459
public String asString();
460
461
@Override
462
public int compareTo(LogSequenceNumber other);
463
464
@Override
465
public boolean equals(Object obj);
466
467
@Override
468
public int hashCode();
469
470
@Override
471
public String toString();
472
}
473
```
474
475
### Best Practices
476
477
1. **Use logical replication for:**
478
- Change data capture
479
- Selective replication
480
- Data transformation pipelines
481
482
2. **Use physical replication for:**
483
- Hot standby
484
- Full database replication
485
- Backup solutions
486
487
3. **Always update LSN positions:**
488
```java
489
stream.setAppliedLSN(stream.getLastReceiveLSN());
490
stream.setFlushedLSN(stream.getLastReceiveLSN());
491
```
492
493
4. **Handle replication lag:**
494
```java
495
// Monitor lag
496
LogSequenceNumber serverLSN = getServerLSN();
497
LogSequenceNumber clientLSN = stream.getLastReceiveLSN();
498
long lag = serverLSN.asLong() - clientLSN.asLong();
499
```
500
501
5. **Clean up slots when done:**
502
```java
503
try {
504
stream.close();
505
replConn.dropReplicationSlot("my_slot");
506
} finally {
507
conn.close();
508
}
509
```
510
511
6. **Use status intervals:**
512
```java
513
// Report progress every 10 seconds
514
.withStatusInterval(10000)
515
```
516
517
7. **Handle connection failures:**
518
```java
519
while (true) {
520
try {
521
ByteBuffer msg = stream.read();
522
// process
523
} catch (SQLException e) {
524
// Reconnect and resume from last LSN
525
break;
526
}
527
}
528
```
529