JSON format support library for Apache Flink table ecosystem with comprehensive serialization, deserialization, and CDC format capabilities
—
JSON format support for Oracle GoldenGate (OGG) Change Data Capture system, enabling processing of Oracle database changes with OGG-specific JSON formatting and comprehensive metadata. GoldenGate is Oracle's real-time data integration and replication solution.
Configuration options specific to Oracle GoldenGate JSON format, providing comprehensive control over JSON processing and error handling.
/**
* Configuration options for OGG JSON format
*/
public class OggJsonFormatOptions {
/** Whether to ignore JSON parsing errors (inherited from JsonFormatOptions) */
public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;
/** Timestamp format pattern (inherited from JsonFormatOptions) */
public static final ConfigOption<String> TIMESTAMP_FORMAT;
/** How to handle null keys in maps (inherited from JsonFormatOptions) */
public static final ConfigOption<String> JSON_MAP_NULL_KEY_MODE;
/** Literal string for null keys when mode is LITERAL (inherited from JsonFormatOptions) */
public static final ConfigOption<String> JSON_MAP_NULL_KEY_LITERAL;
}Configuration Usage:
import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.json.ogg.OggJsonFormatOptions;
// Configure OGG format options
Configuration config = new Configuration();
config.set(OggJsonFormatOptions.IGNORE_PARSE_ERRORS, true);
config.set(OggJsonFormatOptions.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss.SSSSSS");
config.set(OggJsonFormatOptions.JSON_MAP_NULL_KEY_MODE, "DROP");Oracle GoldenGate produces change events with a structured envelope containing operation metadata and data payloads:
{
"table": "USERS",
"op_type": "I",
"op_ts": "2023-01-01 10:00:00.123456",
"current_ts": "2023-01-01 10:00:01.789012",
"pos": "00000000170000001234",
"after": {
"ID": 1,
"NAME": "Alice",
"EMAIL": "alice@example.com",
"CREATED_AT": "2023-01-01 10:00:00.000000"
}
}{
"table": "USERS",
"op_type": "U",
"op_ts": "2023-01-01 10:01:00.123456",
"current_ts": "2023-01-01 10:01:01.789012",
"pos": "00000000170000001256",
"before": {
"ID": 1,
"NAME": "Alice",
"EMAIL": "alice@example.com"
},
"after": {
"ID": 1,
"NAME": "Alice Smith",
"EMAIL": "alice.smith@example.com",
"CREATED_AT": "2023-01-01 10:00:00.000000"
}
}{
"table": "USERS",
"op_type": "D",
"op_ts": "2023-01-01 10:02:00.123456",
"current_ts": "2023-01-01 10:02:01.789012",
"pos": "00000000170000001278",
"before": {
"ID": 1,
"NAME": "Alice Smith",
"EMAIL": "alice.smith@example.com",
"CREATED_AT": "2023-01-01 10:00:00.000000"
}
}Create tables using OGG JSON format for change data capture processing:
CREATE TABLE ogg_source (
id BIGINT,
name STRING,
email STRING,
created_at TIMESTAMP(6),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'ogg-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'ogg-json',
'ogg-json.ignore-parse-errors' = 'true',
'ogg-json.timestamp-format' = 'yyyy-MM-dd HH:mm:ss.SSSSSS'
);import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
TableDescriptor oggTable = TableDescriptor.forConnector("kafka")
.schema(Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.column("name", DataTypes.STRING())
.column("email", DataTypes.STRING())
.column("created_at", DataTypes.TIMESTAMP(6))
.primaryKey("id")
.build())
.option("topic", "ogg-topic")
.option("properties.bootstrap.servers", "localhost:9092")
.format("ogg-json")
.option("ogg-json.ignore-parse-errors", "true")
.option("ogg-json.timestamp-format", "yyyy-MM-dd HH:mm:ss.SSSSSS")
.build();OGG format handles three types of change events with comprehensive Oracle-specific metadata:
op_type: "I" (Insert)after: New row databefore: Not presentop_type: "U" (Update)after: Updated row data (complete row)before: Previous row data (complete row)op_type: "D" (Delete)before: Deleted row data (complete row)after: Not presentOracle GoldenGate provides rich metadata for each change event:
| Field | Description | Type | Example |
|---|---|---|---|
| table | Source table name | String | "USERS" |
| op_type | Operation type | String | "I", "U", "D" |
| op_ts | Operation timestamp | String | "2023-01-01 10:00:00.123456" |
| current_ts | Current processing timestamp | String | "2023-01-01 10:00:01.789012" |
| pos | GoldenGate position/SCN | String | "00000000170000001234" |
| primary_keys | Primary key column names | Array | ["ID"] |
| tokens | Transaction tokens | Object | {...} |
| before | Previous row values | Object | {...} |
| after | Current row values | Object | {...} |
CREATE TABLE ogg_with_metadata (
-- Regular data columns
id BIGINT,
name STRING,
email STRING,
-- Metadata columns
ogg_table STRING METADATA FROM 'table',
ogg_op_type STRING METADATA FROM 'op_type',
ogg_op_ts TIMESTAMP(6) METADATA FROM 'op_ts',
ogg_current_ts TIMESTAMP(6) METADATA FROM 'current_ts',
ogg_pos STRING METADATA FROM 'pos'
) WITH (
'connector' = 'kafka',
'format' = 'ogg-json'
-- other connector options
);OGG JSON format maps Oracle types to Flink types:
| Oracle Type | OGG JSON | Flink Type | Notes |
|---|---|---|---|
| NUMBER | Number/String | DECIMAL | Depends on precision |
| INTEGER | Number | INT | |
| FLOAT | Number | FLOAT | |
| BINARY_FLOAT | Number | FLOAT | |
| BINARY_DOUBLE | Number | DOUBLE | |
| VARCHAR2 | String | STRING | |
| CHAR | String | STRING | |
| NVARCHAR2 | String | STRING | Unicode |
| CLOB | String | STRING | Large text |
| DATE | String | TIMESTAMP | Oracle DATE includes time |
| TIMESTAMP | String | TIMESTAMP | High precision |
| TIMESTAMP WITH TIME ZONE | String | TIMESTAMP_LTZ | With timezone |
| RAW | String | BYTES | Hex encoded |
| BLOB | String | BYTES | Base64 encoded |
| XMLTYPE | String | STRING | As XML string |
| JSON | String | STRING | As JSON string |
Configure timestamp parsing for Oracle's high-precision timestamps:
// Oracle default timestamp format with microseconds
config.set(OggJsonFormatOptions.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss.SSSSSS");
// ISO-8601 format
config.set(OggJsonFormatOptions.TIMESTAMP_FORMAT, "ISO-8601");
// Custom format for specific Oracle DATE format
config.set(OggJsonFormatOptions.TIMESTAMP_FORMAT, "dd-MON-yy HH.mm.ss.SSSSSS AM");Oracle supports various timestamp precisions:
CREATE TABLE ogg_timestamps (
id BIGINT,
name STRING,
-- Different Oracle timestamp types
created_date TIMESTAMP(3), -- TIMESTAMP(3)
updated_ts TIMESTAMP(6), -- TIMESTAMP(6)
event_time TIMESTAMP(9), -- TIMESTAMP(9)
tz_time TIMESTAMP_LTZ(6), -- TIMESTAMP WITH TIME ZONE
-- OGG metadata timestamps
op_timestamp TIMESTAMP(6) METADATA FROM 'op_ts',
current_timestamp TIMESTAMP(6) METADATA FROM 'current_ts'
) WITH (
'connector' = 'kafka',
'format' = 'ogg-json',
'ogg-json.timestamp-format' = 'yyyy-MM-dd HH:mm:ss.SSSSSS'
);OGG provides comprehensive transaction tracking:
{
"table": "USERS",
"op_type": "I",
"op_ts": "2023-01-01 10:00:00.123456",
"current_ts": "2023-01-01 10:00:01.789012",
"pos": "00000000170000001234",
"tokens": {
"TK-XID": "0004.00A.00000123",
"TK-CSN": "12345678901",
"TK-THREAD": "001"
},
"after": {...}
}-- Access transaction information
CREATE TABLE ogg_with_transaction (
id BIGINT,
name STRING,
-- Transaction metadata from tokens
xid STRING METADATA FROM 'tokens.TK-XID',
csn STRING METADATA FROM 'tokens.TK-CSN',
thread_id STRING METADATA FROM 'tokens.TK-THREAD'
) WITH (
'connector' = 'kafka',
'format' = 'ogg-json'
);OGG can output compressed change events:
{
"table": "USERS",
"op_type": "PK", -- Primary key update
"op_ts": "2023-01-01 10:00:00.123456",
"pos": "00000000170000001234",
"primary_keys": ["ID"],
"after": {
"ID": 1
}
}OGG can capture DDL changes:
{
"ddl": "ALTER TABLE USERS ADD COLUMN PHONE VARCHAR2(20)",
"op_type": "DDL",
"op_ts": "2023-01-01 10:00:00.123456",
"pos": "00000000170000001290",
"table": "USERS"
}Configure Oracle supplemental logging for complete change capture:
-- Enable supplemental logging in Oracle
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
ALTER TABLE users ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;Configure robust error handling for production Oracle environments:
// Ignore parsing errors for resilience
config.set(OggJsonFormatOptions.IGNORE_PARSE_ERRORS, true);
// Handle Oracle-specific null keys
config.set(OggJsonFormatOptions.JSON_MAP_NULL_KEY_MODE, "LITERAL");
config.set(OggJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL, "ORA_NULL");# GoldenGate extract parameters for optimal performance
EXTRACT ext1
USERID ggadmin, PASSWORD oracle
EXTTRAIL ./dirdat/lt
TABLE schema.users;
# Replicat parameters for JSON output
REPLICAT rep1
USERID ggadmin, PASSWORD oracle
MAP schema.users, TARGET kafka.topic;// Optimize for high-throughput Oracle changes
Properties kafkaProps = new Properties();
kafkaProps.setProperty("batch.size", "32768");
kafkaProps.setProperty("linger.ms", "5");
kafkaProps.setProperty("compression.type", "snappy");
kafkaProps.setProperty("max.request.size", "10485760");Handle Oracle schema changes:
-- Flexible schema for handling Oracle schema evolution
CREATE TABLE ogg_schema_evolution (
-- Core columns
id BIGINT,
name STRING,
-- Catch new columns
full_record STRING, -- Complete JSON for analysis
-- Metadata for debugging
ogg_table STRING METADATA FROM 'table',
ogg_op_type STRING METADATA FROM 'op_type',
ogg_pos STRING METADATA FROM 'pos'
) WITH (
'connector' = 'kafka',
'format' = 'ogg-json',
'ogg-json.ignore-parse-errors' = 'true'
);Key metrics for Oracle GoldenGate:
-- Real-time Oracle to data warehouse synchronization
INSERT INTO data_warehouse.users_dim
SELECT
id,
name,
email,
CASE
WHEN ogg_op_type = 'D' THEN CURRENT_TIMESTAMP
ELSE NULL
END as deleted_at,
ogg_op_ts as last_modified
FROM ogg_with_metadata
WHERE ogg_table = 'USERS';-- Analyze Oracle change patterns
SELECT
ogg_table,
ogg_op_type,
COUNT(*) as operation_count,
AVG(TIMESTAMPDIFF(SECOND, ogg_op_ts, ogg_current_ts)) as avg_latency_seconds
FROM ogg_with_metadata
WHERE ogg_op_ts >= CURRENT_TIMESTAMP - INTERVAL '1' HOUR
GROUP BY ogg_table, ogg_op_type
ORDER BY operation_count DESC;Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-json