CtrlK
BlogDocsLog inGet started
Tessl Logo

klingai-async-workflows

Build async video generation workflows with Kling AI using queues, state machines, and event-driven patterns. Trigger with phrases like 'klingai async', 'kling ai workflow', 'klingai pipeline', 'async video generation'.

59

Quality

70%

Does it follow best practices?

Impact

No eval scenarios have been run

SecuritybySnyk

Passed

No known issues

Optimize this skill with Tessl

npx tessl skill review --optimize ./plugins/saas-packs/klingai-pack/skills/klingai-async-workflows/SKILL.md
SKILL.md
Quality
Evals
Security

Kling AI Async Workflows

Overview

Kling AI video generation is inherently async: you submit a task, then poll or receive a callback when done. This skill covers production patterns for integrating this into larger systems using queues, state machines, and event-driven architectures.

Core Pattern: Submit + Callback

import jwt, time, os, requests

BASE = "https://api.klingai.com/v1"

def get_headers():
    ak, sk = os.environ["KLING_ACCESS_KEY"], os.environ["KLING_SECRET_KEY"]
    token = jwt.encode(
        {"iss": ak, "exp": int(time.time()) + 1800, "nbf": int(time.time()) - 5},
        sk, algorithm="HS256", headers={"alg": "HS256", "typ": "JWT"}
    )
    return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}

def submit_async(prompt, callback_url=None, **kwargs):
    """Submit task and return immediately."""
    body = {
        "model_name": kwargs.get("model", "kling-v2-master"),
        "prompt": prompt,
        "duration": str(kwargs.get("duration", 5)),
        "mode": kwargs.get("mode", "standard"),
    }
    if callback_url:
        body["callback_url"] = callback_url

    r = requests.post(f"{BASE}/videos/text2video", headers=get_headers(), json=body)
    return r.json()["data"]["task_id"]

Redis Queue Workflow

import redis
import json

r = redis.Redis()

# Producer: enqueue video generation requests
def enqueue_video_job(prompt, metadata=None):
    job = {
        "id": f"job_{int(time.time() * 1000)}",
        "prompt": prompt,
        "metadata": metadata or {},
        "status": "queued",
        "created_at": time.time(),
    }
    r.lpush("kling:jobs:pending", json.dumps(job))
    return job["id"]

# Worker: process jobs from queue
def process_jobs(max_concurrent=3):
    active_tasks = {}

    while True:
        # Submit new jobs if under concurrency limit
        while len(active_tasks) < max_concurrent:
            raw = r.rpop("kling:jobs:pending")
            if not raw:
                break
            job = json.loads(raw)
            task_id = submit_async(job["prompt"])
            active_tasks[task_id] = job
            r.hset("kling:jobs:active", task_id, json.dumps(job))

        # Check active tasks
        completed = []
        for task_id, job in active_tasks.items():
            result = requests.get(
                f"{BASE}/videos/text2video/{task_id}", headers=get_headers()
            ).json()
            status = result["data"]["task_status"]

            if status == "succeed":
                job["status"] = "completed"
                job["video_url"] = result["data"]["task_result"]["videos"][0]["url"]
                r.lpush("kling:jobs:completed", json.dumps(job))
                completed.append(task_id)
            elif status == "failed":
                job["status"] = "failed"
                job["error"] = result["data"].get("task_status_msg")
                r.lpush("kling:jobs:failed", json.dumps(job))
                completed.append(task_id)

        for tid in completed:
            active_tasks.pop(tid)
            r.hdel("kling:jobs:active", tid)

        time.sleep(10)

State Machine Pattern

from enum import Enum
from dataclasses import dataclass, field
from typing import Optional

class JobState(Enum):
    QUEUED = "queued"
    SUBMITTING = "submitting"
    PROCESSING = "processing"
    DOWNLOADING = "downloading"
    COMPLETED = "completed"
    FAILED = "failed"
    RETRYING = "retrying"

@dataclass
class VideoJob:
    prompt: str
    state: JobState = JobState.QUEUED
    task_id: Optional[str] = None
    video_url: Optional[str] = None
    error: Optional[str] = None
    attempts: int = 0
    max_attempts: int = 3

    def can_retry(self) -> bool:
        return self.state == JobState.FAILED and self.attempts < self.max_attempts

    def transition(self, new_state: JobState):
        valid = {
            JobState.QUEUED: {JobState.SUBMITTING},
            JobState.SUBMITTING: {JobState.PROCESSING, JobState.FAILED},
            JobState.PROCESSING: {JobState.DOWNLOADING, JobState.FAILED},
            JobState.DOWNLOADING: {JobState.COMPLETED, JobState.FAILED},
            JobState.FAILED: {JobState.RETRYING},
            JobState.RETRYING: {JobState.SUBMITTING},
        }
        if new_state not in valid.get(self.state, set()):
            raise ValueError(f"Invalid transition: {self.state} -> {new_state}")
        self.state = new_state

Multi-Step Pipeline

async def video_pipeline(prompt, steps=None):
    """Chain: generate -> extend -> download -> upload."""
    steps = steps or ["generate", "extend", "download"]

    # Step 1: Generate
    task_id = submit_async(prompt, duration=5)
    result = poll_task("/videos/text2video", task_id)  # from job-monitoring skill
    video_url = result["videos"][0]["url"]

    # Step 2: Extend (optional)
    if "extend" in steps:
        ext_r = requests.post(f"{BASE}/videos/video-extend", headers=get_headers(), json={
            "task_id": task_id,
            "prompt": f"Continue: {prompt}",
            "duration": "5",
        }).json()
        ext_result = poll_task("/videos/video-extend", ext_r["data"]["task_id"])
        video_url = ext_result["videos"][0]["url"]

    # Step 3: Download
    if "download" in steps:
        video_data = requests.get(video_url).content
        filepath = f"output/{task_id}.mp4"
        with open(filepath, "wb") as f:
            f.write(video_data)
        return filepath

    return video_url

Event-Driven with Webhook

# Use callback_url to avoid polling entirely
task_id = submit_async(
    "Sunset over ocean with sailboats",
    callback_url="https://your-app.com/webhooks/kling"
)

# Your webhook handler triggers next pipeline step
# See klingai-webhook-config skill for receiver implementation

Resources

  • API Reference
  • Developer Portal
Repository
jeremylongshore/claude-code-plugins-plus-skills
Last updated
Created

Is this your skill?

If you maintain this skill, you can claim it as your own. Once claimed, you can manage eval scenarios, bundle related skills, attach documentation or rules, and ensure cross-agent compatibility.