0
# Oracle GoldenGate CDC Format
1
2
JSON format support for Oracle GoldenGate (OGG) Change Data Capture system, enabling processing of Oracle database changes with OGG-specific JSON formatting and comprehensive metadata. GoldenGate is Oracle's real-time data integration and replication solution.
3
4
## Capabilities
5
6
### OGG Format Configuration
7
8
Configuration options specific to Oracle GoldenGate JSON format, providing comprehensive control over JSON processing and error handling.
9
10
```java { .api }
11
/**
12
* Configuration options for OGG JSON format
13
*/
14
public class OggJsonFormatOptions {
15
16
/** Whether to ignore JSON parsing errors (inherited from JsonFormatOptions) */
17
public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;
18
19
/** Timestamp format pattern (inherited from JsonFormatOptions) */
20
public static final ConfigOption<String> TIMESTAMP_FORMAT;
21
22
/** How to handle null keys in maps (inherited from JsonFormatOptions) */
23
public static final ConfigOption<String> JSON_MAP_NULL_KEY_MODE;
24
25
/** Literal string for null keys when mode is LITERAL (inherited from JsonFormatOptions) */
26
public static final ConfigOption<String> JSON_MAP_NULL_KEY_LITERAL;
27
}
28
```
29
30
**Configuration Usage:**
31
32
```java
33
import org.apache.flink.configuration.Configuration;
34
import org.apache.flink.formats.json.ogg.OggJsonFormatOptions;
35
36
// Configure OGG format options
37
Configuration config = new Configuration();
38
config.set(OggJsonFormatOptions.IGNORE_PARSE_ERRORS, true);
39
config.set(OggJsonFormatOptions.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss.SSSSSS");
40
config.set(OggJsonFormatOptions.JSON_MAP_NULL_KEY_MODE, "DROP");
41
```
42
43
## OGG JSON Structure
44
45
Oracle GoldenGate produces change events with a structured envelope containing operation metadata and data payloads:
46
47
### Insert Event
48
49
```json
50
{
51
"table": "USERS",
52
"op_type": "I",
53
"op_ts": "2023-01-01 10:00:00.123456",
54
"current_ts": "2023-01-01 10:00:01.789012",
55
"pos": "00000000170000001234",
56
"after": {
57
"ID": 1,
58
"NAME": "Alice",
59
"EMAIL": "alice@example.com",
60
"CREATED_AT": "2023-01-01 10:00:00.000000"
61
}
62
}
63
```
64
65
### Update Event
66
67
```json
68
{
69
"table": "USERS",
70
"op_type": "U",
71
"op_ts": "2023-01-01 10:01:00.123456",
72
"current_ts": "2023-01-01 10:01:01.789012",
73
"pos": "00000000170000001256",
74
"before": {
75
"ID": 1,
76
"NAME": "Alice",
77
"EMAIL": "alice@example.com"
78
},
79
"after": {
80
"ID": 1,
81
"NAME": "Alice Smith",
82
"EMAIL": "alice.smith@example.com",
83
"CREATED_AT": "2023-01-01 10:00:00.000000"
84
}
85
}
86
```
87
88
### Delete Event
89
90
```json
91
{
92
"table": "USERS",
93
"op_type": "D",
94
"op_ts": "2023-01-01 10:02:00.123456",
95
"current_ts": "2023-01-01 10:02:01.789012",
96
"pos": "00000000170000001278",
97
"before": {
98
"ID": 1,
99
"NAME": "Alice Smith",
100
"EMAIL": "alice.smith@example.com",
101
"CREATED_AT": "2023-01-01 10:00:00.000000"
102
}
103
}
104
```
105
106
## Table API Integration
107
108
### SQL DDL Usage
109
110
Create tables using OGG JSON format for change data capture processing:
111
112
```sql
113
CREATE TABLE ogg_source (
114
id BIGINT,
115
name STRING,
116
email STRING,
117
created_at TIMESTAMP(6),
118
PRIMARY KEY (id) NOT ENFORCED
119
) WITH (
120
'connector' = 'kafka',
121
'topic' = 'ogg-topic',
122
'properties.bootstrap.servers' = 'localhost:9092',
123
'format' = 'ogg-json',
124
'ogg-json.ignore-parse-errors' = 'true',
125
'ogg-json.timestamp-format' = 'yyyy-MM-dd HH:mm:ss.SSSSSS'
126
);
127
```
128
129
### Programmatic Table Definition
130
131
```java
132
import org.apache.flink.table.api.TableDescriptor;
133
import org.apache.flink.table.api.DataTypes;
134
import org.apache.flink.table.api.Schema;
135
136
TableDescriptor oggTable = TableDescriptor.forConnector("kafka")
137
.schema(Schema.newBuilder()
138
.column("id", DataTypes.BIGINT())
139
.column("name", DataTypes.STRING())
140
.column("email", DataTypes.STRING())
141
.column("created_at", DataTypes.TIMESTAMP(6))
142
.primaryKey("id")
143
.build())
144
.option("topic", "ogg-topic")
145
.option("properties.bootstrap.servers", "localhost:9092")
146
.format("ogg-json")
147
.option("ogg-json.ignore-parse-errors", "true")
148
.option("ogg-json.timestamp-format", "yyyy-MM-dd HH:mm:ss.SSSSSS")
149
.build();
150
```
151
152
## Change Event Processing
153
154
OGG format handles three types of change events with comprehensive Oracle-specific metadata:
155
156
### Insert Events
157
- `op_type`: "I" (Insert)
158
- `after`: New row data
159
- `before`: Not present
160
161
### Update Events
162
- `op_type`: "U" (Update)
163
- `after`: Updated row data (complete row)
164
- `before`: Previous row data (complete row)
165
166
### Delete Events
167
- `op_type`: "D" (Delete)
168
- `before`: Deleted row data (complete row)
169
- `after`: Not present
170
171
## OGG Metadata Fields
172
173
Oracle GoldenGate provides rich metadata for each change event:
174
175
| Field | Description | Type | Example |
176
|-------|-------------|------|---------|
177
| table | Source table name | String | "USERS" |
178
| op_type | Operation type | String | "I", "U", "D" |
179
| op_ts | Operation timestamp | String | "2023-01-01 10:00:00.123456" |
180
| current_ts | Current processing timestamp | String | "2023-01-01 10:00:01.789012" |
181
| pos | GoldenGate position/SCN | String | "00000000170000001234" |
182
| primary_keys | Primary key column names | Array | ["ID"] |
183
| tokens | Transaction tokens | Object | {...} |
184
| before | Previous row values | Object | {...} |
185
| after | Current row values | Object | {...} |
186
187
### Accessing Metadata in Tables
188
189
```sql
190
CREATE TABLE ogg_with_metadata (
191
-- Regular data columns
192
id BIGINT,
193
name STRING,
194
email STRING,
195
196
-- Metadata columns
197
ogg_table STRING METADATA FROM 'table',
198
ogg_op_type STRING METADATA FROM 'op_type',
199
ogg_op_ts TIMESTAMP(6) METADATA FROM 'op_ts',
200
ogg_current_ts TIMESTAMP(6) METADATA FROM 'current_ts',
201
ogg_pos STRING METADATA FROM 'pos'
202
) WITH (
203
'connector' = 'kafka',
204
'format' = 'ogg-json'
205
-- other connector options
206
);
207
```
208
209
## Data Type Mapping
210
211
OGG JSON format maps Oracle types to Flink types:
212
213
| Oracle Type | OGG JSON | Flink Type | Notes |
214
|-------------|----------|------------|-------|
215
| NUMBER | Number/String | DECIMAL | Depends on precision |
216
| INTEGER | Number | INT | |
217
| FLOAT | Number | FLOAT | |
218
| BINARY_FLOAT | Number | FLOAT | |
219
| BINARY_DOUBLE | Number | DOUBLE | |
220
| VARCHAR2 | String | STRING | |
221
| CHAR | String | STRING | |
222
| NVARCHAR2 | String | STRING | Unicode |
223
| CLOB | String | STRING | Large text |
224
| DATE | String | TIMESTAMP | Oracle DATE includes time |
225
| TIMESTAMP | String | TIMESTAMP | High precision |
226
| TIMESTAMP WITH TIME ZONE | String | TIMESTAMP_LTZ | With timezone |
227
| RAW | String | BYTES | Hex encoded |
228
| BLOB | String | BYTES | Base64 encoded |
229
| XMLTYPE | String | STRING | As XML string |
230
| JSON | String | STRING | As JSON string |
231
232
## Timestamp Handling
233
234
Configure timestamp parsing for Oracle's high-precision timestamps:
235
236
```java
237
// Oracle default timestamp format with microseconds
238
config.set(OggJsonFormatOptions.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss.SSSSSS");
239
240
// ISO-8601 format
241
config.set(OggJsonFormatOptions.TIMESTAMP_FORMAT, "ISO-8601");
242
243
// Custom format for specific Oracle DATE format
244
config.set(OggJsonFormatOptions.TIMESTAMP_FORMAT, "dd-MON-yy HH.mm.ss.SSSSSS AM");
245
```
246
247
### Oracle Timestamp Precision
248
249
Oracle supports various timestamp precisions:
250
251
```sql
252
CREATE TABLE ogg_timestamps (
253
id BIGINT,
254
name STRING,
255
256
-- Different Oracle timestamp types
257
created_date TIMESTAMP(3), -- TIMESTAMP(3)
258
updated_ts TIMESTAMP(6), -- TIMESTAMP(6)
259
event_time TIMESTAMP(9), -- TIMESTAMP(9)
260
tz_time TIMESTAMP_LTZ(6), -- TIMESTAMP WITH TIME ZONE
261
262
-- OGG metadata timestamps
263
op_timestamp TIMESTAMP(6) METADATA FROM 'op_ts',
264
current_timestamp TIMESTAMP(6) METADATA FROM 'current_ts'
265
) WITH (
266
'connector' = 'kafka',
267
'format' = 'ogg-json',
268
'ogg-json.timestamp-format' = 'yyyy-MM-dd HH:mm:ss.SSSSSS'
269
);
270
```
271
272
## Transaction Processing
273
274
OGG provides comprehensive transaction tracking:
275
276
### Transaction Metadata
277
278
```json
279
{
280
"table": "USERS",
281
"op_type": "I",
282
"op_ts": "2023-01-01 10:00:00.123456",
283
"current_ts": "2023-01-01 10:00:01.789012",
284
"pos": "00000000170000001234",
285
"tokens": {
286
"TK-XID": "0004.00A.00000123",
287
"TK-CSN": "12345678901",
288
"TK-THREAD": "001"
289
},
290
"after": {...}
291
}
292
```
293
294
### Transaction Boundaries
295
296
```sql
297
-- Access transaction information
298
CREATE TABLE ogg_with_transaction (
299
id BIGINT,
300
name STRING,
301
302
-- Transaction metadata from tokens
303
xid STRING METADATA FROM 'tokens.TK-XID',
304
csn STRING METADATA FROM 'tokens.TK-CSN',
305
thread_id STRING METADATA FROM 'tokens.TK-THREAD'
306
) WITH (
307
'connector' = 'kafka',
308
'format' = 'ogg-json'
309
);
310
```
311
312
## Advanced OGG Features
313
314
### Compressed Trail Files
315
316
OGG can output compressed change events:
317
318
```json
319
{
320
"table": "USERS",
321
"op_type": "PK", -- Primary key update
322
"op_ts": "2023-01-01 10:00:00.123456",
323
"pos": "00000000170000001234",
324
"primary_keys": ["ID"],
325
"after": {
326
"ID": 1
327
}
328
}
329
```
330
331
### DDL Support
332
333
OGG can capture DDL changes:
334
335
```json
336
{
337
"ddl": "ALTER TABLE USERS ADD COLUMN PHONE VARCHAR2(20)",
338
"op_type": "DDL",
339
"op_ts": "2023-01-01 10:00:00.123456",
340
"pos": "00000000170000001290",
341
"table": "USERS"
342
}
343
```
344
345
### Supplemental Logging
346
347
Configure Oracle supplemental logging for complete change capture:
348
349
```sql
350
-- Enable supplemental logging in Oracle
351
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
352
ALTER TABLE users ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
353
```
354
355
## Error Handling
356
357
Configure robust error handling for production Oracle environments:
358
359
```java
360
// Ignore parsing errors for resilience
361
config.set(OggJsonFormatOptions.IGNORE_PARSE_ERRORS, true);
362
363
// Handle Oracle-specific null keys
364
config.set(OggJsonFormatOptions.JSON_MAP_NULL_KEY_MODE, "LITERAL");
365
config.set(OggJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL, "ORA_NULL");
366
```
367
368
### Common Error Scenarios
369
370
1. **Large LOB Data**: Configure appropriate size limits
371
2. **Character Set Issues**: Ensure proper UTF-8 encoding
372
3. **Timestamp Precision**: Handle microsecond precision correctly
373
4. **Oracle-specific Data Types**: Handle XMLType, JSON, etc.
374
375
## Performance Optimization
376
377
### OGG Configuration
378
379
```properties
380
# GoldenGate extract parameters for optimal performance
381
EXTRACT ext1
382
USERID ggadmin, PASSWORD oracle
383
EXTTRAIL ./dirdat/lt
384
TABLE schema.users;
385
386
# Replicat parameters for JSON output
387
REPLICAT rep1
388
USERID ggadmin, PASSWORD oracle
389
MAP schema.users, TARGET kafka.topic;
390
```
391
392
### Kafka Configuration
393
394
```java
395
// Optimize for high-throughput Oracle changes
396
Properties kafkaProps = new Properties();
397
kafkaProps.setProperty("batch.size", "32768");
398
kafkaProps.setProperty("linger.ms", "5");
399
kafkaProps.setProperty("compression.type", "snappy");
400
kafkaProps.setProperty("max.request.size", "10485760");
401
```
402
403
## Production Considerations
404
405
### Reliability
406
407
- Monitor GoldenGate extract and replicat processes
408
- Implement proper error handling and alerting
409
- Configure GoldenGate checkpoints appropriately
410
- Set up Oracle archive log retention policies
411
412
### Schema Evolution
413
414
Handle Oracle schema changes:
415
416
```sql
417
-- Flexible schema for handling Oracle schema evolution
418
CREATE TABLE ogg_schema_evolution (
419
-- Core columns
420
id BIGINT,
421
name STRING,
422
423
-- Catch new columns
424
full_record STRING, -- Complete JSON for analysis
425
426
-- Metadata for debugging
427
ogg_table STRING METADATA FROM 'table',
428
ogg_op_type STRING METADATA FROM 'op_type',
429
ogg_pos STRING METADATA FROM 'pos'
430
) WITH (
431
'connector' = 'kafka',
432
'format' = 'ogg-json',
433
'ogg-json.ignore-parse-errors' = 'true'
434
);
435
```
436
437
### Monitoring
438
439
Key metrics for Oracle GoldenGate:
440
- Extract lag behind Oracle redo logs
441
- Replicat processing lag
442
- Archive log generation rate
443
- Change event volume per table
444
- DDL event frequency
445
- Large transaction handling
446
447
### Security
448
449
- Configure Oracle wallet for secure authentication
450
- Use encrypted communication channels
451
- Implement proper access controls for GoldenGate processes
452
- Monitor privileged operations through audit trails
453
454
## Integration Patterns
455
456
### Real-time Data Warehousing
457
458
```sql
459
-- Real-time Oracle to data warehouse synchronization
460
INSERT INTO data_warehouse.users_dim
461
SELECT
462
id,
463
name,
464
email,
465
CASE
466
WHEN ogg_op_type = 'D' THEN CURRENT_TIMESTAMP
467
ELSE NULL
468
END as deleted_at,
469
ogg_op_ts as last_modified
470
FROM ogg_with_metadata
471
WHERE ogg_table = 'USERS';
472
```
473
474
### Change Data Analysis
475
476
```sql
477
-- Analyze Oracle change patterns
478
SELECT
479
ogg_table,
480
ogg_op_type,
481
COUNT(*) as operation_count,
482
AVG(TIMESTAMPDIFF(SECOND, ogg_op_ts, ogg_current_ts)) as avg_latency_seconds
483
FROM ogg_with_metadata
484
WHERE ogg_op_ts >= CURRENT_TIMESTAMP - INTERVAL '1' HOUR
485
GROUP BY ogg_table, ogg_op_type
486
ORDER BY operation_count DESC;
487
```