or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

canal-cdc.mdcore-json.mddebezium-cdc.mdindex.mdmaxwell-cdc.mdogg-cdc.md

ogg-cdc.mddocs/

0

# Oracle GoldenGate CDC Format

1

2

JSON format support for Oracle GoldenGate (OGG) Change Data Capture system, enabling processing of Oracle database changes with OGG-specific JSON formatting and comprehensive metadata. GoldenGate is Oracle's real-time data integration and replication solution.

3

4

## Capabilities

5

6

### OGG Format Configuration

7

8

Configuration options specific to Oracle GoldenGate JSON format, providing comprehensive control over JSON processing and error handling.

9

10

```java { .api }

11

/**

12

* Configuration options for OGG JSON format

13

*/

14

public class OggJsonFormatOptions {

15

16

/** Whether to ignore JSON parsing errors (inherited from JsonFormatOptions) */

17

public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;

18

19

/** Timestamp format pattern (inherited from JsonFormatOptions) */

20

public static final ConfigOption<String> TIMESTAMP_FORMAT;

21

22

/** How to handle null keys in maps (inherited from JsonFormatOptions) */

23

public static final ConfigOption<String> JSON_MAP_NULL_KEY_MODE;

24

25

/** Literal string for null keys when mode is LITERAL (inherited from JsonFormatOptions) */

26

public static final ConfigOption<String> JSON_MAP_NULL_KEY_LITERAL;

27

}

28

```

29

30

**Configuration Usage:**

31

32

```java

33

import org.apache.flink.configuration.Configuration;

34

import org.apache.flink.formats.json.ogg.OggJsonFormatOptions;

35

36

// Configure OGG format options

37

Configuration config = new Configuration();

38

config.set(OggJsonFormatOptions.IGNORE_PARSE_ERRORS, true);

39

config.set(OggJsonFormatOptions.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss.SSSSSS");

40

config.set(OggJsonFormatOptions.JSON_MAP_NULL_KEY_MODE, "DROP");

41

```

42

43

## OGG JSON Structure

44

45

Oracle GoldenGate produces change events with a structured envelope containing operation metadata and data payloads:

46

47

### Insert Event

48

49

```json

50

{

51

"table": "USERS",

52

"op_type": "I",

53

"op_ts": "2023-01-01 10:00:00.123456",

54

"current_ts": "2023-01-01 10:00:01.789012",

55

"pos": "00000000170000001234",

56

"after": {

57

"ID": 1,

58

"NAME": "Alice",

59

"EMAIL": "alice@example.com",

60

"CREATED_AT": "2023-01-01 10:00:00.000000"

61

}

62

}

63

```

64

65

### Update Event

66

67

```json

68

{

69

"table": "USERS",

70

"op_type": "U",

71

"op_ts": "2023-01-01 10:01:00.123456",

72

"current_ts": "2023-01-01 10:01:01.789012",

73

"pos": "00000000170000001256",

74

"before": {

75

"ID": 1,

76

"NAME": "Alice",

77

"EMAIL": "alice@example.com"

78

},

79

"after": {

80

"ID": 1,

81

"NAME": "Alice Smith",

82

"EMAIL": "alice.smith@example.com",

83

"CREATED_AT": "2023-01-01 10:00:00.000000"

84

}

85

}

86

```

87

88

### Delete Event

89

90

```json

91

{

92

"table": "USERS",

93

"op_type": "D",

94

"op_ts": "2023-01-01 10:02:00.123456",

95

"current_ts": "2023-01-01 10:02:01.789012",

96

"pos": "00000000170000001278",

97

"before": {

98

"ID": 1,

99

"NAME": "Alice Smith",

100

"EMAIL": "alice.smith@example.com",

101

"CREATED_AT": "2023-01-01 10:00:00.000000"

102

}

103

}

104

```

105

106

## Table API Integration

107

108

### SQL DDL Usage

109

110

Create tables using OGG JSON format for change data capture processing:

111

112

```sql

113

CREATE TABLE ogg_source (

114

id BIGINT,

115

name STRING,

116

email STRING,

117

created_at TIMESTAMP(6),

118

PRIMARY KEY (id) NOT ENFORCED

119

) WITH (

120

'connector' = 'kafka',

121

'topic' = 'ogg-topic',

122

'properties.bootstrap.servers' = 'localhost:9092',

123

'format' = 'ogg-json',

124

'ogg-json.ignore-parse-errors' = 'true',

125

'ogg-json.timestamp-format' = 'yyyy-MM-dd HH:mm:ss.SSSSSS'

126

);

127

```

128

129

### Programmatic Table Definition

130

131

```java

132

import org.apache.flink.table.api.TableDescriptor;

133

import org.apache.flink.table.api.DataTypes;

134

import org.apache.flink.table.api.Schema;

135

136

TableDescriptor oggTable = TableDescriptor.forConnector("kafka")

137

.schema(Schema.newBuilder()

138

.column("id", DataTypes.BIGINT())

139

.column("name", DataTypes.STRING())

140

.column("email", DataTypes.STRING())

141

.column("created_at", DataTypes.TIMESTAMP(6))

142

.primaryKey("id")

143

.build())

144

.option("topic", "ogg-topic")

145

.option("properties.bootstrap.servers", "localhost:9092")

146

.format("ogg-json")

147

.option("ogg-json.ignore-parse-errors", "true")

148

.option("ogg-json.timestamp-format", "yyyy-MM-dd HH:mm:ss.SSSSSS")

149

.build();

150

```

151

152

## Change Event Processing

153

154

OGG format handles three types of change events with comprehensive Oracle-specific metadata:

155

156

### Insert Events

157

- `op_type`: "I" (Insert)

158

- `after`: New row data

159

- `before`: Not present

160

161

### Update Events

162

- `op_type`: "U" (Update)

163

- `after`: Updated row data (complete row)

164

- `before`: Previous row data (complete row)

165

166

### Delete Events

167

- `op_type`: "D" (Delete)

168

- `before`: Deleted row data (complete row)

169

- `after`: Not present

170

171

## OGG Metadata Fields

172

173

Oracle GoldenGate provides rich metadata for each change event:

174

175

| Field | Description | Type | Example |

176

|-------|-------------|------|---------|

177

| table | Source table name | String | "USERS" |

178

| op_type | Operation type | String | "I", "U", "D" |

179

| op_ts | Operation timestamp | String | "2023-01-01 10:00:00.123456" |

180

| current_ts | Current processing timestamp | String | "2023-01-01 10:00:01.789012" |

181

| pos | GoldenGate position/SCN | String | "00000000170000001234" |

182

| primary_keys | Primary key column names | Array | ["ID"] |

183

| tokens | Transaction tokens | Object | {...} |

184

| before | Previous row values | Object | {...} |

185

| after | Current row values | Object | {...} |

186

187

### Accessing Metadata in Tables

188

189

```sql

190

CREATE TABLE ogg_with_metadata (

191

-- Regular data columns

192

id BIGINT,

193

name STRING,

194

email STRING,

195

196

-- Metadata columns

197

ogg_table STRING METADATA FROM 'table',

198

ogg_op_type STRING METADATA FROM 'op_type',

199

ogg_op_ts TIMESTAMP(6) METADATA FROM 'op_ts',

200

ogg_current_ts TIMESTAMP(6) METADATA FROM 'current_ts',

201

ogg_pos STRING METADATA FROM 'pos'

202

) WITH (

203

'connector' = 'kafka',

204

'format' = 'ogg-json'

205

-- other connector options

206

);

207

```

208

209

## Data Type Mapping

210

211

OGG JSON format maps Oracle types to Flink types:

212

213

| Oracle Type | OGG JSON | Flink Type | Notes |

214

|-------------|----------|------------|-------|

215

| NUMBER | Number/String | DECIMAL | Depends on precision |

216

| INTEGER | Number | INT | |

217

| FLOAT | Number | FLOAT | |

218

| BINARY_FLOAT | Number | FLOAT | |

219

| BINARY_DOUBLE | Number | DOUBLE | |

220

| VARCHAR2 | String | STRING | |

221

| CHAR | String | STRING | |

222

| NVARCHAR2 | String | STRING | Unicode |

223

| CLOB | String | STRING | Large text |

224

| DATE | String | TIMESTAMP | Oracle DATE includes time |

225

| TIMESTAMP | String | TIMESTAMP | High precision |

226

| TIMESTAMP WITH TIME ZONE | String | TIMESTAMP_LTZ | With timezone |

227

| RAW | String | BYTES | Hex encoded |

228

| BLOB | String | BYTES | Base64 encoded |

229

| XMLTYPE | String | STRING | As XML string |

230

| JSON | String | STRING | As JSON string |

231

232

## Timestamp Handling

233

234

Configure timestamp parsing for Oracle's high-precision timestamps:

235

236

```java

237

// Oracle default timestamp format with microseconds

238

config.set(OggJsonFormatOptions.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss.SSSSSS");

239

240

// ISO-8601 format

241

config.set(OggJsonFormatOptions.TIMESTAMP_FORMAT, "ISO-8601");

242

243

// Custom format for specific Oracle DATE format

244

config.set(OggJsonFormatOptions.TIMESTAMP_FORMAT, "dd-MON-yy HH.mm.ss.SSSSSS AM");

245

```

246

247

### Oracle Timestamp Precision

248

249

Oracle supports various timestamp precisions:

250

251

```sql

252

CREATE TABLE ogg_timestamps (

253

id BIGINT,

254

name STRING,

255

256

-- Different Oracle timestamp types

257

created_date TIMESTAMP(3), -- TIMESTAMP(3)

258

updated_ts TIMESTAMP(6), -- TIMESTAMP(6)

259

event_time TIMESTAMP(9), -- TIMESTAMP(9)

260

tz_time TIMESTAMP_LTZ(6), -- TIMESTAMP WITH TIME ZONE

261

262

-- OGG metadata timestamps

263

op_timestamp TIMESTAMP(6) METADATA FROM 'op_ts',

264

current_timestamp TIMESTAMP(6) METADATA FROM 'current_ts'

265

) WITH (

266

'connector' = 'kafka',

267

'format' = 'ogg-json',

268

'ogg-json.timestamp-format' = 'yyyy-MM-dd HH:mm:ss.SSSSSS'

269

);

270

```

271

272

## Transaction Processing

273

274

OGG provides comprehensive transaction tracking:

275

276

### Transaction Metadata

277

278

```json

279

{

280

"table": "USERS",

281

"op_type": "I",

282

"op_ts": "2023-01-01 10:00:00.123456",

283

"current_ts": "2023-01-01 10:00:01.789012",

284

"pos": "00000000170000001234",

285

"tokens": {

286

"TK-XID": "0004.00A.00000123",

287

"TK-CSN": "12345678901",

288

"TK-THREAD": "001"

289

},

290

"after": {...}

291

}

292

```

293

294

### Transaction Boundaries

295

296

```sql

297

-- Access transaction information

298

CREATE TABLE ogg_with_transaction (

299

id BIGINT,

300

name STRING,

301

302

-- Transaction metadata from tokens

303

xid STRING METADATA FROM 'tokens.TK-XID',

304

csn STRING METADATA FROM 'tokens.TK-CSN',

305

thread_id STRING METADATA FROM 'tokens.TK-THREAD'

306

) WITH (

307

'connector' = 'kafka',

308

'format' = 'ogg-json'

309

);

310

```

311

312

## Advanced OGG Features

313

314

### Compressed Trail Files

315

316

OGG can output compressed change events:

317

318

```json

319

{

320

"table": "USERS",

321

"op_type": "PK", -- Primary key update

322

"op_ts": "2023-01-01 10:00:00.123456",

323

"pos": "00000000170000001234",

324

"primary_keys": ["ID"],

325

"after": {

326

"ID": 1

327

}

328

}

329

```

330

331

### DDL Support

332

333

OGG can capture DDL changes:

334

335

```json

336

{

337

"ddl": "ALTER TABLE USERS ADD COLUMN PHONE VARCHAR2(20)",

338

"op_type": "DDL",

339

"op_ts": "2023-01-01 10:00:00.123456",

340

"pos": "00000000170000001290",

341

"table": "USERS"

342

}

343

```

344

345

### Supplemental Logging

346

347

Configure Oracle supplemental logging for complete change capture:

348

349

```sql

350

-- Enable supplemental logging in Oracle

351

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

352

ALTER TABLE users ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

353

```

354

355

## Error Handling

356

357

Configure robust error handling for production Oracle environments:

358

359

```java

360

// Ignore parsing errors for resilience

361

config.set(OggJsonFormatOptions.IGNORE_PARSE_ERRORS, true);

362

363

// Handle Oracle-specific null keys

364

config.set(OggJsonFormatOptions.JSON_MAP_NULL_KEY_MODE, "LITERAL");

365

config.set(OggJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL, "ORA_NULL");

366

```

367

368

### Common Error Scenarios

369

370

1. **Large LOB Data**: Configure appropriate size limits

371

2. **Character Set Issues**: Ensure proper UTF-8 encoding

372

3. **Timestamp Precision**: Handle microsecond precision correctly

373

4. **Oracle-specific Data Types**: Handle XMLType, JSON, etc.

374

375

## Performance Optimization

376

377

### OGG Configuration

378

379

```properties

380

# GoldenGate extract parameters for optimal performance

381

EXTRACT ext1

382

USERID ggadmin, PASSWORD oracle

383

EXTTRAIL ./dirdat/lt

384

TABLE schema.users;

385

386

# Replicat parameters for JSON output

387

REPLICAT rep1

388

USERID ggadmin, PASSWORD oracle

389

MAP schema.users, TARGET kafka.topic;

390

```

391

392

### Kafka Configuration

393

394

```java

395

// Optimize for high-throughput Oracle changes

396

Properties kafkaProps = new Properties();

397

kafkaProps.setProperty("batch.size", "32768");

398

kafkaProps.setProperty("linger.ms", "5");

399

kafkaProps.setProperty("compression.type", "snappy");

400

kafkaProps.setProperty("max.request.size", "10485760");

401

```

402

403

## Production Considerations

404

405

### Reliability

406

407

- Monitor GoldenGate extract and replicat processes

408

- Implement proper error handling and alerting

409

- Configure GoldenGate checkpoints appropriately

410

- Set up Oracle archive log retention policies

411

412

### Schema Evolution

413

414

Handle Oracle schema changes:

415

416

```sql

417

-- Flexible schema for handling Oracle schema evolution

418

CREATE TABLE ogg_schema_evolution (

419

-- Core columns

420

id BIGINT,

421

name STRING,

422

423

-- Catch new columns

424

full_record STRING, -- Complete JSON for analysis

425

426

-- Metadata for debugging

427

ogg_table STRING METADATA FROM 'table',

428

ogg_op_type STRING METADATA FROM 'op_type',

429

ogg_pos STRING METADATA FROM 'pos'

430

) WITH (

431

'connector' = 'kafka',

432

'format' = 'ogg-json',

433

'ogg-json.ignore-parse-errors' = 'true'

434

);

435

```

436

437

### Monitoring

438

439

Key metrics for Oracle GoldenGate:

440

- Extract lag behind Oracle redo logs

441

- Replicat processing lag

442

- Archive log generation rate

443

- Change event volume per table

444

- DDL event frequency

445

- Large transaction handling

446

447

### Security

448

449

- Configure Oracle wallet for secure authentication

450

- Use encrypted communication channels

451

- Implement proper access controls for GoldenGate processes

452

- Monitor privileged operations through audit trails

453

454

## Integration Patterns

455

456

### Real-time Data Warehousing

457

458

```sql

459

-- Real-time Oracle to data warehouse synchronization

460

INSERT INTO data_warehouse.users_dim

461

SELECT

462

id,

463

name,

464

email,

465

CASE

466

WHEN ogg_op_type = 'D' THEN CURRENT_TIMESTAMP

467

ELSE NULL

468

END as deleted_at,

469

ogg_op_ts as last_modified

470

FROM ogg_with_metadata

471

WHERE ogg_table = 'USERS';

472

```

473

474

### Change Data Analysis

475

476

```sql

477

-- Analyze Oracle change patterns

478

SELECT

479

ogg_table,

480

ogg_op_type,

481

COUNT(*) as operation_count,

482

AVG(TIMESTAMPDIFF(SECOND, ogg_op_ts, ogg_current_ts)) as avg_latency_seconds

483

FROM ogg_with_metadata

484

WHERE ogg_op_ts >= CURRENT_TIMESTAMP - INTERVAL '1' HOUR

485

GROUP BY ogg_table, ogg_op_type

486

ORDER BY operation_count DESC;

487

```