Production reference architecture for Kling AI video generation platforms. Use when designing scalable systems. Trigger with phrases like 'klingai architecture', 'kling ai system design', 'video platform architecture', 'klingai production setup'.
48
52%
Does it follow best practices?
Impact
—
No eval scenarios have been run
Passed
No known issues
Optimize this skill with Tessl
npx tessl skill review --optimize ./plugins/saas-packs/klingai-pack/skills/klingai-reference-architecture/SKILL.mdProduction architecture for video generation platforms built on Kling AI. Covers API gateway, job queue, worker pool, storage, and monitoring layers.
User Request
|
[API Gateway / Load Balancer]
|
[Application Server]
|--- validate prompt & estimate cost
|--- enqueue job to Redis/SQS
|
[Job Queue (Redis / SQS / Pub/Sub)]
|
[Worker Pool (N workers)]
|--- generate JWT token
|--- POST https://api.klingai.com/v1/videos/text2video
|--- receive task_id
|--- register callback_url OR poll
|
[Webhook Receiver / Poller]
|--- receive completion callback
|--- download video from Kling CDN
|--- upload to S3/GCS
|--- update job status in DB
|--- notify user
|
[Object Storage (S3 / GCS)]
|
[CDN (CloudFront / Cloud CDN)]
|
User views videofrom fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI()
class VideoRequest(BaseModel):
prompt: str
model: str = "kling-v2-master"
duration: int = 5
mode: str = "standard"
@app.post("/api/videos")
async def create_video(req: VideoRequest):
# 1. Validate
if len(req.prompt) > 2500:
raise HTTPException(400, "Prompt exceeds 2500 chars")
# 2. Estimate cost
credits = estimate_credits(req.duration, req.mode)
if not budget_guard.check(credits):
raise HTTPException(402, "Budget exceeded")
# 3. Enqueue
job_id = await queue.enqueue({
"prompt": req.prompt,
"model": req.model,
"duration": str(req.duration),
"mode": req.mode,
})
return {"job_id": job_id, "status": "queued", "estimated_credits": credits}import redis
import json
class VideoWorker:
def __init__(self, kling_client, storage_client, redis_url="redis://localhost"):
self.kling = kling_client
self.storage = storage_client
self.redis = redis.Redis.from_url(redis_url)
def process_loop(self):
while True:
raw = self.redis.brpop("kling:jobs:pending", timeout=5)
if not raw:
continue
job = json.loads(raw[1])
try:
# Submit to Kling API
result = self.kling.text_to_video(
job["prompt"],
model=job["model"],
duration=int(job["duration"]),
mode=job["mode"],
callback_url=os.environ.get("WEBHOOK_URL"),
)
# If using polling (no callback)
if isinstance(result, dict) and "videos" in result:
video_url = result["videos"][0]["url"]
stored_url = self.storage.download_and_upload(video_url, job["id"])
self.redis.publish("kling:events", json.dumps({
"type": "completed",
"job_id": job["id"],
"video_url": stored_url,
}))
except Exception as e:
self.redis.lpush("kling:jobs:failed", json.dumps({
**job, "error": str(e)
}))| Component | Scaling Strategy |
|---|---|
| Workers | Scale by queue depth (1 worker per 3 concurrent API tasks) |
| API servers | Horizontal, behind load balancer |
| Redis | Single instance for <1K jobs/day, cluster for more |
| Storage | S3/GCS scales automatically |
| CDN | CloudFront/Cloud CDN for global delivery |
| Tier | Max Concurrent Tasks | Workers Needed |
|---|---|---|
| Free | 1 | 1 |
| Standard | 3 | 1 |
| Pro | 5 | 2 |
| Enterprise | 10+ | 3-4 |
# docker-compose.yml
services:
api:
build: ./api
ports: ["8000:8000"]
environment:
- REDIS_URL=redis://redis:6379
- KLING_ACCESS_KEY=${KLING_ACCESS_KEY}
- KLING_SECRET_KEY=${KLING_SECRET_KEY}
worker:
build: ./worker
deploy:
replicas: 2
environment:
- REDIS_URL=redis://redis:6379
- KLING_ACCESS_KEY=${KLING_ACCESS_KEY}
- KLING_SECRET_KEY=${KLING_SECRET_KEY}
- S3_BUCKET=${S3_BUCKET}
webhook:
build: ./webhook
ports: ["8001:8001"]
environment:
- REDIS_URL=redis://redis:6379
redis:
image: redis:7-alpine
volumes: ["redis-data:/data"]
volumes:
redis-data:d41e58e
If you maintain this skill, you can claim it as your own. Once claimed, you can manage eval scenarios, bundle related skills, attach documentation or rules, and ensure cross-agent compatibility.