CtrlK
BlogDocsLog inGet started
Tessl Logo

pantheon-ai/promql-validator

Comprehensive toolkit for validating, optimizing, and understanding Prometheus Query Language (PromQL) queries. Use this skill when working with PromQL queries to check syntax, detect anti-patterns, identify optimization opportunities, and interactively plan queries with users.

Overall
score

93%

Does it follow best practices?

Validation for skill structure

Overview
Skills
Evals
Files

check_best_practices.pyscripts/

#!/usr/bin/env python3
"""
PromQL Best Practices Checker

Detects anti-patterns, performance issues, and optimization opportunities in PromQL queries.
Provides actionable suggestions for improving query efficiency and correctness.
"""

import re
import sys
import json
from typing import Dict, List, Tuple, Optional


class PromQLBestPracticesChecker:
    """Checks PromQL queries for best practices and anti-patterns"""

    # Metric name patterns
    COUNTER_SUFFIXES = ['_total', '_count', '_sum', '_bucket']
    # Expanded gauge patterns based on common naming conventions
    # See: https://prometheus.io/docs/practices/naming/
    GAUGE_PATTERNS = [
        '_bytes',           # Memory, disk sizes (when not _bytes_total)
        '_ratio',           # Ratios like cache_hit_ratio
        '_usage',           # Resource usage metrics
        '_percent',         # Percentage values
        '_gauge',           # Explicitly named gauges
        '_celsius',         # Temperature metrics
        '_fahrenheit',      # Temperature metrics
        '_temperature',     # Temperature metrics
        '_info',            # Info metrics (always 1, with labels)
        '_size',            # Size measurements
        '_current',         # Current values (e.g., connections_current)
        '_limit',           # Limit values
        '_available',       # Available resources
        '_free',            # Free resources
        '_used',            # Used resources (when not a counter)
        '_utilization',     # Utilization percentages
        '_capacity',        # Capacity values
        '_level',           # Level measurements
    ]

    # Rate functions
    RATE_FUNCTIONS = ['rate', 'irate', 'increase', 'delta', 'idelta']

    # Native histogram functions (Prometheus 2.40+/3.0)
    NATIVE_HISTOGRAM_FUNCTIONS = [
        'histogram_avg', 'histogram_stddev', 'histogram_stdvar',
        'histogram_count', 'histogram_sum', 'histogram_fraction'
    ]

    def __init__(self, query: str):
        self.query = query.strip()
        self.issues: List[Dict] = []
        self.suggestions: List[Dict] = []
        self.optimizations: List[Dict] = []

    def check(self) -> Dict:
        """
        Run all best practice checks

        Returns:
            Dict containing check results
        """
        if not self.query:
            return self._build_result()

        # Check for anti-patterns
        self._check_high_cardinality()
        self._check_regex_overuse()
        self._check_missing_rate_on_counters()
        self._check_rate_on_gauges()
        self._check_averaging_quantiles()
        self._check_subquery_performance()
        self._check_irate_range()
        self._check_rate_range()
        self._check_unbounded_queries()
        self._check_aggregation_best_practices()
        self._check_recording_rule_opportunity()
        self._check_label_matcher_efficiency()
        self._check_histogram_usage()
        # Prometheus 3.0+ and additional checks
        self._check_deprecated_functions()
        self._check_predict_linear_range()
        self._check_division_by_zero_risk()
        self._check_changes_resets_alerting()
        self._check_dimensional_metric_names()
        # New checks based on documentation research
        self._check_absent_with_aggregation()
        self._check_vector_matching()
        self._check_native_histogram_usage()
        # Design pattern checks
        self._check_mixed_metric_types()

        return self._build_result()

    def _check_high_cardinality(self):
        """Check for queries that might match too many time series"""
        # Check for metric selectors with no or very few label filters
        # Pattern: metric_name or metric_name{}
        if re.search(r'\b[a-zA-Z_:][a-zA-Z0-9_:]*\s*\{\s*\}', self.query):
            self.issues.append({
                'type': 'high_cardinality',
                'message': 'Query uses empty label matcher {} which may match many time series',
                'severity': 'warning',
                'recommendation': 'Add specific label filters like {job="...", instance="..."} to reduce cardinality'
            })

        # Check for bare metric names without selectors
        # First, remove content inside {...} blocks and strings to avoid matching label names
        query_without_selectors = self._strip_label_selectors_and_strings(self.query)

        metric_pattern = r'\b([a-zA-Z_:][a-zA-Z0-9_:]*)\b(?!\s*[{\(])'
        metrics_without_selectors = re.findall(metric_pattern, query_without_selectors)

        # Filter out function names, keywords, and PromQL reserved words
        reserved_words = {
            # Aggregation operators
            'sum', 'avg', 'min', 'max', 'count', 'stddev', 'stdvar', 'group',
            'topk', 'bottomk', 'quantile', 'count_values', 'limitk', 'limit_ratio',
            # Functions
            'rate', 'irate', 'increase', 'delta', 'idelta', 'deriv', 'predict_linear',
            'histogram_quantile', 'histogram_count', 'histogram_sum', 'histogram_fraction',
            'histogram_avg', 'histogram_stddev', 'histogram_stdvar',
            'abs', 'ceil', 'floor', 'round', 'sqrt', 'exp', 'ln', 'log2', 'log10',
            'sin', 'cos', 'tan', 'asin', 'acos', 'atan', 'sinh', 'cosh', 'tanh',
            'deg', 'rad', 'sgn', 'clamp', 'clamp_max', 'clamp_min', 'pi',
            'timestamp', 'time', 'minute', 'hour', 'day_of_month', 'day_of_week',
            'days_in_month', 'month', 'year',
            'label_replace', 'label_join', 'vector', 'scalar',
            'changes', 'resets', 'absent', 'absent_over_time', 'present_over_time',
            'avg_over_time', 'min_over_time', 'max_over_time', 'sum_over_time',
            'count_over_time', 'quantile_over_time', 'stddev_over_time', 'stdvar_over_time',
            'last_over_time', 'mad_over_time', 'sort', 'sort_desc', 'sort_by_label',
            'sort_by_label_desc', 'holt_winters', 'double_exponential_smoothing', 'info',
            # Keywords and operators
            'by', 'without', 'and', 'or', 'unless', 'on', 'ignoring',
            'group_left', 'group_right', 'bool', 'offset', 'start', 'end',
            # Constants
            'inf', 'nan'
        }

        for metric in metrics_without_selectors:
            if metric.lower() not in reserved_words:
                # Check if this metric has label filters in the ORIGINAL query
                # A metric with filters looks like: metric_name{label="value"}
                # We need to check if this metric is followed by a non-empty {...} block
                escaped_metric = re.escape(metric)
                has_filters = re.search(
                    rf'\b{escaped_metric}\s*\{{\s*[^}}]+\s*\}}',
                    self.query
                )
                if not has_filters:
                    self.issues.append({
                        'type': 'high_cardinality',
                        'message': f'Metric "{metric}" used without label filters',
                        'severity': 'warning',
                        'recommendation': f'Add label filters: {metric}{{job="...", instance="..."}}'
                    })

    def _strip_label_selectors_and_strings(self, query: str) -> str:
        """
        Remove content inside {...} label selectors, quoted strings, and grouping clauses.
        This prevents label names from being misidentified as metric names.

        Also strips content from:
        - by (...) clauses
        - without (...) clauses
        - on (...) clauses
        - ignoring (...) clauses
        - group_left(...) clauses
        - group_right(...) clauses
        """
        # First, strip grouping clauses (by, without, on, ignoring, group_left, group_right)
        # These contain label names, not metric names
        query = re.sub(r'\b(by|without|on|ignoring|group_left|group_right)\s*\([^)]*\)', r'\1 ( )', query)

        result = []
        depth = 0
        in_string = False
        escape_next = False
        i = 0

        while i < len(query):
            char = query[i]

            if escape_next:
                escape_next = False
                i += 1
                continue

            if char == '\\':
                escape_next = True
                i += 1
                continue

            if char == '"':
                in_string = not in_string
                i += 1
                continue

            if in_string:
                i += 1
                continue

            if char == '{':
                depth += 1
                result.append(' ')  # Replace with space to preserve word boundaries
                i += 1
                continue

            if char == '}':
                depth = max(0, depth - 1)
                result.append(' ')
                i += 1
                continue

            if depth == 0:
                result.append(char)
            else:
                # Inside {...}, replace with space to maintain positions
                result.append(' ')

            i += 1

        return ''.join(result)

    def _check_regex_overuse(self):
        """Check for regex matchers that could be exact matches"""
        # Find regex matchers =~ and !~
        regex_matchers = re.findall(r'([a-zA-Z_][a-zA-Z0-9_]*)\s*=~\s*"([^"]+)"', self.query)

        # Regex metacharacters that indicate an actual regex pattern (not just a literal string)
        # Note: . is a metacharacter meaning "any character", so "5.." is a regex pattern
        regex_metacharacters = r'[\.\*\+\?\^\$\[\]\(\)\|\\]'

        for label, pattern in regex_matchers:
            # Check if the pattern contains any regex metacharacters
            # If it does, it's a real regex and should NOT be converted to exact match
            has_regex_chars = re.search(regex_metacharacters, pattern)

            # Only suggest exact match if pattern is purely alphanumeric with underscores/hyphens
            # and contains NO regex metacharacters
            if not has_regex_chars and re.fullmatch(r'[a-zA-Z0-9_\-]+', pattern):
                self.optimizations.append({
                    'type': 'regex_to_exact',
                    'message': f'Label matcher {label}=~"{pattern}" can be an exact match',
                    'severity': 'info',
                    'recommendation': f'Use {label}="{pattern}" instead of =~ for better performance'
                })

            # Check for simple prefix/suffix patterns that might be better structured
            if pattern.endswith('.*'):
                self.suggestions.append({
                    'type': 'regex_optimization',
                    'message': f'Regex pattern "{pattern}" uses wildcard suffix',
                    'severity': 'info',
                    'recommendation': 'Consider if you can use more specific label values'
                })

    def _check_missing_rate_on_counters(self):
        """Check if counter metrics are used without rate/increase"""
        # Find metric names that look like counters
        metric_pattern = r'\b([a-zA-Z_:][a-zA-Z0-9_:]*(?:_total|_count|_sum|_bucket))\b'
        counter_metrics = re.findall(metric_pattern, self.query)

        for metric in counter_metrics:
            # Check if it's wrapped in rate/irate/increase
            escaped_metric = re.escape(metric)
            if not re.search(rf'(?:rate|irate|increase|delta|idelta)\s*\([^)]*{escaped_metric}', self.query):
                # Check if it's in histogram_quantile (buckets are used differently)
                if not re.search(rf'histogram_quantile\s*\([^)]*{escaped_metric}', self.query):
                    # Skip _sum and _count metrics when used in histogram calculations
                    # (they're used for average: _sum / _count)
                    if metric.endswith('_sum') or metric.endswith('_count'):
                        # Check if this is part of a division for average calculation
                        base_metric = metric.rsplit('_', 1)[0]
                        if re.search(rf'{base_metric}_sum.*{base_metric}_count|{base_metric}_count.*{base_metric}_sum', self.query):
                            continue  # Skip - this is a valid average calculation pattern

                    # Skip native histogram metrics (no _bucket suffix needed)
                    # Native histograms use histogram_avg, histogram_stddev, etc.
                    if re.search(rf'histogram_(?:avg|stddev|stdvar|count|sum|fraction)\s*\([^)]*{escaped_metric}', self.query):
                        continue

                    self.issues.append({
                        'type': 'missing_rate',
                        'message': f'Counter metric "{metric}" used without rate() or increase()',
                        'severity': 'warning',
                        'recommendation': f'Use rate({metric}[5m]) to get per-second rate'
                    })

    def _check_rate_on_gauges(self):
        """Check if rate/irate is used on gauge metrics"""
        # Find rate/irate/increase calls
        rate_calls = re.findall(
            r'(rate|irate|increase|delta|idelta)\s*\(\s*([a-zA-Z_:][a-zA-Z0-9_:]*)',
            self.query
        )

        for func, metric in rate_calls:
            # Check if metric name suggests it's a gauge
            is_gauge = any(pattern in metric for pattern in self.GAUGE_PATTERNS)
            is_counter = any(metric.endswith(suffix) for suffix in self.COUNTER_SUFFIXES)

            if is_gauge and not is_counter:
                self.issues.append({
                    'type': 'rate_on_gauge',
                    'message': f'{func}() used on gauge metric "{metric}"',
                    'severity': 'warning',
                    'recommendation': f'Gauges should not use rate(). Use avg_over_time({metric}[5m]) or remove rate()'
                })

    def _check_averaging_quantiles(self):
        """Check for averaging pre-calculated quantiles"""
        # Pattern: avg(...{quantile="..."})
        if re.search(r'avg\s*\([^)]*\{[^}]*quantile\s*=', self.query):
            self.issues.append({
                'type': 'averaging_quantiles',
                'message': 'Averaging pre-calculated quantiles is mathematically invalid',
                'severity': 'error',
                'recommendation': 'Use histogram_quantile() with histogram buckets instead'
            })

    def _check_subquery_performance(self):
        """Check for potentially expensive subqueries"""
        # Pattern: [...:...] subquery syntax
        subquery_pattern = r'\[(\d+)([smhdwy])[^\]]*:\s*(\d+)?([smhdwy])?\]'
        subqueries = re.findall(subquery_pattern, self.query)

        for range_val, range_unit, res_val, res_unit in subqueries:
            # Convert to approximate hours
            range_hours = self._duration_to_hours(int(range_val), range_unit)

            if range_hours > 24 * 7:  # More than 7 days
                self.issues.append({
                    'type': 'expensive_subquery',
                    'message': f'Subquery spans {range_val}{range_unit} which may be very slow',
                    'severity': 'warning',
                    'recommendation': 'Consider using recording rules or reducing the time range'
                })

    def _check_irate_range(self):
        """Check if irate() is used with appropriate time ranges"""
        # irate() should use short ranges (typically < 5m)
        irate_pattern = r'irate\s*\([^)]*\[(\d+)([smhdwy])\]'
        irate_calls = re.findall(irate_pattern, self.query)

        for duration, unit in irate_calls:
            minutes = self._duration_to_minutes(int(duration), unit)

            if minutes > 5:
                self.issues.append({
                    'type': 'irate_long_range',
                    'message': f'irate() used with {duration}{unit} range - irate only looks at last 2 samples',
                    'severity': 'warning',
                    'recommendation': f'Use rate() for ranges > 5m, or reduce irate range to [2m]'
                })

    def _check_rate_range(self):
        """Check if rate() uses appropriate time ranges"""
        # rate() range should be at least 4x scrape interval (typically >= 2m)
        rate_pattern = r'rate\s*\([^)]*\[(\d+)(ms|s|m|h|d|w|y)\]'
        rate_calls = re.findall(rate_pattern, self.query)

        for duration, unit in rate_calls:
            seconds = self._duration_to_seconds(int(duration), unit)

            if seconds < 120:  # Less than 2 minutes
                self.issues.append({
                    'type': 'rate_short_range',
                    'message': f'rate() used with very short range [{duration}{unit}]',
                    'severity': 'warning',
                    'recommendation': 'Rate range should be at least 4x scrape interval, typically [2m] or more'
                })

    def _check_unbounded_queries(self):
        """Check for queries without sufficient constraints"""
        # Look for aggregations without by/without clauses on potentially high-cardinality data
        aggregations = ['sum', 'avg', 'min', 'max', 'count']

        # Check if this appears to be an alerting query (has comparison operator)
        # For alerting, fully aggregated results returning a single value is often intentional
        is_alerting_query = bool(re.search(r'\s*(>|<|>=|<=|==|!=)\s*[\d\.]', self.query))

        for agg in aggregations:
            # Pattern: sum(...) without "by" or "without"
            pattern = rf'{agg}\s*\([^)]+\)(?!\s*(?:by|without)\s*\()'
            if re.search(pattern, self.query):
                if is_alerting_query:
                    # For alerting queries, this is often intentional - use a softer message
                    self.suggestions.append({
                        'type': 'missing_aggregation_clause',
                        'message': f'{agg}() used without by() or without() clause (likely intentional for alerting)',
                        'severity': 'info',
                        'recommendation': f'Full aggregation is common for alerting queries. Add "by (label)" only if you need per-label alerts.'
                    })
                else:
                    # For non-alerting queries, the standard recommendation applies
                    self.suggestions.append({
                        'type': 'missing_aggregation_clause',
                        'message': f'{agg}() used without by() or without() clause',
                        'severity': 'info',
                        'recommendation': f'Consider adding "by (label)" or "without (label)" to {agg}() for clearer results'
                    })

    def _check_aggregation_best_practices(self):
        """Check aggregation operator usage"""
        # Check for count() without by clause (might be intentional, but worth mentioning)
        if re.search(r'count\s*\([^)]+\)(?!\s*by)', self.query):
            self.suggestions.append({
                'type': 'count_without_by',
                'message': 'count() used without by() - this counts all matching series',
                'severity': 'info',
                'recommendation': 'If you want to count by label, use: count(...) by (label)'
            })

    def _check_recording_rule_opportunity(self):
        """Check if query is complex enough to benefit from recording rules"""
        # Heuristics for complex queries:
        # - Multiple nested functions
        # - Multiple aggregations
        # - Subqueries
        # - Long expressions

        complexity_score = 0

        # Count function calls
        func_count = len(re.findall(r'\b[a-z_]+\s*\(', self.query))
        if func_count >= 3:
            complexity_score += 1

        # Check for nested aggregations
        if re.search(r'(sum|avg|min|max)\s*\([^)]*\b(sum|avg|min|max)\s*\(', self.query):
            complexity_score += 1

        # Check for subqueries
        if re.search(r'\[[^\]]+:[^\]]+\]', self.query):
            complexity_score += 1

        # Check query length
        if len(self.query) > 150:
            complexity_score += 1

        if complexity_score >= 2:
            self.suggestions.append({
                'type': 'recording_rule_opportunity',
                'message': 'Query is complex and may benefit from recording rules',
                'severity': 'info',
                'recommendation': 'Consider creating recording rules if this query is used frequently'
            })

    def _check_label_matcher_efficiency(self):
        """Check if label matchers could be more efficient"""
        # Check for multiple OR conditions that might indicate need for regex
        if self.query.count(' or ') >= 2:
            self.suggestions.append({
                'type': 'multiple_or_conditions',
                'message': 'Multiple OR conditions found',
                'severity': 'info',
                'recommendation': 'Consider using regex matcher =~ "value1|value2|value3" if checking same label'
            })

    def _check_histogram_usage(self):
        """Check for proper histogram quantile calculation"""
        # Check for histogram_quantile usage
        if 'histogram_quantile' in self.query:
            # Should include rate() on bucket metrics
            # Note: The rate() can be nested inside aggregations, so we look for both
            # histogram_quantile and rate() appearing anywhere in the query
            has_rate = bool(re.search(r'\brate\s*\(', self.query))
            has_bucket_metric = '_bucket' in self.query

            # Only warn about missing rate if there's a _bucket metric (classic histogram)
            # Native histograms don't have _bucket suffix
            if has_bucket_metric and not has_rate:
                self.issues.append({
                    'type': 'histogram_missing_rate',
                    'message': 'histogram_quantile() should use rate() on bucket metrics',
                    'severity': 'warning',
                    'recommendation': 'Use: histogram_quantile(0.95, sum by (le) (rate(metric_bucket[5m])))'
                })

            # Should aggregate by 'le' label (only for classic histograms with _bucket)
            # Native histograms don't need 'le' label
            if has_bucket_metric:
                # Look for 'le' in any by() clause in the query
                has_le_in_by = bool(re.search(r'\bby\s*\([^)]*\ble\b', self.query))
                if not has_le_in_by:
                    self.issues.append({
                        'type': 'histogram_missing_le',
                        'message': 'histogram_quantile() with classic histograms should aggregate by (le) label',
                        'severity': 'warning',
                        'recommendation': 'Include "le" in the by() clause: sum by (job, le) (...)'
                    })

    def _check_deprecated_functions(self):
        """Check for deprecated functions (Prometheus 3.0+)"""
        # holt_winters is deprecated in Prometheus 3.0, renamed to double_exponential_smoothing
        if re.search(r'\bholt_winters\s*\(', self.query):
            self.issues.append({
                'type': 'deprecated_function',
                'message': 'holt_winters() is deprecated in Prometheus 3.0',
                'severity': 'warning',
                'recommendation': 'Use double_exponential_smoothing() instead (requires --enable-feature=promql-experimental-functions)'
            })

    def _check_predict_linear_range(self):
        """Check if predict_linear() uses appropriate time ranges"""
        # predict_linear() with very short ranges gives unreliable predictions
        predict_pattern = r'predict_linear\s*\([^)]*\[(\d+)(ms|s|m|h|d|w|y)\]'
        predict_calls = re.findall(predict_pattern, self.query)

        for duration, unit in predict_calls:
            minutes = self._duration_to_minutes(int(duration), unit)

            if minutes < 10:
                self.issues.append({
                    'type': 'predict_linear_short_range',
                    'message': f'predict_linear() used with short range [{duration}{unit}]',
                    'severity': 'warning',
                    'recommendation': 'predict_linear() needs sufficient data for reliable predictions. Use at least [10m] or longer.'
                })

    def _check_division_by_zero_risk(self):
        """Check for potential division by zero issues"""
        # Pattern: / rate(..._count...) or / rate(..._total...)
        # This can be zero if no requests occurred
        if re.search(r'/\s*(?:rate|increase)\s*\([^)]*(?:_count|_total)[^)]*\)', self.query):
            self.suggestions.append({
                'type': 'division_by_zero_risk',
                'message': 'Division by rate() or increase() of counter may result in NaN if denominator is 0',
                'severity': 'info',
                'recommendation': 'Consider using "or vector(0)" or "> 0" filter to handle zero denominators'
            })

    def _check_changes_resets_alerting(self):
        """Check for changes() or resets() usage patterns"""
        # changes() and resets() can miss events that happen between scrapes
        if re.search(r'\b(changes|resets)\s*\(', self.query):
            self.suggestions.append({
                'type': 'changes_resets_limitation',
                'message': 'changes() and resets() only detect changes between scraped samples',
                'severity': 'info',
                'recommendation': 'Events occurring between scrapes will be missed. For alerting, consider alternative approaches.'
            })

    def _check_dimensional_metric_names(self):
        """Check for dimensions embedded in metric names (anti-pattern)"""
        # Embedding dimensions in metric names like: http_requests_GET_total, cpu_0_usage
        # This is a bad practice per Google Cloud and Prometheus best practices

        # Look for patterns like: metric_value_total or metric_123_something
        # Common bad patterns: http_requests_GET_200_total, cpu_core0_usage
        bad_patterns = [
            r'\b[a-zA-Z_]+_(GET|POST|PUT|DELETE|PATCH)_[a-zA-Z_]+',  # HTTP methods in name
            r'\b[a-zA-Z_]+_\d+_[a-zA-Z_]+',  # Numbers embedded (like cpu_0_usage)
            r'\b[a-zA-Z_]+_(2\d{2}|3\d{2}|4\d{2}|5\d{2})_[a-zA-Z_]+',  # HTTP status codes in name
        ]

        for pattern in bad_patterns:
            if re.search(pattern, self.query):
                self.suggestions.append({
                    'type': 'dimensional_metric_name',
                    'message': 'Metric name appears to embed dimensions (method, status code, or index)',
                    'severity': 'info',
                    'recommendation': 'Move dimensions to labels instead of embedding in metric names. Example: http_requests_total{method="GET", status="200"}'
                })
                break  # Only warn once

    def _check_absent_with_aggregation(self):
        """
        Check for absent() used with aggregation functions.

        Per https://stackoverflow.com/questions/53191746/prometheus-absent-function
        and https://www.robustperception.io/functions-to-avoid/

        absent() returns 1 if no time series match the selector, 0 otherwise.
        When combined with aggregation, it doesn't work as expected because:
        - absent(sum(metric)) will return empty if ANY metric matches
        - It cannot detect per-label absence

        For label-aware absence detection, use:
        group(present_over_time(metric[range])) by (labels)
        unless
        group(metric) by (labels)
        """
        # Pattern: absent(aggregation_function(...))
        if re.search(r'absent\s*\(\s*(sum|avg|min|max|count|group|stddev|stdvar)\s*\(', self.query):
            self.issues.append({
                'type': 'absent_with_aggregation',
                'message': 'absent() with aggregation may not work as expected',
                'severity': 'warning',
                'recommendation': 'absent() checks if a selector returns no data. Aggregations return data if ANY series matches. For per-label absence, use: group(present_over_time(metric[range])) unless group(metric)'
            })

        # Pattern: absent(...) by (label) - absent doesn't support by()
        if re.search(r'absent\s*\([^)]+\)\s*by\s*\(', self.query):
            self.issues.append({
                'type': 'absent_with_by',
                'message': 'absent() does not support by() clause for per-label grouping',
                'severity': 'error',
                'recommendation': 'absent() returns a single value. For per-label absence detection, use: group(present_over_time(metric[range])) by (labels) unless group(metric) by (labels)'
            })

    def _check_vector_matching(self):
        """
        Check for common vector matching mistakes with on/ignoring/group_left/group_right.

        Per https://grafana.com/blog/2024/12/13/promql-vector-matching-what-it-is-and-how-it-affects-your-prometheus-queries/
        and https://iximiuz.com/en/posts/prometheus-vector-matching/

        Common issues:
        1. Missing group_left/group_right for many-to-one joins
        2. Using group_right when group_left should be used
        3. Missing on() or ignoring() when label sets don't match
        """
        query_lower = self.query.lower()

        # Check for binary operations that might need vector matching
        # Pattern: metric * metric or metric / metric without on() or ignoring()
        binary_ops = ['*', '/', '+', '-', '%', '^']
        has_binary_op = any(op in self.query for op in binary_ops)
        has_vector_matching = 'on(' in query_lower or 'ignoring(' in query_lower

        # Check for _info metric joins (common pattern)
        # Info metrics typically need group_left
        if re.search(r'\*\s*on\s*\([^)]+\)\s*[a-zA-Z_]+_info\b', self.query):
            if 'group_left' not in query_lower and 'group_right' not in query_lower:
                self.issues.append({
                    'type': 'info_metric_missing_group',
                    'message': 'Joining with _info metric without group_left()',
                    'severity': 'warning',
                    'recommendation': 'Info metric joins typically need group_left() to bring labels from the info metric. Use: metric * on(job, instance) group_left(label1, label2) info_metric'
                })

        # Check for group_left/group_right without on() or ignoring()
        if re.search(r'\b(group_left|group_right)\s*\(', query_lower):
            if 'on(' not in query_lower and 'ignoring(' not in query_lower:
                self.issues.append({
                    'type': 'group_without_matching',
                    'message': 'group_left()/group_right() used without on() or ignoring()',
                    'severity': 'error',
                    'recommendation': 'group_left()/group_right() requires on() or ignoring() to specify matching labels'
                })

        # Check for on() with empty parentheses - this is valid but might be unintentional
        if re.search(r'\bon\s*\(\s*\)', query_lower):
            self.suggestions.append({
                'type': 'on_empty_labels',
                'message': 'on() with empty labels matches all series',
                'severity': 'info',
                'recommendation': 'on() with empty parentheses ignores all labels for matching. Ensure this is intentional.'
            })

        # Check for potential many-to-many matching (error-prone)
        # If there's a binary op with on() but no group_left/group_right, it might fail at runtime
        if has_vector_matching and 'group_left' not in query_lower and 'group_right' not in query_lower:
            # This is just informational since one-to-one might be intended
            self.suggestions.append({
                'type': 'vector_matching_cardinality',
                'message': 'Binary operation with on()/ignoring() assumes one-to-one matching',
                'severity': 'info',
                'recommendation': 'If you have many-to-one or one-to-many cardinality, add group_left() or group_right(). Error "multiple matches for labels" indicates cardinality mismatch.'
            })

    def _check_native_histogram_usage(self):
        """
        Check for proper native histogram function usage (Prometheus 2.40+/3.0).

        Per https://prometheus.io/docs/specs/native_histograms/
        and https://prometheus.io/blog/2024/11/14/prometheus-3-0/

        Native histograms:
        - Don't need _bucket suffix
        - Don't need 'le' label in aggregation
        - Still need rate() for proper calculation
        - Use histogram_avg, histogram_stddev, etc.
        """
        # Check for native histogram functions
        native_hist_pattern = r'\b(histogram_avg|histogram_stddev|histogram_stdvar)\s*\('
        native_hist_matches = re.findall(native_hist_pattern, self.query)

        for func in native_hist_matches:
            # Check if rate() is used (required for native histograms too)
            func_call_pattern = rf'{func}\s*\([^)]*'
            if not re.search(rf'{func}\s*\(\s*rate\s*\(', self.query):
                self.issues.append({
                    'type': 'native_histogram_missing_rate',
                    'message': f'{func}() should use rate() on the histogram metric',
                    'severity': 'warning',
                    'recommendation': f'Use: {func}(rate(histogram_metric[5m]))'
                })

        # Check for histogram_quantile with native histogram patterns
        # Native histograms don't need 'le' in by() clause
        if 'histogram_quantile' in self.query:
            # Check if this looks like a native histogram query (no _bucket suffix)
            # Native histogram: histogram_quantile(0.95, sum by (job) (rate(metric[5m])))
            # Classic histogram: histogram_quantile(0.95, sum by (job, le) (rate(metric_bucket[5m])))

            # Look for _bucket anywhere in the query (not just immediately after histogram_quantile)
            # since the bucket metric could be inside nested functions
            has_bucket_suffix = '_bucket' in self.query
            has_le_in_by = bool(re.search(r'\bby\s*\([^)]*\ble\b', self.query))

            # Only warn about unnecessary 'le' for native histograms (no _bucket suffix)
            # If there's a _bucket metric, this is a classic histogram and 'le' IS required
            if not has_bucket_suffix and has_le_in_by:
                self.suggestions.append({
                    'type': 'native_histogram_unnecessary_le',
                    'message': 'Native histograms do not need "le" label in aggregation',
                    'severity': 'info',
                    'recommendation': 'For native histograms, simplify to: histogram_quantile(0.95, sum by (job) (rate(metric[5m])))'
                })

            # If has _bucket but no le - classic histogram missing le (already covered by _check_histogram_usage)
            # If no _bucket and no le - could be native histogram (OK) or classic without le (error)

        # Provide helpful info about histogram_count and histogram_sum
        # These work with both native and classic histograms but differently
        if re.search(r'\bhistogram_count\s*\(', self.query) or re.search(r'\bhistogram_sum\s*\(', self.query):
            # Check if it's wrapping rate()
            if not re.search(r'histogram_(?:count|sum)\s*\(\s*rate\s*\(', self.query):
                self.suggestions.append({
                    'type': 'histogram_helper_without_rate',
                    'message': 'histogram_count()/histogram_sum() typically need rate() for meaningful results',
                    'severity': 'info',
                    'recommendation': 'Use: histogram_count(rate(histogram_metric[5m])) to get observations per second'
                })

    def _check_mixed_metric_types(self):
        """
        Check if query combines fundamentally different metric types in a single expression.

        Mixing counters, gauges, histograms, and summaries in arithmetic operations
        often produces meaningless results. Each metric type has different semantics:
        - Counters: Cumulative values that only increase
        - Gauges: Point-in-time values that can go up or down
        - Histograms: Bucketed observations for distribution analysis
        - Summaries: Pre-calculated quantiles

        Combining them (e.g., latency / memory + request_count) rarely makes sense.

        EXCEPTION: histogram_quantile with _bucket metrics is NOT mixed types - this is
        the correct pattern for classic histograms. The _bucket suffix doesn't indicate
        a counter being used incorrectly; it's part of the histogram data model.
        """
        # Detect metric types in the query
        detected_types = set()
        type_examples = {}

        # Check if this is a histogram_quantile query with classic histograms
        # In this case, _bucket metrics are expected and should not be flagged as "counter"
        is_classic_histogram_query = 'histogram_quantile' in self.query and '_bucket' in self.query

        # Find all metric names in the query (outside of {...} blocks)
        query_clean = self._strip_label_selectors_and_strings(self.query)
        metric_pattern = r'\b([a-zA-Z_:][a-zA-Z0-9_:]*)\b'
        potential_metrics = re.findall(metric_pattern, query_clean)

        # Filter out reserved words
        reserved_words = {
            'sum', 'avg', 'min', 'max', 'count', 'stddev', 'stdvar', 'group',
            'topk', 'bottomk', 'quantile', 'count_values', 'limitk', 'limit_ratio',
            'rate', 'irate', 'increase', 'delta', 'idelta', 'deriv', 'predict_linear',
            'histogram_quantile', 'histogram_count', 'histogram_sum', 'histogram_fraction',
            'histogram_avg', 'histogram_stddev', 'histogram_stdvar',
            'abs', 'ceil', 'floor', 'round', 'sqrt', 'exp', 'ln', 'log2', 'log10',
            'sin', 'cos', 'tan', 'asin', 'acos', 'atan', 'sinh', 'cosh', 'tanh',
            'deg', 'rad', 'sgn', 'clamp', 'clamp_max', 'clamp_min', 'pi',
            'timestamp', 'time', 'minute', 'hour', 'day_of_month', 'day_of_week',
            'days_in_month', 'month', 'year',
            'label_replace', 'label_join', 'vector', 'scalar',
            'changes', 'resets', 'absent', 'absent_over_time', 'present_over_time',
            'avg_over_time', 'min_over_time', 'max_over_time', 'sum_over_time',
            'count_over_time', 'quantile_over_time', 'stddev_over_time', 'stdvar_over_time',
            'last_over_time', 'mad_over_time', 'sort', 'sort_desc', 'sort_by_label',
            'sort_by_label_desc', 'holt_winters', 'double_exponential_smoothing', 'info',
            'by', 'without', 'and', 'or', 'unless', 'on', 'ignoring',
            'group_left', 'group_right', 'bool', 'offset', 'start', 'end',
            'inf', 'nan'
        }

        metrics = [m for m in potential_metrics if m.lower() not in reserved_words]

        for metric in metrics:
            # Classify metric type
            # For _bucket metrics in histogram_quantile queries, treat them as histogram components
            # not as standalone counters
            if metric.endswith('_bucket') and is_classic_histogram_query:
                # This is part of a classic histogram query - don't flag as counter
                continue
            elif any(metric.endswith(suffix) for suffix in ['_total', '_count', '_sum', '_bucket']):
                detected_types.add('counter')
                type_examples.setdefault('counter', []).append(metric)
            elif any(pattern in metric for pattern in self.GAUGE_PATTERNS):
                detected_types.add('gauge')
                type_examples.setdefault('gauge', []).append(metric)
            elif 'quantile' in self.query and metric in self.query:
                # Check if this metric is used with a quantile label selector
                if re.search(rf'{re.escape(metric)}\s*\{{[^}}]*quantile\s*=', self.query):
                    detected_types.add('summary')
                    type_examples.setdefault('summary', []).append(metric)

        # Check for histogram usage via histogram_quantile
        # Only add 'histogram' as a type if it's NOT a classic histogram query
        # (where _bucket metrics are expected and handled above)
        if 'histogram_quantile' in self.query and not is_classic_histogram_query:
            detected_types.add('histogram')
            type_examples.setdefault('histogram', []).append('(histogram_quantile usage)')

        # Warn if multiple different metric types are combined with arithmetic
        if len(detected_types) >= 2:
            # Check if there are arithmetic operators combining these
            arithmetic_ops = ['+', '-', '*', '/']
            has_arithmetic = any(op in self.query for op in arithmetic_ops)

            if has_arithmetic:
                type_list = ', '.join(sorted(detected_types))
                examples = []
                for t, metrics_list in type_examples.items():
                    examples.append(f"{t}: {metrics_list[0]}")

                self.issues.append({
                    'type': 'mixed_metric_types',
                    'message': f'Query combines different metric types ({type_list}) in arithmetic operations',
                    'severity': 'warning',
                    'recommendation': f'Mixing metric types often produces meaningless results. Examples found: {"; ".join(examples)}. Consider separating into distinct queries or ensure the combination makes semantic sense.'
                })

    @staticmethod
    def _duration_to_seconds(value: int, unit: str) -> int:
        """Convert duration to seconds"""
        units = {
            'ms': 0.001,
            's': 1,
            'm': 60,
            'h': 3600,
            'd': 86400,
            'w': 604800,
            'y': 31536000
        }
        return int(value * units.get(unit, 1))

    @staticmethod
    def _duration_to_minutes(value: int, unit: str) -> float:
        """Convert duration to minutes"""
        return PromQLBestPracticesChecker._duration_to_seconds(value, unit) / 60

    @staticmethod
    def _duration_to_hours(value: int, unit: str) -> float:
        """Convert duration to hours"""
        return PromQLBestPracticesChecker._duration_to_seconds(value, unit) / 3600

    def _build_result(self) -> Dict:
        """Build the check result dictionary"""
        all_findings = self.issues + self.suggestions + self.optimizations

        has_errors = any(item['severity'] == 'error' for item in all_findings)
        has_warnings = any(item['severity'] == 'warning' for item in all_findings)

        if has_errors:
            status = 'ERROR'
        elif has_warnings:
            status = 'WARNING'
        elif self.optimizations or self.suggestions:
            status = 'CAN_BE_IMPROVED'
        else:
            status = 'OPTIMIZED'

        return {
            'status': status,
            'query': self.query,
            'issues': self.issues,
            'suggestions': self.suggestions,
            'optimizations': self.optimizations,
            'summary': {
                'errors': len([i for i in self.issues if i['severity'] == 'error']),
                'warnings': len([i for i in self.issues if i['severity'] == 'warning']),
                'suggestions': len(self.suggestions),
                'optimizations': len(self.optimizations)
            }
        }


def main():
    """Main entry point for the best practices checker"""
    if len(sys.argv) < 2:
        print(json.dumps({
            'status': 'ERROR',
            'message': 'Usage: check_best_practices.py "<promql_query>"'
        }, indent=2))
        sys.exit(1)

    query = sys.argv[1]
    checker = PromQLBestPracticesChecker(query)
    result = checker.check()

    print(json.dumps(result, indent=2))

    # Exit with error code if there are errors
    sys.exit(0 if result['summary']['errors'] == 0 else 1)


if __name__ == '__main__':
    main()

Install with Tessl CLI

npx tessl i pantheon-ai/promql-validator@0.2.1

SKILL.md

tile.json