or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-management.mdindex-management.mdindex.mdmilvus-client.mdorm-collection.mdsearch-operations.mdtypes-enums.mduser-management.mdutility-functions.md

utility-functions.mddocs/

0

# Utility Functions

1

2

PyMilvus provides a comprehensive set of utility functions for database maintenance, monitoring, timestamp management, and administrative operations. These functions are available both through the `utility` module and as direct imports from the main package.

3

4

## Import Patterns

5

6

```python { .api }

7

# Import utility module

8

from pymilvus import utility

9

10

# Use utility functions

11

utility.has_collection("my_collection")

12

utility.loading_progress("my_collection")

13

14

# Direct imports (preferred for commonly used functions)

15

from pymilvus import (

16

has_collection, list_collections, drop_collection,

17

create_user, delete_user, list_usernames,

18

mkts_from_datetime, hybridts_to_datetime

19

)

20

21

# Use functions directly

22

has_collection("my_collection")

23

loading_progress("my_collection")

24

```

25

26

## Collection Utilities

27

28

### Collection Existence and Listing

29

30

```python { .api }

31

from pymilvus import utility

32

33

def has_collection(

34

collection_name: str,

35

using: str = "default",

36

timeout: Optional[float] = None

37

) -> bool

38

39

def list_collections(

40

timeout: Optional[float] = None,

41

using: str = "default"

42

) -> List[str]

43

44

def drop_collection(

45

collection_name: str,

46

timeout: Optional[float] = None,

47

using: str = "default"

48

) -> None

49

```

50

51

```python { .api }

52

# Check collection existence

53

if utility.has_collection("documents"):

54

print("Documents collection exists")

55

else:

56

print("Documents collection not found")

57

58

# List all collections

59

collections = utility.list_collections()

60

print(f"Available collections: {collections}")

61

62

# Conditional operations based on existence

63

collection_name = "temp_collection"

64

if utility.has_collection(collection_name):

65

utility.drop_collection(collection_name)

66

print(f"Dropped existing collection: {collection_name}")

67

68

# Batch collection operations

69

collections_to_check = ["users", "products", "orders", "analytics"]

70

existing_collections = []

71

missing_collections = []

72

73

for collection in collections_to_check:

74

if utility.has_collection(collection):

75

existing_collections.append(collection)

76

else:

77

missing_collections.append(collection)

78

79

print(f"Existing: {existing_collections}")

80

print(f"Missing: {missing_collections}")

81

```

82

83

### Collection Renaming

84

85

```python { .api }

86

def rename_collection(

87

old_name: str,

88

new_name: str,

89

timeout: Optional[float] = None,

90

using: str = "default"

91

) -> None

92

```

93

94

```python { .api }

95

# Rename collection

96

utility.rename_collection("old_products", "products_archive")

97

98

# Safe rename with existence check

99

def safe_rename_collection(old_name: str, new_name: str):

100

"""Safely rename collection with validation"""

101

102

if not utility.has_collection(old_name):

103

print(f"Source collection {old_name} does not exist")

104

return False

105

106

if utility.has_collection(new_name):

107

print(f"Target collection {new_name} already exists")

108

return False

109

110

try:

111

utility.rename_collection(old_name, new_name)

112

print(f"Successfully renamed {old_name} to {new_name}")

113

return True

114

except Exception as e:

115

print(f"Rename failed: {e}")

116

return False

117

118

# Usage

119

success = safe_rename_collection("temp_data", "processed_data")

120

```

121

122

## Partition Utilities

123

124

### Partition Operations

125

126

```python { .api }

127

def has_partition(

128

collection_name: str,

129

partition_name: str,

130

using: str = "default",

131

timeout: Optional[float] = None

132

) -> bool

133

```

134

135

```python { .api }

136

# Check partition existence

137

collection_name = "time_series"

138

partitions_to_check = ["2024_q1", "2024_q2", "2024_q3", "2024_q4"]

139

140

for partition in partitions_to_check:

141

exists = utility.has_partition(collection_name, partition)

142

print(f"Partition {partition}: {'exists' if exists else 'missing'}")

143

144

# Conditional partition operations

145

def ensure_partition_exists(collection_name: str, partition_name: str):

146

"""Ensure partition exists, create if missing"""

147

from pymilvus import Collection

148

149

if not utility.has_partition(collection_name, partition_name):

150

collection = Collection(collection_name)

151

collection.create_partition(partition_name)

152

print(f"Created partition: {partition_name}")

153

else:

154

print(f"Partition {partition_name} already exists")

155

156

# Create quarterly partitions

157

for quarter in ["2024_q1", "2024_q2", "2024_q3", "2024_q4"]:

158

ensure_partition_exists("sales_data", quarter)

159

```

160

161

## Loading and Progress Monitoring

162

163

### Loading Operations

164

165

```python { .api }

166

def loading_progress(

167

collection_name: str,

168

partition_names: Optional[List[str]] = None,

169

using: str = "default",

170

timeout: Optional[float] = None

171

) -> Dict[str, Any]

172

173

def wait_for_loading_complete(

174

collection_name: str,

175

partition_names: Optional[List[str]] = None,

176

timeout: Optional[float] = None,

177

using: str = "default"

178

) -> None

179

```

180

181

```python { .api }

182

# Monitor loading progress

183

progress = utility.loading_progress("large_collection")

184

print(f"Loading progress: {progress}")

185

186

# Example progress structure:

187

# {

188

# 'loading_progress': 85.5,

189

# 'num_loaded_partitions': 3,

190

# 'not_loaded_partitions': ['partition_4'],

191

# 'loading_partitions': ['partition_5'],

192

# 'loaded_partitions': ['partition_1', 'partition_2', 'partition_3']

193

# }

194

195

# Wait for loading to complete

196

print("Waiting for collection to load...")

197

utility.wait_for_loading_complete("large_collection", timeout=300) # 5 minute timeout

198

print("Collection loading completed")

199

200

# Monitor loading with progress updates

201

def monitor_loading_progress(collection_name: str, check_interval: int = 5):

202

"""Monitor loading progress with periodic updates"""

203

import time

204

205

while True:

206

progress = utility.loading_progress(collection_name)

207

loading_pct = progress.get('loading_progress', 0)

208

209

print(f"Loading progress: {loading_pct:.1f}%")

210

211

if loading_pct >= 100:

212

print("Loading completed!")

213

break

214

215

time.sleep(check_interval)

216

217

# Usage

218

monitor_loading_progress("huge_dataset", check_interval=10)

219

```

220

221

### Index Building Progress

222

223

```python { .api }

224

def index_building_progress(

225

collection_name: str,

226

index_name: str = "",

227

using: str = "default",

228

timeout: Optional[float] = None

229

) -> Dict[str, Any]

230

231

def wait_for_index_building_complete(

232

collection_name: str,

233

index_name: str = "",

234

timeout: Optional[float] = None,

235

using: str = "default"

236

) -> None

237

```

238

239

```python { .api }

240

# Monitor index building

241

index_progress = utility.index_building_progress("documents", "vector_index")

242

print(f"Index building progress: {index_progress}")

243

244

# Example index progress structure:

245

# {

246

# 'total_rows': 1000000,

247

# 'indexed_rows': 750000,

248

# 'pending_index_rows': 250000,

249

# 'index_state': 'InProgress', # 'Unissued', 'InProgress', 'Finished', 'Failed'

250

# 'progress': 75.0

251

# }

252

253

# Wait for index building

254

utility.wait_for_index_building_complete("documents", "vector_index", timeout=600)

255

256

# Monitor multiple index builds

257

def monitor_all_indexes(collection_name: str):

258

"""Monitor all index building for a collection"""

259

from pymilvus import Collection

260

261

collection = Collection(collection_name)

262

263

# Get all indexes

264

indexes = collection.indexes

265

266

for index in indexes:

267

field_name = index.field_name

268

269

print(f"Monitoring index on field: {field_name}")

270

271

while True:

272

progress = utility.index_building_progress(collection_name, field_name)

273

state = progress.get('index_state', 'Unknown')

274

pct = progress.get('progress', 0)

275

276

print(f" {field_name}: {state} - {pct:.1f}%")

277

278

if state in ['Finished', 'Failed']:

279

break

280

281

time.sleep(10)

282

283

if state == 'Finished':

284

print(f"✓ Index on {field_name} completed successfully")

285

else:

286

print(f"✗ Index on {field_name} failed")

287

288

# Monitor all indexes for a collection

289

monitor_all_indexes("multi_field_collection")

290

```

291

292

## User Management Utilities

293

294

### User Operations

295

296

```python { .api }

297

def create_user(

298

user: str,

299

password: str,

300

using: str = "default",

301

timeout: Optional[float] = None

302

) -> None

303

304

def delete_user(

305

user: str,

306

using: str = "default",

307

timeout: Optional[float] = None

308

) -> None

309

310

def list_usernames(

311

using: str = "default",

312

timeout: Optional[float] = None

313

) -> List[str]

314

315

def update_password(

316

user: str,

317

old_password: str,

318

new_password: str,

319

using: str = "default",

320

timeout: Optional[float] = None

321

) -> None

322

323

def reset_password(

324

user: str,

325

new_password: str,

326

using: str = "default",

327

timeout: Optional[float] = None

328

) -> None

329

```

330

331

```python { .api }

332

# User management examples

333

users_to_create = [

334

("analyst", "analyst_password"),

335

("viewer", "viewer_password"),

336

("admin", "admin_password")

337

]

338

339

# Create users

340

for username, password in users_to_create:

341

try:

342

utility.create_user(username, password)

343

print(f"✓ Created user: {username}")

344

except Exception as e:

345

print(f"✗ Failed to create {username}: {e}")

346

347

# List all users

348

users = utility.list_usernames()

349

print(f"System users: {users}")

350

351

# Password management

352

utility.update_password("analyst", "old_password", "new_secure_password")

353

utility.reset_password("viewer", "admin_reset_password")

354

355

# Cleanup old users

356

obsolete_users = ["temp_user", "test_user", "old_account"]

357

for username in obsolete_users:

358

if username in utility.list_usernames():

359

utility.delete_user(username)

360

print(f"Deleted user: {username}")

361

```

362

363

## Timestamp Utilities

364

365

### Timestamp Creation

366

367

```python { .api }

368

def mkts_from_hybridts(

369

hybridts: int,

370

milliseconds: float = 0.0,

371

delta: Optional[timedelta] = None

372

) -> int

373

374

def mkts_from_unixtime(

375

epoch: float,

376

milliseconds: float = 0.0,

377

delta: Optional[timedelta] = None

378

) -> int

379

380

def mkts_from_datetime(

381

d_time: datetime,

382

milliseconds: float = 0.0,

383

delta: Optional[timedelta] = None

384

) -> int

385

```

386

387

```python { .api }

388

from datetime import datetime, timedelta

389

from pymilvus import mkts_from_datetime, mkts_from_unixtime

390

391

# Create timestamp from datetime

392

now = datetime.now()

393

travel_timestamp = mkts_from_datetime(now)

394

print(f"Travel timestamp: {travel_timestamp}")

395

396

# Create timestamp for specific time

397

specific_time = datetime(2024, 1, 1, 12, 0, 0)

398

historical_timestamp = mkts_from_datetime(specific_time)

399

400

# Create timestamp with offset

401

one_hour_ago = mkts_from_datetime(now, delta=timedelta(hours=-1))

402

one_day_future = mkts_from_datetime(now, delta=timedelta(days=1))

403

404

# Create from Unix timestamp

405

unix_time = 1640995200 # 2022-01-01 00:00:00 UTC

406

timestamp_from_unix = mkts_from_unixtime(unix_time)

407

408

# Use timestamps for time travel queries

409

from pymilvus import MilvusClient

410

client = MilvusClient()

411

412

# Query data as it existed 1 hour ago

413

historical_results = client.query(

414

"time_series_data",

415

expr="id > 0",

416

travel_timestamp=one_hour_ago,

417

output_fields=["id", "value", "timestamp"]

418

)

419

print(f"Historical data (1 hour ago): {len(historical_results)} records")

420

```

421

422

### Timestamp Conversion

423

424

```python { .api }

425

def hybridts_to_datetime(

426

hybridts: int,

427

tz: Optional[timezone] = None

428

) -> datetime

429

430

def hybridts_to_unixtime(

431

hybridts: int

432

) -> float

433

```

434

435

```python { .api }

436

from pymilvus import hybridts_to_datetime, hybridts_to_unixtime

437

438

# Convert Milvus hybrid timestamp to datetime

439

milvus_timestamp = 434646822236381184 # Example hybrid timestamp

440

dt = hybridts_to_datetime(milvus_timestamp)

441

print(f"Datetime: {dt}")

442

443

# Convert to Unix timestamp

444

unix_time = hybridts_to_unixtime(milvus_timestamp)

445

print(f"Unix time: {unix_time}")

446

447

# Working with search results that include timestamps

448

results = client.search("timestamped_data", [[0.1] * 128], limit=5)

449

450

for hit in results[0]:

451

# If your collection has a timestamp field

452

ts_field = hit.entity.get('_ts') # Milvus internal timestamp

453

if ts_field:

454

readable_time = hybridts_to_datetime(ts_field)

455

print(f"Record ID {hit.id}: created at {readable_time}")

456

```

457

458

## Resource Group Management

459

460

### Resource Group Operations

461

462

```python { .api }

463

def create_resource_group(

464

name: str,

465

config: Optional[Dict] = None,

466

using: str = "default",

467

timeout: Optional[float] = None

468

) -> None

469

470

def drop_resource_group(

471

name: str,

472

using: str = "default",

473

timeout: Optional[float] = None

474

) -> None

475

476

def describe_resource_group(

477

name: str,

478

using: str = "default",

479

timeout: Optional[float] = None

480

) -> Dict[str, Any]

481

482

def list_resource_groups(

483

using: str = "default",

484

timeout: Optional[float] = None

485

) -> List[str]

486

487

def update_resource_groups(

488

resource_groups: Dict[str, Dict],

489

using: str = "default",

490

timeout: Optional[float] = None

491

) -> None

492

```

493

494

```python { .api }

495

# Create resource groups for different workloads

496

resource_groups = {

497

"gpu_group": {"requests": {"node_num": 2}, "limits": {"node_num": 4}},

498

"cpu_group": {"requests": {"node_num": 4}, "limits": {"node_num": 8}},

499

"memory_intensive": {"requests": {"node_num": 1}, "limits": {"node_num": 2}}

500

}

501

502

for group_name, config in resource_groups.items():

503

try:

504

utility.create_resource_group(group_name, config)

505

print(f"✓ Created resource group: {group_name}")

506

except Exception as e:

507

print(f"✗ Failed to create {group_name}: {e}")

508

509

# List and describe resource groups

510

groups = utility.list_resource_groups()

511

print(f"Available resource groups: {groups}")

512

513

for group in groups:

514

group_info = utility.describe_resource_group(group)

515

print(f"Group {group}: {group_info}")

516

517

# Update resource group configuration

518

updates = {

519

"gpu_group": {"limits": {"node_num": 6}}, # Increase limit

520

"cpu_group": {"requests": {"node_num": 6}} # Increase requests

521

}

522

523

utility.update_resource_groups(updates)

524

print("Resource group configurations updated")

525

```

526

527

### Node and Replica Transfer

528

529

```python { .api }

530

def transfer_node(

531

source: str,

532

target: str,

533

num_nodes: int,

534

using: str = "default",

535

timeout: Optional[float] = None

536

) -> None

537

538

def transfer_replica(

539

source_group: str,

540

target_group: str,

541

collection_name: str,

542

num_replicas: int,

543

using: str = "default",

544

timeout: Optional[float] = None

545

) -> None

546

```

547

548

```python { .api }

549

# Transfer nodes between resource groups

550

utility.transfer_node("cpu_group", "gpu_group", 2)

551

print("Transferred 2 nodes from cpu_group to gpu_group")

552

553

# Transfer replicas for load balancing

554

utility.transfer_replica("overloaded_group", "underutilized_group", "large_collection", 1)

555

print("Transferred 1 replica to balance load")

556

557

# Dynamic resource rebalancing

558

def rebalance_resources():

559

"""Automatically rebalance resources based on usage"""

560

561

groups = utility.list_resource_groups()

562

563

for group in groups:

564

group_info = utility.describe_resource_group(group)

565

566

available_nodes = group_info.get('num_available_node', 0)

567

loaded_replicas = group_info.get('num_loaded_replica', 0)

568

569

# Simple rebalancing logic

570

if available_nodes > loaded_replicas + 2:

571

# Group has excess capacity

572

print(f"Group {group} has excess capacity: {available_nodes} nodes, {loaded_replicas} replicas")

573

elif available_nodes < loaded_replicas:

574

# Group is overloaded

575

print(f"Group {group} is overloaded: {available_nodes} nodes, {loaded_replicas} replicas")

576

577

rebalance_resources()

578

```

579

580

## Server Information

581

582

### Version and Server Details

583

584

```python { .api }

585

def get_server_version(

586

using: str = "default",

587

timeout: Optional[float] = None

588

) -> str

589

590

def get_server_type(

591

using: str = "default"

592

) -> str

593

```

594

595

```python { .api }

596

# Get server information

597

version = utility.get_server_version()

598

server_type = utility.get_server_type()

599

600

print(f"Milvus Version: {version}")

601

print(f"Server Type: {server_type}")

602

603

# Version compatibility check

604

def check_version_compatibility(required_version: str):

605

"""Check if server version meets requirements"""

606

607

current_version = utility.get_server_version()

608

609

# Simple version comparison (you might want more sophisticated logic)

610

current_parts = current_version.split('.')

611

required_parts = required_version.split('.')

612

613

for i, (current, required) in enumerate(zip(current_parts, required_parts)):

614

if int(current) > int(required):

615

return True

616

elif int(current) < int(required):

617

return False

618

619

return True # Equal versions

620

621

# Check compatibility

622

if check_version_compatibility("2.3.0"):

623

print("Server version is compatible")

624

else:

625

print("Server version is too old")

626

```

627

628

## Maintenance Operations

629

630

### Bulk Operations

631

632

```python { .api }

633

def do_bulk_insert(

634

collection_name: str,

635

files: List[str],

636

partition_name: Optional[str] = None,

637

using: str = "default",

638

timeout: Optional[float] = None,

639

**kwargs

640

) -> int

641

642

def get_bulk_insert_state(

643

task_id: int,

644

using: str = "default",

645

timeout: Optional[float] = None,

646

**kwargs

647

) -> Dict[str, Any]

648

649

def list_bulk_insert_tasks(

650

limit: int = 0,

651

collection_name: Optional[str] = None,

652

using: str = "default",

653

timeout: Optional[float] = None,

654

**kwargs

655

) -> List[Dict[str, Any]]

656

```

657

658

```python { .api }

659

# Bulk insert from files

660

files_to_insert = [

661

"/data/batch1.json",

662

"/data/batch2.json",

663

"/data/batch3.json"

664

]

665

666

task_id = utility.do_bulk_insert("large_collection", files_to_insert)

667

print(f"Bulk insert task started: {task_id}")

668

669

# Monitor bulk insert progress

670

while True:

671

state = utility.get_bulk_insert_state(task_id)

672

status = state.get('state', 'Unknown')

673

progress = state.get('progress', 0)

674

675

print(f"Bulk insert progress: {status} - {progress}%")

676

677

if status in ['ImportCompleted', 'ImportFailed']:

678

break

679

680

time.sleep(30)

681

682

# List all bulk insert tasks

683

tasks = utility.list_bulk_insert_tasks(limit=10)

684

for task in tasks:

685

print(f"Task {task['task_id']}: {task['state']} - {task.get('collection_name', 'N/A')}")

686

```

687

688

### Maintenance and Monitoring

689

690

```python { .api }

691

def flush_all(

692

using: str = "default",

693

timeout: Optional[float] = None,

694

**kwargs

695

) -> None

696

697

def get_query_segment_info(

698

collection_name: str,

699

timeout: Optional[float] = None,

700

using: str = "default"

701

) -> List[Dict[str, Any]]

702

703

def load_balance(

704

src_node_id: int,

705

dst_node_ids: Optional[List[int]] = None,

706

sealed_segment_ids: Optional[List[int]] = None,

707

using: str = "default",

708

timeout: Optional[float] = None

709

) -> None

710

```

711

712

```python { .api }

713

# Flush all collections to ensure data persistence

714

utility.flush_all()

715

print("Flushed all collections")

716

717

# Get segment information for query analysis

718

segment_info = utility.get_query_segment_info("analytics_collection")

719

print(f"Segment info: {len(segment_info)} segments")

720

721

for segment in segment_info[:5]: # Show first 5 segments

722

print(f" Segment {segment['segment_id']}: {segment['num_rows']} rows, {segment['mem_size']} bytes")

723

724

# Load balancing between nodes

725

utility.load_balance(

726

src_node_id=1,

727

dst_node_ids=[2, 3], # Distribute to nodes 2 and 3

728

sealed_segment_ids=None # Balance all segments

729

)

730

print("Load balancing completed")

731

```

732

733

## Alias Management Utilities

734

735

### Alias Operations

736

737

```python { .api }

738

def create_alias(

739

collection_name: str,

740

alias: str,

741

timeout: Optional[float] = None,

742

using: str = "default"

743

) -> None

744

745

def drop_alias(

746

alias: str,

747

timeout: Optional[float] = None,

748

using: str = "default"

749

) -> None

750

751

def alter_alias(

752

collection_name: str,

753

alias: str,

754

timeout: Optional[float] = None,

755

using: str = "default"

756

) -> None

757

758

def list_aliases(

759

collection_name: str,

760

timeout: Optional[float] = None,

761

using: str = "default"

762

) -> List[str]

763

```

764

765

```python { .api }

766

# Create aliases for version management

767

utility.create_alias("products_v2", "products_current")

768

utility.create_alias("products_v1", "products_stable")

769

770

# Blue-green deployment pattern

771

def deploy_new_version(new_collection: str, alias: str):

772

"""Deploy new collection version using alias switching"""

773

774

# Get current alias target

775

try:

776

aliases = utility.list_aliases(new_collection)

777

print(f"Current aliases for {new_collection}: {aliases}")

778

except:

779

pass

780

781

# Switch alias to new collection

782

utility.alter_alias(new_collection, alias)

783

print(f"Switched alias {alias} to {new_collection}")

784

785

# Deploy new version

786

deploy_new_version("products_v3", "products_current")

787

788

# List all aliases for a collection

789

aliases = utility.list_aliases("products_v3")

790

print(f"Aliases for products_v3: {aliases}")

791

```

792

793

## Error Handling and Retry Logic

794

795

```python { .api }

796

def retry_utility_operation(operation_func, max_retries: int = 3, delay: float = 1.0):

797

"""Retry utility operations with exponential backoff"""

798

import time

799

800

for attempt in range(max_retries):

801

try:

802

return operation_func()

803

except Exception as e:

804

if attempt == max_retries - 1:

805

raise e

806

807

wait_time = delay * (2 ** attempt)

808

print(f"Attempt {attempt + 1} failed: {e}")

809

print(f"Retrying in {wait_time} seconds...")

810

time.sleep(wait_time)

811

812

# Usage examples

813

def safe_has_collection(collection_name: str) -> bool:

814

"""Safely check collection existence with retry"""

815

return retry_utility_operation(

816

lambda: utility.has_collection(collection_name),

817

max_retries=3

818

)

819

820

def safe_wait_for_loading(collection_name: str, timeout: int = 300):

821

"""Safely wait for loading with retry logic"""

822

return retry_utility_operation(

823

lambda: utility.wait_for_loading_complete(collection_name, timeout=timeout),

824

max_retries=2

825

)

826

827

# Use safe operations

828

if safe_has_collection("important_collection"):

829

safe_wait_for_loading("important_collection")

830

print("Collection loaded successfully")

831

```

832

833

PyMilvus utility functions provide essential database administration, monitoring, and maintenance capabilities, enabling efficient management of large-scale vector database deployments with comprehensive error handling and retry mechanisms.