Apache Flink SQL, Table API, and UDF development for both OSS Flink and Confluent Cloud
92
89%
Does it follow best practices?
Impact
98%
1.22xAverage score across 5 eval scenarios
Advisory
Suggest reviewing before use
Common errors, causes, and solutions.
Column 'column_name' not found in table 'my_table'Causes:
Solutions:
-- Check actual schema
DESCRIBE my_table;
-- Use backticks for special characters
SELECT `my-column` FROM my_table;
-- Fully qualify table name
SELECT * FROM `catalog`.`database`.`table`;Cannot apply '=' to arguments of type '<INT, STRING>'Causes:
Solutions:
-- Explicit cast
SELECT * FROM t WHERE CAST(id AS STRING) = '123';
-- Use correct type
SELECT * FROM t WHERE id = 123;Column 'id' is ambiguousCauses:
Solutions:
-- Use table alias
SELECT a.id, b.id FROM table_a a JOIN table_b b ON a.key = b.key;Unable to create a source for reading table 'my_table'Causes:
Solutions:
-- Verify table exists
SHOW TABLES;
-- Check full path
SHOW TABLES IN `catalog`.`database`;
-- Recreate table with explicit schema
DROP TABLE IF EXISTS my_table;
CREATE TABLE my_table (...);Cannot generate watermark for column 'event_time': column not found or not a timestampCauses:
Solutions:
-- Check column type
DESCRIBE my_table;
-- Ensure timestamp type
CREATE TABLE events (
id STRING,
event_time TIMESTAMP(3), -- Must be TIMESTAMP or TIMESTAMP_LTZ
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
);An event-time attribute must be defined on input tableCauses:
Solutions:
-- Add watermark
CREATE TABLE events (
id STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
);
-- Or use processing time
CREATE TABLE events (
id STRING,
proc_time AS PROCTIME()
);Symptoms:
Causes:
Solutions:
-- Increase watermark delay
WATERMARK FOR event_time AS event_time - INTERVAL '1' MINUTE;
-- Use processing time for non-time-critical operationsSymptoms:
Causes:
Solutions:
-- Check watermark progress in logs
-- Add idle timeout (Kafka connector)
'properties.flink.partition-discovery.interval-millis' = '60000'
-- Use processing time for testing
SELECT * FROM TABLE(
TUMBLE(TABLE events, DESCRIPTOR(PROCTIME()), INTERVAL '1' MINUTE)
);State size for key exceeds configured limitCauses:
Solutions:
-- Add state TTL (in table config)
SET 'table.exec.state.ttl' = '24h';
-- Use windowed aggregation instead of unbounded
SELECT window_start, COUNT(*)
FROM TABLE(TUMBLE(TABLE events, DESCRIPTOR(ts), INTERVAL '1' HOUR))
GROUP BY window_start;
-- Add explicit deduplication with TTLSymptoms:
Causes:
Solutions:
# Increase task manager memory
taskmanager.memory.process.size: 4096m
# Enable RocksDB for large state
state.backend: rocksdb-- Reduce state with windows
-- Add TTL to configuration
SET 'table.exec.state.ttl' = '1h';Checkpoint was declined (tasks not ready)Causes:
Solutions:
# Increase checkpoint timeout
execution.checkpointing.timeout: 10min
# Enable incremental checkpoints (RocksDB)
state.backend.incremental: trueInput of interval join must have watermarkCauses:
Solutions:
-- Ensure both tables have watermarks
CREATE TABLE orders (
id STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
);
CREATE TABLE shipments (
order_id STRING,
ship_time TIMESTAMP(3),
WATERMARK FOR ship_time AS ship_time - INTERVAL '5' SECOND
);WARNING: Join will result in unbounded state growthCauses:
Solutions:
-- Use interval join
SELECT * FROM orders o, shipments s
WHERE o.id = s.order_id
AND s.ship_time BETWEEN o.order_time AND o.order_time + INTERVAL '4' HOUR;
-- Or temporal join
SELECT * FROM orders o
JOIN dim_table FOR SYSTEM_TIME AS OF o.proc_time AS d
ON o.key = d.key;
-- Or configure TTL
SET 'table.exec.state.ttl' = '24h';Lookup join requires processing time attributeCauses:
Solutions (OSS Flink):
-- Add processing time column
CREATE TABLE orders (
id STRING,
customer_id STRING,
proc_time AS PROCTIME()
);
-- Use in join
SELECT o.*, c.name
FROM orders o
JOIN customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;Function 'PROCTIME' is not supported in Confluent's Flink SQL dialect.Cause: Confluent Cloud Flink has no PROCTIME() function. The textbook temporal lookup-join syntax doesn't compile there.
Solutions (pick the one that matches your data):
-- Option 1 (CANONICAL): External Table + KEY_SEARCH_AGG
-- Use this when the reference data lives in a real external DB or REST API.
-- Requires CREATE CONNECTION + CREATE TABLE ... WITH ('connector' = 'confluent-jdbc',...)
SELECT o.order_id, t.name, t.tier
FROM orders o,
LATERAL TABLE(KEY_SEARCH_AGG(customers_ext, DESCRIPTOR(customer_id), customer_id))
CROSS JOIN UNNEST(search_results) AS t(customer_id, name, tier);
-- Option 2: Regular join against upsert-kafka reference table
-- Use this when the reference data is already in a compacted Kafka topic.
-- Emits a changelog stream, so sink must be upsert + PK.
SELECT o.*, c.name
FROM orders o
LEFT JOIN customers_ref c -- upsert-kafka, compacted
ON o.customer_id = c.id;
-- Option 3: Event-time temporal join (if versioning is what you actually want)
SELECT o.*, c.name
FROM orders o
JOIN customers_cdc FOR SYSTEM_TIME AS OF o.order_time AS c
ON o.customer_id = c.id;See confluent-cloud.md — External Tables for the full KEY_SEARCH_AGG reference and the upsert-kafka workaround.
Table sink '...' doesn't support consuming update and delete changes
which is produced by node Join(joinType=[LeftOuterJoin], ...)Cause: A regular JOIN against an upsert-kafka (or other upsert) table emits update/delete rows, but the sink was declared with changelog.mode = 'append'.
Solution: make the sink upsert-capable with a PRIMARY KEY:
CREATE TABLE orders_enriched (
order_id STRING NOT NULL,
customer_name STRING,
...,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'changelog.mode' = 'upsert', -- was 'append'
...
);Alternative: switch to an event-time temporal join (FOR SYSTEM_TIME AS OF o.event_time) which produces insert-only output and can write to an append sink.
The column(s): enriched_at (generated by non-deterministic function:
CURRENT_TIMESTAMP) can not satisfy the determinism requirement for
correctly processing update message ('UB'/'UA'/'D' in changelogMode,
not 'I' only)Cause: Flink may need to replay updates (e.g., to handle retractions). If the query includes a non-deterministic function (CURRENT_TIMESTAMP, NOW(), RAND(), UUID()), the replayed value would differ from the original — breaking correctness.
Common trigger: adding a "processed at" timestamp column to a join with an upsert table.
Solutions:
-- ❌ Fails when the query produces updates
SELECT o.*, c.name, CURRENT_TIMESTAMP AS enriched_at
FROM orders o LEFT JOIN customers_upsert c ON o.cid = c.id;
-- ✅ Fix 1: drop the non-deterministic column
SELECT o.*, c.name
FROM orders o LEFT JOIN customers_upsert c ON o.cid = c.id;
-- ✅ Fix 2: carry a deterministic timestamp from the source
SELECT o.*, c.name, o.order_time AS enriched_at
FROM orders o LEFT JOIN customers_upsert c ON o.cid = c.id;
-- ✅ Fix 3: use an insert-only join (temporal join) — no updates to replay
SELECT o.*, c.name, CURRENT_TIMESTAMP AS enriched_at
FROM orders o
JOIN customers_cdc FOR SYSTEM_TIME AS OF o.order_time AS c
ON o.cid = c.id;Function 'my_function' not foundCauses:
Solutions:
-- Check registered functions
SHOW FUNCTIONS;
-- Register function
CREATE FUNCTION my_function AS 'com.example.MyFunction';
-- Use correct scope
CREATE TEMPORARY FUNCTION my_function AS 'com.example.MyFunction';No matching method 'eval' found for class 'MyFunction'Causes:
eval methodSolutions:
public class MyFunction extends ScalarFunction {
// Must be public and named 'eval'
public String eval(String input) {
return input.toUpperCase();
}
}User-defined function threw exception: NullPointerExceptionCauses:
Solutions:
public String eval(String input) {
// Handle null
if (input == null) {
return null;
}
return input.toUpperCase();
}401 Unauthorized: Invalid API keyCauses:
Solutions:
# Generate new Flink API key
confluent flink api-key create \
--environment $ENV_ID \
--cloud $CLOUD_PROVIDER \
--region $CLOUD_REGION
# Verify environment variables
echo $FLINK_API_KEY
echo $FLINK_API_SECRETCompute pool 'lfcp-xxxxx' not foundCauses:
Solutions:
# List available pools
confluent flink compute-pool list
# Verify environment
confluent environment list
confluent environment use $ENV_IDTable 'topic_name' not foundCauses:
Solutions:
-- Use fully qualified name
SELECT * FROM `cluster_id`.`database_name`.`topic_name`;
-- Check available tables
SHOW CATALOGS;
SHOW DATABASES IN `cluster_id`;
SHOW TABLES IN `cluster_id`.`database_name`;Artifact 'cfa-xxxxx' not foundCauses:
Solutions:
# List artifacts
confluent flink artifact list
# Upload artifact
confluent flink artifact create my-udf \
--cloud aws --region us-east-1 \
--artifact-file target/my-udf.jarStatement cannot be scheduled: compute pool CFU limit exceededCauses:
Solutions:
# Check pool usage
confluent flink compute-pool describe $COMPUTE_POOL_ID
# Stop unused statements
confluent flink statement list
confluent flink statement stop unused-statement
# Create larger pool
confluent flink compute-pool create large-pool \
--cloud aws --region us-east-1 --max-cfu 20Symptoms:
Causes:
Solutions:
-- Simplify query
-- Use more efficient joins
-- Add indexes (lookup tables)
-- Increase parallelism
SET 'parallelism.default' = '8';Symptoms:
Causes:
Solutions:
-- Use projection pushdown
SELECT needed_column FROM large_table;
-- Avoid SELECT *
-- Pre-aggregate before joining
-- Optimize UDFs (batch processing)Symptoms:
Causes:
Solutions:
-- Add salting for hot keys
SELECT
CONCAT(key, '-', CAST(FLOOR(RAND() * 10) AS STRING)) as salted_key,
value
FROM events;
-- Re-partition
-- Use local aggregation before global# Set log level
log4j.rootLogger=DEBUG, consoleEXPLAIN SELECT * FROM orders WHERE amount > 100;
EXPLAIN PLAN FOR INSERT INTO output SELECT * FROM input;Monitor watermark in Flink UI or logs to understand event-time progression.
-- Use EXPLAIN to validate without executing
EXPLAIN SELECT * FROM my_complex_query;