CtrlK
BlogDocsLog inGet started
Tessl Logo

pantheon-ai/k8s-yaml-validator

Comprehensive toolkit for validating, linting, and testing Kubernetes YAML resources. Use this skill when validating Kubernetes manifests, debugging YAML syntax errors, performing dry-run tests on clusters, or working with Custom Resource Definitions (CRDs) that require documentation lookup.

Overall
score

93%

Does it follow best practices?

Validation for skill structure

Overview
Skills
Evals
Files

detect_crd.pyscripts/

#!/usr/bin/env python3
"""
Detect Custom Resource Definitions (CRDs) in Kubernetes YAML files.
Extracts kind, apiVersion, and group information for CRD documentation lookup.

This script is resilient to syntax errors in individual documents within
multi-document YAML files. It will parse valid documents and report errors
for invalid ones, allowing CRD detection to proceed for parseable resources.
"""

import json
import re
import sys
from pathlib import Path

try:
    import yaml
except ImportError:
    print("Error: PyYAML is not installed. Please run: pip install pyyaml", file=sys.stderr)
    print("Or use the wrapper script: bash scripts/detect_crd_wrapper.sh", file=sys.stderr)
    sys.exit(1)


def split_yaml_documents(content):
    """
    Split YAML content into individual documents.
    Handles document separators (---) properly.
    """
    # Split on document separator, keeping track of line numbers
    documents = []
    current_doc = []
    current_start_line = 1
    line_num = 0

    for line in content.split('\n'):
        line_num += 1
        if line.strip() == '---':
            if current_doc:
                doc_content = '\n'.join(current_doc)
                if doc_content.strip():  # Only add non-empty documents
                    documents.append({
                        'content': doc_content,
                        'start_line': current_start_line
                    })
            current_doc = []
            current_start_line = line_num + 1
        else:
            current_doc.append(line)

    # Don't forget the last document
    if current_doc:
        doc_content = '\n'.join(current_doc)
        if doc_content.strip():
            documents.append({
                'content': doc_content,
                'start_line': current_start_line
            })

    return documents


def parse_yaml_file(file_path):
    """
    Parse a YAML file that may contain multiple documents.

    This function is resilient to syntax errors in individual documents.
    It parses each document separately and continues even if some fail,
    matching the behavior of kubeconform which can validate 2/3 resources
    even when 1/3 has syntax errors.

    Returns:
        tuple: (list of parsed documents, list of parse errors)
    """
    try:
        with open(file_path, 'r') as f:
            content = f.read()
    except Exception as e:
        print(f"Error reading file: {e}", file=sys.stderr)
        return [], [{'error': str(e), 'document': 0}]

    # First, try parsing the entire file at once (fast path)
    try:
        documents = list(yaml.safe_load_all(content))
        return documents, []
    except yaml.YAMLError:
        # If full parsing fails, try document-by-document parsing
        pass

    # Split into individual documents and parse each separately
    doc_parts = split_yaml_documents(content)
    documents = []
    errors = []

    for i, doc_info in enumerate(doc_parts, 1):
        try:
            parsed = yaml.safe_load(doc_info['content'])
            if parsed is not None:
                documents.append(parsed)
        except yaml.YAMLError as e:
            error_msg = str(e)
            # Extract line number from error if available
            line_match = re.search(r'line (\d+)', error_msg)
            error_line = doc_info['start_line']
            if line_match:
                error_line = doc_info['start_line'] + int(line_match.group(1)) - 1

            errors.append({
                'document': i,
                'start_line': doc_info['start_line'],
                'error_line': error_line,
                'error': error_msg
            })
            print(f"Warning: Document {i} (starting at line {doc_info['start_line']}) has syntax errors: {error_msg}", file=sys.stderr)

    if errors:
        print(f"Parsed {len(documents)} of {len(doc_parts)} documents successfully. {len(errors)} document(s) had errors.", file=sys.stderr)

    return documents, errors


def is_standard_k8s_resource(api_version, kind):
    """Check if a resource is a standard Kubernetes resource."""
    standard_groups = {
        # Core API group
        'v1': True,
        # Apps group
        'apps/v1': True,
        # Batch group
        'batch/v1': True,
        'batch/v1beta1': True,
        # Networking group
        'networking.k8s.io/v1': True,
        'networking.k8s.io/v1beta1': True,
        # Policy group
        'policy/v1': True,
        'policy/v1beta1': True,
        # RBAC group
        'rbac.authorization.k8s.io/v1': True,
        'rbac.authorization.k8s.io/v1beta1': True,
        # Storage group
        'storage.k8s.io/v1': True,
        'storage.k8s.io/v1beta1': True,
        # Autoscaling group
        'autoscaling/v1': True,
        'autoscaling/v2': True,
        'autoscaling/v2beta1': True,
        'autoscaling/v2beta2': True,
        # API Extensions group (for CRD definitions themselves)
        'apiextensions.k8s.io/v1': True,
        'apiextensions.k8s.io/v1beta1': True,
        # Certificates group
        'certificates.k8s.io/v1': True,
        'certificates.k8s.io/v1beta1': True,
        # Admission Registration group
        'admissionregistration.k8s.io/v1': True,
        'admissionregistration.k8s.io/v1beta1': True,
        # Coordination group (Leases)
        'coordination.k8s.io/v1': True,
        # Discovery group (EndpointSlices)
        'discovery.k8s.io/v1': True,
        'discovery.k8s.io/v1beta1': True,
        # Events group
        'events.k8s.io/v1': True,
        'events.k8s.io/v1beta1': True,
        # Flow Control group (v1 is GA since K8s 1.29, v1beta3 deprecated in 1.32)
        'flowcontrol.apiserver.k8s.io/v1': True,
        'flowcontrol.apiserver.k8s.io/v1beta1': True,
        'flowcontrol.apiserver.k8s.io/v1beta2': True,
        'flowcontrol.apiserver.k8s.io/v1beta3': True,
        # Storage Migration group (K8s 1.30+)
        'storagemigration.k8s.io/v1alpha1': True,
        # Node group (RuntimeClass)
        'node.k8s.io/v1': True,
        'node.k8s.io/v1beta1': True,
        # Scheduling group (PriorityClass)
        'scheduling.k8s.io/v1': True,
        'scheduling.k8s.io/v1beta1': True,
        # Snapshot Storage group (VolumeSnapshots)
        'snapshot.storage.k8s.io/v1': True,
        'snapshot.storage.k8s.io/v1beta1': True,
        # Networking alpha (AdminNetworkPolicy - K8s 1.30+)
        'networking.k8s.io/v1alpha1': True,
        # Certificates alpha (ClusterTrustBundle - K8s 1.30+)
        'certificates.k8s.io/v1alpha1': True,
        # Resource group (ResourceClaims - K8s 1.26+)
        'resource.k8s.io/v1alpha2': True,
        'resource.k8s.io/v1alpha3': True,
        # Internal API Server group
        'internal.apiserver.k8s.io/v1alpha1': True,
        # API Registration group
        'apiregistration.k8s.io/v1': True,
        'apiregistration.k8s.io/v1beta1': True,
        # Authentication group
        'authentication.k8s.io/v1': True,
        'authentication.k8s.io/v1beta1': True,
        # Authorization group
        'authorization.k8s.io/v1': True,
        'authorization.k8s.io/v1beta1': True,
    }

    # Check if it's a standard group
    return api_version in standard_groups


def extract_resource_info(doc):
    """Extract resource information from a Kubernetes resource document."""
    if not doc or not isinstance(doc, dict):
        return None

    kind = doc.get('kind')
    api_version = doc.get('apiVersion')

    if not kind or not api_version:
        return None

    # Extract group from apiVersion (e.g., "cert-manager.io/v1" -> "cert-manager.io")
    group = api_version.split('/')[0] if '/' in api_version else 'core'
    version = api_version.split('/')[-1]

    is_crd = not is_standard_k8s_resource(api_version, kind)

    return {
        'kind': kind,
        'apiVersion': api_version,
        'group': group,
        'version': version,
        'isCRD': is_crd,
        'name': doc.get('metadata', {}).get('name', 'unnamed')
    }


def main():
    if len(sys.argv) < 2:
        print("Usage: detect_crd.py <yaml-file>", file=sys.stderr)
        sys.exit(1)

    file_path = sys.argv[1]

    if not Path(file_path).exists():
        print(f"File not found: {file_path}", file=sys.stderr)
        sys.exit(1)

    documents, parse_errors = parse_yaml_file(file_path)
    resources = []

    for doc in documents:
        resource_info = extract_resource_info(doc)
        if resource_info:
            resources.append(resource_info)

    # Build output with both resources and any parse errors
    output = {
        'resources': resources,
        'parseErrors': parse_errors,
        'summary': {
            'totalDocuments': len(documents) + len(parse_errors),
            'parsedSuccessfully': len(documents),
            'parseErrors': len(parse_errors),
            'crdsDetected': sum(1 for r in resources if r.get('isCRD', False))
        }
    }

    # Output as JSON for easy parsing
    print(json.dumps(output, indent=2))


if __name__ == '__main__':
    main()

Install with Tessl CLI

npx tessl i pantheon-ai/k8s-yaml-validator@0.1.0

SKILL.md

tile.json