CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-json

JSON format support library for Apache Flink table ecosystem with comprehensive serialization, deserialization, and CDC format capabilities

Pending
Overview
Eval results
Files

ogg-cdc.mddocs/

Oracle GoldenGate CDC Format

JSON format support for Oracle GoldenGate (OGG) Change Data Capture system, enabling processing of Oracle database changes with OGG-specific JSON formatting and comprehensive metadata. GoldenGate is Oracle's real-time data integration and replication solution.

Capabilities

OGG Format Configuration

Configuration options specific to Oracle GoldenGate JSON format, providing comprehensive control over JSON processing and error handling.

/**
 * Configuration options for OGG JSON format
 */
public class OggJsonFormatOptions {
    
    /** Whether to ignore JSON parsing errors (inherited from JsonFormatOptions) */
    public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;
    
    /** Timestamp format pattern (inherited from JsonFormatOptions) */
    public static final ConfigOption<String> TIMESTAMP_FORMAT;
    
    /** How to handle null keys in maps (inherited from JsonFormatOptions) */
    public static final ConfigOption<String> JSON_MAP_NULL_KEY_MODE;
    
    /** Literal string for null keys when mode is LITERAL (inherited from JsonFormatOptions) */
    public static final ConfigOption<String> JSON_MAP_NULL_KEY_LITERAL;
}

Configuration Usage:

import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.json.ogg.OggJsonFormatOptions;

// Configure OGG format options
Configuration config = new Configuration();
config.set(OggJsonFormatOptions.IGNORE_PARSE_ERRORS, true);
config.set(OggJsonFormatOptions.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss.SSSSSS");
config.set(OggJsonFormatOptions.JSON_MAP_NULL_KEY_MODE, "DROP");

OGG JSON Structure

Oracle GoldenGate produces change events with a structured envelope containing operation metadata and data payloads:

Insert Event

{
  "table": "USERS",
  "op_type": "I",
  "op_ts": "2023-01-01 10:00:00.123456",
  "current_ts": "2023-01-01 10:00:01.789012",
  "pos": "00000000170000001234",
  "after": {
    "ID": 1,
    "NAME": "Alice",
    "EMAIL": "alice@example.com",
    "CREATED_AT": "2023-01-01 10:00:00.000000"
  }
}

Update Event

{
  "table": "USERS",
  "op_type": "U",
  "op_ts": "2023-01-01 10:01:00.123456",
  "current_ts": "2023-01-01 10:01:01.789012",
  "pos": "00000000170000001256",
  "before": {
    "ID": 1,
    "NAME": "Alice",
    "EMAIL": "alice@example.com"
  },
  "after": {
    "ID": 1,
    "NAME": "Alice Smith",
    "EMAIL": "alice.smith@example.com",
    "CREATED_AT": "2023-01-01 10:00:00.000000"
  }
}

Delete Event

{
  "table": "USERS",
  "op_type": "D",
  "op_ts": "2023-01-01 10:02:00.123456",
  "current_ts": "2023-01-01 10:02:01.789012",
  "pos": "00000000170000001278",
  "before": {
    "ID": 1,
    "NAME": "Alice Smith",
    "EMAIL": "alice.smith@example.com",
    "CREATED_AT": "2023-01-01 10:00:00.000000"
  }
}

Table API Integration

SQL DDL Usage

Create tables using OGG JSON format for change data capture processing:

CREATE TABLE ogg_source (
  id BIGINT,
  name STRING,
  email STRING,
  created_at TIMESTAMP(6),
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'topic' = 'ogg-topic',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format' = 'ogg-json',
  'ogg-json.ignore-parse-errors' = 'true',
  'ogg-json.timestamp-format' = 'yyyy-MM-dd HH:mm:ss.SSSSSS'
);

Programmatic Table Definition

import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;

TableDescriptor oggTable = TableDescriptor.forConnector("kafka")
    .schema(Schema.newBuilder()
        .column("id", DataTypes.BIGINT())
        .column("name", DataTypes.STRING())
        .column("email", DataTypes.STRING())
        .column("created_at", DataTypes.TIMESTAMP(6))
        .primaryKey("id")
        .build())
    .option("topic", "ogg-topic")
    .option("properties.bootstrap.servers", "localhost:9092")
    .format("ogg-json")
    .option("ogg-json.ignore-parse-errors", "true")
    .option("ogg-json.timestamp-format", "yyyy-MM-dd HH:mm:ss.SSSSSS")
    .build();

Change Event Processing

OGG format handles three types of change events with comprehensive Oracle-specific metadata:

Insert Events

  • op_type: "I" (Insert)
  • after: New row data
  • before: Not present

Update Events

  • op_type: "U" (Update)
  • after: Updated row data (complete row)
  • before: Previous row data (complete row)

Delete Events

  • op_type: "D" (Delete)
  • before: Deleted row data (complete row)
  • after: Not present

OGG Metadata Fields

Oracle GoldenGate provides rich metadata for each change event:

FieldDescriptionTypeExample
tableSource table nameString"USERS"
op_typeOperation typeString"I", "U", "D"
op_tsOperation timestampString"2023-01-01 10:00:00.123456"
current_tsCurrent processing timestampString"2023-01-01 10:00:01.789012"
posGoldenGate position/SCNString"00000000170000001234"
primary_keysPrimary key column namesArray["ID"]
tokensTransaction tokensObject{...}
beforePrevious row valuesObject{...}
afterCurrent row valuesObject{...}

Accessing Metadata in Tables

CREATE TABLE ogg_with_metadata (
  -- Regular data columns
  id BIGINT,
  name STRING,
  email STRING,
  
  -- Metadata columns
  ogg_table STRING METADATA FROM 'table',
  ogg_op_type STRING METADATA FROM 'op_type',
  ogg_op_ts TIMESTAMP(6) METADATA FROM 'op_ts',
  ogg_current_ts TIMESTAMP(6) METADATA FROM 'current_ts',
  ogg_pos STRING METADATA FROM 'pos'
) WITH (
  'connector' = 'kafka',
  'format' = 'ogg-json'
  -- other connector options
);

Data Type Mapping

OGG JSON format maps Oracle types to Flink types:

Oracle TypeOGG JSONFlink TypeNotes
NUMBERNumber/StringDECIMALDepends on precision
INTEGERNumberINT
FLOATNumberFLOAT
BINARY_FLOATNumberFLOAT
BINARY_DOUBLENumberDOUBLE
VARCHAR2StringSTRING
CHARStringSTRING
NVARCHAR2StringSTRINGUnicode
CLOBStringSTRINGLarge text
DATEStringTIMESTAMPOracle DATE includes time
TIMESTAMPStringTIMESTAMPHigh precision
TIMESTAMP WITH TIME ZONEStringTIMESTAMP_LTZWith timezone
RAWStringBYTESHex encoded
BLOBStringBYTESBase64 encoded
XMLTYPEStringSTRINGAs XML string
JSONStringSTRINGAs JSON string

Timestamp Handling

Configure timestamp parsing for Oracle's high-precision timestamps:

// Oracle default timestamp format with microseconds
config.set(OggJsonFormatOptions.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss.SSSSSS");

// ISO-8601 format
config.set(OggJsonFormatOptions.TIMESTAMP_FORMAT, "ISO-8601");

// Custom format for specific Oracle DATE format
config.set(OggJsonFormatOptions.TIMESTAMP_FORMAT, "dd-MON-yy HH.mm.ss.SSSSSS AM");

Oracle Timestamp Precision

Oracle supports various timestamp precisions:

CREATE TABLE ogg_timestamps (
  id BIGINT,
  name STRING,
  
  -- Different Oracle timestamp types
  created_date TIMESTAMP(3),        -- TIMESTAMP(3)
  updated_ts TIMESTAMP(6),          -- TIMESTAMP(6) 
  event_time TIMESTAMP(9),          -- TIMESTAMP(9)
  tz_time TIMESTAMP_LTZ(6),         -- TIMESTAMP WITH TIME ZONE
  
  -- OGG metadata timestamps
  op_timestamp TIMESTAMP(6) METADATA FROM 'op_ts',
  current_timestamp TIMESTAMP(6) METADATA FROM 'current_ts'
) WITH (
  'connector' = 'kafka',
  'format' = 'ogg-json',
  'ogg-json.timestamp-format' = 'yyyy-MM-dd HH:mm:ss.SSSSSS'
);

Transaction Processing

OGG provides comprehensive transaction tracking:

Transaction Metadata

{
  "table": "USERS",
  "op_type": "I",
  "op_ts": "2023-01-01 10:00:00.123456",
  "current_ts": "2023-01-01 10:00:01.789012",
  "pos": "00000000170000001234",
  "tokens": {
    "TK-XID": "0004.00A.00000123",
    "TK-CSN": "12345678901",
    "TK-THREAD": "001"
  },
  "after": {...}
}

Transaction Boundaries

-- Access transaction information
CREATE TABLE ogg_with_transaction (
  id BIGINT,
  name STRING,
  
  -- Transaction metadata from tokens
  xid STRING METADATA FROM 'tokens.TK-XID',
  csn STRING METADATA FROM 'tokens.TK-CSN',
  thread_id STRING METADATA FROM 'tokens.TK-THREAD'
) WITH (
  'connector' = 'kafka',
  'format' = 'ogg-json'
);

Advanced OGG Features

Compressed Trail Files

OGG can output compressed change events:

{
  "table": "USERS",
  "op_type": "PK",  -- Primary key update
  "op_ts": "2023-01-01 10:00:00.123456",
  "pos": "00000000170000001234",
  "primary_keys": ["ID"],
  "after": {
    "ID": 1
  }
}

DDL Support

OGG can capture DDL changes:

{
  "ddl": "ALTER TABLE USERS ADD COLUMN PHONE VARCHAR2(20)",
  "op_type": "DDL",
  "op_ts": "2023-01-01 10:00:00.123456",
  "pos": "00000000170000001290",
  "table": "USERS"
}

Supplemental Logging

Configure Oracle supplemental logging for complete change capture:

-- Enable supplemental logging in Oracle
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
ALTER TABLE users ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

Error Handling

Configure robust error handling for production Oracle environments:

// Ignore parsing errors for resilience
config.set(OggJsonFormatOptions.IGNORE_PARSE_ERRORS, true);

// Handle Oracle-specific null keys
config.set(OggJsonFormatOptions.JSON_MAP_NULL_KEY_MODE, "LITERAL");
config.set(OggJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL, "ORA_NULL");

Common Error Scenarios

  1. Large LOB Data: Configure appropriate size limits
  2. Character Set Issues: Ensure proper UTF-8 encoding
  3. Timestamp Precision: Handle microsecond precision correctly
  4. Oracle-specific Data Types: Handle XMLType, JSON, etc.

Performance Optimization

OGG Configuration

# GoldenGate extract parameters for optimal performance
EXTRACT ext1
USERID ggadmin, PASSWORD oracle
EXTTRAIL ./dirdat/lt
TABLE schema.users;

# Replicat parameters for JSON output
REPLICAT rep1
USERID ggadmin, PASSWORD oracle
MAP schema.users, TARGET kafka.topic;

Kafka Configuration

// Optimize for high-throughput Oracle changes
Properties kafkaProps = new Properties();
kafkaProps.setProperty("batch.size", "32768");
kafkaProps.setProperty("linger.ms", "5");
kafkaProps.setProperty("compression.type", "snappy");
kafkaProps.setProperty("max.request.size", "10485760");

Production Considerations

Reliability

  • Monitor GoldenGate extract and replicat processes
  • Implement proper error handling and alerting
  • Configure GoldenGate checkpoints appropriately
  • Set up Oracle archive log retention policies

Schema Evolution

Handle Oracle schema changes:

-- Flexible schema for handling Oracle schema evolution
CREATE TABLE ogg_schema_evolution (
  -- Core columns
  id BIGINT,
  name STRING,
  
  -- Catch new columns
  full_record STRING,  -- Complete JSON for analysis
  
  -- Metadata for debugging
  ogg_table STRING METADATA FROM 'table',
  ogg_op_type STRING METADATA FROM 'op_type',
  ogg_pos STRING METADATA FROM 'pos'
) WITH (
  'connector' = 'kafka',
  'format' = 'ogg-json',
  'ogg-json.ignore-parse-errors' = 'true'
);

Monitoring

Key metrics for Oracle GoldenGate:

  • Extract lag behind Oracle redo logs
  • Replicat processing lag
  • Archive log generation rate
  • Change event volume per table
  • DDL event frequency
  • Large transaction handling

Security

  • Configure Oracle wallet for secure authentication
  • Use encrypted communication channels
  • Implement proper access controls for GoldenGate processes
  • Monitor privileged operations through audit trails

Integration Patterns

Real-time Data Warehousing

-- Real-time Oracle to data warehouse synchronization
INSERT INTO data_warehouse.users_dim
SELECT 
  id,
  name,
  email,
  CASE 
    WHEN ogg_op_type = 'D' THEN CURRENT_TIMESTAMP
    ELSE NULL 
  END as deleted_at,
  ogg_op_ts as last_modified
FROM ogg_with_metadata
WHERE ogg_table = 'USERS';

Change Data Analysis

-- Analyze Oracle change patterns
SELECT 
  ogg_table,
  ogg_op_type,
  COUNT(*) as operation_count,
  AVG(TIMESTAMPDIFF(SECOND, ogg_op_ts, ogg_current_ts)) as avg_latency_seconds
FROM ogg_with_metadata
WHERE ogg_op_ts >= CURRENT_TIMESTAMP - INTERVAL '1' HOUR
GROUP BY ogg_table, ogg_op_type
ORDER BY operation_count DESC;

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-json

docs

canal-cdc.md

core-json.md

debezium-cdc.md

index.md

maxwell-cdc.md

ogg-cdc.md

tile.json