Architect, build, and debug Kafka Streams apps (JVM-embedded stream processing). Use when user mentions KStream, KTable, topology, TopologyTestDriver, StreamsBuilder, interactive queries, GlobalKTable, joins/windows/aggregations, or debugging issues (rebalancing, state stores, lag, deserialization errors). Also use when user wants to optimize Kafka Streams for WarpStream or tune Kafka Streams client configuration for WarpStream. Do NOT trigger for Flink, connectors, CDC, or plain producer/consumer.
75
92%
Does it follow best practices?
Impact
—
No eval scenarios have been run
Passed
No known issues
JVM-embedded stream processing library with no separate cluster.
Do NOT read all reference files upfront. Read ONLY what you need, when you need it.
references/topology-patterns.md § Joins Decision Tree onlyreferences/build-templates.md when writing build files, not beforereferences/debugging.md for that symptomNever read multiple files preemptively "just in case"
Before answering in any mode (Architect, Build, Debug), confirm the target environment if the user hasn't stated it: Apache Kafka | Confluent Platform | Confluent Cloud | WarpStream. Versions/auth shape every recommendation — KIP-1071 support, SASL config, ACL model, transactional-id expiry, CLI tool names all branch on this. Skip the question only if the user already named the environment.
If the user selects WarpStream: Read references/warpstream-optimization.md and apply its overrides on top of the standard config baseline. Key impacts for Kafka Streams:
exactly_once_v2 enables idempotent producers internally, which reduces throughput on WarpStream due to limited in-flight request concurrency. Default to at_least_once with downstream deduplication unless the user has a strong need for EOS.fetch.min.bytes is not supported — do not set it.replication.factor is cosmetic (always 3) — do not tune it.client.id with ws_az=<az> suffix is critical for cost.Determine the user's intent and enter the appropriate mode:
| User intent | Mode | What to do |
|---|---|---|
| "I need to process events from topic X..." / "Build me a KS app..." / "I want to aggregate/filter/join..." | Build | Go to Build Mode |
| "How should I design my topology?" / "Should I use a KTable or GlobalKTable?" / "What join type do I need?" / "How do I handle late events?" | Architect | Go to Architect Mode |
| "My Streams app is stuck/slow/crashing..." / "Why am I getting rebalancing loops?" / "How do I interpret this metric?" | Debug | Go to Debug Mode |
If unclear, default to Architect — understand the problem before generating code.
If user asks for generic stream processing on CC without mentioning KS, briefly offer Flink as alternative. Don't lecture.
Design the right topology. Translate user's data problem into KS primitives.
Confirm target environment first (see preamble). Then ask (skip if answered): What data (topics)? What output? Relationship between inputs (combine/enrich/group)?
Match problem to pattern (read references/topology-patterns.md only for the specific pattern needed). Present: why it fits, data flow in plain English, KS primitives involved, tradeoffs/alternatives.
When needed, read only the relevant section:
references/topology-patterns.md § Joins Decision Treereferences/topology-patterns.md § Windowing Decision Treereferences/topology-patterns.md § Enrichment Patternsreferences/topology-patterns.md § Exactly-Once (walk through before recommending — at-least-once is simpler if downstream can dedupe)After user confirms, go to Build Mode.
Generate a complete, runnable Kafka Streams project.
Ask (skip if already answered):
references/config-baseline.md when generating config; if WarpStream, also read references/warpstream-optimization.md for client overrides)references/cli-commands.md if needed)references/architecture.md or references/production-hardening.md § Deployment Sizing if needed)Present plan: topics to create (source/output/DLQ), schemas to register. Changelog/repartition topics auto-created by KS. If the user says input topics already exist, omit them from create-topics.sh — the script should only create new topics (typically output + DLQ).
Generate: project structure, schemas, App.java, TopologyBuilder.java, config, simplelogger.properties, docker-compose (if local), scripts, TopologyTest.java, .env.example, monitoring comments.
Read references only as needed:
references/topology-patterns.md for the specific patternreferences/build-templates.mdreferences/schema-patterns.mdreferences/config-baseline.md for env-specific blocksscripts/create-topics.sh, scripts/teardown.shreferences/docker-compose.mdGradle: Run gradle wrapper --gradle-version 8.12 after creating build files.
If user wants sample data: generate SampleDataProducer.java and produce task.
Trigger: User says "production"/"prod"/"deploy" or specifies K8s/ECS/Docker Swarm or requests multiple instances.
Add production components (read references/production-hardening.md for details if needed): Logback JSON logging, logback.xml, health check endpoint, Dockerfile with JVM tuning, KIP-1034 DLQ handler, K8s YAML (if K8s), shadow/fat jar plugin.
Explain topology, config choices, how to run, what to monitor. Mention group.protocol=streams (KIP-1071) provides 50-80% faster rebalancing (requires AK 4.2+/CP 8.2+).
You must actually start the app against a real broker and observe it reach RUNNING before declaring the task done. Generated code that compiles and passes TopologyTestDriver tests can still fail at startup — version-mismatch NoClassDefFoundErrors, silent logger fallbacks, missing runtime deps, and import-path errors all slip past compile + test and only surface against a real broker / Schema Registry. A green build is not a working app.
Branch on the target environment chosen in Step 1:
Local (Apache Kafka or Confluent Platform via the generated docker-compose.yml):
docker compose up -d and wait for Kafka + SR to be healthy (docker compose ps, or curl SR /subjects)./create-topics.sh./gradlew run or mvn exec:java) so you can read its logs while it runsState transition from REBALANCING to RUNNING within ~30s. If you don't see it, read the actual stack trace, diagnose via references/debugging.md § Startup Failures, fix, restart, re-verifydocker compose down (or leave running if the user wants to keep iterating — ask)Confluent Cloud: You usually cannot run end-to-end yourself because the cluster + SR API keys are the user's. Do the most you can without them, then hand off the rest:
.env has real CC creds (the user has set up a real .env): run the app locally pointed at CC (./gradlew run auto-loads .env) and follow steps 3–5 above. Don't skip just because it's CC — if you have creds, run it../gradlew build (compile + unit tests) and report the result./create-topics.sh --cloud, ./gradlew run, the consume command from references/verification.md § Confluent Cloud) and what success looks like (State transition from REBALANCING to RUNNING, records on the output topic)WarpStream: You usually cannot run end-to-end yourself because the WarpStream cluster and credentials are the user's. Follow the same approach as Confluent Cloud:
.env has real WarpStream creds: run the app locally (./gradlew run auto-loads .env) and follow the Local steps 3–5 above. Note that State transition from REBALANCING to RUNNING may take longer due to WarpStream's higher metadata latency../gradlew build (compile + unit tests) and report the result./create-topics.sh, ./gradlew run) and what success looks like (State transition from REBALANCING to RUNNING, records on the output topic)client.id with ws_az=<az> in their .env for zone-aware routingIn the handoff, state plainly which of the above you did. If you ran it and saw RUNNING, say so. If you only compiled, say only that. Don't imply a runtime verification you didn't perform.
For CC consume commands, schema-aware producers, and reset procedures, read references/verification.md.
| Symptom | Category | Go to |
|---|---|---|
| App crashes on startup | Startup failure | references/debugging.md § Startup Failures |
| App runs but no output / stops processing | Processing stall | references/debugging.md § Processing Stalls |
| Rebalancing loops / constant rebalancing | Rebalancing | references/debugging.md § Rebalancing Issues |
| High lag / slow processing | Performance | references/debugging.md § Performance |
| Deserialization errors / poison pills | Data quality | references/debugging.md § Deserialization Errors |
| State store issues (corruption, growth, recovery) | State | references/debugging.md § State Store Issues |
Thread failures / StreamsUncaughtExceptionHandler | Thread health | references/debugging.md § Thread Failures |
| Memory issues (OOM, high heap, RocksDB) | Memory | references/debugging.md § Memory Issues |
| Low throughput or KAFKA_STORAGE_ERROR on WarpStream | WarpStream config | references/warpstream-optimization.md |
Confirm target environment first (see preamble) — most debug paths branch on it. Then ask for: error message, config, KS/Java versions, new app or regression?
Read the relevant section in references/debugging.md for the identified category. Provide fix with explanation.
Non-negotiable defaults. Apply all. Read reference files only if you need implementation details.
SpecificAvroSerde, KafkaProtobufSerde, KafkaJsonSchemaSerde). Set schema.registry.url, default.key.serde, default.value.serde. JSON Schema: set json.value.type. Protobuf: set specific.protobuf.value.type (references/config-baseline.md)group.protocol=streams (default). Remove if UnsupportedVersionException. Unsupported: static membership, regex topics, standby replicas, warm-up replicas (references/topology-patterns.md § Assignment Strategy)DeserializationExceptionHandler, ProcessingExceptionHandler (KIP-1034), ProductionExceptionHandler, StreamsUncaughtExceptionHandler. Use MaxFailures pattern for uncaught (references/production-hardening.md § Error Handling)ensure.explicit.internal.resource.naming=truestreams.close(30s) on SIGTERM/SIGINTmetrics.recording.level=INFO (references/config-baseline.md)simplelogger.properties (references/build-templates.md)statestore.cache.max.bytes=0 to avoid non-deterministic assertions.java.time.Instant for timestamp-millis/timestamp-micros, LocalDate for date, BigDecimal for decimal, etc. Never use raw long/int literals with generated setter methods — use Instant.EPOCH, Instant.now(), Instant.ofEpochMilli(...). Use Instant.isAfter()/isBefore() instead of Math.max()/Math.min() for timestamp comparisons. Applies to topology code, aggregation initializers, producers, AND test helpers (references/schema-patterns.md § Java type mapping).scripts/: create-topics.sh (pre-create topics, --cloud), teardown.sh (delete topics/state, --cloud), produce-test-data.sh (generate if requested).
references/topology-patterns.md — design, joins, windows, aggregations | references/architecture.md — internals, sizing | references/debugging.md — troubleshooting | references/config-baseline.md — config | references/build-templates.md — project structure | references/schema-patterns.md — Avro/Protobuf/JSON | references/production-hardening.md — prod setup | references/cli-commands.md — CLI | references/docker-compose.md — local dev | references/verification.md — checklists | references/warpstream-optimization.md — WarpStream client config overrides
8b85616
If you maintain this skill, you can claim it as your own. Once claimed, you can manage eval scenarios, bundle related skills, attach documentation or rules, and ensure cross-agent compatibility.