CtrlK
BlogDocsLog inGet started
Tessl Logo

klingai-job-monitoring

Track and monitor Kling AI video generation task status. Use when building dashboards, tracking batch jobs, or debugging stuck tasks. Trigger with phrases like 'klingai job status', 'kling ai monitor', 'track klingai task', 'klingai progress'.

80

Quality

77%

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Passed

No known issues

Optimize this skill with Tessl

npx tessl skill review --optimize ./plugins/saas-packs/klingai-pack/skills/klingai-job-monitoring/SKILL.md
SKILL.md
Quality
Evals
Security

Kling AI Job Monitoring

Overview

Every Kling AI generation returns a task_id. This skill covers polling strategies, batch tracking, timeout handling, and callback-based monitoring for the /v1/videos/text2video, /v1/videos/image2video, and /v1/videos/video-extend endpoints.

Task Lifecycle

StatusMeaningTypical Duration
submittedQueued for processing0-30s
processingGeneration in progress30-120s (standard), 60-300s (professional)
succeedComplete, video URL availableTerminal
failedGeneration failedTerminal

Polling a Single Task

import jwt, time, os, requests

BASE = "https://api.klingai.com/v1"

def get_headers():
    ak, sk = os.environ["KLING_ACCESS_KEY"], os.environ["KLING_SECRET_KEY"]
    token = jwt.encode(
        {"iss": ak, "exp": int(time.time()) + 1800, "nbf": int(time.time()) - 5},
        sk, algorithm="HS256", headers={"alg": "HS256", "typ": "JWT"}
    )
    return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}

def poll_task(endpoint: str, task_id: str, interval: int = 10, timeout: int = 600):
    """Poll with adaptive interval and timeout."""
    start = time.monotonic()
    attempts = 0
    while time.monotonic() - start < timeout:
        time.sleep(interval)
        attempts += 1
        r = requests.get(f"{BASE}{endpoint}/{task_id}", headers=get_headers(), timeout=30)
        data = r.json()["data"]
        status = data["task_status"]
        elapsed = int(time.monotonic() - start)
        print(f"[{elapsed}s] Poll #{attempts}: {status}")

        if status == "succeed":
            return data["task_result"]
        elif status == "failed":
            raise RuntimeError(f"Task failed: {data.get('task_status_msg', 'unknown')}")

        if attempts > 5:
            interval = min(interval * 1.2, 30)
    raise TimeoutError(f"Task {task_id} timed out after {timeout}s")

Batch Job Tracker

from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional

@dataclass
class TrackedTask:
    task_id: str
    endpoint: str
    prompt: str
    status: str = "submitted"
    created_at: float = field(default_factory=time.time)
    result_url: Optional[str] = None
    error_msg: Optional[str] = None

class BatchTracker:
    def __init__(self):
        self.tasks: dict[str, TrackedTask] = {}

    def add(self, task_id, endpoint, prompt):
        self.tasks[task_id] = TrackedTask(task_id=task_id, endpoint=endpoint, prompt=prompt)

    def update_all(self):
        active = [t for t in self.tasks.values() if t.status in ("submitted", "processing")]
        for task in active:
            try:
                r = requests.get(
                    f"{BASE}{task.endpoint}/{task.task_id}",
                    headers=get_headers(), timeout=30
                ).json()
                data = r["data"]
                task.status = data["task_status"]
                if task.status == "succeed":
                    task.result_url = data["task_result"]["videos"][0]["url"]
                elif task.status == "failed":
                    task.error_msg = data.get("task_status_msg")
            except Exception as e:
                print(f"Error polling {task.task_id}: {e}")

    def print_report(self):
        by_status = {}
        for t in self.tasks.values():
            by_status.setdefault(t.status, 0)
            by_status[t.status] += 1
        active = sum(v for k, v in by_status.items() if k in ("submitted", "processing"))
        print(f"\n=== Batch: {len(self.tasks)} tasks, {active} active ===")
        for status, count in sorted(by_status.items()):
            print(f"  {status}: {count}")

Stuck Task Detection

def detect_stuck(tracker: BatchTracker, threshold_sec: int = 600):
    """Flag tasks processing longer than threshold."""
    now = time.time()
    stuck = []
    for t in tracker.tasks.values():
        if t.status in ("submitted", "processing"):
            elapsed = now - t.created_at
            if elapsed > threshold_sec:
                stuck.append((t.task_id, int(elapsed)))
    if stuck:
        print(f"WARNING: {len(stuck)} stuck tasks:")
        for tid, secs in stuck:
            print(f"  {tid}: {secs}s")
    return stuck

Batch Monitor Loop

tracker = BatchTracker()

# Submit batch
for prompt in prompts:
    r = requests.post(f"{BASE}/videos/text2video", headers=get_headers(), json={
        "model_name": "kling-v2-master", "prompt": prompt, "duration": "5"
    }).json()
    tracker.add(r["data"]["task_id"], "/videos/text2video", prompt)

# Monitor until all complete
while any(t.status in ("submitted", "processing") for t in tracker.tasks.values()):
    time.sleep(15)
    tracker.update_all()
    tracker.print_report()
    detect_stuck(tracker)

Resources

  • Task Query API
  • Developer Portal
Repository
jeremylongshore/claude-code-plugins-plus-skills
Last updated
Created

Is this your skill?

If you maintain this skill, you can claim it as your own. Once claimed, you can manage eval scenarios, bundle related skills, attach documentation or rules, and ensure cross-agent compatibility.