Apache Flink SQL, Table API, and UDF development for both OSS Flink and Confluent Cloud
92
89%
Does it follow best practices?
Impact
98%
1.22xAverage score across 5 eval scenarios
Advisory
Suggest reviewing before use
Confluent-specific features, deployment patterns, and CLI commands.
confluent update)# Required
export CLOUD_PROVIDER="aws" # aws, azure, gcp
export CLOUD_REGION="us-east-1" # region code
export ORG_ID="b0b421724-xxxx-xxxx" # Organization ID
export ENV_ID="env-xxxxx" # Environment ID
export COMPUTE_POOL_ID="lfcp-xxxxx" # Compute pool ID
export FLINK_API_KEY="<key>" # Flink API key
export FLINK_API_SECRET="<secret>" # Flink API secret
# Optional (for UDF uploads)
export ARTIFACT_API_KEY="<key>" # Cloud resource management key
export ARTIFACT_API_SECRET="<secret>"confluent flink api-key create \
--environment $ENV_ID \
--cloud $CLOUD_PROVIDER \
--region $CLOUD_REGION# List compute pools
confluent flink compute-pool list
# Create compute pool
confluent flink compute-pool create my-pool \
--cloud aws \
--region us-east-1 \
--max-cfu 10
# Describe compute pool
confluent flink compute-pool describe lfcp-xxxxx
# Delete compute pool
confluent flink compute-pool delete lfcp-xxxxx
# Set default compute pool
confluent flink compute-pool use lfcp-xxxxx# Start interactive SQL shell
confluent flink shell \
--cloud $CLOUD_PROVIDER \
--region $CLOUD_REGION \
--environment $ENV_ID \
--compute-pool $COMPUTE_POOL_ID
# In shell:
# > SHOW TABLES;
# > SELECT * FROM my_table LIMIT 10;
# > !quit# List statements
confluent flink statement list
# Create statement (run SQL)
confluent flink statement create my-statement \
--sql "INSERT INTO sink_table SELECT * FROM source_table" \
--compute-pool $COMPUTE_POOL_ID
# Describe statement
confluent flink statement describe my-statement
# Stop statement (pause)
confluent flink statement stop my-statement
# Resume statement
confluent flink statement resume my-statement
# Delete statement
confluent flink statement delete my-statement
# View exceptions
confluent flink statement exception list my-statement# List artifacts
confluent flink artifact list
# Upload artifact
confluent flink artifact create my-udf \
--cloud aws \
--region us-east-1 \
--artifact-file target/my-udf-1.0.jar
# Describe artifact
confluent flink artifact describe cfa-xxxxx
# Delete artifact
confluent flink artifact delete cfa-xxxxx# Create savepoint
confluent flink savepoint create \
--statement my-statement
# List savepoints
confluent flink savepoint list
# Describe savepoint
confluent flink savepoint describe sp-xxxxx
# Resume from savepoint
confluent flink statement resume my-statement \
--savepoint sp-xxxxxpom.xml:
<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>io.confluent.flink</groupId>
<artifactId>confluent-flink-table-api-java-plugin</artifactId>
<version>2.1-8</version>
</dependency>
</dependencies>Main class:
import io.confluent.flink.plugin.ConfluentSettings;
import io.confluent.flink.plugin.ConfluentTools;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
public class FlinkApp {
public static void main(String[] args) {
// Create environment from env vars
TableEnvironment env = TableEnvironment.create(
ConfluentSettings.fromGlobalVariables()
);
// Execute SQL
TableResult result = env.executeSql(
"SELECT * FROM `cluster`.`database`.`topic`"
);
// Print results
ConfluentTools.printMaterializedLimit(result, 100);
}
}pip install confluent-flink-table-api-python-pluginMain script:
from pyflink.table.confluent import ConfluentSettings, ConfluentTools
from pyflink.table import TableEnvironment
# Create environment from env vars
settings = ConfluentSettings.from_global_variables()
env = TableEnvironment.create(settings)
# Execute SQL
result = env.execute_sql(
"SELECT * FROM `cluster`.`database`.`topic`"
)
# Print results
ConfluentTools.print_materialized_limit(result, 100)Confluent tables have special system columns:
| Column | Type | Description |
|---|---|---|
$rowtime | TIMESTAMP_LTZ(3) | Event time from Kafka timestamp |
$headers | MAP<STRING, BYTES> | Kafka headers |
-- Use system columns
SELECT
id,
`$rowtime` as event_time,
`$headers`['correlation-id'] as correlation_id
FROM my_topic;
-- Watermark on $rowtime
CREATE TABLE my_events (
id STRING,
payload STRING,
WATERMARK FOR `$rowtime` AS `$rowtime` - INTERVAL '5' SECOND
);import io.confluent.flink.plugin.ConfluentTableDescriptor;
TableDescriptor descriptor = ConfluentTableDescriptor.forManaged()
.schema(Schema.newBuilder()
.column("id", DataTypes.STRING())
.column("value", DataTypes.INT())
.watermark("$rowtime", $("$rowtime").minus(lit(5).seconds()))
.build())
.build();
env.createTable("my_table", descriptor);// Collect results (blocking)
List<Row> rows = ConfluentTools.collectMaterializedLimit(table, 100);
List<Row> allRows = ConfluentTools.collectMaterialized(table); // Bounded only
// Print results
ConfluentTools.printMaterializedLimit(table, 100);
ConfluentTools.printMaterialized(table); // Bounded only
// Statement lifecycle
TableResult result = env.executeSql("SELECT ...");
String name = ConfluentTools.getStatementName(result);
ConfluentTools.stopStatement(result);
ConfluentTools.stopStatementByName(env, "statement-name");
ConfluentTools.deleteStatement(env, "statement-name");<environment>
├── <kafka-cluster-1>
│ └── <database-1>
│ ├── topic_a
│ ├── topic_b
│ └── my_view
└── <kafka-cluster-2>
└── <database-2>
└── topic_c-- Fully qualified name
SELECT * FROM `cluster_id`.`database_name`.`topic_name`;
-- Set default catalog/database
USE CATALOG `cluster_id`;
USE `database_name`;
SELECT * FROM topic_name;
-- Show structure
SHOW CATALOGS;
SHOW DATABASES;
SHOW TABLES;# Create long-running statement
confluent flink statement create enrichment-job \
--sql "INSERT INTO enriched_orders
SELECT o.*, c.name, c.tier
FROM orders o
JOIN customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id" \
--compute-pool $COMPUTE_POOL_ID# 1. Stop with savepoint
confluent flink savepoint create --statement my-job
# 2. Update job (new SQL or schema)
confluent flink statement delete my-job
confluent flink statement create my-job-v2 \
--sql "..." \
--compute-pool $COMPUTE_POOL_ID
# 3. Resume from savepoint
confluent flink statement resume my-job-v2 --savepoint sp-xxxxx# 1. Deploy new version alongside
confluent flink statement create job-v2 \
--sql "INSERT INTO output_v2 SELECT ..." \
--compute-pool $COMPUTE_POOL_ID
# 2. Verify new version
# 3. Stop old version
confluent flink statement stop job-v1
# 4. Switch consumers to new output
# 5. Delete old version
confluent flink statement delete job-v1Confluent can automatically scale compute pools:
# Enable autopilot (in Console)
# Or set max-cfu higher and let system scale
confluent flink compute-pool create my-pool \
--cloud aws \
--region us-east-1 \
--max-cfu 20 # System scales up to this# Check statement metrics
confluent flink statement describe my-statement
# View in Console: Flink > Statements > Metrics| Feature | Status |
|---|---|
| DataStream API | Not supported (Table API only) |
| Batch mode | Not supported (streaming only) |
| Custom connectors | Not supported |
| State backends | Managed by Confluent |
| Checkpointing | Managed by Confluent |
| Limitation | Value |
|---|---|
| Max UDFs per statement | 10 |
| Max artifacts per env | 100 |
| Max artifact size | 100 MB |
| Max row size | 4 MB |
| Java versions | 11, 17 |
| Python version | 3.11 only |
Not supported:
| Feature | Status |
|---|---|
| CREATE DATABASE | Not supported |
| CREATE CATALOG | Not supported |
| File connectors | Not supported |
Classic JDBC lookup join (FOR SYSTEM_TIME AS OF PROCTIME()) | Not supported — use External Tables / KEY_SEARCH_AGG instead (below) |
| Hive integration | Not supported |
PROCTIME() function | Not supported — see below |
CURRENT_TIMESTAMP in update queries | Rejected as non-deterministic — see below |
Confluent Cloud Flink rejects PROCTIME() with:
Function 'PROCTIME' is not supported in Confluent's Flink SQL dialect.
This means the textbook lookup-join pattern doesn't work:
-- ❌ FAILS on Confluent Cloud
SELECT o.*, c.name
FROM orders o
LEFT JOIN customers FOR SYSTEM_TIME AS OF PROCTIME() AS c
ON o.customer_id = c.id;Workarounds for "current-value lookup" semantics:
KEY_SEARCH_AGG — the canonical CC path for per-row lookups against an external database (Postgres/MySQL/SQL Server/Oracle), REST API, MongoDB, or Couchbase. See the "External Tables" section below. No proctime needed.SELECT o.*, c.name
FROM orders o
LEFT JOIN customers_upsert c -- upsert-kafka
ON o.customer_id = c.id;FOR SYSTEM_TIME AS OF o.order_time instead.FOR SYSTEM_TIME AS OF o.$rowtime uses Kafka ingestion time (requires the right side to have a watermark declared).Confluent Cloud Flink provides read-only External Tables for federated lookups against databases and REST APIs. This is the supported path for per-row enrichment from an external system — it replaces the OSS Flink JDBC/HBase lookup-join connectors.
Supported connectors:
| Connector | connector option value | Dialects / backends |
|---|---|---|
| Confluent JDBC | confluent-jdbc | Postgres, MySQL, SQL Server, Oracle |
| REST | rest | Any HTTPS JSON endpoint |
| MongoDB | mongodb | MongoDB Atlas |
| Couchbase | couchbase | Couchbase |
Three built-in TVFs operate over external tables:
KEY_SEARCH_AGG(ext_table, DESCRIPTOR(key_col), search_col) — exact-key lookupTEXT_SEARCH_AGG(...) — text / full-text searchVECTOR_SEARCH_AGG(...) — vector / semantic search (requires an embedding)Two-step setup — a CREATE CONNECTION (endpoint + credentials) then a CREATE TABLE that references it.
-- Step 1: connection (holds endpoint + credentials)
CREATE CONNECTION jdbc_postgres_connection
WITH (
'type' = 'confluent_jdbc', -- NOTE: underscore form in CREATE CONNECTION
'endpoint' = '<jdbc_url>',
'username' = '<username>',
'password' = '<password>',
'environment' = '<ENV_ID>'
);
-- Step 2: external table
CREATE TABLE netflix_shows (
show_id STRING,
title STRING,
release_year INT,
rating STRING
) WITH (
'connector' = 'confluent-jdbc', -- NOTE: hyphen form in CREATE TABLE
'confluent-jdbc.connection' = 'jdbc_postgres_connection',
'confluent-jdbc.table-name' = 'netflix_shows'
);Naming gotcha: connection type uses an underscore (
confluent_jdbc), but theCREATE TABLEconnectoroption uses a hyphen (confluent-jdbc). Both forms appear verbatim in the Confluent docs.
Usage — KEY_SEARCH_AGG + CROSS JOIN UNNEST:
The function returns an array column named search_results. Flatten it with UNNEST:
SELECT t.*
FROM input_titles,
LATERAL TABLE(KEY_SEARCH_AGG(netflix_shows, DESCRIPTOR(title), title))
CROSS JOIN UNNEST(search_results)
AS t(show_id, title, release_year, rating);Tuning options via the optional map[...] 4th argument:
LATERAL TABLE(KEY_SEARCH_AGG(
netflix_shows, DESCRIPTOR(title), title,
MAP['async_enabled', 'true',
'client_timeout', '30',
'max_parallelism', '10',
'retry_count', '3']
))| Option | Default | Notes |
|---|---|---|
async_enabled | true | Non-blocking lookups |
client_timeout | 30 | Seconds |
max_parallelism | 10 | Only applies when async_enabled = true |
retry_count | 3 | |
retry_error_list | — | Comma-separated error codes that trigger retry |
debug | false | Returns stack traces in $errors — may leak prompt/response content |
Limitations:
CROSS JOIN UNNEST(search_results) AS t(...).lookup.cache.* setting.confluent.cloud, auth header format is fixed.PROCTIME() needed — the pattern uses LATERAL TABLE(...), not FOR SYSTEM_TIME AS OF.Reference: Key Search with External Sources
A regular JOIN between an append stream and an upsert-kafka table emits UPDATE/DELETE messages (not insert-only). If the sink is declared 'changelog.mode' = 'append' the statement fails with:
Table sink '...' doesn't support consuming update and delete changes
Fix: declare the sink as upsert with a PRIMARY KEY:
CREATE TABLE orders_enriched (
order_id STRING NOT NULL,
customer_name STRING,
...,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'changelog.mode' = 'upsert', -- was 'append'
...
);You'll also see two startup warnings that are mostly cosmetic:
Any query that produces updates/deletes cannot include non-deterministic functions like CURRENT_TIMESTAMP, NOW(), RAND():
The column(s) ... can not satisfy the determinism requirement for correctly processing update message
Why: Flink may need to replay updates; if the function returns different values on replay, the output becomes inconsistent.
Fixes:
o.order_time instead of CURRENT_TIMESTAMP), orCheck current availability:
confluent flink region list --cloud aws
confluent flink region list --cloud azure
confluent flink region list --cloud gcp# Verify credentials
confluent flink shell --debug
# Check API key scope
confluent api-key list --resource flink# View exceptions
confluent flink statement exception list my-statement
# Check statement status
confluent flink statement describe my-statement| Error | Cause | Fix |
|---|---|---|
Authentication failed | Invalid API key | Regenerate Flink API key |
Compute pool not found | Wrong pool ID | Check confluent flink compute-pool list |
Table not found | Wrong catalog path | Use fully qualified name |
UDF not found | Wrong artifact ID | Verify artifact uploaded |
CFU limit exceeded | Pool at capacity | Increase max-cfu or add pool |