A client library for accessing the Grafana HTTP API, written in Python
—
Legacy alerting, modern alerting provisioning, notification channels, contact points, notification policies, and mute timings management. This covers both the legacy alerting system and the new Grafana 8+ unified alerting system.
Legacy alert rule management for Grafana versions prior to 8.0 and instances still using the legacy alerting system.
def get_alertrule(self, folder_name: str, alertrule_name: str):
"""
Get legacy alert rule by folder and name.
Args:
folder_name (str): Folder name containing the alert rule
alertrule_name (str): Alert rule name
Returns:
dict: Alert rule configuration and status
"""
...
def create_alertrule(self, folder_name: str, alertrule: dict):
"""
Create legacy alert rule in specified folder.
Args:
folder_name (str): Target folder name
alertrule (dict): Alert rule configuration
Returns:
dict: Created alert rule with ID and metadata
"""
...
def update_alertrule(self, folder_name: str, alertrule: dict):
"""
Update existing legacy alert rule.
Args:
folder_name (str): Folder name containing the alert rule
alertrule (dict): Updated alert rule configuration
Returns:
dict: Update result
"""
...
def delete_alertrule(self, folder_name: str, alertrule_name: str):
"""
Delete legacy alert rule.
Args:
folder_name (str): Folder name containing the alert rule
alertrule_name (str): Alert rule name to delete
Returns:
dict: Deletion result
"""
...Legacy Alerting Usage Example:
from grafana_client import GrafanaApi, TokenAuth
api = GrafanaApi(auth=TokenAuth("your-token"), host="grafana.example.com")
# Create legacy alert rule
legacy_alert = {
"name": "High CPU Usage",
"message": "CPU usage is above 80%",
"frequency": "10s",
"conditions": [
{
"query": {
"params": ["A", "5m", "now"]
},
"reducer": {
"params": [],
"type": "avg"
},
"evaluator": {
"params": [80],
"type": "gt"
},
"operator": {
"type": "and"
}
}
],
"executionErrorState": "alerting",
"noDataState": "no_data",
"for": "5m"
}
# Create the alert rule
result = api.alerting.create_alertrule("Production", legacy_alert)
print(f"Created legacy alert rule: {result}")
# Get existing alert rule
existing_rule = api.alerting.get_alertrule("Production", "High CPU Usage")
print(f"Alert rule state: {existing_rule.get('state', 'unknown')}")
# Update alert rule
legacy_alert["message"] = "Updated: CPU usage is critically high"
api.alerting.update_alertrule("Production", legacy_alert)
print("Alert rule updated")Modern unified alerting system operations for Grafana 8+ with support for alert rules, contact points, notification policies, and mute timings.
def get_alertrules_all(self):
"""
Get all alert rules in unified alerting.
Returns:
list: List of all alert rules across folders
"""
...
def get_alertrule(self, alertrule_uid: str):
"""
Get alert rule by UID.
Args:
alertrule_uid (str): Alert rule UID
Returns:
dict: Alert rule configuration and metadata
"""
...
def create_alertrule(self, alertrule: dict, disable_provenance: bool = False):
"""
Create new alert rule.
Args:
alertrule (dict): Alert rule configuration
disable_provenance (bool): Disable provenance checking
Returns:
dict: Created alert rule with UID and metadata
"""
...
def update_alertrule(self, alertrule_uid: str, alertrule: dict, disable_provenance: bool = False):
"""
Update existing alert rule.
Args:
alertrule_uid (str): Alert rule UID
alertrule (dict): Updated alert rule configuration
disable_provenance (bool): Disable provenance checking
Returns:
dict: Update result
"""
...
def delete_alertrule(self, alertrule_uid: str):
"""
Delete alert rule by UID.
Args:
alertrule_uid (str): Alert rule UID to delete
Returns:
dict: Deletion result
"""
...Modern Alert Rule Usage Example:
# Modern alert rule configuration
modern_alert = {
"uid": "", # Auto-generated if empty
"title": "High Memory Usage",
"condition": "C", # Condition query ref ID
"data": [
{
"refId": "A",
"queryType": "",
"relativeTimeRange": {
"from": 600,
"to": 0
},
"datasourceUid": "prometheus-uid",
"model": {
"expr": "node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes * 100",
"interval": "",
"refId": "A"
}
},
{
"refId": "C",
"queryType": "",
"relativeTimeRange": {
"from": 0,
"to": 0
},
"datasourceUid": "__expr__",
"model": {
"conditions": [
{
"evaluator": {
"params": [20],
"type": "lt"
},
"operator": {
"type": "and"
},
"query": {
"params": ["A"]
},
"reducer": {
"params": [],
"type": "last"
},
"type": "query"
}
],
"expression": "",
"hide": False,
"intervalMs": 1000,
"maxDataPoints": 43200,
"reducer": "last",
"refId": "C",
"type": "classic_conditions"
}
}
],
"folderUID": "alerts-folder-uid",
"ruleGroup": "System Alerts",
"noDataState": "NoData",
"execErrState": "Alerting",
"for": "5m",
"annotations": {
"description": "Memory usage is below 20%",
"runbook_url": "https://wiki.example.com/memory-alerts"
},
"labels": {
"severity": "warning",
"team": "sre"
}
}
# Create modern alert rule
created_rule = api.alertingprovisioning.create_alertrule(modern_alert)
print(f"Created alert rule UID: {created_rule['uid']}")
# Get all alert rules
all_rules = api.alertingprovisioning.get_alertrules_all()
print(f"Total alert rules: {len(all_rules)}")
# Get specific rule
rule_details = api.alertingprovisioning.get_alertrule(created_rule['uid'])
print(f"Rule: {rule_details['title']} - State: {rule_details.get('state', 'unknown')}")Managing notification channels and contact points for alert delivery.
def get_contactpoints(self, name: Optional[str] = None):
"""
Get contact points, optionally filtered by name.
Args:
name (Optional[str]): Filter by contact point name
Returns:
list: List of contact points
"""
...
def create_contactpoint(self, contactpoint: dict, disable_provenance: bool = False):
"""
Create new contact point.
Args:
contactpoint (dict): Contact point configuration
disable_provenance (bool): Disable provenance checking
Returns:
dict: Created contact point with UID
"""
...
def update_contactpoint(self, contactpoint_uid: str, contactpoint: dict):
"""
Update existing contact point.
Args:
contactpoint_uid (str): Contact point UID
contactpoint (dict): Updated contact point configuration
Returns:
dict: Update result
"""
...
def delete_contactpoint(self, contactpoint_uid: str):
"""
Delete contact point.
Args:
contactpoint_uid (str): Contact point UID to delete
Returns:
dict: Deletion result
"""
...Contact Points Usage Example:
# Slack contact point
slack_contact = {
"name": "slack-alerts",
"type": "slack",
"settings": {
"url": "https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX",
"channel": "#alerts",
"title": "Grafana Alert",
"text": "{{ range .Alerts }}{{ .Annotations.summary }}{{ end }}"
}
}
# Email contact point
email_contact = {
"name": "email-sre",
"type": "email",
"settings": {
"addresses": ["sre-team@example.com", "oncall@example.com"],
"subject": "Grafana Alert: {{ .GroupLabels.alertname }}",
"body": "Alert Details:\n{{ range .Alerts }}{{ .Annotations.description }}{{ end }}"
}
}
# Webhook contact point
webhook_contact = {
"name": "webhook-pagerduty",
"type": "webhook",
"settings": {
"url": "https://events.pagerduty.com/v2/enqueue",
"httpMethod": "POST",
"username": "",
"password": "",
"title": "Grafana Alert",
"body": '{"routing_key": "YOUR_ROUTING_KEY", "event_action": "trigger", "payload": {"summary": "{{ .GroupLabels.alertname }}", "severity": "error", "source": "Grafana"}}'
}
}
# Create contact points
slack_result = api.alertingprovisioning.create_contactpoint(slack_contact)
email_result = api.alertingprovisioning.create_contactpoint(email_contact)
webhook_result = api.alertingprovisioning.create_contactpoint(webhook_contact)
print(f"Created contact points:")
print(f"- Slack: {slack_result['uid']}")
print(f"- Email: {email_result['uid']}")
print(f"- Webhook: {webhook_result['uid']}")
# Get all contact points
contact_points = api.alertingprovisioning.get_contactpoints()
for cp in contact_points:
print(f"Contact point: {cp['name']} ({cp['type']})")Managing notification policy trees that define routing and escalation rules for alerts.
def get_notification_policy_tree(self):
"""
Get the notification policy tree.
Returns:
dict: Complete notification policy tree configuration
"""
...
def set_notification_policy_tree(self, notification_policy_tree: dict, disable_provenance: bool = False):
"""
Set/replace the entire notification policy tree.
Args:
notification_policy_tree (dict): Complete policy tree configuration
disable_provenance (bool): Disable provenance checking
Returns:
dict: Update result
"""
...Notification Policies Usage Example:
# Get current notification policy tree
current_policy = api.alertingprovisioning.get_notification_policy_tree()
print(f"Current root policy receiver: {current_policy.get('receiver', 'default')}")
# Define notification policy tree
policy_tree = {
"receiver": "default-contact", # Default receiver for unmatched alerts
"group_by": ["alertname", "cluster"], # Group alerts by these labels
"group_wait": "10s", # Wait before sending first notification
"group_interval": "10s", # Wait between notifications for same group
"repeat_interval": "1h", # Wait before repeating notifications
"routes": [
{
"receiver": "slack-alerts",
"matchers": [
{
"name": "severity",
"value": "warning",
"isRegex": False,
"isEqual": True
}
],
"group_wait": "5s",
"repeat_interval": "30m"
},
{
"receiver": "email-sre",
"matchers": [
{
"name": "severity",
"value": "critical",
"isRegex": False,
"isEqual": True
}
],
"group_wait": "0s",
"repeat_interval": "15m",
"routes": [
{
"receiver": "webhook-pagerduty",
"matchers": [
{
"name": "team",
"value": "sre",
"isRegex": False,
"isEqual": True
}
],
"continue": True # Continue to parent route as well
}
]
}
]
}
# Set notification policy tree
api.alertingprovisioning.set_notification_policy_tree(policy_tree)
print("Notification policy tree updated")Managing mute timings to suppress alerts during maintenance windows or scheduled downtime.
def get_mute_timings(self):
"""
Get all mute timings.
Returns:
list: List of mute timing configurations
"""
...
def create_mute_timing(self, mutetiming: dict, disable_provenance: bool = False):
"""
Create new mute timing.
Args:
mutetiming (dict): Mute timing configuration
disable_provenance (bool): Disable provenance checking
Returns:
dict: Created mute timing
"""
...
def delete_mute_timing(self, mutetiming_name: str):
"""
Delete mute timing by name.
Args:
mutetiming_name (str): Mute timing name to delete
Returns:
dict: Deletion result
"""
...Mute Timings Usage Example:
# Maintenance window mute timing
maintenance_mute = {
"name": "maintenance-window",
"time_intervals": [
{
"times": [
{
"start_time": "02:00",
"end_time": "04:00"
}
],
"weekdays": ["sunday"], # Every Sunday 2-4 AM
"months": [],
"years": [],
"days_of_month": []
}
]
}
# Business hours mute timing (outside business hours)
business_hours_mute = {
"name": "outside-business-hours",
"time_intervals": [
{
"times": [
{
"start_time": "18:00",
"end_time": "08:00"
}
],
"weekdays": ["monday", "tuesday", "wednesday", "thursday", "friday"]
},
{
"times": [], # All day
"weekdays": ["saturday", "sunday"]
}
]
}
# Holiday mute timing
holiday_mute = {
"name": "holidays-2024",
"time_intervals": [
{
"times": [], # All day
"weekdays": [],
"months": ["december"],
"years": ["2024"],
"days_of_month": ["25", "26"] # Christmas
},
{
"times": [],
"weekdays": [],
"months": ["january"],
"years": ["2024"],
"days_of_month": ["1"] # New Year
}
]
}
# Create mute timings
api.alertingprovisioning.create_mute_timing(maintenance_mute)
api.alertingprovisioning.create_mute_timing(business_hours_mute)
api.alertingprovisioning.create_mute_timing(holiday_mute)
print("Mute timings created")
# List all mute timings
mute_timings = api.alertingprovisioning.get_mute_timings()
for mt in mute_timings:
print(f"Mute timing: {mt['name']} ({len(mt['time_intervals'])} intervals)")Legacy notification channel management for older Grafana versions.
def get_channels(self):
"""
Get legacy notification channels.
Returns:
list: List of notification channels
"""
...
def get_channel_by_uid(self, channel_uid: str):
"""
Get legacy notification channel by UID.
Args:
channel_uid (str): Channel UID
Returns:
dict: Notification channel configuration
"""
...
def create_channel(self, channel: dict):
"""
Create legacy notification channel.
Args:
channel (dict): Channel configuration
Returns:
dict: Created channel with ID and UID
"""
...
def update_channel_by_uid(self, uid: str, channel: dict):
"""
Update legacy notification channel.
Args:
uid (str): Channel UID
channel (dict): Updated channel configuration
Returns:
dict: Update result
"""
...
def delete_notification_by_uid(self, notification_uid: str):
"""
Delete legacy notification channel.
Args:
notification_uid (str): Channel UID to delete
Returns:
dict: Deletion result
"""
...Legacy Notification Channels Usage Example:
# Legacy Slack notification channel
legacy_slack = {
"name": "legacy-slack",
"type": "slack",
"settings": {
"url": "https://hooks.slack.com/services/...",
"channel": "#alerts-legacy",
"title": "Legacy Alert",
"text": "Alert: {{ .Title }}\nMessage: {{ .Message }}"
}
}
# Create legacy channel
legacy_result = api.notifications.create_channel(legacy_slack)
print(f"Created legacy channel: {legacy_result['uid']}")
# Get all legacy channels
channels = api.notifications.get_channels()
for channel in channels:
print(f"Legacy channel: {channel['name']} ({channel['type']})")Common alerting operation errors and handling strategies:
from grafana_client import GrafanaClientError, GrafanaBadInputError
try:
# Invalid alert rule configuration
invalid_alert = {
"title": "", # Empty title
"condition": "X", # Non-existent condition
"data": [] # Empty data array
}
api.alertingprovisioning.create_alertrule(invalid_alert)
except GrafanaBadInputError as e:
print(f"Invalid alert configuration: {e.message}")
except GrafanaClientError as e:
if e.status_code == 404:
print("Alert rule or folder not found")
elif e.status_code == 409:
print("Alert rule already exists")
else:
print(f"Alert operation failed: {e.message}")
# Contact point validation
try:
invalid_contact = {
"name": "",
"type": "invalid-type",
"settings": {}
}
api.alertingprovisioning.create_contactpoint(invalid_contact)
except Exception as e:
print(f"Contact point creation failed: {e}")All alerting operations support async versions:
import asyncio
from grafana_client import AsyncGrafanaApi, TokenAuth
async def manage_alerting():
api = AsyncGrafanaApi(auth=TokenAuth("your-token"), host="grafana.example.com")
# Concurrent operations
alert_tasks = [
api.alertingprovisioning.get_alertrules_all(),
api.alertingprovisioning.get_contactpoints(),
api.alertingprovisioning.get_mute_timings()
]
alert_rules, contact_points, mute_timings = await asyncio.gather(*alert_tasks)
print(f"Alert rules: {len(alert_rules)}")
print(f"Contact points: {len(contact_points)}")
print(f"Mute timings: {len(mute_timings)}")
# Create multiple contact points concurrently
contact_configs = [slack_contact, email_contact, webhook_contact]
create_tasks = [
api.alertingprovisioning.create_contactpoint(config)
for config in contact_configs
]
results = await asyncio.gather(*create_tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Failed to create contact point {i}: {result}")
else:
print(f"Created contact point: {result['uid']}")
asyncio.run(manage_alerting())Install with Tessl CLI
npx tessl i tessl/pypi-grafana-client@5.0.1