CtrlK
BlogDocsLog inGet started
Tessl Logo

klingai-batch-processing

Process multiple video generation requests efficiently with Kling AI. Use when generating batches of videos or building content pipelines. Trigger with phrases like 'klingai batch', 'kling ai bulk', 'multiple videos klingai', 'klingai parallel generation'.

59

Quality

70%

Does it follow best practices?

Impact

No eval scenarios have been run

SecuritybySnyk

Passed

No known issues

Optimize this skill with Tessl

npx tessl skill review --optimize ./plugins/saas-packs/klingai-pack/skills/klingai-batch-processing/SKILL.md
SKILL.md
Quality
Evals
Security

Kling AI Batch Processing

Overview

Generate multiple videos efficiently using controlled parallelism, rate-limit-aware submission, progress tracking, and result collection. All requests go through https://api.klingai.com/v1.

Batch Submission with Rate Limiting

import jwt, time, os, requests

BASE = "https://api.klingai.com/v1"

def get_headers():
    ak, sk = os.environ["KLING_ACCESS_KEY"], os.environ["KLING_SECRET_KEY"]
    token = jwt.encode(
        {"iss": ak, "exp": int(time.time()) + 1800, "nbf": int(time.time()) - 5},
        sk, algorithm="HS256", headers={"alg": "HS256", "typ": "JWT"}
    )
    return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}

def submit_batch(prompts, model="kling-v2-master", duration="5",
                 mode="standard", max_concurrent=3, delay=2.0):
    """Submit batch with controlled concurrency and pacing."""
    tasks = []
    active = []

    for i, prompt in enumerate(prompts):
        # Wait if at concurrency limit
        while len(active) >= max_concurrent:
            active = [t for t in active if not check_complete(t["task_id"])]
            if len(active) >= max_concurrent:
                time.sleep(5)

        response = requests.post(f"{BASE}/videos/text2video", headers=get_headers(), json={
            "model_name": model,
            "prompt": prompt,
            "duration": duration,
            "mode": mode,
        })
        data = response.json()["data"]
        task = {"task_id": data["task_id"], "prompt": prompt, "index": i}
        tasks.append(task)
        active.append(task)
        print(f"[{i+1}/{len(prompts)}] Submitted: {data['task_id']}")
        time.sleep(delay)  # pace requests

    return tasks

def check_complete(task_id):
    r = requests.get(f"{BASE}/videos/text2video/{task_id}", headers=get_headers()).json()
    return r["data"]["task_status"] in ("succeed", "failed")

Collect Results

def collect_results(tasks, timeout=600):
    """Wait for all tasks and collect results."""
    results = {}
    start = time.monotonic()

    while len(results) < len(tasks) and time.monotonic() - start < timeout:
        for task in tasks:
            if task["task_id"] in results:
                continue
            r = requests.get(
                f"{BASE}/videos/text2video/{task['task_id']}", headers=get_headers()
            ).json()
            status = r["data"]["task_status"]
            if status == "succeed":
                results[task["task_id"]] = {
                    "status": "succeed",
                    "url": r["data"]["task_result"]["videos"][0]["url"],
                    "prompt": task["prompt"],
                }
            elif status == "failed":
                results[task["task_id"]] = {
                    "status": "failed",
                    "error": r["data"].get("task_status_msg", "Unknown"),
                    "prompt": task["prompt"],
                }
        if len(results) < len(tasks):
            time.sleep(15)

    return results

Async Batch with asyncio

import asyncio
import aiohttp

async def async_batch(prompts, max_concurrent=3):
    """Async batch processing with semaphore-controlled concurrency."""
    semaphore = asyncio.Semaphore(max_concurrent)
    results = {}

    async def generate_one(prompt, index):
        async with semaphore:
            async with aiohttp.ClientSession() as session:
                # Submit
                async with session.post(
                    f"{BASE}/videos/text2video",
                    headers=get_headers(),
                    json={"model_name": "kling-v2-master", "prompt": prompt,
                          "duration": "5", "mode": "standard"},
                ) as resp:
                    data = (await resp.json())["data"]
                    task_id = data["task_id"]

                # Poll
                while True:
                    await asyncio.sleep(10)
                    async with session.get(
                        f"{BASE}/videos/text2video/{task_id}",
                        headers=get_headers(),
                    ) as resp:
                        data = (await resp.json())["data"]
                        if data["task_status"] == "succeed":
                            results[index] = data["task_result"]["videos"][0]["url"]
                            return
                        elif data["task_status"] == "failed":
                            results[index] = f"FAILED: {data.get('task_status_msg')}"
                            return

    await asyncio.gather(*[generate_one(p, i) for i, p in enumerate(prompts)])
    return results

Batch with Callbacks (No Polling)

def submit_batch_with_callbacks(prompts, callback_url):
    """Submit batch with webhook callbacks -- no polling needed."""
    tasks = []
    for prompt in prompts:
        r = requests.post(f"{BASE}/videos/text2video", headers=get_headers(), json={
            "model_name": "kling-v2-master",
            "prompt": prompt,
            "duration": "5",
            "mode": "standard",
            "callback_url": callback_url,
        }).json()
        tasks.append(r["data"]["task_id"])
        time.sleep(2)  # rate limit pacing
    return tasks

Cost Estimation Before Batch

def estimate_batch_cost(count, duration=5, mode="standard", audio=False):
    credits_map = {(5, "standard"): 10, (5, "professional"): 35,
                   (10, "standard"): 20, (10, "professional"): 70}
    per_video = credits_map.get((duration, mode), 10)
    if audio:
        per_video *= 5
    total = count * per_video
    print(f"Batch: {count} videos x {per_video} credits = {total} credits")
    print(f"Estimated cost: ${total * 0.14:.2f}")
    return total

# Check before submitting
needed = estimate_batch_cost(50, duration=5, mode="standard")

Resources

  • API Reference
  • Developer Portal
Repository
jeremylongshore/claude-code-plugins-plus-skills
Last updated
Created

Is this your skill?

If you maintain this skill, you can claim it as your own. Once claimed, you can manage eval scenarios, bundle related skills, attach documentation or rules, and ensure cross-agent compatibility.