A client library for accessing the Grafana HTTP API, written in Python
—
Comprehensive data source operations including CRUD operations, health checks, query execution, smart query capabilities with type-safe data models, and integration with Grafana's data source plugin ecosystem.
Core data source management operations for creating, reading, updating, and deleting data sources with support for both UID and ID-based access.
def list_datasources(self):
"""
List all data sources in the organization.
Returns:
list: List of data source objects with basic information
"""
...
def get_datasource_by_uid(self, datasource_uid: str):
"""
Get data source by UID (recommended).
Args:
datasource_uid (str): Data source UID
Returns:
dict: Complete data source configuration
"""
...
def get_datasource_by_id(self, datasource_id: int):
"""
Get data source by ID (deprecated, use UID method).
Args:
datasource_id (int): Data source ID
Returns:
dict: Data source configuration
"""
...
def get_datasource_by_name(self, datasource_name: str):
"""
Get data source by name.
Args:
datasource_name (str): Data source name
Returns:
dict: Data source configuration
"""
...
def create_datasource(self, datasource: dict):
"""
Create new data source.
Args:
datasource (dict): Data source configuration
Returns:
dict: Created data source with ID and UID
"""
...
def update_datasource_by_uid(self, datasource_uid: str, datasource: dict):
"""
Update data source by UID (recommended).
Args:
datasource_uid (str): Data source UID
datasource (dict): Updated data source configuration
Returns:
dict: Update result
"""
...
def update_datasource(self, datasource_id: int, datasource: dict):
"""
Update data source by ID (deprecated, use UID method).
Args:
datasource_id (int): Data source ID
datasource (dict): Updated configuration
Returns:
dict: Update result
"""
...
def delete_datasource_by_uid(self, datasource_uid: str):
"""
Delete data source by UID.
Args:
datasource_uid (str): Data source UID
Returns:
dict: Deletion result
"""
...
def delete_datasource_by_id(self, datasource_id: int):
"""
Delete data source by ID (deprecated, use UID method).
Args:
datasource_id (int): Data source ID
Returns:
dict: Deletion result
"""
...
def delete_datasource_by_name(self, datasource_name: str):
"""
Delete data source by name.
Args:
datasource_name (str): Data source name
Returns:
dict: Deletion result
"""
...Basic Usage Example:
from grafana_client import GrafanaApi, TokenAuth
from grafana_client.model import DatasourceModel
api = GrafanaApi(auth=TokenAuth("your-token"), host="grafana.example.com")
# List all data sources
datasources = api.datasource.list_datasources()
for ds in datasources:
print(f"Data source: {ds['name']} ({ds['type']}) - UID: {ds['uid']}")
# Get specific data source
prometheus_ds = api.datasource.get_datasource_by_name("Prometheus")
print(f"Prometheus URL: {prometheus_ds['url']}")
# Create new data source using model
new_ds = DatasourceModel(
name="New Prometheus",
type="prometheus",
url="http://prometheus:9090",
access="proxy",
jsonData={
"httpMethod": "POST",
"timeInterval": "5s"
}
)
result = api.datasource.create_datasource(new_ds.asdict())
print(f"Created data source UID: {result['uid']}")Methods for finding and identifying data sources within the organization.
def find_datasource(self, datasource_name: str):
"""
Find data source by name (returns ID).
Args:
datasource_name (str): Data source name to search for
Returns:
int: Data source ID if found, None if not found
"""
...
def get_datasource_id_by_name(self, datasource_name: str):
"""
Get data source ID by name.
Args:
datasource_name (str): Data source name
Returns:
int: Data source ID if found
"""
...
def get(self, dsident):
"""
Get data source by DatasourceIdentifier.
Args:
dsident (DatasourceIdentifier): Data source identifier (ID, UID, or name)
Returns:
dict: Data source configuration
"""
...Usage with DatasourceIdentifier:
from grafana_client.model import DatasourceIdentifier
# Create identifier by UID (recommended)
ds_id = DatasourceIdentifier(uid="prometheus-uid")
datasource = api.datasource.get(ds_id)
# Create identifier by name
ds_id = DatasourceIdentifier(name="Prometheus")
datasource = api.datasource.get(ds_id)
# Create identifier by ID (deprecated)
ds_id = DatasourceIdentifier(id="123")
datasource = api.datasource.get(ds_id)Permission management for data sources, controlling access at the data source level.
def enable_datasource_permissions(self, datasource_id: int):
"""
Enable permissions for a data source.
Args:
datasource_id (int): Data source ID
Returns:
dict: Permission enablement result
"""
...
def disable_datasource_permissions(self, datasource_id: int):
"""
Disable permissions for a data source.
Args:
datasource_id (int): Data source ID
Returns:
dict: Permission disablement result
"""
...
def get_datasource_permissions(self, datasource_id: int):
"""
Get permissions for a data source.
Args:
datasource_id (int): Data source ID
Returns:
list: List of permission objects
"""
...
def add_datasource_permissions(self, datasource_id: int, permissions: list):
"""
Add permissions to a data source.
Args:
datasource_id (int): Data source ID
permissions (list): List of permission objects to add
Returns:
dict: Permission addition result
"""
...
def remove_datasource_permissions(self, datasource_id: int, permission_id: int):
"""
Remove permission from a data source.
Args:
datasource_id (int): Data source ID
permission_id (int): Permission ID to remove
Returns:
dict: Permission removal result
"""
...Permission Management Example:
# Enable permissions for a data source
api.datasource.enable_datasource_permissions(datasource_id=123)
# Get current permissions
permissions = api.datasource.get_datasource_permissions(datasource_id=123)
for perm in permissions:
print(f"User/Team {perm.get('userId', perm.get('teamId'))}: {perm['permission']}")
# Add new permission
new_permissions = [{
"teamId": 5,
"permission": 1 # Query permission
}]
api.datasource.add_datasource_permissions(datasource_id=123, permissions=new_permissions)
# Remove specific permission
api.datasource.remove_datasource_permissions(datasource_id=123, permission_id=456)Multiple methods for checking data source health and connectivity.
def health(self, datasource_uid: str):
"""
Native Grafana 9+ health check API.
Args:
datasource_uid (str): Data source UID
Returns:
dict: Health check result from Grafana API
"""
...
def health_check(self, datasource: dict):
"""
Client-side health check with comprehensive testing.
Args:
datasource (dict): Data source configuration
Returns:
DatasourceHealthResponse: Detailed health check result
"""
...
def health_inquiry(self, datasource_uid: str):
"""
Comprehensive health inquiry combining multiple checks.
Args:
datasource_uid (str): Data source UID
Returns:
DatasourceHealthResponse: Combined health check result
"""
...Health Check Usage:
from grafana_client.model import DatasourceHealthResponse
# Native Grafana health check (Grafana 9+)
try:
health_result = api.datasource.health("datasource-uid")
print(f"Health status: {health_result['status']}")
print(f"Message: {health_result['message']}")
except Exception as e:
print(f"Health check failed: {e}")
# Comprehensive client-side health check
datasource_config = api.datasource.get_datasource_by_uid("prometheus-uid")
health_response = api.datasource.health_check(datasource_config)
print(f"Health check success: {health_response.success}")
print(f"Status: {health_response.status}")
print(f"Message: {health_response.message}")
print(f"Duration: {health_response.duration}s")
# Health inquiry (combines multiple methods)
inquiry_result = api.datasource.health_inquiry("datasource-uid")
print(f"Comprehensive health: {inquiry_result.asdict_compact()}")Health Response Parsing Utilities:
@staticmethod
def parse_health_response_results(response: Dict):
"""
Parse health response results (static method).
Args:
response (Dict): Raw health check response
Returns:
Tuple[bool, str]: Success status and message
"""
...
@staticmethod
def parse_health_response_data(response: Dict):
"""
Parse health response data (static method).
Args:
response (Dict): Raw health response data
Returns:
Tuple[bool, str]: Success status and message
"""
...Execute queries against data sources with support for instant queries, range queries, and smart queries.
def query(self, datasource_id: int, query: dict, timestamp: Optional[int] = None):
"""
Execute instant query against data source.
Args:
datasource_id (int): Data source ID
query (dict): Query configuration
timestamp (Optional[int]): Unix timestamp for instant queries
Returns:
dict: Query result data
"""
...
def query_range(self, datasource_id: int, query: dict, start: int, end: int, step: int):
"""
Execute range query against data source.
Args:
datasource_id (int): Data source ID
query (dict): Query configuration
start (int): Start timestamp (Unix time)
end (int): End timestamp (Unix time)
step (int): Step interval in seconds
Returns:
dict: Range query result data
"""
...
def smartquery(self, datasource: dict, expression: str, attrs: Optional[dict] = None, request: Optional[dict] = None):
"""
Execute smart query with automatic query building.
Args:
datasource (dict): Data source configuration
expression (str): Query expression
attrs (Optional[dict]): Additional query attributes
request (Optional[dict]): HTTP request configuration
Returns:
dict: Smart query results
"""
...
def series(self, datasource_id: int, match: list, start: int, end: int, access: str = "proxy"):
"""
Get series metadata from data source.
Args:
datasource_id (int): Data source ID
match (list): Series match patterns
start (int): Start timestamp
end (int): End timestamp
access (str): Access mode ("proxy" or "direct")
Returns:
dict: Series metadata
"""
...
def get_datasource_proxy_data(self, datasource_id: int, path: str, params: Optional[dict] = None):
"""
Get data through data source proxy.
Args:
datasource_id (int): Data source ID
path (str): Proxy path
params (Optional[dict]): Query parameters
Returns:
dict: Proxy response data
"""
...Query Usage Examples:
import time
# Get data source for queries
prometheus_ds = api.datasource.get_datasource_by_name("Prometheus")
datasource_id = prometheus_ds['id']
# Instant query
instant_query = {
"expr": "up",
"format": "json"
}
instant_result = api.datasource.query(datasource_id, instant_query)
print(f"Instant query result: {instant_result}")
# Range query
current_time = int(time.time())
start_time = current_time - 3600 # 1 hour ago
range_query = {
"expr": "rate(http_requests_total[5m])",
"format": "json"
}
range_result = api.datasource.query_range(
datasource_id=datasource_id,
query=range_query,
start=start_time,
end=current_time,
step=300 # 5 minute steps
)
print(f"Range query returned {len(range_result.get('data', {}).get('result', []))} series")
# Smart query (automatic query building)
smart_result = api.datasource.smartquery(
datasource=prometheus_ds,
expression="cpu_usage",
attrs={
"time_range": "1h",
"aggregation": "avg"
}
)Common data source type configurations for various backends.
Prometheus Data Source:
prometheus_config = {
"name": "Prometheus",
"type": "prometheus",
"access": "proxy",
"url": "http://prometheus:9090",
"jsonData": {
"httpMethod": "POST",
"timeInterval": "5s",
"queryTimeout": "60s",
"disableMetricsLookup": False,
"customQueryParameters": "",
"exemplarTraceIdDestinations": []
},
"secureJsonData": {
"httpHeaderValue1": "Bearer token-value"
}
}InfluxDB Data Source:
influxdb_config = {
"name": "InfluxDB",
"type": "influxdb",
"access": "proxy",
"url": "http://influxdb:8086",
"database": "mydb",
"user": "influx_user",
"jsonData": {
"timeInterval": "10s",
"httpMode": "GET"
},
"secureJsonData": {
"password": "influx_password"
}
}MySQL Data Source:
mysql_config = {
"name": "MySQL",
"type": "mysql",
"access": "proxy",
"url": "mysql-host:3306",
"database": "grafana",
"user": "mysql_user",
"jsonData": {
"maxOpenConns": 0,
"maxIdleConns": 2,
"connMaxLifetime": 14400
},
"secureJsonData": {
"password": "mysql_password"
}
}Elasticsearch Data Source:
elasticsearch_config = {
"name": "Elasticsearch",
"type": "elasticsearch",
"access": "proxy",
"url": "http://elasticsearch:9200",
"database": "[logs-]YYYY.MM.DD",
"jsonData": {
"timeField": "@timestamp",
"esVersion": "7.10.0",
"logMessageField": "message",
"logLevelField": "level",
"maxConcurrentShardRequests": 5,
"includeFrozen": False
}
}Bulk Operations:
# Create multiple data sources
datasource_configs = [
prometheus_config,
influxdb_config,
mysql_config
]
created_datasources = []
for config in datasource_configs:
try:
result = api.datasource.create_datasource(config)
created_datasources.append(result)
print(f"Created: {config['name']} (UID: {result['uid']})")
except Exception as e:
print(f"Failed to create {config['name']}: {e}")
# Bulk health checks
for ds in created_datasources:
try:
health = api.datasource.health(ds['uid'])
print(f"{ds['name']}: {health.get('status', 'Unknown')}")
except Exception as e:
print(f"{ds['name']}: Health check failed - {e}")Data Source Permissions (via RBAC):
# Set data source permissions for teams
api.rbac.set_rbac_datasources_teams(
datasource_uid="prometheus-uid",
team_id=5,
permission="Edit" # "View", "Edit", or "Admin"
)
# Set data source permissions for built-in roles
api.rbac.set_rbac_datasources_builtin_roles(
datasource_uid="prometheus-uid",
builtin_role="Editor",
permission="View"
)Common data source operation errors:
from grafana_client import GrafanaClientError, GrafanaBadInputError
try:
# Attempt to create data source with invalid config
invalid_config = {
"name": "", # Invalid empty name
"type": "unknown-type",
"url": "invalid-url"
}
api.datasource.create_datasource(invalid_config)
except GrafanaBadInputError as e:
print(f"Invalid data source configuration: {e.message}")
except GrafanaClientError as e:
if e.status_code == 409:
print("Data source with this name already exists")
elif e.status_code == 404:
print("Data source not found")
else:
print(f"Client error: {e.message}")
# Health check error handling
try:
health_result = api.datasource.health("non-existent-uid")
except GrafanaClientError as e:
print(f"Health check failed: {e.message}")All data source operations support async versions:
import asyncio
from grafana_client import AsyncGrafanaApi, TokenAuth
async def manage_datasources():
api = AsyncGrafanaApi(auth=TokenAuth("your-token"), host="grafana.example.com")
# Async data source operations
datasources = await api.datasource.list_datasources()
print(f"Found {len(datasources)} data sources")
# Concurrent health checks
health_tasks = [
api.datasource.health(ds['uid'])
for ds in datasources if ds.get('uid')
]
health_results = await asyncio.gather(*health_tasks, return_exceptions=True)
for ds, health in zip(datasources, health_results):
if isinstance(health, Exception):
print(f"{ds['name']}: Health check failed - {health}")
else:
print(f"{ds['name']}: {health.get('status', 'Unknown')}")
asyncio.run(manage_datasources())Install with Tessl CLI
npx tessl i tessl/pypi-grafana-client@5.0.1