CtrlK
BlogDocsLog inGet started
Tessl Logo

kafka-connector-v4

Set up, configure, and troubleshoot the Snowflake Kafka Connector V4 (Snowpipe Streaming high-performance architecture). Covers fresh installations, connector property configuration for default pipe and user-defined pipe modes, server-side and client-side validation, JMX monitoring, migration from V3, and common error diagnosis. Triggers: kafka connector v4, kafka connector setup, kafka connector config, kafka connector troubleshoot, snowflake kafka connector, configure kafka connector, kafka connector help, kafka streaming connector.

72

Quality

88%

Does it follow best practices?

Impact

No eval scenarios have been run

SecuritybySnyk

Passed

No known issues

SKILL.md
Quality
Evals
Security

When to use

Use this skill when the user wants to:

  • Fresh setup -- Deploy a new Kafka Connector V4 from scratch on an existing Kafka Connect cluster
  • Configure -- Choose between pipe modes, validation modes, topic-to-table mapping, converters, error handling, or performance tuning
  • Troubleshoot -- Diagnose connector failures, ingestion errors, lag, schema evolution issues, or authentication problems
  • Migrate from V3 -- Upgrade from the classic Kafka connector (V3) to V4 with correct compatibility flags and offset migration

When NOT to use this skill:

  • If the user wants a custom Kafka consumer using the Snowpipe Streaming SDK directly (redirect to the custom-kafka-consumer skill)
  • If the user wants to try Snowpipe Streaming without Kafka (redirect to the snowpipe-streaming-quickstart skill)

What this skill provides

  1. Fresh Setup Workflow (Steps 0-5) -- Confirms prerequisites, generates RSA key-pair, creates Snowflake role/user/grants, generates a ready-to-deploy connector configuration file, and guides connector startup and verification.
  2. Configuration Advisor (Steps 6-7) -- Helps the user choose the right pipe mode (default vs user-defined), validation mode (server-side vs client-side), and generates the appropriate connector properties.
  3. Troubleshooting Guide (Step 8) -- A symptom-based diagnostic that identifies common issues and provides targeted fixes.
  4. V3 Migration Assistant (Step 9) -- Generates a migration-safe configuration with correct compatibility flags and offset migration settings.

Critical concepts

Architecture

  • Connector class: com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector
  • Underlying technology: Snowpipe Streaming V2 (Rust-based SDK, off-heap memory management)
  • Throughput: Up to 10 GB/s with 5-10 second end-to-end latency
  • Pricing: Flat, throughput-based pricing (per GB ingested)
  • Authentication: Key-pair only (no OAuth in V4)
  • Java: 11+ required

Two pipe modes

Default pipe mode (simplest):

  • Auto-creates a default pipe named {tableName}-STREAMING
  • Maps Kafka record JSON keys to table columns by name (case-insensitive)
  • No explicit CREATE PIPE needed
  • Supports schema evolution when ENABLE_SCHEMA_EVOLUTION=TRUE on the table
  • Works with both server-side and client-side validation

User-defined pipe mode (flexible):

  • User creates a PIPE with COPY INTO ... FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING')) syntax
  • Full control over transformations: renaming columns, type casting, filtering, masking
  • Access to RECORD_METADATA fields via $1:RECORD_METADATA.topic, $1:RECORD_METADATA.offset, etc.
  • Pipe name must match the table name
  • Works with server-side validation only (client-side validation fails with ERROR_5030)

Two validation modes

Server-side validation (default, snowflake.validation=server_side):

  • Snowflake backend performs validation -- maximum throughput
  • Invalid records captured in Error Tables
  • Works with both default pipe and user-defined pipe modes
  • DLQ is NOT used for ingestion validation errors (only for converter deserialization errors)

Client-side validation (snowflake.validation=client_side):

  • Connector validates before sending to Snowflake
  • Supports Dead Letter Queue (DLQ) for invalid records
  • Better type inference for schema evolution
  • Only works with default pipe mode

V4 default changes vs V3

ConfigurationV3 DefaultV4 Default
snowflake.enable.schematizationfalse (RECORD_CONTENT/RECORD_METADATA VARIANT columns)true (record fields mapped to individual columns)
snowflake.validationClient-side equivalentserver_side
snowflake.compatibility.enable.autogenerated.table.name.sanitizationtrue equivalent (invalid chars replaced, names uppercased)false (topic names used as-is, case preserved)
snowflake.compatibility.enable.column.identifier.normalizationtrue equivalent (column names uppercased)false (column identifiers preserve case)

Warning about topic names with special characters: With V4 defaults (table name sanitization disabled), a topic named orders-prod creates a table literally named "orders-prod" which requires double-quoting in all SQL. To avoid this, either use snowflake.topic2table.map to map to a clean table name, or set snowflake.compatibility.enable.autogenerated.table.name.sanitization=true to replicate V3 behavior (uppercased, invalid chars replaced).

RECORD_METADATA structure

FieldTypeDescription
topicStringKafka topic name
partitionStringKafka partition number
offsetNumberOffset in the partition
CreateTime / LogAppendTimeNumberKafka record timestamp (ms since epoch)
SnowflakeConnectorPushTimeNumberWhen connector buffered the record (ms since epoch)
keyStringKafka message key (requires StringConverter for key.converter)
headersObjectUser-defined key-value pairs

RECORD_METADATA adds ~150 bytes overhead per record. Set snowflake.metadata.all=false to disable if not needed.

JVM memory guidance

The Rust-based Snowpipe Streaming SDK allocates off-heap (system) memory for buffering. Limit JVM heap to approximately 50% of available RAM. For example, on a worker with 8 GB RAM, set -Xmx4g. Minimum 5 MB per Kafka partition.

Supported platforms

PackageTested Versions
Apache Kafka2.8.2, 3.7.2, 4.1.1
Confluent6.2.15, 7.8.2, 8.2.0
Java11+ (SE recommended)

Instructions

IMPORTANT EXECUTION GUIDELINES:

  1. Announce each step clearly -- Before executing each step, print a clear header like "Step X -- [Step Name]" so the user knows where they are.
  2. Batch commands aggressively -- Minimize permission prompts. Run independent operations in parallel.
  3. Log all SQL to kafka_connector_v4_sql.log -- Every time you execute SQL, immediately append it to the log file with a step header and timestamp.
  4. Private keys: NEVER display in chat. Use <REDACTED> in all logs and output.
  5. Detect user intent first (Step 0) -- Branch to the appropriate workflow based on what the user needs.

Step 0 -- Detect intent and branch

Before doing anything, determine what the user needs:

  • If they say "setup" / "install" / "deploy" / "new connector" --> proceed through Steps 1-5 (Fresh Setup)
  • If they say "configure" / "config" / "properties" / "settings" --> skip to Step 6 (Configuration Advisor)
  • If they say "troubleshoot" / "debug" / "error" / "failing" / "lag" --> skip to Step 8 (Troubleshooting)
  • If they say "migrate" / "v3 to v4" / "upgrade" --> skip to Step 9 (Migration)

If intent is unclear, ask:

What would you like help with?

  1. Fresh setup -- Deploy a new Kafka Connector V4
  2. Configure -- Help with connector properties for your use case
  3. Troubleshoot -- Diagnose an issue with an existing connector
  4. Migrate from V3 -- Upgrade from the classic connector

Step 1 -- Gather prerequisites (Fresh Setup)

Goal: Verify the user has everything needed before starting.

Ask the user for (in a single prompt):

  • Kafka distribution: Confluent or OSS Apache Kafka?
  • Kafka Connect mode: Distributed or Standalone?
  • Java version (must be 11+)
  • Snowflake account URL (e.g., myorg-myaccount.snowflakecomputing.com)
  • Snowflake user name for the connector
  • Snowflake role name for the connector
  • Target database and schema
  • Topic name(s)
  • Whether they already have RSA key-pair auth configured

Run in parallel:

  • Bash: Check Java version: java -version 2>&1
  • SQL: SELECT CURRENT_USER(), CURRENT_ROLE(), CURRENT_DATABASE(), CURRENT_SCHEMA(), CURRENT_WAREHOUSE(), CURRENT_ACCOUNT();

Error handling:

  • Java version < 11 --> tell user to upgrade
  • No warehouse set --> tell user to set one with USE WAREHOUSE
  • Role lacks privileges --> guide to Step 2

Step 2 -- Configure Snowflake (role, user, grants)

Goal: Create or configure the Snowflake role and necessary grants.

Execute as one single SQL call:

-- Use a role that can create and manage roles and privileges.
USE ROLE SECURITYADMIN;

-- Create a Snowflake role for the connector.
CREATE ROLE IF NOT EXISTS {{KAFKA_ROLE}};

-- Grant privileges on the database.
GRANT USAGE ON DATABASE {{DATABASE}} TO ROLE {{KAFKA_ROLE}};

-- Grant privileges on the schema.
GRANT USAGE ON SCHEMA {{DATABASE}}.{{SCHEMA}} TO ROLE {{KAFKA_ROLE}};

-- Grant ability to create tables and pipes (for auto-creation).
GRANT CREATE TABLE ON SCHEMA {{DATABASE}}.{{SCHEMA}} TO ROLE {{KAFKA_ROLE}};
GRANT CREATE PIPE ON SCHEMA {{DATABASE}}.{{SCHEMA}} TO ROLE {{KAFKA_ROLE}};

-- If using an existing table, grant INSERT on that specific table.
-- GRANT INSERT ON TABLE {{DATABASE}}.{{SCHEMA}}.{{TABLE}} TO ROLE {{KAFKA_ROLE}};

-- If using user-defined pipes, grant OPERATE on the pipe.
-- GRANT OPERATE ON PIPE {{DATABASE}}.{{SCHEMA}}.{{PIPE}} TO ROLE {{KAFKA_ROLE}};

-- Grant the role to the connector user.
GRANT ROLE {{KAFKA_ROLE}} TO USER {{KAFKA_USER}};

Note: If user already has a role, skip creation and verify grants. If ingesting into an existing table, grant INSERT on that specific table.


Step 3 -- Generate RSA key-pair (if needed)

Goal: Generate key-pair for connector authentication.

If user already has key-pair auth, skip this step.

Single Bash call (generate key-pair and extract public key body):

openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt 2>/dev/null \
  && chmod 600 rsa_key.p8 \
  && openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub 2>/dev/null \
  && echo "RSA key-pair generated successfully"

Then silently read rsa_key.pub, strip the header/footer lines, and store the base64 body.

SECURITY: NEVER display any key content in chat. Use <REDACTED> in logs.

Register the public key via SQL:

ALTER USER {{KAFKA_USER}} SET RSA_PUBLIC_KEY='{{PUBLIC_KEY_BODY}}';
DESC USER {{KAFKA_USER}};

Step 4 -- Generate connector configuration

Goal: Generate a ready-to-deploy connector configuration file.

Ask the user:

  1. Do you want the default pipe mode (simplest, auto-creates tables/pipes) or user-defined pipe mode (custom transformations)?
  2. Do you want server-side validation (default, max throughput, Error Tables) or client-side validation (DLQ support)?
  3. Do your topic names match your desired table names? If not, provide mappings for topic2table.map.
  4. Is your data format JSON or Avro (with Schema Registry)?
  5. How many Kafka partitions? (Recommend: set tasks.max equal to partition count)

Based on answers, generate the config file using the appropriate template from the Templates section.

Write the config file:

  • Distributed mode: Write kafka-connector-v4-config.json
  • Standalone mode: Write kafka-connector-v4-config.properties

For the private key value, instruct the user:

Open rsa_key.p8, remove the -----BEGIN PRIVATE KEY----- and -----END PRIVATE KEY----- header/footer lines, remove all newlines, and paste the single base64 string as the value for snowflake.private.key. In production, use a ConfigProvider to externalize this secret (AWS KMS, Azure Key Vault, HashiCorp Vault).

If user chose user-defined pipe mode, also generate the pipe SQL (see Step 7).


Step 5 -- Deploy and verify

Goal: Start the connector and verify data is flowing.

Provide the startup command:

Distributed mode:

curl -X POST -H "Content-Type: application/json" --data @kafka-connector-v4-config.json http://localhost:8083/connectors

Standalone mode:

<kafka_dir>/bin/connect-standalone.sh <kafka_dir>/config/connect-standalone.properties kafka-connector-v4-config.properties

Verification steps (guide the user):

  1. Check connector status:

    curl -s http://localhost:8083/connectors/{{CONNECTOR_NAME}}/status | python3 -m json.tool
  2. Wait a few seconds for data to arrive, then query the table:

    SELECT COUNT(*) FROM {{DATABASE}}.{{SCHEMA}}.{{TABLE}};
    SELECT * FROM {{DATABASE}}.{{SCHEMA}}.{{TABLE}} LIMIT 10;
  3. Estimate end-to-end latency (see Template 8 in Templates section).

Present success summary with row count and latency estimate.


Step 6 -- Configuration advisor (standalone entry point)

Goal: Help users who need configuration guidance for a specific topic.

Ask what they need help with:

  1. Pipe mode selection -- Default (simple, auto-creates) vs user-defined (custom transformations). If user-defined, proceed to Step 7.
  2. Validation mode -- Server-side (default, max throughput, Error Tables) vs client-side (DLQ support). Note: client-side only works with default pipe.
  3. Topic-to-table mapping -- Static (topic name = table name), explicit (topic2table.map), regex patterns, many-to-one. Examples:
    • Explicit: snowflake.topic2table.map=orders:ORDER_TABLE,customers:CUSTOMER_TABLE
    • Many-to-one: snowflake.topic2table.map=topic1:SHARED_TABLE,topic2:SHARED_TABLE,topic3:SHARED_TABLE
    • Regex: snowflake.topic2table.map=.*_events:ALL_EVENTS
    • Quoted (special chars): snowflake.topic2table.map="my:topic":"My_Table"
  4. Schema evolution -- Set ENABLE_SCHEMA_EVOLUTION=TRUE on the table. Adds new columns automatically. Server-side validation may infer wrong types; pre-create table if exact types matter.
  5. Iceberg tables -- Must pre-create the table, grant USAGE on external volume, no schema evolution support.
  6. Converters -- JSON (JsonConverter), Avro (AvroConverter + Schema Registry), Protobuf (ProtobufConverter). Note: StringConverter and ByteArrayConverter not supported with schematization=true.
  7. Performance tuning -- tasks.max = partition count, JVM heap ~50% RAM, same-region deployment, cache settings.
  8. Error handling -- errors.tolerance (none vs all), DLQ config, Error Tables for server-side.
  9. Monitoring -- JMX metrics setup, key alerting metrics (see monitoring section in Critical Concepts).

For each sub-topic, provide the relevant configuration snippet and a brief explanation.


Step 7 -- User-defined pipe setup

Goal: Guide user through creating a user-defined pipe for custom transformations.

  1. Create the destination table:

    CREATE TABLE {{DATABASE}}.{{SCHEMA}}.{{TABLE}} (
      -- columns matching desired output schema
    );
  2. Create a pipe with the same name as the table:

    CREATE PIPE {{DATABASE}}.{{SCHEMA}}.{{TABLE}} AS
    COPY INTO {{DATABASE}}.{{SCHEMA}}.{{TABLE}}
    FROM (
      SELECT
        $1:field_name::TYPE AS column_name,
        $1:RECORD_METADATA.topic::STRING AS source_topic,
        $1:RECORD_METADATA.offset::NUMBER AS kafka_offset
      FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
    );
  3. Grant OPERATE on the pipe:

    GRANT OPERATE ON PIPE {{DATABASE}}.{{SCHEMA}}.{{TABLE}} TO ROLE {{KAFKA_ROLE}};

Field accessor syntax:

  • Simple fields: $1:field_name
  • Fields with spaces or special characters: $1['field name']
  • Nested fields: $1:parent.child or $1:parent['child field']
  • Metadata fields: $1:RECORD_METADATA.topic, $1:RECORD_METADATA.offset, $1:RECORD_METADATA.SnowflakeConnectorPushTime

Important: Client-side validation does NOT work with user-defined pipes. The connector will fail with ERROR_5030. Use snowflake.validation=server_side (the default). This means you cannot use both DLQ and user-defined pipes -- if you need custom transformations, use server-side validation with Error Tables for error handling instead of DLQ.


Step 8 -- Troubleshooting (standalone entry point)

Goal: Diagnose and fix common connector issues.

Ask the user to describe their symptom, then branch:

Symptom: Connector fails at startup

"Unrecognized configuration" errors:

  • You may be using V3 properties that were removed in V4. Removed properties include:
    • snowflake.ingestion.method
    • buffer.flush.time, buffer.size.bytes, buffer.count.records
    • snowflake.streaming.max.client.lag, snowflake.streaming.max.memory.limit.bytes
    • snowflake.snowpipe.*
    • snowflake.authenticator, snowflake.oauth.*
    • snowflake.streaming.iceberg.enabled
    • All Snowflake-provided custom converters (SnowflakeJsonConverter, etc.)
  • Remove these properties and use V4 equivalents.

"Compatibility validator" errors:

  • New installation: Set snowflake.streaming.validate.compatibility.with.classic=false
  • Migration from V3: Explicitly set all required compatibility properties:
    • snowflake.validation=client_side
    • snowflake.compatibility.enable.column.identifier.normalization=true
    • snowflake.compatibility.enable.autogenerated.table.name.sanitization=true
    • snowflake.enable.schematization=true or false
    • snowflake.streaming.classic.offset.migration=skip or best_effort or strict

"Authentication failed":

  • Verify snowflake.private.key is valid Base64-encoded PKCS#8 (no header/footer lines, no newlines)
  • If key is encrypted, set snowflake.private.key.passphrase
  • Verify the role has required privileges (see Step 2)

"Unsupported converter" with schematization:

  • StringConverter and ByteArrayConverter are not supported when snowflake.enable.schematization=true (the default)
  • Use JsonConverter, AvroConverter, or ProtobufConverter

Symptom: Connector running but no data in Snowflake

  1. Check connector status: curl -s http://localhost:8083/connectors/<name>/status
  2. Check if topics have data: use kafka-console-consumer --topic <topic> --bootstrap-server <server> --from-beginning --max-messages 5
  3. Check role has INSERT on the target table
  4. Check cache: if table/pipe was created after connector started, cache may be stale. Reduce snowflake.cache.table.exists.expire.ms or restart the connector.

Symptom: ERROR_5030 (ingestion error)

Common causes:

  • Data type mismatch between Kafka record and table schema
  • Client-side validation with user-defined pipe (not supported -- switch to server_side)
  • Schema changes that can't auto-evolve

To investigate:

  1. Check Error Table for server-side validation:
    -- List error tables in the schema
    SHOW TABLES LIKE '%ERRORS%' IN SCHEMA {{DATABASE}}.{{SCHEMA}};
    -- Query recent errors (error table is auto-created alongside the target table)
    SELECT * FROM {{DATABASE}}.{{SCHEMA}."{{TABLE}}_ERRORS" ORDER BY ERROR_TIMESTAMP DESC LIMIT 20;
  2. Check DLQ for client-side validation: consume from your DLQ topic to inspect failed records
  3. Enable errors.log.enable=true for verbose logging
  4. Review connector logs for specific error details

Symptom: Ingestion lag growing

Check JMX metrics: latest-consumer-offset minus persisted-in-snowflake-offset

If JMX is unavailable, estimate lag from the Snowflake side using Template 8 (latency estimation query). High latency_ms values indicate data is taking a long time from connector buffering to queryability.

Fixes:

  • Increase tasks: Set tasks.max closer to the total number of Kafka partitions
  • Check backpressure: If backpressure-rewind-count is increasing, the SDK is at capacity -- scale out Kafka Connect cluster
  • Review JVM memory: Heap should be ~50% of available RAM, rest for Rust SDK off-heap memory
  • Same region: Ensure Kafka Connect cluster is in the same cloud region as Snowflake account

Symptom: Duplicate records after migration

Use the deduplication query (see Template 9 in Templates section) based on RECORD_METADATA topic, partition, and offset fields.

Important: Deduplication requires snowflake.metadata.topic and snowflake.metadata.offset.and.partition to be enabled (they are by default).

Symptom: Schema evolution producing wrong types

Server-side validation can't always infer the correct data type (e.g., interprets "2026-04-13" as DATE not TEXT, can't infer binary columns).

Fixes:

  • Pre-create the table with the correct schema before starting the connector
  • Use client-side validation (snowflake.validation=client_side) for better type inference
  • Avoid running DDL on tables while the connector is actively ingesting

Symptom: Channel recovery count increasing

Possible causes:

  • Schema changes on target table conflicting with connector's cached schema
  • Permission changes affecting the connector's role
  • Network instability between Kafka Connect and Snowflake

Review connector logs for specific recovery reasons. Reduce SDK log noise with export SS_LOG_LEVEL=warn.

Symptom: Out of memory (OOM) errors

The V4 connector uses a Rust-based SDK that allocates off-heap (system) memory outside the JVM heap. OOM can occur at both levels:

  • JVM heap OOM: Reduce heap size to ~50% of available RAM (e.g., -Xmx4g on 8 GB machine). The connector itself needs less heap than V3.
  • System memory OOM: The Rust SDK allocates ~5 MB per Kafka partition for buffering. If total partition count is very high, the system may run out of physical memory. Scale out to more Kafka Connect workers to spread partitions across machines.
  • Both: If using other connectors on the same cluster, account for their memory needs too.

Symptom: Connector task repeatedly restarting

If a task fails and Kafka Connect auto-restarts it in a loop:

  1. Check the task failure reason: curl -s http://localhost:8083/connectors/<name>/status
  2. Look for the trace field in failed tasks for the stack trace
  3. Common causes: authentication failure (key expired), table dropped while connector running, schema incompatibility
  4. Set enable.task.fail.on.authorization.errors=true to fail fast on auth issues rather than retry indefinitely

Symptom: Connection failures through proxy

If the connector cannot reach Snowflake through a corporate proxy:

jvm.proxy.host={{PROXY_HOST}}
jvm.proxy.port={{PROXY_PORT}}
jvm.proxy.username={{PROXY_USER}}
jvm.proxy.password={{PROXY_PASSWORD}}

Symptom: SDK client count growing unbounded

If the sdk-client-count JMX metric grows continuously, there may be a client leak. Each distinct target table should have one SDK client. If the count exceeds the number of distinct tables, contact Snowflake Support.


Step 9 -- V3 migration assistant (standalone entry point)

Goal: Generate a migration-safe V4 configuration from an existing V3 deployment.

Ask the user:

  1. Was your V3 connector using Snowpipe mode (default, snowflake.ingestion.method=SNOWPIPE) or Snowpipe Streaming mode (SNOWPIPE_STREAMING)?
  2. What was your V3 connector name?
  3. (If Streaming mode) Did your V3 config include snowflake.streaming.channel.name.include.connector.name=true?
  4. Do you want to adopt V4 defaults (recommended for new behavior) or reproduce V3 behavior exactly?

Based on answers, generate the migration config:

From V3 Snowpipe mode: Use Template 6 (offset migration = skip)

From V3 Snowpipe Streaming mode: Use Template 7 (offset migration = best_effort or strict)

Migration procedure:

  1. Stop the V3 connector.
  2. (Snowpipe mode only) Wait for all staged data to be ingested into Snowflake before proceeding. Snowpipe stages files asynchronously.
  3. Deploy the V4 configuration using the same connector name as V3 (same Kafka consumer group).
  4. Start the V4 connector.
  5. Verify data flow (see Step 5).
  6. Complete within 7 days (offsets.retention.minutes default). After 7 days, Kafka consumer group offsets expire.

If your V3 connector uses OAuth authentication, you must switch to key-pair auth before migrating. V4 does not support OAuth. Generate an RSA key-pair (see Step 3), register the public key on the Snowflake user, and update the connector config to use snowflake.private.key instead of snowflake.oauth.* properties.

If using Snowflake custom converters in V3, replace them with community equivalents:

  • SnowflakeJsonConverter --> org.apache.kafka.connect.json.JsonConverter
  • SnowflakeAvroConverter --> io.confluent.connect.avro.AvroConverter
  • SnowflakeAvroConverterWithoutSchemaRegistry --> io.confluent.connect.avro.AvroConverter
  • SnowflakeProtobufConverter --> io.confluent.connect.protobuf.ProtobufConverter

Downgrade path: Reverse migration is possible but expect duplicates. Use the deduplication query (Template 9) to clean up.

Best practices

  • Never persist private keys in chat or logs. Use <REDACTED> when logging SQL containing key material. In production, use a ConfigProvider (AWS KMS, Azure Key Vault, HashiCorp Vault) to externalize secrets.
  • Set tasks.max equal to the number of Kafka partitions for optimal throughput. Do not exceed the number of CPU cores across the Kafka Connect cluster.
  • Limit JVM heap to ~50% of available RAM to leave room for the Rust-based SDK's off-heap memory.
  • Run Kafka Connect in the same cloud region as your Snowflake account for best throughput.
  • For new installations, always set snowflake.streaming.validate.compatibility.with.classic=false.
  • Pre-create tables when you need exact control over column types.
  • Enable ENABLE_SCHEMA_EVOLUTION=TRUE on tables if your Kafka record schema may change over time. All connector-created tables have this enabled by default.
  • Set snowflake.metadata.all=false if you don't need RECORD_METADATA (~150 bytes savings per record).
  • For testing, set snowflake.cache.table.exists=false and snowflake.cache.pipe.exists=false to avoid cache staleness. Use default cache settings (5 min) in production.
  • Enable MDC logging (enable.mdc.logging=true) when running multiple connector instances to correlate log entries.
  • Reduce SDK log noise with export SS_LOG_LEVEL=warn on Kafka Connect workers.

Examples

Sample user queries and how the skill responds:

  • "Set up Kafka connector V4" --> Full fresh setup flow (Steps 1-5)
  • "Help me configure my Kafka connector properties" --> Configuration Advisor (Step 6)
  • "My Kafka connector is failing with ERROR_5030" --> Troubleshooting, ERROR_5030 branch (Step 8)
  • "Migrate my V3 connector to V4" --> V3 Migration Assistant (Step 9)
  • "I need a user-defined pipe with custom transformations" --> Steps 6-7
  • "How do I monitor my Kafka connector?" --> Configuration Advisor, monitoring sub-topic (Step 6)
  • "Set up DLQ for my Kafka connector" --> Configuration Advisor, error handling sub-topic (Step 6)
  • "My connector is lagging behind" --> Troubleshooting, ingestion lag branch (Step 8)

Templates

Template 1: Minimal V4 config (Distributed mode, JSON)

{
  "name": "{{CONNECTOR_NAME}}",
  "config": {
    "connector.class": "com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector",
    "topics": "{{TOPICS}}",
    "tasks.max": "{{TASKS_MAX}}",
    "snowflake.url.name": "{{ACCOUNT_URL}}",
    "snowflake.user.name": "{{USER}}",
    "snowflake.private.key": "{{PRIVATE_KEY_BASE64}}",
    "snowflake.database.name": "{{DATABASE}}",
    "snowflake.schema.name": "{{SCHEMA}}",
    "snowflake.role.name": "{{ROLE}}",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "snowflake.streaming.validate.compatibility.with.classic": "false"
  }
}

Template 2: Minimal V4 config (Standalone mode, Properties)

name={{CONNECTOR_NAME}}
connector.class=com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector
topics={{TOPICS}}
tasks.max={{TASKS_MAX}}
snowflake.url.name={{ACCOUNT_URL}}
snowflake.user.name={{USER}}
snowflake.private.key={{PRIVATE_KEY_BASE64}}
snowflake.database.name={{DATABASE}}
snowflake.schema.name={{SCHEMA}}
snowflake.role.name={{ROLE}}
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
snowflake.streaming.validate.compatibility.with.classic=false

Template 3: Full V4 config with common options (Distributed mode, JSON)

{
  "name": "{{CONNECTOR_NAME}}",
  "config": {
    "connector.class": "com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector",
    "topics": "{{TOPICS}}",
    "tasks.max": "{{TASKS_MAX}}",
    "snowflake.url.name": "{{ACCOUNT_URL}}",
    "snowflake.user.name": "{{USER}}",
    "snowflake.private.key": "{{PRIVATE_KEY_BASE64}}",
    "snowflake.database.name": "{{DATABASE}}",
    "snowflake.schema.name": "{{SCHEMA}}",
    "snowflake.role.name": "{{ROLE}}",
    "snowflake.topic2table.map": "{{TOPIC_TABLE_MAP}}",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "snowflake.enable.schematization": "true",
    "snowflake.validation": "server_side",
    "errors.tolerance": "none",
    "errors.log.enable": "true",
    "snowflake.metadata.all": "true",
    "jmx": "true",
    "enable.mdc.logging": "false",
    "snowflake.streaming.validate.compatibility.with.classic": "false"
  }
}

Template 4: Snowflake role/user/grants SQL

-- Use a role that can create and manage roles and privileges.
USE ROLE SECURITYADMIN;

-- Create a Snowflake role for the connector.
CREATE ROLE IF NOT EXISTS {{KAFKA_ROLE}};

-- Grant privileges on the database.
GRANT USAGE ON DATABASE {{DATABASE}} TO ROLE {{KAFKA_ROLE}};

-- Grant privileges on the schema.
GRANT USAGE ON SCHEMA {{DATABASE}}.{{SCHEMA}} TO ROLE {{KAFKA_ROLE}};

-- Grant ability to create tables and pipes (for auto-creation in default pipe mode).
GRANT CREATE TABLE ON SCHEMA {{DATABASE}}.{{SCHEMA}} TO ROLE {{KAFKA_ROLE}};
GRANT CREATE PIPE ON SCHEMA {{DATABASE}}.{{SCHEMA}} TO ROLE {{KAFKA_ROLE}};

-- Recommended for future Error Table features.
GRANT CREATE VIEW ON SCHEMA {{DATABASE}}.{{SCHEMA}} TO ROLE {{KAFKA_ROLE}};

-- If using an existing table, grant INSERT.
GRANT INSERT ON TABLE {{DATABASE}}.{{SCHEMA}}.{{TABLE}} TO ROLE {{KAFKA_ROLE}};

-- If using user-defined pipes, grant OPERATE on the pipe.
-- GRANT OPERATE ON PIPE {{DATABASE}}.{{SCHEMA}}.{{PIPE}} TO ROLE {{KAFKA_ROLE}};

-- If ingesting into Iceberg tables, grant USAGE on the external volume.
-- GRANT USAGE ON EXTERNAL VOLUME {{EXTERNAL_VOLUME}} TO ROLE {{KAFKA_ROLE}};

-- Grant the role to the connector user.
GRANT ROLE {{KAFKA_ROLE}} TO USER {{KAFKA_USER}};

Template 5: User-defined pipe with transformations

-- Create the destination table.
CREATE TABLE {{DATABASE}}.{{SCHEMA}}.{{TABLE}} (
  order_id NUMBER,
  customer_name VARCHAR,
  order_total NUMBER,
  source_topic VARCHAR,
  kafka_offset NUMBER
);

-- Create a pipe with the SAME NAME as the table.
CREATE PIPE {{DATABASE}}.{{SCHEMA}}.{{TABLE}} AS
COPY INTO {{DATABASE}}.{{SCHEMA}}.{{TABLE}}
FROM (
  SELECT
    $1:order_id::NUMBER,
    $1:customer_name,
    $1:order_total::NUMBER,
    $1:RECORD_METADATA.topic::STRING AS source_topic,
    $1:RECORD_METADATA.offset::NUMBER AS kafka_offset
  FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);

-- Grant OPERATE on the pipe to the connector role.
GRANT OPERATE ON PIPE {{DATABASE}}.{{SCHEMA}}.{{TABLE}} TO ROLE {{KAFKA_ROLE}};

Template 6: V3-to-V4 migration config (from Snowpipe mode)

# Required: new connector class
connector.class=com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector

# Your existing connection settings
topics={{TOPICS}}
tasks.max={{TASKS_MAX}}
snowflake.url.name={{ACCOUNT_URL}}
snowflake.user.name={{USER}}
snowflake.private.key={{PRIVATE_KEY_BASE64}}
snowflake.database.name={{DATABASE}}
snowflake.schema.name={{SCHEMA}}
snowflake.role.name={{ROLE}}

# Community converter (replaces Snowflake converters)
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter

# Compatibility flags to reproduce V3 behavior
snowflake.validation=client_side
snowflake.compatibility.enable.autogenerated.table.name.sanitization=true
snowflake.compatibility.enable.column.identifier.normalization=true
# Set to 'false' to keep V3 RECORD_CONTENT/RECORD_METADATA columns,
# or 'true' to adopt V4 schematized columns (recommended for new tables).
snowflake.enable.schematization=false

# Offset migration: skip SSv1 migration (not needed for Snowpipe mode)
snowflake.streaming.classic.offset.migration=skip

# Disable compatibility validator after confirming config
snowflake.streaming.validate.compatibility.with.classic=false

Template 7: V3-to-V4 migration config (from Snowpipe Streaming mode)

# Required: new connector class
connector.class=com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector

# Your existing connection settings
topics={{TOPICS}}
tasks.max={{TASKS_MAX}}
snowflake.url.name={{ACCOUNT_URL}}
snowflake.user.name={{USER}}
snowflake.private.key={{PRIVATE_KEY_BASE64}}
snowflake.database.name={{DATABASE}}
snowflake.schema.name={{SCHEMA}}
snowflake.role.name={{ROLE}}

# Community converter (replaces Snowflake converters)
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter

# Compatibility flags to reproduce V3 behavior
snowflake.validation=client_side
snowflake.compatibility.enable.autogenerated.table.name.sanitization=true
snowflake.compatibility.enable.column.identifier.normalization=true
# Set to 'false' to keep V3 RECORD_CONTENT/RECORD_METADATA columns,
# or 'true' to adopt V4 schematized columns (recommended for new tables).
snowflake.enable.schematization=false

# Offset migration: recover offsets from V3 SSv1 channels
# Use 'strict' to fail if SSv1 channels aren't found,
# or 'best_effort' to fall back to Kafka consumer group offsets.
snowflake.streaming.classic.offset.migration=best_effort
# Must match your V3 snowflake.streaming.channel.name.include.connector.name setting
snowflake.streaming.classic.offset.migration.include.connector.name={{MATCH_V3_SETTING}}

# Disable compatibility validator after confirming config
snowflake.streaming.validate.compatibility.with.classic=false

Template 8: Latency estimation query

SELECT
  RECORD_METADATA:topic::STRING AS topic,
  RECORD_METADATA:partition::NUMBER AS partition,
  RECORD_METADATA:offset::NUMBER AS offset,
  TIMESTAMPDIFF('millisecond',
    TO_TIMESTAMP(RECORD_METADATA:SnowflakeConnectorPushTime::BIGINT, 3),
    CONVERT_TIMEZONE('UTC', CURRENT_TIMESTAMP())
  ) AS latency_ms
FROM {{DATABASE}}.{{SCHEMA}}.{{TABLE}}
ORDER BY latency_ms DESC
LIMIT 10;

Template 9: Deduplication query (post-migration)

Important: Test this query on a small dataset first. The approach uses an IN subquery on (topic, partition, offset) tuples to identify groups with duplicates. For large tables, consider using INSERT INTO ... SELECT DISTINCT into a new table as a safer alternative.

DELETE FROM {{DATABASE}}.{{SCHEMA}}.{{TABLE}}
WHERE RECORD_METADATA IS NOT NULL
  AND (RECORD_METADATA:topic, RECORD_METADATA:partition, RECORD_METADATA:offset)
      IN (
        SELECT RECORD_METADATA:topic, RECORD_METADATA:partition, RECORD_METADATA:offset
        FROM (
          SELECT RECORD_METADATA,
                 ROW_NUMBER() OVER (
                   PARTITION BY RECORD_METADATA:topic, RECORD_METADATA:partition, RECORD_METADATA:offset
                   ORDER BY RECORD_METADATA:offset
                 ) AS rn
          FROM {{DATABASE}}.{{SCHEMA}}.{{TABLE}}
          WHERE RECORD_METADATA IS NOT NULL
        )
        WHERE rn > 1
      );

Template 10: JMX monitoring setup

Add these JVM properties to your Kafka Connect worker startup:

KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port={{JMX_PORT}} -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"

Key MBeans to monitor (domain: snowflake.kafka.connector):

Task-level:

  • put-records (Meter) -- records received, use rates for throughput
  • put-duration (Timer) -- put call duration, high values indicate bottlenecks
  • backpressure-rewind-count (Counter) -- rewinds due to SDK backpressure
  • assigned-partitions (Gauge) -- partitions assigned to this task

Channel-level (per partition):

  • processed-offset (Gauge) -- latest offset buffered by connector
  • persisted-in-snowflake-offset (Gauge) -- latest offset durably committed in Snowflake
  • latest-consumer-offset (Gauge) -- latest offset available from Kafka
  • channel-recovery-count (Gauge) -- channel recovery events

Alerting recommendations:

  • Ingestion lag: latest-consumer-offset - persisted-in-snowflake-offset growing
  • Backpressure: backpressure-rewind-count increasing over time
  • Channel recovery: channel-recovery-count increasing
  • Put duration: put-duration p99 exceeding acceptable threshold

Template 11: DLQ configuration (client-side validation)

# Enable client-side validation (required for DLQ)
snowflake.validation=client_side

# Continue on errors (route invalid records to DLQ instead of failing)
errors.tolerance=all

# DLQ topic name
errors.deadletterqueue.topic.name={{DLQ_TOPIC}}

# Enable error logging
errors.log.enable=true

Warning: Setting errors.tolerance=all without configuring a DLQ topic causes invalid records to be silently dropped. This can cause data loss.

Template 12: Avro with Schema Registry config (Distributed mode, JSON)

{
  "name": "{{CONNECTOR_NAME}}",
  "config": {
    "connector.class": "com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector",
    "topics": "{{TOPICS}}",
    "tasks.max": "{{TASKS_MAX}}",
    "snowflake.url.name": "{{ACCOUNT_URL}}",
    "snowflake.user.name": "{{USER}}",
    "snowflake.private.key": "{{PRIVATE_KEY_BASE64}}",
    "snowflake.database.name": "{{DATABASE}}",
    "snowflake.schema.name": "{{SCHEMA}}",
    "snowflake.role.name": "{{ROLE}}",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "{{SCHEMA_REGISTRY_URL}}",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "snowflake.streaming.validate.compatibility.with.classic": "false"
  }
}

Note: For secure Schema Registry access, also set value.converter.basic.auth.credentials.source=USER_INFO and value.converter.basic.auth.user.info={{REGISTRY_USER}}:{{REGISTRY_PASSWORD}}.

Repository
snowflakedb/snowpipe-streaming-sdk-examples
Last updated
Created

Is this your skill?

If you maintain this skill, you can claim it as your own. Once claimed, you can manage eval scenarios, bundle related skills, attach documentation or rules, and ensure cross-agent compatibility.