0
# MilvusClient and AsyncMilvusClient
1
2
The MilvusClient provides a simplified, high-level interface for common Milvus operations, while AsyncMilvusClient offers the same functionality with async/await support for non-blocking operations in high-concurrency applications.
3
4
## MilvusClient
5
6
### Constructor
7
8
```python { .api }
9
from pymilvus import MilvusClient
10
11
def __init__(
12
self,
13
uri: str = "http://localhost:19530",
14
user: str = "",
15
password: str = "",
16
db_name: str = "",
17
token: str = "",
18
timeout: Optional[float] = None,
19
**kwargs
20
) -> None
21
```
22
23
**Parameters:**
24
- `uri`: Milvus server URI (default: "http://localhost:19530")
25
- `user`: Username for authentication
26
- `password`: Password for authentication
27
- `db_name`: Database name to connect to
28
- `token`: Authentication token (alternative to user/password)
29
- `timeout`: Connection timeout in seconds
30
- `**kwargs`: Additional connection parameters
31
32
**Example:**
33
```python
34
# Local connection
35
client = MilvusClient()
36
37
# Remote connection with authentication
38
client = MilvusClient(
39
uri="https://milvus.example.com:19530",
40
user="admin",
41
password="password123",
42
db_name="production"
43
)
44
```
45
46
## Collection Management
47
48
### create_collection
49
50
```python { .api }
51
def create_collection(
52
self,
53
collection_name: str,
54
dimension: Optional[int] = None,
55
primary_field_name: str = "id",
56
id_type: str = "int",
57
vector_field_name: str = "vector",
58
metric_type: str = "COSINE",
59
auto_id: bool = False,
60
timeout: Optional[float] = None,
61
schema: Optional[CollectionSchema] = None,
62
index_params: Optional[IndexParams] = None,
63
**kwargs
64
) -> None
65
```
66
67
**Parameters:**
68
- `collection_name`: Name of the collection to create
69
- `dimension`: Vector dimension (required if schema not provided)
70
- `primary_field_name`: Name of primary key field (default: "id")
71
- `id_type`: Primary key type - "int" or "string" (default: "int")
72
- `vector_field_name`: Name of vector field (default: "vector")
73
- `metric_type`: Distance metric - "L2", "IP", "COSINE" (default: "COSINE")
74
- `auto_id`: Enable auto-generated IDs (default: False)
75
- `timeout`: Operation timeout in seconds
76
- `schema`: Pre-built CollectionSchema object
77
- `index_params`: Index parameters for automatic index creation
78
- `**kwargs`: Additional collection properties
79
80
**Examples:**
81
```python
82
# Simple collection creation
83
client.create_collection(
84
collection_name="documents",
85
dimension=768,
86
metric_type="COSINE"
87
)
88
89
# Collection with string IDs
90
client.create_collection(
91
collection_name="products",
92
dimension=512,
93
id_type="string",
94
primary_field_name="product_id",
95
vector_field_name="embedding"
96
)
97
98
# With pre-built schema
99
from pymilvus import CollectionSchema, FieldSchema, DataType
100
101
schema = CollectionSchema([
102
FieldSchema("id", DataType.INT64, is_primary=True),
103
FieldSchema("title", DataType.VARCHAR, max_length=200),
104
FieldSchema("vector", DataType.FLOAT_VECTOR, dim=768),
105
FieldSchema("metadata", DataType.JSON)
106
], description="Document collection")
107
108
client.create_collection("advanced_docs", schema=schema)
109
```
110
111
### drop_collection
112
113
```python { .api }
114
def drop_collection(
115
self,
116
collection_name: str,
117
timeout: Optional[float] = None
118
) -> None
119
```
120
121
### describe_collection
122
123
```python { .api }
124
def describe_collection(
125
self,
126
collection_name: str,
127
timeout: Optional[float] = None
128
) -> Dict[str, Any]
129
```
130
131
**Returns:** Dictionary containing collection metadata including schema, indexes, and properties.
132
133
### has_collection
134
135
```python { .api }
136
def has_collection(
137
self,
138
collection_name: str,
139
timeout: Optional[float] = None
140
) -> bool
141
```
142
143
### list_collections
144
145
```python { .api }
146
def list_collections(
147
self,
148
timeout: Optional[float] = None
149
) -> List[str]
150
```
151
152
**Returns:** List of collection names in the database.
153
154
### rename_collection
155
156
```python { .api }
157
def rename_collection(
158
self,
159
old_name: str,
160
new_name: str,
161
timeout: Optional[float] = None
162
) -> None
163
```
164
165
### get_collection_stats
166
167
```python { .api }
168
def get_collection_stats(
169
self,
170
collection_name: str,
171
timeout: Optional[float] = None
172
) -> Dict[str, Any]
173
```
174
175
**Returns:** Statistics including row count, data size, and index information.
176
177
## Schema Creation Helpers
178
179
### create_schema
180
181
```python { .api }
182
@classmethod
183
def create_schema(
184
cls,
185
auto_id: bool = False,
186
enable_dynamic_field: bool = False,
187
partition_key_field: Optional[str] = None,
188
clustering_key_field: Optional[str] = None,
189
**kwargs
190
) -> CollectionSchema
191
```
192
193
### create_field_schema
194
195
```python { .api }
196
@classmethod
197
def create_field_schema(
198
cls,
199
field_name: str,
200
datatype: DataType,
201
is_primary: bool = False,
202
**kwargs
203
) -> FieldSchema
204
```
205
206
### prepare_index_params
207
208
```python { .api }
209
@classmethod
210
def prepare_index_params(cls) -> IndexParams
211
```
212
213
**Returns:** Empty IndexParams object for building index configurations.
214
215
## Data Operations
216
217
### insert
218
219
```python { .api }
220
def insert(
221
self,
222
collection_name: str,
223
data: Union[List[Dict], pd.DataFrame],
224
partition_name: Optional[str] = None,
225
timeout: Optional[float] = None,
226
**kwargs
227
) -> Dict[str, Any]
228
```
229
230
**Parameters:**
231
- `collection_name`: Target collection name
232
- `data`: Data to insert as list of dictionaries or pandas DataFrame
233
- `partition_name`: Target partition (optional)
234
- `timeout`: Operation timeout
235
- `**kwargs`: Additional insertion parameters
236
237
**Returns:** Dictionary with `insert_count` and `primary_keys` (if not auto_id).
238
239
**Examples:**
240
```python
241
# Insert list of dictionaries
242
data = [
243
{"id": 1, "vector": [0.1] * 768, "title": "Document 1"},
244
{"id": 2, "vector": [0.2] * 768, "title": "Document 2"}
245
]
246
result = client.insert("documents", data)
247
248
# Insert pandas DataFrame
249
import pandas as pd
250
df = pd.DataFrame({
251
"id": [1, 2, 3],
252
"vector": [[0.1]*768, [0.2]*768, [0.3]*768],
253
"category": ["A", "B", "A"]
254
})
255
result = client.insert("products", df)
256
```
257
258
### upsert
259
260
```python { .api }
261
def upsert(
262
self,
263
collection_name: str,
264
data: Union[List[Dict], pd.DataFrame],
265
partition_name: Optional[str] = None,
266
timeout: Optional[float] = None,
267
**kwargs
268
) -> Dict[str, Any]
269
```
270
271
**Note:** Upsert will insert new entities or update existing ones based on primary key.
272
273
### delete
274
275
```python { .api }
276
def delete(
277
self,
278
collection_name: str,
279
pks: Optional[Union[List, str, int]] = None,
280
filter: Optional[str] = None,
281
partition_name: Optional[str] = None,
282
timeout: Optional[float] = None,
283
**kwargs
284
) -> Dict[str, Any]
285
```
286
287
**Parameters:**
288
- `pks`: Primary key values to delete (mutually exclusive with filter)
289
- `filter`: Boolean expression for filtering entities to delete
290
- `partition_name`: Target partition
291
- `timeout`: Operation timeout
292
293
**Examples:**
294
```python
295
# Delete by primary keys
296
client.delete("documents", pks=[1, 2, 3])
297
298
# Delete by filter expression
299
client.delete("products", filter="category == 'discontinued'")
300
301
# Delete from specific partition
302
client.delete("logs", filter="timestamp < 1640995200", partition_name="old_data")
303
```
304
305
### get
306
307
```python { .api }
308
def get(
309
self,
310
collection_name: str,
311
ids: Union[List, str, int],
312
output_fields: Optional[List[str]] = None,
313
partition_names: Optional[List[str]] = None,
314
timeout: Optional[float] = None
315
) -> List[Dict[str, Any]]
316
```
317
318
**Parameters:**
319
- `ids`: Primary key values to retrieve
320
- `output_fields`: Fields to return (default: all fields)
321
- `partition_names`: Partitions to search in
322
- `timeout`: Operation timeout
323
324
**Returns:** List of entity dictionaries.
325
326
## Query Operations
327
328
### query
329
330
```python { .api }
331
def query(
332
self,
333
collection_name: str,
334
filter: str,
335
output_fields: Optional[List[str]] = None,
336
partition_names: Optional[List[str]] = None,
337
limit: int = 16384,
338
offset: int = 0,
339
timeout: Optional[float] = None,
340
consistency_level: Optional[str] = None,
341
**kwargs
342
) -> List[Dict[str, Any]]
343
```
344
345
**Parameters:**
346
- `filter`: Boolean expression for filtering
347
- `output_fields`: Fields to return
348
- `partition_names`: Target partitions
349
- `limit`: Maximum number of results
350
- `offset`: Number of results to skip
351
- `consistency_level`: "Strong", "Eventually", "Bounded", or "Session"
352
353
**Examples:**
354
```python
355
# Basic query
356
results = client.query(
357
"products",
358
filter="price > 100 and category == 'electronics'",
359
output_fields=["id", "name", "price"],
360
limit=50
361
)
362
363
# Query with pagination
364
results = client.query(
365
"documents",
366
filter="status == 'published'",
367
output_fields=["id", "title", "content"],
368
offset=100,
369
limit=20
370
)
371
```
372
373
### query_iterator
374
375
```python { .api }
376
def query_iterator(
377
self,
378
collection_name: str,
379
filter: str,
380
output_fields: Optional[List[str]] = None,
381
partition_names: Optional[List[str]] = None,
382
batch_size: int = 1000,
383
limit: Optional[int] = None,
384
timeout: Optional[float] = None,
385
**kwargs
386
) -> QueryIterator
387
```
388
389
**Parameters:**
390
- `batch_size`: Number of results per batch
391
- `limit`: Total maximum results across all batches
392
393
**Returns:** Iterator that yields batches of results.
394
395
**Example:**
396
```python
397
# Process large result set in batches
398
iterator = client.query_iterator(
399
"large_collection",
400
filter="category == 'active'",
401
output_fields=["id", "data"],
402
batch_size=1000
403
)
404
405
for batch in iterator:
406
process_batch(batch)
407
print(f"Processed {len(batch)} records")
408
```
409
410
## Search Operations
411
412
### search
413
414
```python { .api }
415
def search(
416
self,
417
collection_name: str,
418
data: Union[List[List[float]], List[Dict]],
419
anns_field: str = "vector",
420
search_params: Optional[Dict] = None,
421
limit: int = 10,
422
expr: Optional[str] = None,
423
output_fields: Optional[List[str]] = None,
424
partition_names: Optional[List[str]] = None,
425
round_decimal: int = -1,
426
timeout: Optional[float] = None,
427
consistency_level: Optional[str] = None,
428
**kwargs
429
) -> List[List[Dict[str, Any]]]
430
```
431
432
**Parameters:**
433
- `data`: Query vectors as list of lists or list of dictionaries with vector field
434
- `anns_field`: Name of vector field to search
435
- `search_params`: Search algorithm parameters (e.g., {"nprobe": 10})
436
- `limit`: Maximum results per query
437
- `expr`: Filter expression
438
- `output_fields`: Fields to return in results
439
- `round_decimal`: Decimal precision for distances (-1 for no rounding)
440
441
**Returns:** List of result lists (one per query vector).
442
443
**Examples:**
444
```python
445
# Single vector search
446
query_vector = [0.1] * 768
447
results = client.search(
448
"documents",
449
data=[query_vector],
450
limit=5,
451
output_fields=["id", "title", "content"],
452
expr="category == 'news'"
453
)
454
455
# Multiple vector search
456
query_vectors = [[0.1] * 768, [0.2] * 768]
457
results = client.search(
458
"embeddings",
459
data=query_vectors,
460
search_params={"nprobe": 16},
461
limit=10,
462
round_decimal=4
463
)
464
```
465
466
### search_iterator
467
468
```python { .api }
469
def search_iterator(
470
self,
471
collection_name: str,
472
data: Union[List[List[float]], List[Dict]],
473
anns_field: str = "vector",
474
batch_size: int = 1000,
475
limit: Optional[int] = None,
476
search_params: Optional[Dict] = None,
477
expr: Optional[str] = None,
478
output_fields: Optional[List[str]] = None,
479
**kwargs
480
) -> SearchIterator
481
```
482
483
### hybrid_search
484
485
```python { .api }
486
def hybrid_search(
487
self,
488
collection_name: str,
489
reqs: List[AnnSearchRequest],
490
ranker: Union[RRFRanker, WeightedRanker],
491
limit: int = 10,
492
partition_names: Optional[List[str]] = None,
493
output_fields: Optional[List[str]] = None,
494
timeout: Optional[float] = None,
495
round_decimal: int = -1,
496
**kwargs
497
) -> List[List[Dict[str, Any]]]
498
```
499
500
**Parameters:**
501
- `reqs`: List of AnnSearchRequest objects for different vector fields
502
- `ranker`: Ranking algorithm (RRFRanker or WeightedRanker)
503
- `limit`: Final result count after reranking
504
505
**Example:**
506
```python
507
from pymilvus import AnnSearchRequest, RRFRanker
508
509
# Multiple vector search requests
510
req1 = AnnSearchRequest(
511
data=dense_vectors,
512
anns_field="dense_vector",
513
param={"metric_type": "L2", "params": {"nprobe": 16}},
514
limit=100
515
)
516
517
req2 = AnnSearchRequest(
518
data=sparse_vectors,
519
anns_field="sparse_vector",
520
param={"metric_type": "IP"},
521
limit=100
522
)
523
524
# Hybrid search with RRF ranking
525
results = client.hybrid_search(
526
"multi_vector_collection",
527
reqs=[req1, req2],
528
ranker=RRFRanker(k=60),
529
limit=10,
530
output_fields=["id", "title", "content"]
531
)
532
```
533
534
## Index Management
535
536
### create_index
537
538
```python { .api }
539
def create_index(
540
self,
541
collection_name: str,
542
field_name: str,
543
index_params: Dict[str, Any],
544
timeout: Optional[float] = None,
545
**kwargs
546
) -> None
547
```
548
549
**Parameters:**
550
- `field_name`: Field to create index on
551
- `index_params`: Index configuration dictionary
552
553
**Examples:**
554
```python
555
# Vector index
556
client.create_index(
557
"documents",
558
"vector",
559
{
560
"index_type": "IVF_FLAT",
561
"metric_type": "L2",
562
"params": {"nlist": 1024}
563
}
564
)
565
566
# Scalar index
567
client.create_index(
568
"products",
569
"category",
570
{"index_type": "TRIE"}
571
)
572
```
573
574
### drop_index
575
576
```python { .api }
577
def drop_index(
578
self,
579
collection_name: str,
580
field_name: str,
581
timeout: Optional[float] = None
582
) -> None
583
```
584
585
### list_indexes
586
587
```python { .api }
588
def list_indexes(
589
self,
590
collection_name: str,
591
field_name: Optional[str] = None,
592
timeout: Optional[float] = None
593
) -> List[str]
594
```
595
596
### describe_index
597
598
```python { .api }
599
def describe_index(
600
self,
601
collection_name: str,
602
field_name: str,
603
timeout: Optional[float] = None
604
) -> Dict[str, Any]
605
```
606
607
## Loading and Memory Management
608
609
### load_collection
610
611
```python { .api }
612
def load_collection(
613
self,
614
collection_name: str,
615
timeout: Optional[float] = None,
616
replica_number: int = 1,
617
resource_groups: Optional[List[str]] = None,
618
**kwargs
619
) -> None
620
```
621
622
### release_collection
623
624
```python { .api }
625
def release_collection(
626
self,
627
collection_name: str,
628
timeout: Optional[float] = None
629
) -> None
630
```
631
632
### get_load_state
633
634
```python { .api }
635
def get_load_state(
636
self,
637
collection_name: str,
638
partition_name: Optional[str] = None,
639
timeout: Optional[float] = None
640
) -> Dict[str, Any]
641
```
642
643
**Returns:** Dictionary with `state` ("NotExist", "NotLoad", "Loading", "Loaded") and progress information.
644
645
### refresh_load
646
647
```python { .api }
648
def refresh_load(
649
self,
650
collection_name: str,
651
timeout: Optional[float] = None
652
) -> None
653
```
654
655
## AsyncMilvusClient
656
657
The AsyncMilvusClient provides identical functionality to MilvusClient but with async/await support for non-blocking operations.
658
659
### Usage Pattern
660
661
```python { .api }
662
from pymilvus import AsyncMilvusClient
663
import asyncio
664
665
async def async_operations():
666
# Initialize async client
667
client = AsyncMilvusClient(uri="http://localhost:19530")
668
669
try:
670
# All methods are async and must be awaited
671
await client.create_collection("async_collection", dimension=768)
672
673
# Concurrent operations
674
tasks = [
675
client.insert("async_collection", batch1),
676
client.insert("async_collection", batch2),
677
client.insert("async_collection", batch3)
678
]
679
results = await asyncio.gather(*tasks)
680
681
# Search operations
682
search_results = await client.search(
683
"async_collection",
684
data=[[0.1] * 768],
685
limit=10
686
)
687
688
finally:
689
# Always close the client
690
await client.close()
691
692
# Run async operations
693
asyncio.run(async_operations())
694
```
695
696
### Key Differences from MilvusClient
697
698
1. **All methods are coroutines** - Must be awaited
699
2. **Concurrent execution** - Use `asyncio.gather()` for parallel operations
700
3. **Resource management** - Always call `await client.close()`
701
4. **Same method signatures** - Parameters and return types identical to sync version
702
703
### Async Method Examples
704
705
```python { .api }
706
# Async data operations
707
await client.insert(collection_name, data)
708
await client.upsert(collection_name, data)
709
await client.delete(collection_name, pks=[1, 2, 3])
710
711
# Async search operations
712
results = await client.search(collection_name, query_vectors, limit=10)
713
results = await client.query(collection_name, filter="category == 'A'")
714
715
# Async collection management
716
await client.create_collection(name, dimension=768)
717
collections = await client.list_collections()
718
await client.load_collection(name)
719
```
720
721
## Connection Management
722
723
### close
724
725
```python { .api }
726
def close(self) -> None
727
```
728
729
Closes the client connection and cleans up resources. For AsyncMilvusClient, this method is async:
730
731
```python { .api }
732
async def close(self) -> None # AsyncMilvusClient version
733
```
734
735
**Best Practice:**
736
```python
737
# Synchronous client
738
try:
739
client = MilvusClient()
740
# ... operations ...
741
finally:
742
client.close()
743
744
# Asynchronous client
745
try:
746
client = AsyncMilvusClient()
747
# ... operations ...
748
finally:
749
await client.close()
750
751
# Or use context manager (if supported)
752
async with AsyncMilvusClient() as client:
753
await client.search(...)
754
```
755
756
## Error Handling
757
758
Both MilvusClient and AsyncMilvusClient raise the same exception types. Common exceptions include:
759
760
```python
761
from pymilvus import MilvusException, MilvusUnavailableException
762
763
try:
764
client = MilvusClient(uri="invalid://host:port")
765
client.search("nonexistent", [[0.1] * 768])
766
except MilvusUnavailableException:
767
print("Milvus server unavailable")
768
except MilvusException as e:
769
print(f"Milvus error: {e.code} - {e.message}")
770
except Exception as e:
771
print(f"General error: {e}")
772
```
773
774
The MilvusClient interface provides a streamlined way to interact with Milvus, abstracting away many of the complexities while still providing access to advanced features when needed.