CtrlK
BlogDocsLog inGet started
Tessl Logo

pantheon-ai/promql-validator

Comprehensive toolkit for validating, optimizing, and understanding Prometheus Query Language (PromQL) queries. Use this skill when working with PromQL queries to check syntax, detect anti-patterns, identify optimization opportunities, and interactively plan queries with users.

Overall
score

93%

Does it follow best practices?

Validation for skill structure

Overview
Skills
Evals
Files

validate_syntax.pyscripts/

#!/usr/bin/env python3
"""
PromQL Syntax Validator

Validates the syntax of Prometheus Query Language (PromQL) expressions.
Checks for correct metric names, label matchers, operators, functions, and time durations.
"""

import re
import sys
import json
from typing import Dict, List, Tuple, Optional


class PromQLSyntaxValidator:
    """Validates PromQL query syntax"""

    # Regex patterns for PromQL syntax elements
    METRIC_NAME_PATTERN = r'[a-zA-Z_:][a-zA-Z0-9_:]*'
    DURATION_PATTERN = r'\d+(?:ms|s|m|h|d|w|y)'
    LABEL_MATCHER_PATTERN = r'[a-zA-Z_][a-zA-Z0-9_]*\s*(?:=|!=|=~|!~)\s*"(?:[^"\\]|\\.)*"'

    # Valid PromQL functions
    FUNCTIONS = {
        # Aggregation operators
        'sum', 'min', 'max', 'avg', 'group', 'stddev', 'stdvar', 'count', 'count_values',
        'bottomk', 'topk', 'quantile',
        # Limiting functions (Prometheus 2.43+, experimental)
        'limitk', 'limit_ratio',
        # Rate/increase functions
        'rate', 'irate', 'increase', 'delta', 'idelta', 'deriv',
        # Time functions
        'timestamp', 'time', 'minute', 'hour', 'day_of_month', 'day_of_week',
        'days_in_month', 'month', 'year',
        # Math functions
        'abs', 'ceil', 'floor', 'round', 'sqrt', 'exp', 'ln', 'log2', 'log10',
        'sin', 'cos', 'tan', 'asin', 'acos', 'atan', 'sinh', 'cosh', 'tanh',
        'asinh', 'acosh', 'atanh', 'deg', 'rad', 'sgn', 'clamp', 'clamp_max', 'clamp_min',
        # Histogram/summary functions (classic)
        'histogram_quantile', 'histogram_count', 'histogram_sum', 'histogram_fraction',
        # Native histogram functions (Prometheus 2.40+/3.0)
        'histogram_avg', 'histogram_stddev', 'histogram_stdvar',
        # Label manipulation
        'label_replace', 'label_join',
        # Over time functions
        'changes', 'resets', 'avg_over_time', 'min_over_time', 'max_over_time',
        'sum_over_time', 'count_over_time', 'quantile_over_time', 'stddev_over_time',
        'stdvar_over_time', 'last_over_time', 'present_over_time', 'mad_over_time',
        # Prediction functions
        'predict_linear',
        'holt_winters',  # Deprecated in Prometheus 3.0, use double_exponential_smoothing
        'double_exponential_smoothing',  # Prometheus 3.0+ (replaces holt_winters, experimental)
        # Sorting functions
        'sort', 'sort_desc', 'sort_by_label', 'sort_by_label_desc',
        # Other functions
        'absent', 'absent_over_time', 'scalar', 'vector',
        # Metric joining (experimental)
        'info',
        # Trigonometric functions
        'pi',
    }

    # Aggregation operators that support by/without clauses
    AGGREGATION_OPERATORS = {
        'sum', 'min', 'max', 'avg', 'group', 'stddev', 'stdvar', 'count',
        'count_values', 'bottomk', 'topk', 'quantile',
        # Prometheus 2.43+ (experimental, requires --enable-feature=promql-experimental-functions)
        'limitk', 'limit_ratio'
    }

    # Binary operators
    BINARY_OPERATORS = {
        '+', '-', '*', '/', '%', '^',  # Arithmetic
        '==', '!=', '>', '<', '>=', '<=',  # Comparison
        'and', 'or', 'unless',  # Logical
    }

    # Keywords that look like functions (followed by parentheses) but are NOT functions
    # These are aggregation modifiers, vector matching keywords, etc.
    NON_FUNCTION_KEYWORDS = {
        'by',           # Aggregation modifier: sum by (label)
        'without',      # Aggregation modifier: sum without (label)
        'on',           # Vector matching: metric_a + on (label) metric_b
        'ignoring',     # Vector matching: metric_a + ignoring (label) metric_b
        'group_left',   # Vector matching: one-to-many joins
        'group_right',  # Vector matching: many-to-one joins
        'bool',         # Comparison modifier: metric > bool 10
    }

    def __init__(self, query: str):
        self.query = query.strip()
        self.errors: List[Dict] = []
        self.warnings: List[Dict] = []

    def validate(self) -> Dict:
        """
        Run all validation checks

        Returns:
            Dict containing validation results
        """
        if not self.query:
            self.errors.append({
                'type': 'empty_query',
                'message': 'Query is empty',
                'severity': 'error'
            })
            return self._build_result()

        # Check for balanced brackets and quotes
        self._check_balanced_delimiters()

        # Check for valid metric names and selectors
        self._check_metric_selectors()

        # Check for valid time ranges
        self._check_time_ranges()

        # Check function syntax
        self._check_function_syntax()

        # Check operators
        self._check_operators()

        # Check for common typos
        self._check_common_typos()

        return self._build_result()

    def _check_balanced_delimiters(self):
        """Check for balanced brackets, braces, and quotes"""
        brackets = []
        braces = []
        parens = []
        in_string = False
        escape_next = False

        for i, char in enumerate(self.query):
            if escape_next:
                escape_next = False
                continue

            if char == '\\':
                escape_next = True
                continue

            if char == '"':
                in_string = not in_string
                continue

            if in_string:
                continue

            if char == '[':
                brackets.append(i)
            elif char == ']':
                if not brackets:
                    self.errors.append({
                        'type': 'unmatched_bracket',
                        'message': f'Unmatched closing bracket at position {i}',
                        'position': i,
                        'severity': 'error'
                    })
                else:
                    brackets.pop()
            elif char == '{':
                braces.append(i)
            elif char == '}':
                if not braces:
                    self.errors.append({
                        'type': 'unmatched_brace',
                        'message': f'Unmatched closing brace at position {i}',
                        'position': i,
                        'severity': 'error'
                    })
                else:
                    braces.pop()
            elif char == '(':
                parens.append(i)
            elif char == ')':
                if not parens:
                    self.errors.append({
                        'type': 'unmatched_paren',
                        'message': f'Unmatched closing parenthesis at position {i}',
                        'position': i,
                        'severity': 'error'
                    })
                else:
                    parens.pop()

        if in_string:
            self.errors.append({
                'type': 'unclosed_string',
                'message': 'Unclosed string literal',
                'severity': 'error'
            })

        if brackets:
            self.errors.append({
                'type': 'unclosed_bracket',
                'message': f'Unclosed bracket at position {brackets[-1]}',
                'position': brackets[-1],
                'severity': 'error'
            })

        if braces:
            self.errors.append({
                'type': 'unclosed_brace',
                'message': f'Unclosed brace at position {braces[-1]}',
                'position': braces[-1],
                'severity': 'error'
            })

        if parens:
            self.errors.append({
                'type': 'unclosed_paren',
                'message': f'Unclosed parenthesis at position {parens[-1]}',
                'position': parens[-1],
                'severity': 'error'
            })

    def _check_metric_selectors(self):
        """Check for valid metric names and label selectors"""
        # Pattern for metric selector (metric name with optional label matchers)
        # Matches: metric_name{label="value"} or {label="value"} or {"metric.name"}

        # Find all potential metric selectors
        # Remove strings first to avoid false matches
        query_no_strings = re.sub(r'"(?:[^"\\]|\\.)*"', '""', self.query)

        # Check for empty label matchers
        if re.search(r'\{\s*\}', query_no_strings):
            self.warnings.append({
                'type': 'empty_label_matcher',
                'message': 'Empty label matcher {} found - this may match many time series',
                'severity': 'warning'
            })

        # Validate UTF-8 metric name syntax (Prometheus 3.0+)
        # Valid: {"my.metric"} or {"my.metric", label="value"}
        # Per https://prometheus.io/docs/guides/utf8/
        self._check_utf8_metric_syntax()

    def _check_utf8_metric_syntax(self):
        """
        Validate UTF-8 metric name quoting syntax (Prometheus 3.0+)

        Per Prometheus docs, UTF-8 metric names must be:
        - Enclosed in double quotes
        - Placed within curly braces as the first element
        - Format: {"my.metric"} or {"my.metric", label="value"}
        """
        # Pattern for quoted metric name selector (UTF-8 syntax)
        # Matches: {"metric.name"} or {"metric.name", label="value", ...}
        utf8_selector_pattern = r'\{\s*"([^"]+)"'

        utf8_matches = re.finditer(utf8_selector_pattern, self.query)

        for match in utf8_matches:
            metric_name = match.group(1)

            # Check if this looks like it should be a UTF-8 metric name
            # (contains characters not valid in classic metric names)
            classic_valid = re.fullmatch(self.METRIC_NAME_PATTERN, metric_name)

            if classic_valid:
                # Metric name is valid in classic format, quoting is optional but valid
                self.warnings.append({
                    'type': 'utf8_metric_unnecessary_quoting',
                    'message': f'Metric name "{metric_name}" uses UTF-8 quoting but is valid in classic format',
                    'severity': 'info'
                })
            else:
                # This is a proper UTF-8 metric name (has special chars like dots)
                # Validate the UTF-8 name format
                if not metric_name.strip():
                    self.errors.append({
                        'type': 'invalid_utf8_metric',
                        'message': 'Empty quoted metric name in UTF-8 selector',
                        'severity': 'error'
                    })
                # UTF-8 metric names can contain any valid UTF-8 character except null
                elif '\x00' in metric_name:
                    self.errors.append({
                        'type': 'invalid_utf8_metric',
                        'message': f'Metric name "{metric_name}" contains null character',
                        'severity': 'error'
                    })

        # Check for common UTF-8 syntax mistakes
        # Wrong: {my.metric="value"} - should be {"my.metric"}
        # This pattern looks for unquoted metric-like names with dots used as label names
        wrong_utf8_pattern = r'\{\s*([a-zA-Z_][a-zA-Z0-9_.]*\.[a-zA-Z0-9_.]+)\s*='
        wrong_matches = re.finditer(wrong_utf8_pattern, self.query)

        for match in wrong_matches:
            suspicious_name = match.group(1)
            # Only flag if it contains dots (likely meant to be a UTF-8 metric name)
            if '.' in suspicious_name:
                self.warnings.append({
                    'type': 'possible_utf8_syntax_error',
                    'message': f'Label name "{suspicious_name}" contains dots - did you mean to use UTF-8 metric syntax?',
                    'severity': 'warning',
                    'hint': f'For UTF-8 metric names use: {{"{suspicious_name}"}}'
                })

    def _check_time_ranges(self):
        """Check for valid time range syntax"""
        # Find all range vectors [duration]
        range_pattern = r'\[([^\]]+)\]'
        ranges = re.findall(range_pattern, self.query)

        for range_str in ranges:
            range_str = range_str.strip()

            # Check if it's a subquery (has colon)
            if ':' in range_str:
                # Subquery format: [range:resolution] or [range:]
                parts = range_str.split(':')
                if len(parts) > 2:
                    self.errors.append({
                        'type': 'invalid_subquery',
                        'message': f'Invalid subquery syntax: [{range_str}]',
                        'severity': 'error'
                    })
                    continue

                range_part = parts[0].strip()
                resolution_part = parts[1].strip() if len(parts) > 1 else ''

                if range_part and not re.fullmatch(self.DURATION_PATTERN, range_part):
                    self.errors.append({
                        'type': 'invalid_duration',
                        'message': f'Invalid duration in subquery range: {range_part}',
                        'severity': 'error'
                    })

                if resolution_part and not re.fullmatch(self.DURATION_PATTERN, resolution_part):
                    self.errors.append({
                        'type': 'invalid_duration',
                        'message': f'Invalid duration in subquery resolution: {resolution_part}',
                        'severity': 'error'
                    })
            else:
                # Regular range vector
                if not re.fullmatch(self.DURATION_PATTERN, range_str):
                    self.errors.append({
                        'type': 'invalid_duration',
                        'message': f'Invalid duration syntax: {range_str}. Expected format like 5m, 1h, 7d',
                        'severity': 'error'
                    })

    def _check_function_syntax(self):
        """Check for valid function usage"""
        # Find all function calls
        func_pattern = r'([a-z_][a-z0-9_]*)\s*\('
        functions = re.findall(func_pattern, self.query, re.IGNORECASE)

        for func in functions:
            func_lower = func.lower()

            # Skip keywords that look like functions but aren't
            # (e.g., "by", "without", "on", "ignoring", "group_left", "group_right")
            if func_lower in self.NON_FUNCTION_KEYWORDS:
                continue

            if func_lower not in self.FUNCTIONS:
                # Check if it might be a typo
                close_matches = self._find_close_matches(func_lower, self.FUNCTIONS)
                if close_matches:
                    self.errors.append({
                        'type': 'unknown_function',
                        'message': f'Unknown function: {func}. Did you mean: {", ".join(close_matches)}?',
                        'severity': 'error'
                    })
                else:
                    self.errors.append({
                        'type': 'unknown_function',
                        'message': f'Unknown function: {func}',
                        'severity': 'error'
                    })

    def _check_operators(self):
        """Check for valid operator usage"""
        # Check for double operators (typos like ++ or --)
        if re.search(r'[+\-*/]{2,}', self.query):
            self.warnings.append({
                'type': 'double_operator',
                'message': 'Found consecutive operators - this might be a typo',
                'severity': 'warning'
            })

    def _check_common_typos(self):
        """Check for common typos and mistakes"""
        # Check for 'rate()' without range vector
        if re.search(r'\b(rate|irate|increase|delta|idelta)\s*\(\s*[a-zA-Z_:][a-zA-Z0-9_:]*\s*(?:\{[^}]*\})?\s*\)', self.query):
            self.errors.append({
                'type': 'missing_range_vector',
                'message': 'rate(), irate(), increase(), delta(), and idelta() require a range vector [duration]',
                'severity': 'error'
            })

        # Check for 'offset' keyword - valid positions:
        # 1. metric_name offset 1h (instant vector)
        # 2. metric_name[5m] offset 1h (range vector)
        # 3. rate(metric_name[5m] offset 1h) (inside function)
        # Invalid: metric_name offset 1h [5m] (offset between metric and range)
        if ' offset ' in self.query.lower():
            # Check for invalid pattern: offset followed by range vector
            if re.search(r'\boffset\s+\d+[smhdwy]\s*\[', self.query, re.IGNORECASE):
                self.errors.append({
                    'type': 'misplaced_offset',
                    'message': 'offset modifier should come after the range vector [duration], not before it',
                    'severity': 'error'
                })

    def _find_close_matches(self, word: str, candidates: set, max_distance: int = 2) -> List[str]:
        """Find close matches using simple edit distance"""
        matches = []
        for candidate in candidates:
            if abs(len(word) - len(candidate)) > max_distance:
                continue
            if self._levenshtein_distance(word, candidate) <= max_distance:
                matches.append(candidate)
        return matches[:3]  # Return top 3 matches

    @staticmethod
    def _levenshtein_distance(s1: str, s2: str) -> int:
        """Calculate Levenshtein distance between two strings"""
        if len(s1) < len(s2):
            return PromQLSyntaxValidator._levenshtein_distance(s2, s1)
        if len(s2) == 0:
            return len(s1)

        previous_row = range(len(s2) + 1)
        for i, c1 in enumerate(s1):
            current_row = [i + 1]
            for j, c2 in enumerate(s2):
                insertions = previous_row[j + 1] + 1
                deletions = current_row[j] + 1
                substitutions = previous_row[j] + (c1 != c2)
                current_row.append(min(insertions, deletions, substitutions))
            previous_row = current_row

        return previous_row[-1]

    def _build_result(self) -> Dict:
        """Build the validation result dictionary"""
        has_errors = len(self.errors) > 0
        has_warnings = len(self.warnings) > 0

        if has_errors:
            status = 'ERROR'
        elif has_warnings:
            status = 'WARNING'
        else:
            status = 'VALID'

        return {
            'status': status,
            'query': self.query,
            'errors': self.errors,
            'warnings': self.warnings,
            'valid': not has_errors
        }


def main():
    """Main entry point for the syntax validator"""
    if len(sys.argv) < 2:
        print(json.dumps({
            'status': 'ERROR',
            'message': 'Usage: validate_syntax.py "<promql_query>"'
        }, indent=2))
        sys.exit(1)

    query = sys.argv[1]
    validator = PromQLSyntaxValidator(query)
    result = validator.validate()

    print(json.dumps(result, indent=2))

    # Exit with error code if validation failed
    sys.exit(0 if result['valid'] else 1)


if __name__ == '__main__':
    main()

Install with Tessl CLI

npx tessl i pantheon-ai/promql-validator@0.2.1

SKILL.md

tile.json