or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

admin-and-rbac.mdalerting.mdauthentication.mdclient-management.mddashboards.mddata-models.mddatasources.mdindex.mdlibrary-elements.mdplugin-management.mdsnapshot-management.mdusers-and-orgs.md

datasources.mddocs/

0

# Data Source Management

1

2

Comprehensive data source operations including CRUD operations, health checks, query execution, smart query capabilities with type-safe data models, and integration with Grafana's data source plugin ecosystem.

3

4

## Capabilities

5

6

### Data Source CRUD Operations

7

8

Core data source management operations for creating, reading, updating, and deleting data sources with support for both UID and ID-based access.

9

10

```python { .api }

11

def list_datasources(self):

12

"""

13

List all data sources in the organization.

14

15

Returns:

16

list: List of data source objects with basic information

17

"""

18

...

19

20

def get_datasource_by_uid(self, datasource_uid: str):

21

"""

22

Get data source by UID (recommended).

23

24

Args:

25

datasource_uid (str): Data source UID

26

27

Returns:

28

dict: Complete data source configuration

29

"""

30

...

31

32

def get_datasource_by_id(self, datasource_id: int):

33

"""

34

Get data source by ID (deprecated, use UID method).

35

36

Args:

37

datasource_id (int): Data source ID

38

39

Returns:

40

dict: Data source configuration

41

"""

42

...

43

44

def get_datasource_by_name(self, datasource_name: str):

45

"""

46

Get data source by name.

47

48

Args:

49

datasource_name (str): Data source name

50

51

Returns:

52

dict: Data source configuration

53

"""

54

...

55

56

def create_datasource(self, datasource: dict):

57

"""

58

Create new data source.

59

60

Args:

61

datasource (dict): Data source configuration

62

63

Returns:

64

dict: Created data source with ID and UID

65

"""

66

...

67

68

def update_datasource_by_uid(self, datasource_uid: str, datasource: dict):

69

"""

70

Update data source by UID (recommended).

71

72

Args:

73

datasource_uid (str): Data source UID

74

datasource (dict): Updated data source configuration

75

76

Returns:

77

dict: Update result

78

"""

79

...

80

81

def update_datasource(self, datasource_id: int, datasource: dict):

82

"""

83

Update data source by ID (deprecated, use UID method).

84

85

Args:

86

datasource_id (int): Data source ID

87

datasource (dict): Updated configuration

88

89

Returns:

90

dict: Update result

91

"""

92

...

93

94

def delete_datasource_by_uid(self, datasource_uid: str):

95

"""

96

Delete data source by UID.

97

98

Args:

99

datasource_uid (str): Data source UID

100

101

Returns:

102

dict: Deletion result

103

"""

104

...

105

106

def delete_datasource_by_id(self, datasource_id: int):

107

"""

108

Delete data source by ID (deprecated, use UID method).

109

110

Args:

111

datasource_id (int): Data source ID

112

113

Returns:

114

dict: Deletion result

115

"""

116

...

117

118

def delete_datasource_by_name(self, datasource_name: str):

119

"""

120

Delete data source by name.

121

122

Args:

123

datasource_name (str): Data source name

124

125

Returns:

126

dict: Deletion result

127

"""

128

...

129

```

130

131

**Basic Usage Example:**

132

133

```python

134

from grafana_client import GrafanaApi, TokenAuth

135

from grafana_client.model import DatasourceModel

136

137

api = GrafanaApi(auth=TokenAuth("your-token"), host="grafana.example.com")

138

139

# List all data sources

140

datasources = api.datasource.list_datasources()

141

for ds in datasources:

142

print(f"Data source: {ds['name']} ({ds['type']}) - UID: {ds['uid']}")

143

144

# Get specific data source

145

prometheus_ds = api.datasource.get_datasource_by_name("Prometheus")

146

print(f"Prometheus URL: {prometheus_ds['url']}")

147

148

# Create new data source using model

149

new_ds = DatasourceModel(

150

name="New Prometheus",

151

type="prometheus",

152

url="http://prometheus:9090",

153

access="proxy",

154

jsonData={

155

"httpMethod": "POST",

156

"timeInterval": "5s"

157

}

158

)

159

160

result = api.datasource.create_datasource(new_ds.asdict())

161

print(f"Created data source UID: {result['uid']}")

162

```

163

164

### Data Source Discovery and Search

165

166

Methods for finding and identifying data sources within the organization.

167

168

```python { .api }

169

def find_datasource(self, datasource_name: str):

170

"""

171

Find data source by name (returns ID).

172

173

Args:

174

datasource_name (str): Data source name to search for

175

176

Returns:

177

int: Data source ID if found, None if not found

178

"""

179

...

180

181

def get_datasource_id_by_name(self, datasource_name: str):

182

"""

183

Get data source ID by name.

184

185

Args:

186

datasource_name (str): Data source name

187

188

Returns:

189

int: Data source ID if found

190

"""

191

...

192

193

def get(self, dsident):

194

"""

195

Get data source by DatasourceIdentifier.

196

197

Args:

198

dsident (DatasourceIdentifier): Data source identifier (ID, UID, or name)

199

200

Returns:

201

dict: Data source configuration

202

"""

203

...

204

```

205

206

**Usage with DatasourceIdentifier:**

207

208

```python

209

from grafana_client.model import DatasourceIdentifier

210

211

# Create identifier by UID (recommended)

212

ds_id = DatasourceIdentifier(uid="prometheus-uid")

213

datasource = api.datasource.get(ds_id)

214

215

# Create identifier by name

216

ds_id = DatasourceIdentifier(name="Prometheus")

217

datasource = api.datasource.get(ds_id)

218

219

# Create identifier by ID (deprecated)

220

ds_id = DatasourceIdentifier(id="123")

221

datasource = api.datasource.get(ds_id)

222

```

223

224

### Data Source Permissions Management

225

226

Permission management for data sources, controlling access at the data source level.

227

228

```python { .api }

229

def enable_datasource_permissions(self, datasource_id: int):

230

"""

231

Enable permissions for a data source.

232

233

Args:

234

datasource_id (int): Data source ID

235

236

Returns:

237

dict: Permission enablement result

238

"""

239

...

240

241

def disable_datasource_permissions(self, datasource_id: int):

242

"""

243

Disable permissions for a data source.

244

245

Args:

246

datasource_id (int): Data source ID

247

248

Returns:

249

dict: Permission disablement result

250

"""

251

...

252

253

def get_datasource_permissions(self, datasource_id: int):

254

"""

255

Get permissions for a data source.

256

257

Args:

258

datasource_id (int): Data source ID

259

260

Returns:

261

list: List of permission objects

262

"""

263

...

264

265

def add_datasource_permissions(self, datasource_id: int, permissions: list):

266

"""

267

Add permissions to a data source.

268

269

Args:

270

datasource_id (int): Data source ID

271

permissions (list): List of permission objects to add

272

273

Returns:

274

dict: Permission addition result

275

"""

276

...

277

278

def remove_datasource_permissions(self, datasource_id: int, permission_id: int):

279

"""

280

Remove permission from a data source.

281

282

Args:

283

datasource_id (int): Data source ID

284

permission_id (int): Permission ID to remove

285

286

Returns:

287

dict: Permission removal result

288

"""

289

...

290

```

291

292

**Permission Management Example:**

293

294

```python

295

# Enable permissions for a data source

296

api.datasource.enable_datasource_permissions(datasource_id=123)

297

298

# Get current permissions

299

permissions = api.datasource.get_datasource_permissions(datasource_id=123)

300

for perm in permissions:

301

print(f"User/Team {perm.get('userId', perm.get('teamId'))}: {perm['permission']}")

302

303

# Add new permission

304

new_permissions = [{

305

"teamId": 5,

306

"permission": 1 # Query permission

307

}]

308

api.datasource.add_datasource_permissions(datasource_id=123, permissions=new_permissions)

309

310

# Remove specific permission

311

api.datasource.remove_datasource_permissions(datasource_id=123, permission_id=456)

312

```

313

314

### Data Source Health Checks

315

316

Multiple methods for checking data source health and connectivity.

317

318

```python { .api }

319

def health(self, datasource_uid: str):

320

"""

321

Native Grafana 9+ health check API.

322

323

Args:

324

datasource_uid (str): Data source UID

325

326

Returns:

327

dict: Health check result from Grafana API

328

"""

329

...

330

331

def health_check(self, datasource: dict):

332

"""

333

Client-side health check with comprehensive testing.

334

335

Args:

336

datasource (dict): Data source configuration

337

338

Returns:

339

DatasourceHealthResponse: Detailed health check result

340

"""

341

...

342

343

def health_inquiry(self, datasource_uid: str):

344

"""

345

Comprehensive health inquiry combining multiple checks.

346

347

Args:

348

datasource_uid (str): Data source UID

349

350

Returns:

351

DatasourceHealthResponse: Combined health check result

352

"""

353

...

354

```

355

356

**Health Check Usage:**

357

358

```python

359

from grafana_client.model import DatasourceHealthResponse

360

361

# Native Grafana health check (Grafana 9+)

362

try:

363

health_result = api.datasource.health("datasource-uid")

364

print(f"Health status: {health_result['status']}")

365

print(f"Message: {health_result['message']}")

366

except Exception as e:

367

print(f"Health check failed: {e}")

368

369

# Comprehensive client-side health check

370

datasource_config = api.datasource.get_datasource_by_uid("prometheus-uid")

371

health_response = api.datasource.health_check(datasource_config)

372

373

print(f"Health check success: {health_response.success}")

374

print(f"Status: {health_response.status}")

375

print(f"Message: {health_response.message}")

376

print(f"Duration: {health_response.duration}s")

377

378

# Health inquiry (combines multiple methods)

379

inquiry_result = api.datasource.health_inquiry("datasource-uid")

380

print(f"Comprehensive health: {inquiry_result.asdict_compact()}")

381

```

382

383

**Health Response Parsing Utilities:**

384

385

```python { .api }

386

@staticmethod

387

def parse_health_response_results(response: Dict):

388

"""

389

Parse health response results (static method).

390

391

Args:

392

response (Dict): Raw health check response

393

394

Returns:

395

Tuple[bool, str]: Success status and message

396

"""

397

...

398

399

@staticmethod

400

def parse_health_response_data(response: Dict):

401

"""

402

Parse health response data (static method).

403

404

Args:

405

response (Dict): Raw health response data

406

407

Returns:

408

Tuple[bool, str]: Success status and message

409

"""

410

...

411

```

412

413

### Data Source Query Operations

414

415

Execute queries against data sources with support for instant queries, range queries, and smart queries.

416

417

```python { .api }

418

def query(self, datasource_id: int, query: dict, timestamp: Optional[int] = None):

419

"""

420

Execute instant query against data source.

421

422

Args:

423

datasource_id (int): Data source ID

424

query (dict): Query configuration

425

timestamp (Optional[int]): Unix timestamp for instant queries

426

427

Returns:

428

dict: Query result data

429

"""

430

...

431

432

def query_range(self, datasource_id: int, query: dict, start: int, end: int, step: int):

433

"""

434

Execute range query against data source.

435

436

Args:

437

datasource_id (int): Data source ID

438

query (dict): Query configuration

439

start (int): Start timestamp (Unix time)

440

end (int): End timestamp (Unix time)

441

step (int): Step interval in seconds

442

443

Returns:

444

dict: Range query result data

445

"""

446

...

447

448

def smartquery(self, datasource: dict, expression: str, attrs: Optional[dict] = None, request: Optional[dict] = None):

449

"""

450

Execute smart query with automatic query building.

451

452

Args:

453

datasource (dict): Data source configuration

454

expression (str): Query expression

455

attrs (Optional[dict]): Additional query attributes

456

request (Optional[dict]): HTTP request configuration

457

458

Returns:

459

dict: Smart query results

460

"""

461

...

462

463

def series(self, datasource_id: int, match: list, start: int, end: int, access: str = "proxy"):

464

"""

465

Get series metadata from data source.

466

467

Args:

468

datasource_id (int): Data source ID

469

match (list): Series match patterns

470

start (int): Start timestamp

471

end (int): End timestamp

472

access (str): Access mode ("proxy" or "direct")

473

474

Returns:

475

dict: Series metadata

476

"""

477

...

478

479

def get_datasource_proxy_data(self, datasource_id: int, path: str, params: Optional[dict] = None):

480

"""

481

Get data through data source proxy.

482

483

Args:

484

datasource_id (int): Data source ID

485

path (str): Proxy path

486

params (Optional[dict]): Query parameters

487

488

Returns:

489

dict: Proxy response data

490

"""

491

...

492

```

493

494

**Query Usage Examples:**

495

496

```python

497

import time

498

499

# Get data source for queries

500

prometheus_ds = api.datasource.get_datasource_by_name("Prometheus")

501

datasource_id = prometheus_ds['id']

502

503

# Instant query

504

instant_query = {

505

"expr": "up",

506

"format": "json"

507

}

508

instant_result = api.datasource.query(datasource_id, instant_query)

509

print(f"Instant query result: {instant_result}")

510

511

# Range query

512

current_time = int(time.time())

513

start_time = current_time - 3600 # 1 hour ago

514

range_query = {

515

"expr": "rate(http_requests_total[5m])",

516

"format": "json"

517

}

518

519

range_result = api.datasource.query_range(

520

datasource_id=datasource_id,

521

query=range_query,

522

start=start_time,

523

end=current_time,

524

step=300 # 5 minute steps

525

)

526

print(f"Range query returned {len(range_result.get('data', {}).get('result', []))} series")

527

528

# Smart query (automatic query building)

529

smart_result = api.datasource.smartquery(

530

datasource=prometheus_ds,

531

expression="cpu_usage",

532

attrs={

533

"time_range": "1h",

534

"aggregation": "avg"

535

}

536

)

537

```

538

539

### Data Source Configuration Examples

540

541

Common data source type configurations for various backends.

542

543

**Prometheus Data Source:**

544

545

```python

546

prometheus_config = {

547

"name": "Prometheus",

548

"type": "prometheus",

549

"access": "proxy",

550

"url": "http://prometheus:9090",

551

"jsonData": {

552

"httpMethod": "POST",

553

"timeInterval": "5s",

554

"queryTimeout": "60s",

555

"disableMetricsLookup": False,

556

"customQueryParameters": "",

557

"exemplarTraceIdDestinations": []

558

},

559

"secureJsonData": {

560

"httpHeaderValue1": "Bearer token-value"

561

}

562

}

563

```

564

565

**InfluxDB Data Source:**

566

567

```python

568

influxdb_config = {

569

"name": "InfluxDB",

570

"type": "influxdb",

571

"access": "proxy",

572

"url": "http://influxdb:8086",

573

"database": "mydb",

574

"user": "influx_user",

575

"jsonData": {

576

"timeInterval": "10s",

577

"httpMode": "GET"

578

},

579

"secureJsonData": {

580

"password": "influx_password"

581

}

582

}

583

```

584

585

**MySQL Data Source:**

586

587

```python

588

mysql_config = {

589

"name": "MySQL",

590

"type": "mysql",

591

"access": "proxy",

592

"url": "mysql-host:3306",

593

"database": "grafana",

594

"user": "mysql_user",

595

"jsonData": {

596

"maxOpenConns": 0,

597

"maxIdleConns": 2,

598

"connMaxLifetime": 14400

599

},

600

"secureJsonData": {

601

"password": "mysql_password"

602

}

603

}

604

```

605

606

**Elasticsearch Data Source:**

607

608

```python

609

elasticsearch_config = {

610

"name": "Elasticsearch",

611

"type": "elasticsearch",

612

"access": "proxy",

613

"url": "http://elasticsearch:9200",

614

"database": "[logs-]YYYY.MM.DD",

615

"jsonData": {

616

"timeField": "@timestamp",

617

"esVersion": "7.10.0",

618

"logMessageField": "message",

619

"logLevelField": "level",

620

"maxConcurrentShardRequests": 5,

621

"includeFrozen": False

622

}

623

}

624

```

625

626

### Advanced Data Source Operations

627

628

**Bulk Operations:**

629

630

```python

631

# Create multiple data sources

632

datasource_configs = [

633

prometheus_config,

634

influxdb_config,

635

mysql_config

636

]

637

638

created_datasources = []

639

for config in datasource_configs:

640

try:

641

result = api.datasource.create_datasource(config)

642

created_datasources.append(result)

643

print(f"Created: {config['name']} (UID: {result['uid']})")

644

except Exception as e:

645

print(f"Failed to create {config['name']}: {e}")

646

647

# Bulk health checks

648

for ds in created_datasources:

649

try:

650

health = api.datasource.health(ds['uid'])

651

print(f"{ds['name']}: {health.get('status', 'Unknown')}")

652

except Exception as e:

653

print(f"{ds['name']}: Health check failed - {e}")

654

```

655

656

**Data Source Permissions (via RBAC):**

657

658

```python

659

# Set data source permissions for teams

660

api.rbac.set_rbac_datasources_teams(

661

datasource_uid="prometheus-uid",

662

team_id=5,

663

permission="Edit" # "View", "Edit", or "Admin"

664

)

665

666

# Set data source permissions for built-in roles

667

api.rbac.set_rbac_datasources_builtin_roles(

668

datasource_uid="prometheus-uid",

669

builtin_role="Editor",

670

permission="View"

671

)

672

```

673

674

### Error Handling

675

676

Common data source operation errors:

677

678

```python

679

from grafana_client import GrafanaClientError, GrafanaBadInputError

680

681

try:

682

# Attempt to create data source with invalid config

683

invalid_config = {

684

"name": "", # Invalid empty name

685

"type": "unknown-type",

686

"url": "invalid-url"

687

}

688

api.datasource.create_datasource(invalid_config)

689

690

except GrafanaBadInputError as e:

691

print(f"Invalid data source configuration: {e.message}")

692

693

except GrafanaClientError as e:

694

if e.status_code == 409:

695

print("Data source with this name already exists")

696

elif e.status_code == 404:

697

print("Data source not found")

698

else:

699

print(f"Client error: {e.message}")

700

701

# Health check error handling

702

try:

703

health_result = api.datasource.health("non-existent-uid")

704

except GrafanaClientError as e:

705

print(f"Health check failed: {e.message}")

706

```

707

708

### Async Data Source Operations

709

710

All data source operations support async versions:

711

712

```python

713

import asyncio

714

from grafana_client import AsyncGrafanaApi, TokenAuth

715

716

async def manage_datasources():

717

api = AsyncGrafanaApi(auth=TokenAuth("your-token"), host="grafana.example.com")

718

719

# Async data source operations

720

datasources = await api.datasource.list_datasources()

721

print(f"Found {len(datasources)} data sources")

722

723

# Concurrent health checks

724

health_tasks = [

725

api.datasource.health(ds['uid'])

726

for ds in datasources if ds.get('uid')

727

]

728

729

health_results = await asyncio.gather(*health_tasks, return_exceptions=True)

730

731

for ds, health in zip(datasources, health_results):

732

if isinstance(health, Exception):

733

print(f"{ds['name']}: Health check failed - {health}")

734

else:

735

print(f"{ds['name']}: {health.get('status', 'Unknown')}")

736

737

asyncio.run(manage_datasources())

738

```

739

740

### Best Practices

741

742

1. **Use UIDs**: Always prefer UID-based operations over ID-based ones

743

2. **Health Monitoring**: Regularly check data source health in production

744

3. **Secure Credentials**: Use secureJsonData for sensitive configuration

745

4. **Connection Pooling**: Configure appropriate connection limits for SQL data sources

746

5. **Query Optimization**: Use appropriate time ranges and intervals for queries

747

6. **Error Handling**: Implement robust error handling for all operations

748

7. **Async for Scale**: Use async API for bulk operations and health monitoring

749

8. **Version Compatibility**: Check Grafana version compatibility for specific features