Apache Flink SQL, Table API, and UDF development for both OSS Flink and Confluent Cloud
92
89%
Does it follow best practices?
Impact
98%
1.22xAverage score across 5 eval scenarios
Advisory
Suggest reviewing before use
Advanced SQL patterns for stream processing.
CREATE TABLE orders (
order_id STRING,
customer_id STRING,
amount DECIMAL(10, 2),
order_time TIMESTAMP(3),
-- Metadata columns
`partition` INT METADATA FROM 'partition' VIRTUAL,
`offset` BIGINT METADATA FROM 'offset' VIRTUAL,
-- Watermark
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND,
-- Primary key for upsert mode
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'flink-consumer',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
);-- Tables created from Kafka topics are auto-discovered
-- Just reference them:
SELECT * FROM `cluster`.`database`.`topic_name`;
-- Or create explicitly with specific schema
CREATE TABLE my_events (
event_id STRING,
event_type STRING,
payload STRING,
event_time TIMESTAMP_LTZ(3),
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
);-- Simple view
CREATE VIEW high_value_orders AS
SELECT * FROM orders WHERE amount > 1000;
-- Materialized aggregation (continuous query)
CREATE VIEW customer_totals AS
SELECT
customer_id,
COUNT(*) as order_count,
SUM(amount) as total_amount
FROM orders
GROUP BY customer_id;SELECT
window_start,
window_end,
product_id,
COUNT(*) as sale_count,
SUM(quantity) as total_quantity,
AVG(price) as avg_price,
MIN(price) as min_price,
MAX(price) as max_price
FROM TABLE(
TUMBLE(TABLE sales, DESCRIPTOR(sale_time), INTERVAL '1' HOUR)
)
GROUP BY window_start, window_end, product_id;-- 5-minute slides over 1-hour windows
SELECT
window_start,
window_end,
sensor_id,
AVG(temperature) as avg_temp,
COUNT(*) as reading_count
FROM TABLE(
HOP(
TABLE sensor_readings,
DESCRIPTOR(reading_time),
INTERVAL '5' MINUTE, -- slide
INTERVAL '1' HOUR -- size
)
)
GROUP BY window_start, window_end, sensor_id;SELECT
window_start,
window_end,
user_id,
COUNT(*) as action_count,
LISTAGG(action_type, ', ') as actions
FROM TABLE(
SESSION(TABLE user_actions, DESCRIPTOR(action_time), INTERVAL '30' MINUTE)
)
GROUP BY window_start, window_end, user_id;-- Daily cumulative with hourly updates
SELECT
window_start,
window_end,
SUM(revenue) as cumulative_revenue
FROM TABLE(
CUMULATE(
TABLE transactions,
DESCRIPTOR(transaction_time),
INTERVAL '1' HOUR, -- step
INTERVAL '1' DAY -- max size
)
)
GROUP BY window_start, window_end;-- Watermark allows 5 seconds late data
CREATE TABLE events (
event_id STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (...);
-- Late events (beyond watermark) are dropped from window resultsFeed fine-grained window results into a coarser window to reduce computation:
-- Step 1: 1-minute pre-aggregation
CREATE VIEW minute_stats AS
SELECT
window_start,
window_end,
sensor_id,
AVG(temperature) as avg_temp,
COUNT(*) as reading_count
FROM TABLE(TUMBLE(TABLE sensor_readings, DESCRIPTOR(reading_time), INTERVAL '1' MINUTE))
GROUP BY window_start, window_end, sensor_id;
-- Step 2: 5-minute aggregation from pre-aggregated results
SELECT
window_start,
window_end,
sensor_id,
AVG(avg_temp) as avg_temp_5m,
SUM(reading_count) as total_readings_5m
FROM TABLE(TUMBLE(TABLE minute_stats, DESCRIPTOR(window_end), INTERVAL '5' MINUTE))
GROUP BY window_start, window_end, sensor_id;Tip: Chaining reduces the number of rows the coarser window processes. Especially useful when the fine-grained window has high cardinality.
-- CAUTION: State grows unboundedly
SELECT o.*, c.name
FROM orders o
JOIN customers c ON o.customer_id = c.id;-- Orders joined with shipments within 4 hours
SELECT
o.order_id,
o.order_time,
s.ship_time,
TIMESTAMPDIFF(MINUTE, o.order_time, s.ship_time) as minutes_to_ship
FROM orders o, shipments s
WHERE o.order_id = s.order_id
AND s.ship_time BETWEEN o.order_time AND o.order_time + INTERVAL '4' HOUR;-- Join orders with exchange rates at order time
SELECT
o.order_id,
o.amount,
o.currency,
r.rate,
o.amount * r.rate as amount_usd
FROM orders o
JOIN currency_rates FOR SYSTEM_TIME AS OF o.order_time AS r
ON o.currency = r.currency;-- Enrich with customer data from external DB (OSS Flink)
SELECT o.*, c.name, c.email
FROM orders o
JOIN customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;⚠️ Confluent Cloud:
PROCTIME()is not supported. For per-row lookups on CC, pick one of these patterns:A. External Tables +
KEY_SEARCH_AGG(canonical path for real external DB / REST API lookups — supports Postgres, MySQL, SQL Server, Oracle, REST, MongoDB, Couchbase):SELECT o.order_id, t.name, t.tier FROM orders o, LATERAL TABLE(KEY_SEARCH_AGG(customers_ext, DESCRIPTOR(customer_id), customer_id)) CROSS JOIN UNNEST(search_results) AS t(customer_id, name, tier);Requires a prior
CREATE CONNECTIONand aCREATE TABLE ... WITH ('connector' = 'confluent-jdbc', ...). See confluent-cloud.md — External Tables.B. Regular join against upsert-kafka (when the reference data is already in a compacted Kafka topic):
SELECT o.*, c.name FROM orders o LEFT JOIN customers_ref c -- compacted upsert-kafka ON o.customer_id = c.id;This produces a changelog stream, so the sink must be
changelog.mode = 'upsert'with aPRIMARY KEY, and the query cannot includeCURRENT_TIMESTAMPor other non-deterministic functions.
-- Join two windowed streams
SELECT
L.window_start,
L.window_end,
L.key,
L.left_value,
R.right_value
FROM (
SELECT window_start, window_end, key, SUM(value) as left_value
FROM TABLE(TUMBLE(TABLE left_stream, DESCRIPTOR(ts), INTERVAL '1' HOUR))
GROUP BY window_start, window_end, key
) L
JOIN (
SELECT window_start, window_end, key, SUM(value) as right_value
FROM TABLE(TUMBLE(TABLE right_stream, DESCRIPTOR(ts), INTERVAL '1' HOUR))
GROUP BY window_start, window_end, key
) R
ON L.key = R.key AND L.window_start = R.window_start;Enrich a fact stream with multiple dimension tables in a single query:
-- Denormalize train activities with station, passenger, and channel dimensions
SELECT
a.activity_id,
a.activity_time,
s.station_name,
s.city,
p.passenger_name,
c.channel_type
FROM train_activities a
JOIN stations FOR SYSTEM_TIME AS OF a.activity_time AS s
ON a.station_id = s.station_id
JOIN passengers FOR SYSTEM_TIME AS OF a.activity_time AS p
ON a.passenger_id = p.passenger_id
JOIN booking_channels FOR SYSTEM_TIME AS OF a.activity_time AS c
ON a.channel_id = c.channel_id;Warning: Each temporal join adds state. Monitor state size when joining many dimensions.
Evaluate a correlated subquery per input row with automatic retraction on updates. Note: this is distinct from LATERAL TABLE(udtf()) used for table function expansion (see udf-guide.md).
-- For each state, find top 2 cities by population (updates as population changes)
SELECT
s.state,
c.city,
c.population
FROM states s,
LATERAL (
SELECT city, population
FROM cities
WHERE cities.state_id = s.state_id
ORDER BY population DESC
LIMIT 2
) AS c;SELECT
customer_id,
COUNT(*) as order_count,
SUM(amount) as total_amount
FROM orders
GROUP BY customer_id
HAVING SUM(amount) > 10000;SELECT
category,
COUNT(DISTINCT user_id) as unique_users,
COUNT(*) as total_events
FROM page_views
GROUP BY category;SELECT
order_id,
customer_id,
amount,
SUM(amount) OVER (
PARTITION BY customer_id
ORDER BY order_time
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) as running_total
FROM orders;SELECT
event_id,
user_id,
COUNT(*) OVER (
PARTITION BY user_id
ORDER BY event_time
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
) as events_last_hour
FROM events;-- Flag readings more than 2 standard deviations from rolling average
SELECT
sensor_id,
reading_time,
temperature,
avg_temp,
stddev_temp,
CASE
WHEN ABS(temperature - avg_temp) > 2 * stddev_temp THEN 'OUTLIER'
ELSE 'NORMAL'
END AS status
FROM (
SELECT
sensor_id,
reading_time,
temperature,
AVG(temperature) OVER w AS avg_temp,
STDDEV(temperature) OVER w AS stddev_temp
FROM sensor_readings
WINDOW w AS (
PARTITION BY sensor_id
ORDER BY reading_time
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
)
);Row-to-row comparison for trend detection. Note: LAG here is a standard window function — distinct from LAG() inside MATCH_RECOGNIZE DEFINE clauses (see Pattern Detection section).
-- Detect price movement direction
SELECT
product_id,
event_time,
price,
LAG(price) OVER (PARTITION BY product_id ORDER BY event_time) AS prev_price,
CASE
WHEN price > LAG(price) OVER (PARTITION BY product_id ORDER BY event_time) THEN '▲'
WHEN price < LAG(price) OVER (PARTITION BY product_id ORDER BY event_time) THEN '▼'
ELSE '='
END AS trend
FROM price_updates;In streaming mode, standalone ORDER BY requires a time attribute — Flink cannot sort an unbounded stream arbitrarily.
-- ✅ Valid: ORDER BY time attribute
SELECT * FROM events ORDER BY event_time;
-- ✅ Valid: ORDER BY with LIMIT (bounded result)
SELECT * FROM events ORDER BY amount DESC LIMIT 10;
-- ❌ Invalid in streaming: ORDER BY non-time column without LIMIT
-- SELECT * FROM events ORDER BY amount; -- failsCREATE TABLE cdc_customers (
id INT,
name STRING,
email STRING,
updated_at TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.inventory.customers',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'debezium-json'
);CREATE TABLE customer_summary (
customer_id STRING,
order_count BIGINT,
total_amount DECIMAL(10, 2),
PRIMARY KEY (customer_id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'customer-summary',
'properties.bootstrap.servers' = 'localhost:9092',
'key.format' = 'json',
'value.format' = 'json'
);
INSERT INTO customer_summary
SELECT
customer_id,
COUNT(*) as order_count,
SUM(amount) as total_amount
FROM orders
GROUP BY customer_id;SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY user_id
ORDER BY event_time DESC
) AS rn
FROM events
)
WHERE rn = 1;SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY user_id
ORDER BY event_time ASC
) AS rn
FROM events
)
WHERE rn = 1;SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY category
ORDER BY sales DESC
) AS rn
FROM products
)
WHERE rn <= 5;SELECT *
FROM transactions
MATCH_RECOGNIZE (
PARTITION BY account_id
ORDER BY transaction_time
MEASURES
FIRST(A.transaction_time) AS first_txn,
LAST(A.transaction_time) AS last_txn,
COUNT(A.amount) AS txn_count,
SUM(A.amount) AS total_amount
ONE ROW PER MATCH
AFTER MATCH SKIP PAST LAST ROW
PATTERN (A{3,} B)
DEFINE
A AS A.amount > 1000 AND A.location <> LAG(A.location),
B AS B.amount > 5000
) AS fraud;SELECT *
FROM page_views
MATCH_RECOGNIZE (
PARTITION BY user_id
ORDER BY view_time
MEASURES
FIRST(A.view_time) AS session_start,
LAST(A.view_time) AS session_end,
COUNT(*) AS page_count
ONE ROW PER MATCH
AFTER MATCH SKIP PAST LAST ROW
PATTERN (A+)
DEFINE
A AS LAST(A.view_time, 1) IS NULL
OR A.view_time - LAST(A.view_time, 1) < INTERVAL '30' MINUTE
);-- Extract nested JSON
SELECT
order_id,
JSON_VALUE(payload, '$.customer.name') as customer_name,
JSON_VALUE(payload, '$.items[0].product_id') as first_product,
JSON_QUERY(payload, '$.items') as all_items
FROM orders;
-- Explode JSON array
SELECT
order_id,
item.product_id,
item.quantity
FROM orders,
LATERAL TABLE(JSON_ARRAY_ELEMENTS(JSON_QUERY(payload, '$.items'))) AS item;SELECT
window_start,
COUNT(*) as total_orders,
COUNT(*) FILTER (WHERE status = 'completed') as completed_orders,
SUM(amount) FILTER (WHERE status = 'completed') as completed_revenue,
AVG(amount) FILTER (WHERE amount > 100) as avg_large_order
FROM TABLE(TUMBLE(TABLE orders, DESCRIPTOR(order_time), INTERVAL '1' HOUR))
GROUP BY window_start;-- Union (removes duplicates)
SELECT user_id, event_type FROM web_events
UNION
SELECT user_id, event_type FROM mobile_events;
-- Union All (keeps duplicates)
SELECT user_id, event_type FROM web_events
UNION ALL
SELECT user_id, event_type FROM mobile_events;
-- Intersect
SELECT user_id FROM active_users
INTERSECT
SELECT user_id FROM premium_users;
-- Except
SELECT user_id FROM all_users
EXCEPT
SELECT user_id FROM churned_users;-- Insert to multiple sinks from same source
BEGIN STATEMENT SET;
INSERT INTO high_value_orders
SELECT * FROM orders WHERE amount > 1000;
INSERT INTO regular_orders
SELECT * FROM orders WHERE amount <= 1000;
INSERT INTO order_metrics
SELECT
DATE_FORMAT(order_time, 'yyyy-MM-dd') as order_date,
COUNT(*) as order_count
FROM orders
GROUP BY DATE_FORMAT(order_time, 'yyyy-MM-dd');
END;Unlike continuous Top-N above (which emits updating results), Window Top-N emits final results per window — no retractions.
-- Top 3 suppliers per 5-minute window (final, non-updating results)
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY window_start, window_end
ORDER BY total_sales DESC
) AS rn
FROM (
SELECT
window_start, window_end,
supplier_id,
SUM(sales) as total_sales
FROM TABLE(TUMBLE(TABLE orders, DESCRIPTOR(order_time), INTERVAL '5' MINUTE))
GROUP BY window_start, window_end, supplier_id
)
)
WHERE rn <= 3;Instead of dropping late data, route it to a separate sink for reprocessing:
-- Fork timely vs late data using CURRENT_WATERMARK
BEGIN STATEMENT SET;
INSERT INTO timely_events
SELECT * FROM events
WHERE event_time >= CURRENT_WATERMARK(event_time);
INSERT INTO late_events
SELECT * FROM events
WHERE event_time < CURRENT_WATERMARK(event_time);
END;Tip: Combine with Statement Sets (above) to process both paths in a single Flink job.
Override table connector properties at query time without modifying catalog definitions:
-- Read from latest offset (ignoring catalog's startup mode)
SELECT * FROM orders /*+ OPTIONS('scan.startup.mode' = 'latest-offset') */;
-- Override parallelism for a specific scan
SELECT * FROM events /*+ OPTIONS('scan.parallelism' = '4') */;
-- Change format for debugging
SELECT * FROM raw_logs /*+ OPTIONS('format' = 'raw') */;Expand typed ARRAY columns into individual rows. Note: for JSON arrays, see JSON Processing above.
-- Expand an ARRAY column into rows
SELECT
order_id,
tag
FROM orders
CROSS JOIN UNNEST(tags) AS T(tag);
-- With preserved columns
SELECT
order_id,
customer_id,
item_id,
item_qty
FROM orders
CROSS JOIN UNNEST(items_id, items_qty) AS T(item_id, item_qty);