Set up, configure, and troubleshoot the Snowflake Kafka Connector V4 (Snowpipe Streaming high-performance architecture). Covers fresh installations, connector property configuration for default pipe and user-defined pipe modes, server-side and client-side validation, JMX monitoring, migration from V3, and common error diagnosis. Triggers: kafka connector v4, kafka connector setup, kafka connector config, kafka connector troubleshoot, snowflake kafka connector, configure kafka connector, kafka connector help, kafka streaming connector.
72
88%
Does it follow best practices?
Impact
—
No eval scenarios have been run
Passed
No known issues
Use this skill when the user wants to:
When NOT to use this skill:
custom-kafka-consumer skill)snowpipe-streaming-quickstart skill)com.snowflake.kafka.connector.SnowflakeStreamingSinkConnectorDefault pipe mode (simplest):
{tableName}-STREAMINGCREATE PIPE neededENABLE_SCHEMA_EVOLUTION=TRUE on the tableUser-defined pipe mode (flexible):
COPY INTO ... FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING')) syntaxRECORD_METADATA fields via $1:RECORD_METADATA.topic, $1:RECORD_METADATA.offset, etc.Server-side validation (default, snowflake.validation=server_side):
Client-side validation (snowflake.validation=client_side):
| Configuration | V3 Default | V4 Default |
|---|---|---|
snowflake.enable.schematization | false (RECORD_CONTENT/RECORD_METADATA VARIANT columns) | true (record fields mapped to individual columns) |
snowflake.validation | Client-side equivalent | server_side |
snowflake.compatibility.enable.autogenerated.table.name.sanitization | true equivalent (invalid chars replaced, names uppercased) | false (topic names used as-is, case preserved) |
snowflake.compatibility.enable.column.identifier.normalization | true equivalent (column names uppercased) | false (column identifiers preserve case) |
Warning about topic names with special characters: With V4 defaults (table name sanitization disabled), a topic named orders-prod creates a table literally named "orders-prod" which requires double-quoting in all SQL. To avoid this, either use snowflake.topic2table.map to map to a clean table name, or set snowflake.compatibility.enable.autogenerated.table.name.sanitization=true to replicate V3 behavior (uppercased, invalid chars replaced).
| Field | Type | Description |
|---|---|---|
topic | String | Kafka topic name |
partition | String | Kafka partition number |
offset | Number | Offset in the partition |
CreateTime / LogAppendTime | Number | Kafka record timestamp (ms since epoch) |
SnowflakeConnectorPushTime | Number | When connector buffered the record (ms since epoch) |
key | String | Kafka message key (requires StringConverter for key.converter) |
headers | Object | User-defined key-value pairs |
RECORD_METADATA adds ~150 bytes overhead per record. Set snowflake.metadata.all=false to disable if not needed.
The Rust-based Snowpipe Streaming SDK allocates off-heap (system) memory for buffering. Limit JVM heap to approximately 50% of available RAM. For example, on a worker with 8 GB RAM, set -Xmx4g. Minimum 5 MB per Kafka partition.
| Package | Tested Versions |
|---|---|
| Apache Kafka | 2.8.2, 3.7.2, 4.1.1 |
| Confluent | 6.2.15, 7.8.2, 8.2.0 |
| Java | 11+ (SE recommended) |
IMPORTANT EXECUTION GUIDELINES:
kafka_connector_v4_sql.log -- Every time you execute SQL, immediately append it to the log file with a step header and timestamp.<REDACTED> in all logs and output.Before doing anything, determine what the user needs:
If intent is unclear, ask:
What would you like help with?
- Fresh setup -- Deploy a new Kafka Connector V4
- Configure -- Help with connector properties for your use case
- Troubleshoot -- Diagnose an issue with an existing connector
- Migrate from V3 -- Upgrade from the classic connector
Goal: Verify the user has everything needed before starting.
Ask the user for (in a single prompt):
myorg-myaccount.snowflakecomputing.com)Run in parallel:
java -version 2>&1SELECT CURRENT_USER(), CURRENT_ROLE(), CURRENT_DATABASE(), CURRENT_SCHEMA(), CURRENT_WAREHOUSE(), CURRENT_ACCOUNT();Error handling:
USE WAREHOUSEGoal: Create or configure the Snowflake role and necessary grants.
Execute as one single SQL call:
-- Use a role that can create and manage roles and privileges.
USE ROLE SECURITYADMIN;
-- Create a Snowflake role for the connector.
CREATE ROLE IF NOT EXISTS {{KAFKA_ROLE}};
-- Grant privileges on the database.
GRANT USAGE ON DATABASE {{DATABASE}} TO ROLE {{KAFKA_ROLE}};
-- Grant privileges on the schema.
GRANT USAGE ON SCHEMA {{DATABASE}}.{{SCHEMA}} TO ROLE {{KAFKA_ROLE}};
-- Grant ability to create tables and pipes (for auto-creation).
GRANT CREATE TABLE ON SCHEMA {{DATABASE}}.{{SCHEMA}} TO ROLE {{KAFKA_ROLE}};
GRANT CREATE PIPE ON SCHEMA {{DATABASE}}.{{SCHEMA}} TO ROLE {{KAFKA_ROLE}};
-- If using an existing table, grant INSERT on that specific table.
-- GRANT INSERT ON TABLE {{DATABASE}}.{{SCHEMA}}.{{TABLE}} TO ROLE {{KAFKA_ROLE}};
-- If using user-defined pipes, grant OPERATE on the pipe.
-- GRANT OPERATE ON PIPE {{DATABASE}}.{{SCHEMA}}.{{PIPE}} TO ROLE {{KAFKA_ROLE}};
-- Grant the role to the connector user.
GRANT ROLE {{KAFKA_ROLE}} TO USER {{KAFKA_USER}};Note: If user already has a role, skip creation and verify grants. If ingesting into an existing table, grant INSERT on that specific table.
Goal: Generate key-pair for connector authentication.
If user already has key-pair auth, skip this step.
Single Bash call (generate key-pair and extract public key body):
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt 2>/dev/null \
&& chmod 600 rsa_key.p8 \
&& openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub 2>/dev/null \
&& echo "RSA key-pair generated successfully"Then silently read rsa_key.pub, strip the header/footer lines, and store the base64 body.
SECURITY: NEVER display any key content in chat. Use <REDACTED> in logs.
Register the public key via SQL:
ALTER USER {{KAFKA_USER}} SET RSA_PUBLIC_KEY='{{PUBLIC_KEY_BODY}}';
DESC USER {{KAFKA_USER}};Goal: Generate a ready-to-deploy connector configuration file.
Ask the user:
topic2table.map.tasks.max equal to partition count)Based on answers, generate the config file using the appropriate template from the Templates section.
Write the config file:
kafka-connector-v4-config.jsonkafka-connector-v4-config.propertiesFor the private key value, instruct the user:
Open
rsa_key.p8, remove the-----BEGIN PRIVATE KEY-----and-----END PRIVATE KEY-----header/footer lines, remove all newlines, and paste the single base64 string as the value forsnowflake.private.key. In production, use a ConfigProvider to externalize this secret (AWS KMS, Azure Key Vault, HashiCorp Vault).
If user chose user-defined pipe mode, also generate the pipe SQL (see Step 7).
Goal: Start the connector and verify data is flowing.
Provide the startup command:
Distributed mode:
curl -X POST -H "Content-Type: application/json" --data @kafka-connector-v4-config.json http://localhost:8083/connectorsStandalone mode:
<kafka_dir>/bin/connect-standalone.sh <kafka_dir>/config/connect-standalone.properties kafka-connector-v4-config.propertiesVerification steps (guide the user):
Check connector status:
curl -s http://localhost:8083/connectors/{{CONNECTOR_NAME}}/status | python3 -m json.toolWait a few seconds for data to arrive, then query the table:
SELECT COUNT(*) FROM {{DATABASE}}.{{SCHEMA}}.{{TABLE}};
SELECT * FROM {{DATABASE}}.{{SCHEMA}}.{{TABLE}} LIMIT 10;Estimate end-to-end latency (see Template 8 in Templates section).
Present success summary with row count and latency estimate.
Goal: Help users who need configuration guidance for a specific topic.
Ask what they need help with:
topic2table.map), regex patterns, many-to-one. Examples:
snowflake.topic2table.map=orders:ORDER_TABLE,customers:CUSTOMER_TABLEsnowflake.topic2table.map=topic1:SHARED_TABLE,topic2:SHARED_TABLE,topic3:SHARED_TABLEsnowflake.topic2table.map=.*_events:ALL_EVENTSsnowflake.topic2table.map="my:topic":"My_Table"ENABLE_SCHEMA_EVOLUTION=TRUE on the table. Adds new columns automatically. Server-side validation may infer wrong types; pre-create table if exact types matter.JsonConverter), Avro (AvroConverter + Schema Registry), Protobuf (ProtobufConverter). Note: StringConverter and ByteArrayConverter not supported with schematization=true.tasks.max = partition count, JVM heap ~50% RAM, same-region deployment, cache settings.errors.tolerance (none vs all), DLQ config, Error Tables for server-side.For each sub-topic, provide the relevant configuration snippet and a brief explanation.
Goal: Guide user through creating a user-defined pipe for custom transformations.
Create the destination table:
CREATE TABLE {{DATABASE}}.{{SCHEMA}}.{{TABLE}} (
-- columns matching desired output schema
);Create a pipe with the same name as the table:
CREATE PIPE {{DATABASE}}.{{SCHEMA}}.{{TABLE}} AS
COPY INTO {{DATABASE}}.{{SCHEMA}}.{{TABLE}}
FROM (
SELECT
$1:field_name::TYPE AS column_name,
$1:RECORD_METADATA.topic::STRING AS source_topic,
$1:RECORD_METADATA.offset::NUMBER AS kafka_offset
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);Grant OPERATE on the pipe:
GRANT OPERATE ON PIPE {{DATABASE}}.{{SCHEMA}}.{{TABLE}} TO ROLE {{KAFKA_ROLE}};Field accessor syntax:
$1:field_name$1['field name']$1:parent.child or $1:parent['child field']$1:RECORD_METADATA.topic, $1:RECORD_METADATA.offset, $1:RECORD_METADATA.SnowflakeConnectorPushTimeImportant: Client-side validation does NOT work with user-defined pipes. The connector will fail with ERROR_5030. Use snowflake.validation=server_side (the default). This means you cannot use both DLQ and user-defined pipes -- if you need custom transformations, use server-side validation with Error Tables for error handling instead of DLQ.
Goal: Diagnose and fix common connector issues.
Ask the user to describe their symptom, then branch:
"Unrecognized configuration" errors:
snowflake.ingestion.methodbuffer.flush.time, buffer.size.bytes, buffer.count.recordssnowflake.streaming.max.client.lag, snowflake.streaming.max.memory.limit.bytessnowflake.snowpipe.*snowflake.authenticator, snowflake.oauth.*snowflake.streaming.iceberg.enabledSnowflakeJsonConverter, etc.)"Compatibility validator" errors:
snowflake.streaming.validate.compatibility.with.classic=falsesnowflake.validation=client_sidesnowflake.compatibility.enable.column.identifier.normalization=truesnowflake.compatibility.enable.autogenerated.table.name.sanitization=truesnowflake.enable.schematization=true or falsesnowflake.streaming.classic.offset.migration=skip or best_effort or strict"Authentication failed":
snowflake.private.key is valid Base64-encoded PKCS#8 (no header/footer lines, no newlines)snowflake.private.key.passphrase"Unsupported converter" with schematization:
StringConverter and ByteArrayConverter are not supported when snowflake.enable.schematization=true (the default)JsonConverter, AvroConverter, or ProtobufConvertercurl -s http://localhost:8083/connectors/<name>/statuskafka-console-consumer --topic <topic> --bootstrap-server <server> --from-beginning --max-messages 5snowflake.cache.table.exists.expire.ms or restart the connector.Common causes:
server_side)To investigate:
-- List error tables in the schema
SHOW TABLES LIKE '%ERRORS%' IN SCHEMA {{DATABASE}}.{{SCHEMA}};
-- Query recent errors (error table is auto-created alongside the target table)
SELECT * FROM {{DATABASE}}.{{SCHEMA}."{{TABLE}}_ERRORS" ORDER BY ERROR_TIMESTAMP DESC LIMIT 20;errors.log.enable=true for verbose loggingCheck JMX metrics: latest-consumer-offset minus persisted-in-snowflake-offset
If JMX is unavailable, estimate lag from the Snowflake side using Template 8 (latency estimation query). High latency_ms values indicate data is taking a long time from connector buffering to queryability.
Fixes:
tasks.max closer to the total number of Kafka partitionsbackpressure-rewind-count is increasing, the SDK is at capacity -- scale out Kafka Connect clusterUse the deduplication query (see Template 9 in Templates section) based on RECORD_METADATA topic, partition, and offset fields.
Important: Deduplication requires snowflake.metadata.topic and snowflake.metadata.offset.and.partition to be enabled (they are by default).
Server-side validation can't always infer the correct data type (e.g., interprets "2026-04-13" as DATE not TEXT, can't infer binary columns).
Fixes:
snowflake.validation=client_side) for better type inferencePossible causes:
Review connector logs for specific recovery reasons. Reduce SDK log noise with export SS_LOG_LEVEL=warn.
The V4 connector uses a Rust-based SDK that allocates off-heap (system) memory outside the JVM heap. OOM can occur at both levels:
-Xmx4g on 8 GB machine). The connector itself needs less heap than V3.If a task fails and Kafka Connect auto-restarts it in a loop:
curl -s http://localhost:8083/connectors/<name>/statustrace field in failed tasks for the stack traceenable.task.fail.on.authorization.errors=true to fail fast on auth issues rather than retry indefinitelyIf the connector cannot reach Snowflake through a corporate proxy:
jvm.proxy.host={{PROXY_HOST}}
jvm.proxy.port={{PROXY_PORT}}
jvm.proxy.username={{PROXY_USER}}
jvm.proxy.password={{PROXY_PASSWORD}}If the sdk-client-count JMX metric grows continuously, there may be a client leak. Each distinct target table should have one SDK client. If the count exceeds the number of distinct tables, contact Snowflake Support.
Goal: Generate a migration-safe V4 configuration from an existing V3 deployment.
Ask the user:
snowflake.ingestion.method=SNOWPIPE) or Snowpipe Streaming mode (SNOWPIPE_STREAMING)?snowflake.streaming.channel.name.include.connector.name=true?Based on answers, generate the migration config:
From V3 Snowpipe mode: Use Template 6 (offset migration = skip)
From V3 Snowpipe Streaming mode: Use Template 7 (offset migration = best_effort or strict)
Migration procedure:
offsets.retention.minutes default). After 7 days, Kafka consumer group offsets expire.If your V3 connector uses OAuth authentication, you must switch to key-pair auth before migrating. V4 does not support OAuth. Generate an RSA key-pair (see Step 3), register the public key on the Snowflake user, and update the connector config to use snowflake.private.key instead of snowflake.oauth.* properties.
If using Snowflake custom converters in V3, replace them with community equivalents:
SnowflakeJsonConverter --> org.apache.kafka.connect.json.JsonConverterSnowflakeAvroConverter --> io.confluent.connect.avro.AvroConverterSnowflakeAvroConverterWithoutSchemaRegistry --> io.confluent.connect.avro.AvroConverterSnowflakeProtobufConverter --> io.confluent.connect.protobuf.ProtobufConverterDowngrade path: Reverse migration is possible but expect duplicates. Use the deduplication query (Template 9) to clean up.
<REDACTED> when logging SQL containing key material. In production, use a ConfigProvider (AWS KMS, Azure Key Vault, HashiCorp Vault) to externalize secrets.tasks.max equal to the number of Kafka partitions for optimal throughput. Do not exceed the number of CPU cores across the Kafka Connect cluster.snowflake.streaming.validate.compatibility.with.classic=false.ENABLE_SCHEMA_EVOLUTION=TRUE on tables if your Kafka record schema may change over time. All connector-created tables have this enabled by default.snowflake.metadata.all=false if you don't need RECORD_METADATA (~150 bytes savings per record).snowflake.cache.table.exists=false and snowflake.cache.pipe.exists=false to avoid cache staleness. Use default cache settings (5 min) in production.enable.mdc.logging=true) when running multiple connector instances to correlate log entries.export SS_LOG_LEVEL=warn on Kafka Connect workers.Sample user queries and how the skill responds:
{
"name": "{{CONNECTOR_NAME}}",
"config": {
"connector.class": "com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector",
"topics": "{{TOPICS}}",
"tasks.max": "{{TASKS_MAX}}",
"snowflake.url.name": "{{ACCOUNT_URL}}",
"snowflake.user.name": "{{USER}}",
"snowflake.private.key": "{{PRIVATE_KEY_BASE64}}",
"snowflake.database.name": "{{DATABASE}}",
"snowflake.schema.name": "{{SCHEMA}}",
"snowflake.role.name": "{{ROLE}}",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"snowflake.streaming.validate.compatibility.with.classic": "false"
}
}name={{CONNECTOR_NAME}}
connector.class=com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector
topics={{TOPICS}}
tasks.max={{TASKS_MAX}}
snowflake.url.name={{ACCOUNT_URL}}
snowflake.user.name={{USER}}
snowflake.private.key={{PRIVATE_KEY_BASE64}}
snowflake.database.name={{DATABASE}}
snowflake.schema.name={{SCHEMA}}
snowflake.role.name={{ROLE}}
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
snowflake.streaming.validate.compatibility.with.classic=false{
"name": "{{CONNECTOR_NAME}}",
"config": {
"connector.class": "com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector",
"topics": "{{TOPICS}}",
"tasks.max": "{{TASKS_MAX}}",
"snowflake.url.name": "{{ACCOUNT_URL}}",
"snowflake.user.name": "{{USER}}",
"snowflake.private.key": "{{PRIVATE_KEY_BASE64}}",
"snowflake.database.name": "{{DATABASE}}",
"snowflake.schema.name": "{{SCHEMA}}",
"snowflake.role.name": "{{ROLE}}",
"snowflake.topic2table.map": "{{TOPIC_TABLE_MAP}}",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"snowflake.enable.schematization": "true",
"snowflake.validation": "server_side",
"errors.tolerance": "none",
"errors.log.enable": "true",
"snowflake.metadata.all": "true",
"jmx": "true",
"enable.mdc.logging": "false",
"snowflake.streaming.validate.compatibility.with.classic": "false"
}
}-- Use a role that can create and manage roles and privileges.
USE ROLE SECURITYADMIN;
-- Create a Snowflake role for the connector.
CREATE ROLE IF NOT EXISTS {{KAFKA_ROLE}};
-- Grant privileges on the database.
GRANT USAGE ON DATABASE {{DATABASE}} TO ROLE {{KAFKA_ROLE}};
-- Grant privileges on the schema.
GRANT USAGE ON SCHEMA {{DATABASE}}.{{SCHEMA}} TO ROLE {{KAFKA_ROLE}};
-- Grant ability to create tables and pipes (for auto-creation in default pipe mode).
GRANT CREATE TABLE ON SCHEMA {{DATABASE}}.{{SCHEMA}} TO ROLE {{KAFKA_ROLE}};
GRANT CREATE PIPE ON SCHEMA {{DATABASE}}.{{SCHEMA}} TO ROLE {{KAFKA_ROLE}};
-- Recommended for future Error Table features.
GRANT CREATE VIEW ON SCHEMA {{DATABASE}}.{{SCHEMA}} TO ROLE {{KAFKA_ROLE}};
-- If using an existing table, grant INSERT.
GRANT INSERT ON TABLE {{DATABASE}}.{{SCHEMA}}.{{TABLE}} TO ROLE {{KAFKA_ROLE}};
-- If using user-defined pipes, grant OPERATE on the pipe.
-- GRANT OPERATE ON PIPE {{DATABASE}}.{{SCHEMA}}.{{PIPE}} TO ROLE {{KAFKA_ROLE}};
-- If ingesting into Iceberg tables, grant USAGE on the external volume.
-- GRANT USAGE ON EXTERNAL VOLUME {{EXTERNAL_VOLUME}} TO ROLE {{KAFKA_ROLE}};
-- Grant the role to the connector user.
GRANT ROLE {{KAFKA_ROLE}} TO USER {{KAFKA_USER}};-- Create the destination table.
CREATE TABLE {{DATABASE}}.{{SCHEMA}}.{{TABLE}} (
order_id NUMBER,
customer_name VARCHAR,
order_total NUMBER,
source_topic VARCHAR,
kafka_offset NUMBER
);
-- Create a pipe with the SAME NAME as the table.
CREATE PIPE {{DATABASE}}.{{SCHEMA}}.{{TABLE}} AS
COPY INTO {{DATABASE}}.{{SCHEMA}}.{{TABLE}}
FROM (
SELECT
$1:order_id::NUMBER,
$1:customer_name,
$1:order_total::NUMBER,
$1:RECORD_METADATA.topic::STRING AS source_topic,
$1:RECORD_METADATA.offset::NUMBER AS kafka_offset
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
-- Grant OPERATE on the pipe to the connector role.
GRANT OPERATE ON PIPE {{DATABASE}}.{{SCHEMA}}.{{TABLE}} TO ROLE {{KAFKA_ROLE}};# Required: new connector class
connector.class=com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector
# Your existing connection settings
topics={{TOPICS}}
tasks.max={{TASKS_MAX}}
snowflake.url.name={{ACCOUNT_URL}}
snowflake.user.name={{USER}}
snowflake.private.key={{PRIVATE_KEY_BASE64}}
snowflake.database.name={{DATABASE}}
snowflake.schema.name={{SCHEMA}}
snowflake.role.name={{ROLE}}
# Community converter (replaces Snowflake converters)
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
# Compatibility flags to reproduce V3 behavior
snowflake.validation=client_side
snowflake.compatibility.enable.autogenerated.table.name.sanitization=true
snowflake.compatibility.enable.column.identifier.normalization=true
# Set to 'false' to keep V3 RECORD_CONTENT/RECORD_METADATA columns,
# or 'true' to adopt V4 schematized columns (recommended for new tables).
snowflake.enable.schematization=false
# Offset migration: skip SSv1 migration (not needed for Snowpipe mode)
snowflake.streaming.classic.offset.migration=skip
# Disable compatibility validator after confirming config
snowflake.streaming.validate.compatibility.with.classic=false# Required: new connector class
connector.class=com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector
# Your existing connection settings
topics={{TOPICS}}
tasks.max={{TASKS_MAX}}
snowflake.url.name={{ACCOUNT_URL}}
snowflake.user.name={{USER}}
snowflake.private.key={{PRIVATE_KEY_BASE64}}
snowflake.database.name={{DATABASE}}
snowflake.schema.name={{SCHEMA}}
snowflake.role.name={{ROLE}}
# Community converter (replaces Snowflake converters)
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
# Compatibility flags to reproduce V3 behavior
snowflake.validation=client_side
snowflake.compatibility.enable.autogenerated.table.name.sanitization=true
snowflake.compatibility.enable.column.identifier.normalization=true
# Set to 'false' to keep V3 RECORD_CONTENT/RECORD_METADATA columns,
# or 'true' to adopt V4 schematized columns (recommended for new tables).
snowflake.enable.schematization=false
# Offset migration: recover offsets from V3 SSv1 channels
# Use 'strict' to fail if SSv1 channels aren't found,
# or 'best_effort' to fall back to Kafka consumer group offsets.
snowflake.streaming.classic.offset.migration=best_effort
# Must match your V3 snowflake.streaming.channel.name.include.connector.name setting
snowflake.streaming.classic.offset.migration.include.connector.name={{MATCH_V3_SETTING}}
# Disable compatibility validator after confirming config
snowflake.streaming.validate.compatibility.with.classic=falseSELECT
RECORD_METADATA:topic::STRING AS topic,
RECORD_METADATA:partition::NUMBER AS partition,
RECORD_METADATA:offset::NUMBER AS offset,
TIMESTAMPDIFF('millisecond',
TO_TIMESTAMP(RECORD_METADATA:SnowflakeConnectorPushTime::BIGINT, 3),
CONVERT_TIMEZONE('UTC', CURRENT_TIMESTAMP())
) AS latency_ms
FROM {{DATABASE}}.{{SCHEMA}}.{{TABLE}}
ORDER BY latency_ms DESC
LIMIT 10;Important: Test this query on a small dataset first. The approach uses an IN subquery on (topic, partition, offset) tuples to identify groups with duplicates. For large tables, consider using INSERT INTO ... SELECT DISTINCT into a new table as a safer alternative.
DELETE FROM {{DATABASE}}.{{SCHEMA}}.{{TABLE}}
WHERE RECORD_METADATA IS NOT NULL
AND (RECORD_METADATA:topic, RECORD_METADATA:partition, RECORD_METADATA:offset)
IN (
SELECT RECORD_METADATA:topic, RECORD_METADATA:partition, RECORD_METADATA:offset
FROM (
SELECT RECORD_METADATA,
ROW_NUMBER() OVER (
PARTITION BY RECORD_METADATA:topic, RECORD_METADATA:partition, RECORD_METADATA:offset
ORDER BY RECORD_METADATA:offset
) AS rn
FROM {{DATABASE}}.{{SCHEMA}}.{{TABLE}}
WHERE RECORD_METADATA IS NOT NULL
)
WHERE rn > 1
);Add these JVM properties to your Kafka Connect worker startup:
KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port={{JMX_PORT}} -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"Key MBeans to monitor (domain: snowflake.kafka.connector):
Task-level:
put-records (Meter) -- records received, use rates for throughputput-duration (Timer) -- put call duration, high values indicate bottlenecksbackpressure-rewind-count (Counter) -- rewinds due to SDK backpressureassigned-partitions (Gauge) -- partitions assigned to this taskChannel-level (per partition):
processed-offset (Gauge) -- latest offset buffered by connectorpersisted-in-snowflake-offset (Gauge) -- latest offset durably committed in Snowflakelatest-consumer-offset (Gauge) -- latest offset available from Kafkachannel-recovery-count (Gauge) -- channel recovery eventsAlerting recommendations:
latest-consumer-offset - persisted-in-snowflake-offset growingbackpressure-rewind-count increasing over timechannel-recovery-count increasingput-duration p99 exceeding acceptable threshold# Enable client-side validation (required for DLQ)
snowflake.validation=client_side
# Continue on errors (route invalid records to DLQ instead of failing)
errors.tolerance=all
# DLQ topic name
errors.deadletterqueue.topic.name={{DLQ_TOPIC}}
# Enable error logging
errors.log.enable=trueWarning: Setting errors.tolerance=all without configuring a DLQ topic causes invalid records to be silently dropped. This can cause data loss.
{
"name": "{{CONNECTOR_NAME}}",
"config": {
"connector.class": "com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector",
"topics": "{{TOPICS}}",
"tasks.max": "{{TASKS_MAX}}",
"snowflake.url.name": "{{ACCOUNT_URL}}",
"snowflake.user.name": "{{USER}}",
"snowflake.private.key": "{{PRIVATE_KEY_BASE64}}",
"snowflake.database.name": "{{DATABASE}}",
"snowflake.schema.name": "{{SCHEMA}}",
"snowflake.role.name": "{{ROLE}}",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "{{SCHEMA_REGISTRY_URL}}",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"snowflake.streaming.validate.compatibility.with.classic": "false"
}
}Note: For secure Schema Registry access, also set value.converter.basic.auth.credentials.source=USER_INFO and value.converter.basic.auth.user.info={{REGISTRY_USER}}:{{REGISTRY_PASSWORD}}.
25dfdc4
If you maintain this skill, you can claim it as your own. Once claimed, you can manage eval scenarios, bundle related skills, attach documentation or rules, and ensure cross-agent compatibility.