Production SDK patterns for Kling AI: client wrapper, retry logic, async polling, and error handling. Use when building robust integrations. Trigger with phrases like 'klingai sdk', 'kling ai client', 'klingai patterns', 'kling ai wrapper'.
80
77%
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Passed
No known issues
Optimize this skill with Tessl
npx tessl skill review --optimize ./plugins/saas-packs/klingai-pack/skills/klingai-sdk-patterns/SKILL.mdProduction-ready client patterns for the Kling AI API. Covers auto-refreshing JWT, typed request/response models, exponential backoff polling, async batch submission, and structured error handling.
import jwt
import time
import os
import requests
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class KlingConfig:
access_key: str = field(default_factory=lambda: os.environ["KLING_ACCESS_KEY"])
secret_key: str = field(default_factory=lambda: os.environ["KLING_SECRET_KEY"])
base_url: str = "https://api.klingai.com/v1"
token_buffer_sec: int = 300
poll_interval_sec: int = 10
max_poll_attempts: int = 120 # 20 minutes max
timeout_sec: int = 30
class KlingClient:
"""Production Kling AI client with auto-refreshing JWT."""
def __init__(self, config: Optional[KlingConfig] = None):
self.config = config or KlingConfig()
self._token = None
self._token_expires = 0
@property
def _headers(self) -> dict:
now = int(time.time())
if now >= (self._token_expires - self.config.token_buffer_sec):
payload = {"iss": self.config.access_key, "exp": now + 1800, "nbf": now - 5}
self._token = jwt.encode(payload, self.config.secret_key,
algorithm="HS256",
headers={"alg": "HS256", "typ": "JWT"})
self._token_expires = now + 1800
return {"Authorization": f"Bearer {self._token}",
"Content-Type": "application/json"}
def _post(self, path: str, body: dict) -> dict:
r = requests.post(f"{self.config.base_url}{path}",
headers=self._headers, json=body,
timeout=self.config.timeout_sec)
r.raise_for_status()
return r.json()
def _get(self, path: str) -> dict:
r = requests.get(f"{self.config.base_url}{path}",
headers=self._headers,
timeout=self.config.timeout_sec)
r.raise_for_status()
return r.json()
def _poll_task(self, endpoint: str, task_id: str) -> dict:
"""Poll with exponential backoff until task completes."""
interval = self.config.poll_interval_sec
for attempt in range(self.config.max_poll_attempts):
time.sleep(interval)
result = self._get(f"{endpoint}/{task_id}")
status = result["data"]["task_status"]
if status == "succeed":
return result["data"]["task_result"]
elif status == "failed":
raise KlingGenerationError(result["data"].get("task_status_msg", "Unknown"))
# Increase interval up to 30s max
interval = min(interval * 1.2, 30)
raise KlingTimeoutError(f"Task {task_id} did not complete in time")
# --- Public API ---
def text_to_video(self, prompt: str, **kwargs) -> dict:
body = {"model_name": kwargs.get("model", "kling-v2-master"),
"prompt": prompt,
"duration": str(kwargs.get("duration", 5)),
"aspect_ratio": kwargs.get("aspect_ratio", "16:9"),
"mode": kwargs.get("mode", "standard")}
if kwargs.get("negative_prompt"):
body["negative_prompt"] = kwargs["negative_prompt"]
if kwargs.get("cfg_scale") is not None:
body["cfg_scale"] = kwargs["cfg_scale"]
if kwargs.get("callback_url"):
body["callback_url"] = kwargs["callback_url"]
task = self._post("/videos/text2video", body)
task_id = task["data"]["task_id"]
if kwargs.get("wait", True):
return self._poll_task("/videos/text2video", task_id)
return {"task_id": task_id}
def image_to_video(self, image_url: str, **kwargs) -> dict:
body = {"model_name": kwargs.get("model", "kling-v2-1"),
"image": image_url,
"duration": str(kwargs.get("duration", 5)),
"mode": kwargs.get("mode", "standard")}
if kwargs.get("prompt"):
body["prompt"] = kwargs["prompt"]
task = self._post("/videos/image2video", body)
task_id = task["data"]["task_id"]
if kwargs.get("wait", True):
return self._poll_task("/videos/image2video", task_id)
return {"task_id": task_id}
def extend_video(self, task_id: str, **kwargs) -> dict:
body = {"task_id": task_id,
"prompt": kwargs.get("prompt", ""),
"duration": str(kwargs.get("duration", 5)),
"mode": kwargs.get("mode", "standard")}
result = self._post("/videos/video-extend", body)
new_task_id = result["data"]["task_id"]
if kwargs.get("wait", True):
return self._poll_task("/videos/video-extend", new_task_id)
return {"task_id": new_task_id}
class KlingError(Exception):
pass
class KlingGenerationError(KlingError):
pass
class KlingTimeoutError(KlingError):
passclient = KlingClient()
# Synchronous (waits for result)
result = client.text_to_video(
"A cat playing piano in a jazz club",
model="kling-v2-6",
mode="professional",
duration=5,
)
print(result["videos"][0]["url"])
# Fire-and-forget (returns task_id)
task = client.text_to_video("Ocean waves at sunset", wait=False)
print(f"Submitted: {task['task_id']}")import jwt from "jsonwebtoken";
class KlingClient {
#token = null;
#tokenExp = 0;
constructor(ak = process.env.KLING_ACCESS_KEY, sk = process.env.KLING_SECRET_KEY) {
this.ak = ak;
this.sk = sk;
this.base = "https://api.klingai.com/v1";
}
#getHeaders() {
const now = Math.floor(Date.now() / 1000);
if (now >= this.#tokenExp - 300) {
this.#token = jwt.sign(
{ iss: this.ak, exp: now + 1800, nbf: now - 5 },
this.sk, { algorithm: "HS256", header: { typ: "JWT" } }
);
this.#tokenExp = now + 1800;
}
return { Authorization: `Bearer ${this.#token}`, "Content-Type": "application/json" };
}
async textToVideo(prompt, opts = {}) {
const res = await fetch(`${this.base}/videos/text2video`, {
method: "POST",
headers: this.#getHeaders(),
body: JSON.stringify({
model_name: opts.model ?? "kling-v2-master",
prompt,
duration: String(opts.duration ?? 5),
aspect_ratio: opts.aspectRatio ?? "16:9",
mode: opts.mode ?? "standard",
}),
});
const { data } = await res.json();
return opts.wait === false ? data : this.#poll("/videos/text2video", data.task_id);
}
async #poll(endpoint, taskId, interval = 10000) {
for (let i = 0; i < 120; i++) {
await new Promise((r) => setTimeout(r, interval));
const res = await fetch(`${this.base}${endpoint}/${taskId}`, {
headers: this.#getHeaders(),
});
const { data } = await res.json();
if (data.task_status === "succeed") return data.task_result;
if (data.task_status === "failed") throw new Error(data.task_status_msg);
interval = Math.min(interval * 1.2, 30000);
}
throw new Error(`Timeout: task ${taskId}`);
}
}import functools
def retry_on_transient(max_retries=3, backoff_base=2):
"""Retry on 429 (rate limit) and 5xx (server) errors."""
def decorator(fn):
@functools.wraps(fn)
def wrapper(*args, **kwargs):
for attempt in range(max_retries + 1):
try:
return fn(*args, **kwargs)
except requests.HTTPError as e:
if e.response.status_code in (429, 500, 502, 503) and attempt < max_retries:
wait = backoff_base ** attempt
time.sleep(wait)
continue
raise
return wrapper
return decorator
# Apply to client methods
KlingClient._post = retry_on_transient()(KlingClient._post)3e83543
If you maintain this skill, you can claim it as your own. Once claimed, you can manage eval scenarios, bundle related skills, attach documentation or rules, and ensure cross-agent compatibility.