CtrlK
BlogDocsLog inGet started
Tessl Logo

gamussa/flink-sql

Apache Flink SQL, Table API, and UDF development for both OSS Flink and Confluent Cloud

95

1.21x

Does it follow best practices?

Evaluation97%

1.21x

Agent success when using this tile

Validation for skill structure

Overview
Skills
Evals
Files

confluent-cloud.mdskills/flink-sql/references/

Confluent Cloud for Apache Flink

Confluent-specific features, deployment patterns, and CLI commands.

Table of Contents

  1. Setup & Configuration
  2. CLI Commands
  3. Table API Plugin
  4. Confluent-Specific Features
  5. Deployment Patterns
  6. Billing & Compute Pools
  7. Limitations

Setup & Configuration

Prerequisites

  1. Confluent Cloud account
  2. Flink compute pool provisioned
  3. Flink API key generated
  4. Confluent CLI installed (confluent update)

Environment Variables

# Required
export CLOUD_PROVIDER="aws"           # aws, azure, gcp
export CLOUD_REGION="us-east-1"       # region code
export ORG_ID="b0b421724-xxxx-xxxx"   # Organization ID
export ENV_ID="env-xxxxx"             # Environment ID
export COMPUTE_POOL_ID="lfcp-xxxxx"   # Compute pool ID
export FLINK_API_KEY="<key>"          # Flink API key
export FLINK_API_SECRET="<secret>"    # Flink API secret

# Optional (for UDF uploads)
export ARTIFACT_API_KEY="<key>"       # Cloud resource management key
export ARTIFACT_API_SECRET="<secret>"

Generate Flink API Key

confluent flink api-key create \
  --environment $ENV_ID \
  --cloud $CLOUD_PROVIDER \
  --region $CLOUD_REGION

CLI Commands

Compute Pool Management

# List compute pools
confluent flink compute-pool list

# Create compute pool
confluent flink compute-pool create my-pool \
  --cloud aws \
  --region us-east-1 \
  --max-cfu 10

# Describe compute pool
confluent flink compute-pool describe lfcp-xxxxx

# Delete compute pool
confluent flink compute-pool delete lfcp-xxxxx

# Set default compute pool
confluent flink compute-pool use lfcp-xxxxx

SQL Shell

# Start interactive SQL shell
confluent flink shell \
  --cloud $CLOUD_PROVIDER \
  --region $CLOUD_REGION \
  --environment $ENV_ID \
  --compute-pool $COMPUTE_POOL_ID

# In shell:
# > SHOW TABLES;
# > SELECT * FROM my_table LIMIT 10;
# > !quit

Statement Management

# List statements
confluent flink statement list

# Create statement (run SQL)
confluent flink statement create my-statement \
  --sql "INSERT INTO sink_table SELECT * FROM source_table" \
  --compute-pool $COMPUTE_POOL_ID

# Describe statement
confluent flink statement describe my-statement

# Stop statement (pause)
confluent flink statement stop my-statement

# Resume statement
confluent flink statement resume my-statement

# Delete statement
confluent flink statement delete my-statement

# View exceptions
confluent flink statement exception list my-statement

Artifact Management (UDFs)

# List artifacts
confluent flink artifact list

# Upload artifact
confluent flink artifact create my-udf \
  --cloud aws \
  --region us-east-1 \
  --artifact-file target/my-udf-1.0.jar

# Describe artifact
confluent flink artifact describe cfa-xxxxx

# Delete artifact
confluent flink artifact delete cfa-xxxxx

Savepoint Management

# Create savepoint
confluent flink savepoint create \
  --statement my-statement

# List savepoints
confluent flink savepoint list

# Describe savepoint
confluent flink savepoint describe sp-xxxxx

# Resume from savepoint
confluent flink statement resume my-statement \
  --savepoint sp-xxxxx

Table API Plugin

Java Setup

pom.xml:

<repositories>
  <repository>
    <id>confluent</id>
    <url>https://packages.confluent.io/maven/</url>
  </repository>
</repositories>

<dependencies>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java</artifactId>
    <version>2.1.0</version>
  </dependency>
  <dependency>
    <groupId>io.confluent.flink</groupId>
    <artifactId>confluent-flink-table-api-java-plugin</artifactId>
    <version>2.1-8</version>
  </dependency>
</dependencies>

Main class:

import io.confluent.flink.plugin.ConfluentSettings;
import io.confluent.flink.plugin.ConfluentTools;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;

public class FlinkApp {
    public static void main(String[] args) {
        // Create environment from env vars
        TableEnvironment env = TableEnvironment.create(
            ConfluentSettings.fromGlobalVariables()
        );
        
        // Execute SQL
        TableResult result = env.executeSql(
            "SELECT * FROM `cluster`.`database`.`topic`"
        );
        
        // Print results
        ConfluentTools.printMaterializedLimit(result, 100);
    }
}

Python Setup

pip install confluent-flink-table-api-python-plugin

Main script:

from pyflink.table.confluent import ConfluentSettings, ConfluentTools
from pyflink.table import TableEnvironment

# Create environment from env vars
settings = ConfluentSettings.from_global_variables()
env = TableEnvironment.create(settings)

# Execute SQL
result = env.execute_sql(
    "SELECT * FROM `cluster`.`database`.`topic`"
)

# Print results
ConfluentTools.print_materialized_limit(result, 100)

Confluent-Specific Features

System Columns

Confluent tables have special system columns:

ColumnTypeDescription
$rowtimeTIMESTAMP_LTZ(3)Event time from Kafka timestamp
$headersMAP<STRING, BYTES>Kafka headers
-- Use system columns
SELECT 
  id, 
  `$rowtime` as event_time,
  `$headers`['correlation-id'] as correlation_id
FROM my_topic;

-- Watermark on $rowtime
CREATE TABLE my_events (
  id STRING,
  payload STRING,
  WATERMARK FOR `$rowtime` AS `$rowtime` - INTERVAL '5' SECOND
);

ConfluentTableDescriptor

import io.confluent.flink.plugin.ConfluentTableDescriptor;

TableDescriptor descriptor = ConfluentTableDescriptor.forManaged()
    .schema(Schema.newBuilder()
        .column("id", DataTypes.STRING())
        .column("value", DataTypes.INT())
        .watermark("$rowtime", $("$rowtime").minus(lit(5).seconds()))
        .build())
    .build();

env.createTable("my_table", descriptor);

ConfluentTools Utilities

// Collect results (blocking)
List<Row> rows = ConfluentTools.collectMaterializedLimit(table, 100);
List<Row> allRows = ConfluentTools.collectMaterialized(table);  // Bounded only

// Print results
ConfluentTools.printMaterializedLimit(table, 100);
ConfluentTools.printMaterialized(table);  // Bounded only

// Statement lifecycle
TableResult result = env.executeSql("SELECT ...");
String name = ConfluentTools.getStatementName(result);
ConfluentTools.stopStatement(result);
ConfluentTools.stopStatementByName(env, "statement-name");
ConfluentTools.deleteStatement(env, "statement-name");

Catalog Structure

<environment>
├── <kafka-cluster-1>
│   └── <database-1>
│       ├── topic_a
│       ├── topic_b
│       └── my_view
└── <kafka-cluster-2>
    └── <database-2>
        └── topic_c
-- Fully qualified name
SELECT * FROM `cluster_id`.`database_name`.`topic_name`;

-- Set default catalog/database
USE CATALOG `cluster_id`;
USE `database_name`;
SELECT * FROM topic_name;

-- Show structure
SHOW CATALOGS;
SHOW DATABASES;
SHOW TABLES;

Deployment Patterns

Continuous Streaming Job

# Create long-running statement
confluent flink statement create enrichment-job \
  --sql "INSERT INTO enriched_orders 
         SELECT o.*, c.name, c.tier 
         FROM orders o 
         JOIN customers FOR SYSTEM_TIME AS OF o.proc_time AS c 
         ON o.customer_id = c.id" \
  --compute-pool $COMPUTE_POOL_ID

Savepoint-Based Updates

# 1. Stop with savepoint
confluent flink savepoint create --statement my-job

# 2. Update job (new SQL or schema)
confluent flink statement delete my-job
confluent flink statement create my-job-v2 \
  --sql "..." \
  --compute-pool $COMPUTE_POOL_ID

# 3. Resume from savepoint
confluent flink statement resume my-job-v2 --savepoint sp-xxxxx

Blue-Green Deployment

# 1. Deploy new version alongside
confluent flink statement create job-v2 \
  --sql "INSERT INTO output_v2 SELECT ..." \
  --compute-pool $COMPUTE_POOL_ID

# 2. Verify new version
# 3. Stop old version
confluent flink statement stop job-v1

# 4. Switch consumers to new output
# 5. Delete old version
confluent flink statement delete job-v1

Billing & Compute Pools

CFU (Confluent Flink Unit)

  • Billing is based on CFU consumption
  • Each compute pool has max CFU limit
  • Statements consume CFUs based on complexity

Autopilot Mode

Confluent can automatically scale compute pools:

# Enable autopilot (in Console)
# Or set max-cfu higher and let system scale

confluent flink compute-pool create my-pool \
  --cloud aws \
  --region us-east-1 \
  --max-cfu 20  # System scales up to this

Monitor CFU Usage

# Check statement metrics
confluent flink statement describe my-statement

# View in Console: Flink > Statements > Metrics

Limitations

General Limitations

FeatureStatus
DataStream APINot supported (Table API only)
Batch modeNot supported (streaming only)
Custom connectorsNot supported
State backendsManaged by Confluent
CheckpointingManaged by Confluent

UDF Limitations

LimitationValue
Max UDFs per statement10
Max artifacts per env100
Max artifact size100 MB
Max row size4 MB
Java versions11, 17
Python version3.11 only

Not supported:

  • Aggregate functions (UDAF)
  • Table aggregate functions
  • Temporary functions
  • ALTER FUNCTION
  • UDFs with MATCH_RECOGNIZE
  • Vararg functions
  • External network calls

SQL Limitations

FeatureStatus
CREATE DATABASENot supported
CREATE CATALOGNot supported
File connectorsNot supported
JDBC connectorLimited (lookup only)
Hive integrationNot supported

Supported Regions

Check current availability:

confluent flink region list --cloud aws
confluent flink region list --cloud azure
confluent flink region list --cloud gcp

Troubleshooting

Connection Issues

# Verify credentials
confluent flink shell --debug

# Check API key scope
confluent api-key list --resource flink

Statement Failures

# View exceptions
confluent flink statement exception list my-statement

# Check statement status
confluent flink statement describe my-statement

Common Errors

ErrorCauseFix
Authentication failedInvalid API keyRegenerate Flink API key
Compute pool not foundWrong pool IDCheck confluent flink compute-pool list
Table not foundWrong catalog pathUse fully qualified name
UDF not foundWrong artifact IDVerify artifact uploaded
CFU limit exceededPool at capacityIncrease max-cfu or add pool

Install with Tessl CLI

npx tessl i gamussa/flink-sql@1.0.0

tile.json