0
# Utility Functions
1
2
PyMilvus provides a comprehensive set of utility functions for database maintenance, monitoring, timestamp management, and administrative operations. These functions are available both through the `utility` module and as direct imports from the main package.
3
4
## Import Patterns
5
6
```python { .api }
7
# Import utility module
8
from pymilvus import utility
9
10
# Use utility functions
11
utility.has_collection("my_collection")
12
utility.loading_progress("my_collection")
13
14
# Direct imports (preferred for commonly used functions)
15
from pymilvus import (
16
has_collection, list_collections, drop_collection,
17
create_user, delete_user, list_usernames,
18
mkts_from_datetime, hybridts_to_datetime
19
)
20
21
# Use functions directly
22
has_collection("my_collection")
23
loading_progress("my_collection")
24
```
25
26
## Collection Utilities
27
28
### Collection Existence and Listing
29
30
```python { .api }
31
from pymilvus import utility
32
33
def has_collection(
34
collection_name: str,
35
using: str = "default",
36
timeout: Optional[float] = None
37
) -> bool
38
39
def list_collections(
40
timeout: Optional[float] = None,
41
using: str = "default"
42
) -> List[str]
43
44
def drop_collection(
45
collection_name: str,
46
timeout: Optional[float] = None,
47
using: str = "default"
48
) -> None
49
```
50
51
```python { .api }
52
# Check collection existence
53
if utility.has_collection("documents"):
54
print("Documents collection exists")
55
else:
56
print("Documents collection not found")
57
58
# List all collections
59
collections = utility.list_collections()
60
print(f"Available collections: {collections}")
61
62
# Conditional operations based on existence
63
collection_name = "temp_collection"
64
if utility.has_collection(collection_name):
65
utility.drop_collection(collection_name)
66
print(f"Dropped existing collection: {collection_name}")
67
68
# Batch collection operations
69
collections_to_check = ["users", "products", "orders", "analytics"]
70
existing_collections = []
71
missing_collections = []
72
73
for collection in collections_to_check:
74
if utility.has_collection(collection):
75
existing_collections.append(collection)
76
else:
77
missing_collections.append(collection)
78
79
print(f"Existing: {existing_collections}")
80
print(f"Missing: {missing_collections}")
81
```
82
83
### Collection Renaming
84
85
```python { .api }
86
def rename_collection(
87
old_name: str,
88
new_name: str,
89
timeout: Optional[float] = None,
90
using: str = "default"
91
) -> None
92
```
93
94
```python { .api }
95
# Rename collection
96
utility.rename_collection("old_products", "products_archive")
97
98
# Safe rename with existence check
99
def safe_rename_collection(old_name: str, new_name: str):
100
"""Safely rename collection with validation"""
101
102
if not utility.has_collection(old_name):
103
print(f"Source collection {old_name} does not exist")
104
return False
105
106
if utility.has_collection(new_name):
107
print(f"Target collection {new_name} already exists")
108
return False
109
110
try:
111
utility.rename_collection(old_name, new_name)
112
print(f"Successfully renamed {old_name} to {new_name}")
113
return True
114
except Exception as e:
115
print(f"Rename failed: {e}")
116
return False
117
118
# Usage
119
success = safe_rename_collection("temp_data", "processed_data")
120
```
121
122
## Partition Utilities
123
124
### Partition Operations
125
126
```python { .api }
127
def has_partition(
128
collection_name: str,
129
partition_name: str,
130
using: str = "default",
131
timeout: Optional[float] = None
132
) -> bool
133
```
134
135
```python { .api }
136
# Check partition existence
137
collection_name = "time_series"
138
partitions_to_check = ["2024_q1", "2024_q2", "2024_q3", "2024_q4"]
139
140
for partition in partitions_to_check:
141
exists = utility.has_partition(collection_name, partition)
142
print(f"Partition {partition}: {'exists' if exists else 'missing'}")
143
144
# Conditional partition operations
145
def ensure_partition_exists(collection_name: str, partition_name: str):
146
"""Ensure partition exists, create if missing"""
147
from pymilvus import Collection
148
149
if not utility.has_partition(collection_name, partition_name):
150
collection = Collection(collection_name)
151
collection.create_partition(partition_name)
152
print(f"Created partition: {partition_name}")
153
else:
154
print(f"Partition {partition_name} already exists")
155
156
# Create quarterly partitions
157
for quarter in ["2024_q1", "2024_q2", "2024_q3", "2024_q4"]:
158
ensure_partition_exists("sales_data", quarter)
159
```
160
161
## Loading and Progress Monitoring
162
163
### Loading Operations
164
165
```python { .api }
166
def loading_progress(
167
collection_name: str,
168
partition_names: Optional[List[str]] = None,
169
using: str = "default",
170
timeout: Optional[float] = None
171
) -> Dict[str, Any]
172
173
def wait_for_loading_complete(
174
collection_name: str,
175
partition_names: Optional[List[str]] = None,
176
timeout: Optional[float] = None,
177
using: str = "default"
178
) -> None
179
```
180
181
```python { .api }
182
# Monitor loading progress
183
progress = utility.loading_progress("large_collection")
184
print(f"Loading progress: {progress}")
185
186
# Example progress structure:
187
# {
188
# 'loading_progress': 85.5,
189
# 'num_loaded_partitions': 3,
190
# 'not_loaded_partitions': ['partition_4'],
191
# 'loading_partitions': ['partition_5'],
192
# 'loaded_partitions': ['partition_1', 'partition_2', 'partition_3']
193
# }
194
195
# Wait for loading to complete
196
print("Waiting for collection to load...")
197
utility.wait_for_loading_complete("large_collection", timeout=300) # 5 minute timeout
198
print("Collection loading completed")
199
200
# Monitor loading with progress updates
201
def monitor_loading_progress(collection_name: str, check_interval: int = 5):
202
"""Monitor loading progress with periodic updates"""
203
import time
204
205
while True:
206
progress = utility.loading_progress(collection_name)
207
loading_pct = progress.get('loading_progress', 0)
208
209
print(f"Loading progress: {loading_pct:.1f}%")
210
211
if loading_pct >= 100:
212
print("Loading completed!")
213
break
214
215
time.sleep(check_interval)
216
217
# Usage
218
monitor_loading_progress("huge_dataset", check_interval=10)
219
```
220
221
### Index Building Progress
222
223
```python { .api }
224
def index_building_progress(
225
collection_name: str,
226
index_name: str = "",
227
using: str = "default",
228
timeout: Optional[float] = None
229
) -> Dict[str, Any]
230
231
def wait_for_index_building_complete(
232
collection_name: str,
233
index_name: str = "",
234
timeout: Optional[float] = None,
235
using: str = "default"
236
) -> None
237
```
238
239
```python { .api }
240
# Monitor index building
241
index_progress = utility.index_building_progress("documents", "vector_index")
242
print(f"Index building progress: {index_progress}")
243
244
# Example index progress structure:
245
# {
246
# 'total_rows': 1000000,
247
# 'indexed_rows': 750000,
248
# 'pending_index_rows': 250000,
249
# 'index_state': 'InProgress', # 'Unissued', 'InProgress', 'Finished', 'Failed'
250
# 'progress': 75.0
251
# }
252
253
# Wait for index building
254
utility.wait_for_index_building_complete("documents", "vector_index", timeout=600)
255
256
# Monitor multiple index builds
257
def monitor_all_indexes(collection_name: str):
258
"""Monitor all index building for a collection"""
259
from pymilvus import Collection
260
261
collection = Collection(collection_name)
262
263
# Get all indexes
264
indexes = collection.indexes
265
266
for index in indexes:
267
field_name = index.field_name
268
269
print(f"Monitoring index on field: {field_name}")
270
271
while True:
272
progress = utility.index_building_progress(collection_name, field_name)
273
state = progress.get('index_state', 'Unknown')
274
pct = progress.get('progress', 0)
275
276
print(f" {field_name}: {state} - {pct:.1f}%")
277
278
if state in ['Finished', 'Failed']:
279
break
280
281
time.sleep(10)
282
283
if state == 'Finished':
284
print(f"✓ Index on {field_name} completed successfully")
285
else:
286
print(f"✗ Index on {field_name} failed")
287
288
# Monitor all indexes for a collection
289
monitor_all_indexes("multi_field_collection")
290
```
291
292
## User Management Utilities
293
294
### User Operations
295
296
```python { .api }
297
def create_user(
298
user: str,
299
password: str,
300
using: str = "default",
301
timeout: Optional[float] = None
302
) -> None
303
304
def delete_user(
305
user: str,
306
using: str = "default",
307
timeout: Optional[float] = None
308
) -> None
309
310
def list_usernames(
311
using: str = "default",
312
timeout: Optional[float] = None
313
) -> List[str]
314
315
def update_password(
316
user: str,
317
old_password: str,
318
new_password: str,
319
using: str = "default",
320
timeout: Optional[float] = None
321
) -> None
322
323
def reset_password(
324
user: str,
325
new_password: str,
326
using: str = "default",
327
timeout: Optional[float] = None
328
) -> None
329
```
330
331
```python { .api }
332
# User management examples
333
users_to_create = [
334
("analyst", "analyst_password"),
335
("viewer", "viewer_password"),
336
("admin", "admin_password")
337
]
338
339
# Create users
340
for username, password in users_to_create:
341
try:
342
utility.create_user(username, password)
343
print(f"✓ Created user: {username}")
344
except Exception as e:
345
print(f"✗ Failed to create {username}: {e}")
346
347
# List all users
348
users = utility.list_usernames()
349
print(f"System users: {users}")
350
351
# Password management
352
utility.update_password("analyst", "old_password", "new_secure_password")
353
utility.reset_password("viewer", "admin_reset_password")
354
355
# Cleanup old users
356
obsolete_users = ["temp_user", "test_user", "old_account"]
357
for username in obsolete_users:
358
if username in utility.list_usernames():
359
utility.delete_user(username)
360
print(f"Deleted user: {username}")
361
```
362
363
## Timestamp Utilities
364
365
### Timestamp Creation
366
367
```python { .api }
368
def mkts_from_hybridts(
369
hybridts: int,
370
milliseconds: float = 0.0,
371
delta: Optional[timedelta] = None
372
) -> int
373
374
def mkts_from_unixtime(
375
epoch: float,
376
milliseconds: float = 0.0,
377
delta: Optional[timedelta] = None
378
) -> int
379
380
def mkts_from_datetime(
381
d_time: datetime,
382
milliseconds: float = 0.0,
383
delta: Optional[timedelta] = None
384
) -> int
385
```
386
387
```python { .api }
388
from datetime import datetime, timedelta
389
from pymilvus import mkts_from_datetime, mkts_from_unixtime
390
391
# Create timestamp from datetime
392
now = datetime.now()
393
travel_timestamp = mkts_from_datetime(now)
394
print(f"Travel timestamp: {travel_timestamp}")
395
396
# Create timestamp for specific time
397
specific_time = datetime(2024, 1, 1, 12, 0, 0)
398
historical_timestamp = mkts_from_datetime(specific_time)
399
400
# Create timestamp with offset
401
one_hour_ago = mkts_from_datetime(now, delta=timedelta(hours=-1))
402
one_day_future = mkts_from_datetime(now, delta=timedelta(days=1))
403
404
# Create from Unix timestamp
405
unix_time = 1640995200 # 2022-01-01 00:00:00 UTC
406
timestamp_from_unix = mkts_from_unixtime(unix_time)
407
408
# Use timestamps for time travel queries
409
from pymilvus import MilvusClient
410
client = MilvusClient()
411
412
# Query data as it existed 1 hour ago
413
historical_results = client.query(
414
"time_series_data",
415
expr="id > 0",
416
travel_timestamp=one_hour_ago,
417
output_fields=["id", "value", "timestamp"]
418
)
419
print(f"Historical data (1 hour ago): {len(historical_results)} records")
420
```
421
422
### Timestamp Conversion
423
424
```python { .api }
425
def hybridts_to_datetime(
426
hybridts: int,
427
tz: Optional[timezone] = None
428
) -> datetime
429
430
def hybridts_to_unixtime(
431
hybridts: int
432
) -> float
433
```
434
435
```python { .api }
436
from pymilvus import hybridts_to_datetime, hybridts_to_unixtime
437
438
# Convert Milvus hybrid timestamp to datetime
439
milvus_timestamp = 434646822236381184 # Example hybrid timestamp
440
dt = hybridts_to_datetime(milvus_timestamp)
441
print(f"Datetime: {dt}")
442
443
# Convert to Unix timestamp
444
unix_time = hybridts_to_unixtime(milvus_timestamp)
445
print(f"Unix time: {unix_time}")
446
447
# Working with search results that include timestamps
448
results = client.search("timestamped_data", [[0.1] * 128], limit=5)
449
450
for hit in results[0]:
451
# If your collection has a timestamp field
452
ts_field = hit.entity.get('_ts') # Milvus internal timestamp
453
if ts_field:
454
readable_time = hybridts_to_datetime(ts_field)
455
print(f"Record ID {hit.id}: created at {readable_time}")
456
```
457
458
## Resource Group Management
459
460
### Resource Group Operations
461
462
```python { .api }
463
def create_resource_group(
464
name: str,
465
config: Optional[Dict] = None,
466
using: str = "default",
467
timeout: Optional[float] = None
468
) -> None
469
470
def drop_resource_group(
471
name: str,
472
using: str = "default",
473
timeout: Optional[float] = None
474
) -> None
475
476
def describe_resource_group(
477
name: str,
478
using: str = "default",
479
timeout: Optional[float] = None
480
) -> Dict[str, Any]
481
482
def list_resource_groups(
483
using: str = "default",
484
timeout: Optional[float] = None
485
) -> List[str]
486
487
def update_resource_groups(
488
resource_groups: Dict[str, Dict],
489
using: str = "default",
490
timeout: Optional[float] = None
491
) -> None
492
```
493
494
```python { .api }
495
# Create resource groups for different workloads
496
resource_groups = {
497
"gpu_group": {"requests": {"node_num": 2}, "limits": {"node_num": 4}},
498
"cpu_group": {"requests": {"node_num": 4}, "limits": {"node_num": 8}},
499
"memory_intensive": {"requests": {"node_num": 1}, "limits": {"node_num": 2}}
500
}
501
502
for group_name, config in resource_groups.items():
503
try:
504
utility.create_resource_group(group_name, config)
505
print(f"✓ Created resource group: {group_name}")
506
except Exception as e:
507
print(f"✗ Failed to create {group_name}: {e}")
508
509
# List and describe resource groups
510
groups = utility.list_resource_groups()
511
print(f"Available resource groups: {groups}")
512
513
for group in groups:
514
group_info = utility.describe_resource_group(group)
515
print(f"Group {group}: {group_info}")
516
517
# Update resource group configuration
518
updates = {
519
"gpu_group": {"limits": {"node_num": 6}}, # Increase limit
520
"cpu_group": {"requests": {"node_num": 6}} # Increase requests
521
}
522
523
utility.update_resource_groups(updates)
524
print("Resource group configurations updated")
525
```
526
527
### Node and Replica Transfer
528
529
```python { .api }
530
def transfer_node(
531
source: str,
532
target: str,
533
num_nodes: int,
534
using: str = "default",
535
timeout: Optional[float] = None
536
) -> None
537
538
def transfer_replica(
539
source_group: str,
540
target_group: str,
541
collection_name: str,
542
num_replicas: int,
543
using: str = "default",
544
timeout: Optional[float] = None
545
) -> None
546
```
547
548
```python { .api }
549
# Transfer nodes between resource groups
550
utility.transfer_node("cpu_group", "gpu_group", 2)
551
print("Transferred 2 nodes from cpu_group to gpu_group")
552
553
# Transfer replicas for load balancing
554
utility.transfer_replica("overloaded_group", "underutilized_group", "large_collection", 1)
555
print("Transferred 1 replica to balance load")
556
557
# Dynamic resource rebalancing
558
def rebalance_resources():
559
"""Automatically rebalance resources based on usage"""
560
561
groups = utility.list_resource_groups()
562
563
for group in groups:
564
group_info = utility.describe_resource_group(group)
565
566
available_nodes = group_info.get('num_available_node', 0)
567
loaded_replicas = group_info.get('num_loaded_replica', 0)
568
569
# Simple rebalancing logic
570
if available_nodes > loaded_replicas + 2:
571
# Group has excess capacity
572
print(f"Group {group} has excess capacity: {available_nodes} nodes, {loaded_replicas} replicas")
573
elif available_nodes < loaded_replicas:
574
# Group is overloaded
575
print(f"Group {group} is overloaded: {available_nodes} nodes, {loaded_replicas} replicas")
576
577
rebalance_resources()
578
```
579
580
## Server Information
581
582
### Version and Server Details
583
584
```python { .api }
585
def get_server_version(
586
using: str = "default",
587
timeout: Optional[float] = None
588
) -> str
589
590
def get_server_type(
591
using: str = "default"
592
) -> str
593
```
594
595
```python { .api }
596
# Get server information
597
version = utility.get_server_version()
598
server_type = utility.get_server_type()
599
600
print(f"Milvus Version: {version}")
601
print(f"Server Type: {server_type}")
602
603
# Version compatibility check
604
def check_version_compatibility(required_version: str):
605
"""Check if server version meets requirements"""
606
607
current_version = utility.get_server_version()
608
609
# Simple version comparison (you might want more sophisticated logic)
610
current_parts = current_version.split('.')
611
required_parts = required_version.split('.')
612
613
for i, (current, required) in enumerate(zip(current_parts, required_parts)):
614
if int(current) > int(required):
615
return True
616
elif int(current) < int(required):
617
return False
618
619
return True # Equal versions
620
621
# Check compatibility
622
if check_version_compatibility("2.3.0"):
623
print("Server version is compatible")
624
else:
625
print("Server version is too old")
626
```
627
628
## Maintenance Operations
629
630
### Bulk Operations
631
632
```python { .api }
633
def do_bulk_insert(
634
collection_name: str,
635
files: List[str],
636
partition_name: Optional[str] = None,
637
using: str = "default",
638
timeout: Optional[float] = None,
639
**kwargs
640
) -> int
641
642
def get_bulk_insert_state(
643
task_id: int,
644
using: str = "default",
645
timeout: Optional[float] = None,
646
**kwargs
647
) -> Dict[str, Any]
648
649
def list_bulk_insert_tasks(
650
limit: int = 0,
651
collection_name: Optional[str] = None,
652
using: str = "default",
653
timeout: Optional[float] = None,
654
**kwargs
655
) -> List[Dict[str, Any]]
656
```
657
658
```python { .api }
659
# Bulk insert from files
660
files_to_insert = [
661
"/data/batch1.json",
662
"/data/batch2.json",
663
"/data/batch3.json"
664
]
665
666
task_id = utility.do_bulk_insert("large_collection", files_to_insert)
667
print(f"Bulk insert task started: {task_id}")
668
669
# Monitor bulk insert progress
670
while True:
671
state = utility.get_bulk_insert_state(task_id)
672
status = state.get('state', 'Unknown')
673
progress = state.get('progress', 0)
674
675
print(f"Bulk insert progress: {status} - {progress}%")
676
677
if status in ['ImportCompleted', 'ImportFailed']:
678
break
679
680
time.sleep(30)
681
682
# List all bulk insert tasks
683
tasks = utility.list_bulk_insert_tasks(limit=10)
684
for task in tasks:
685
print(f"Task {task['task_id']}: {task['state']} - {task.get('collection_name', 'N/A')}")
686
```
687
688
### Maintenance and Monitoring
689
690
```python { .api }
691
def flush_all(
692
using: str = "default",
693
timeout: Optional[float] = None,
694
**kwargs
695
) -> None
696
697
def get_query_segment_info(
698
collection_name: str,
699
timeout: Optional[float] = None,
700
using: str = "default"
701
) -> List[Dict[str, Any]]
702
703
def load_balance(
704
src_node_id: int,
705
dst_node_ids: Optional[List[int]] = None,
706
sealed_segment_ids: Optional[List[int]] = None,
707
using: str = "default",
708
timeout: Optional[float] = None
709
) -> None
710
```
711
712
```python { .api }
713
# Flush all collections to ensure data persistence
714
utility.flush_all()
715
print("Flushed all collections")
716
717
# Get segment information for query analysis
718
segment_info = utility.get_query_segment_info("analytics_collection")
719
print(f"Segment info: {len(segment_info)} segments")
720
721
for segment in segment_info[:5]: # Show first 5 segments
722
print(f" Segment {segment['segment_id']}: {segment['num_rows']} rows, {segment['mem_size']} bytes")
723
724
# Load balancing between nodes
725
utility.load_balance(
726
src_node_id=1,
727
dst_node_ids=[2, 3], # Distribute to nodes 2 and 3
728
sealed_segment_ids=None # Balance all segments
729
)
730
print("Load balancing completed")
731
```
732
733
## Alias Management Utilities
734
735
### Alias Operations
736
737
```python { .api }
738
def create_alias(
739
collection_name: str,
740
alias: str,
741
timeout: Optional[float] = None,
742
using: str = "default"
743
) -> None
744
745
def drop_alias(
746
alias: str,
747
timeout: Optional[float] = None,
748
using: str = "default"
749
) -> None
750
751
def alter_alias(
752
collection_name: str,
753
alias: str,
754
timeout: Optional[float] = None,
755
using: str = "default"
756
) -> None
757
758
def list_aliases(
759
collection_name: str,
760
timeout: Optional[float] = None,
761
using: str = "default"
762
) -> List[str]
763
```
764
765
```python { .api }
766
# Create aliases for version management
767
utility.create_alias("products_v2", "products_current")
768
utility.create_alias("products_v1", "products_stable")
769
770
# Blue-green deployment pattern
771
def deploy_new_version(new_collection: str, alias: str):
772
"""Deploy new collection version using alias switching"""
773
774
# Get current alias target
775
try:
776
aliases = utility.list_aliases(new_collection)
777
print(f"Current aliases for {new_collection}: {aliases}")
778
except:
779
pass
780
781
# Switch alias to new collection
782
utility.alter_alias(new_collection, alias)
783
print(f"Switched alias {alias} to {new_collection}")
784
785
# Deploy new version
786
deploy_new_version("products_v3", "products_current")
787
788
# List all aliases for a collection
789
aliases = utility.list_aliases("products_v3")
790
print(f"Aliases for products_v3: {aliases}")
791
```
792
793
## Error Handling and Retry Logic
794
795
```python { .api }
796
def retry_utility_operation(operation_func, max_retries: int = 3, delay: float = 1.0):
797
"""Retry utility operations with exponential backoff"""
798
import time
799
800
for attempt in range(max_retries):
801
try:
802
return operation_func()
803
except Exception as e:
804
if attempt == max_retries - 1:
805
raise e
806
807
wait_time = delay * (2 ** attempt)
808
print(f"Attempt {attempt + 1} failed: {e}")
809
print(f"Retrying in {wait_time} seconds...")
810
time.sleep(wait_time)
811
812
# Usage examples
813
def safe_has_collection(collection_name: str) -> bool:
814
"""Safely check collection existence with retry"""
815
return retry_utility_operation(
816
lambda: utility.has_collection(collection_name),
817
max_retries=3
818
)
819
820
def safe_wait_for_loading(collection_name: str, timeout: int = 300):
821
"""Safely wait for loading with retry logic"""
822
return retry_utility_operation(
823
lambda: utility.wait_for_loading_complete(collection_name, timeout=timeout),
824
max_retries=2
825
)
826
827
# Use safe operations
828
if safe_has_collection("important_collection"):
829
safe_wait_for_loading("important_collection")
830
print("Collection loaded successfully")
831
```
832
833
PyMilvus utility functions provide essential database administration, monitoring, and maintenance capabilities, enabling efficient management of large-scale vector database deployments with comprehensive error handling and retry mechanisms.