or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-management.mdindex-management.mdindex.mdmilvus-client.mdorm-collection.mdsearch-operations.mdtypes-enums.mduser-management.mdutility-functions.md

milvus-client.mddocs/

0

# MilvusClient and AsyncMilvusClient

1

2

The MilvusClient provides a simplified, high-level interface for common Milvus operations, while AsyncMilvusClient offers the same functionality with async/await support for non-blocking operations in high-concurrency applications.

3

4

## MilvusClient

5

6

### Constructor

7

8

```python { .api }

9

from pymilvus import MilvusClient

10

11

def __init__(

12

self,

13

uri: str = "http://localhost:19530",

14

user: str = "",

15

password: str = "",

16

db_name: str = "",

17

token: str = "",

18

timeout: Optional[float] = None,

19

**kwargs

20

) -> None

21

```

22

23

**Parameters:**

24

- `uri`: Milvus server URI (default: "http://localhost:19530")

25

- `user`: Username for authentication

26

- `password`: Password for authentication

27

- `db_name`: Database name to connect to

28

- `token`: Authentication token (alternative to user/password)

29

- `timeout`: Connection timeout in seconds

30

- `**kwargs`: Additional connection parameters

31

32

**Example:**

33

```python

34

# Local connection

35

client = MilvusClient()

36

37

# Remote connection with authentication

38

client = MilvusClient(

39

uri="https://milvus.example.com:19530",

40

user="admin",

41

password="password123",

42

db_name="production"

43

)

44

```

45

46

## Collection Management

47

48

### create_collection

49

50

```python { .api }

51

def create_collection(

52

self,

53

collection_name: str,

54

dimension: Optional[int] = None,

55

primary_field_name: str = "id",

56

id_type: str = "int",

57

vector_field_name: str = "vector",

58

metric_type: str = "COSINE",

59

auto_id: bool = False,

60

timeout: Optional[float] = None,

61

schema: Optional[CollectionSchema] = None,

62

index_params: Optional[IndexParams] = None,

63

**kwargs

64

) -> None

65

```

66

67

**Parameters:**

68

- `collection_name`: Name of the collection to create

69

- `dimension`: Vector dimension (required if schema not provided)

70

- `primary_field_name`: Name of primary key field (default: "id")

71

- `id_type`: Primary key type - "int" or "string" (default: "int")

72

- `vector_field_name`: Name of vector field (default: "vector")

73

- `metric_type`: Distance metric - "L2", "IP", "COSINE" (default: "COSINE")

74

- `auto_id`: Enable auto-generated IDs (default: False)

75

- `timeout`: Operation timeout in seconds

76

- `schema`: Pre-built CollectionSchema object

77

- `index_params`: Index parameters for automatic index creation

78

- `**kwargs`: Additional collection properties

79

80

**Examples:**

81

```python

82

# Simple collection creation

83

client.create_collection(

84

collection_name="documents",

85

dimension=768,

86

metric_type="COSINE"

87

)

88

89

# Collection with string IDs

90

client.create_collection(

91

collection_name="products",

92

dimension=512,

93

id_type="string",

94

primary_field_name="product_id",

95

vector_field_name="embedding"

96

)

97

98

# With pre-built schema

99

from pymilvus import CollectionSchema, FieldSchema, DataType

100

101

schema = CollectionSchema([

102

FieldSchema("id", DataType.INT64, is_primary=True),

103

FieldSchema("title", DataType.VARCHAR, max_length=200),

104

FieldSchema("vector", DataType.FLOAT_VECTOR, dim=768),

105

FieldSchema("metadata", DataType.JSON)

106

], description="Document collection")

107

108

client.create_collection("advanced_docs", schema=schema)

109

```

110

111

### drop_collection

112

113

```python { .api }

114

def drop_collection(

115

self,

116

collection_name: str,

117

timeout: Optional[float] = None

118

) -> None

119

```

120

121

### describe_collection

122

123

```python { .api }

124

def describe_collection(

125

self,

126

collection_name: str,

127

timeout: Optional[float] = None

128

) -> Dict[str, Any]

129

```

130

131

**Returns:** Dictionary containing collection metadata including schema, indexes, and properties.

132

133

### has_collection

134

135

```python { .api }

136

def has_collection(

137

self,

138

collection_name: str,

139

timeout: Optional[float] = None

140

) -> bool

141

```

142

143

### list_collections

144

145

```python { .api }

146

def list_collections(

147

self,

148

timeout: Optional[float] = None

149

) -> List[str]

150

```

151

152

**Returns:** List of collection names in the database.

153

154

### rename_collection

155

156

```python { .api }

157

def rename_collection(

158

self,

159

old_name: str,

160

new_name: str,

161

timeout: Optional[float] = None

162

) -> None

163

```

164

165

### get_collection_stats

166

167

```python { .api }

168

def get_collection_stats(

169

self,

170

collection_name: str,

171

timeout: Optional[float] = None

172

) -> Dict[str, Any]

173

```

174

175

**Returns:** Statistics including row count, data size, and index information.

176

177

## Schema Creation Helpers

178

179

### create_schema

180

181

```python { .api }

182

@classmethod

183

def create_schema(

184

cls,

185

auto_id: bool = False,

186

enable_dynamic_field: bool = False,

187

partition_key_field: Optional[str] = None,

188

clustering_key_field: Optional[str] = None,

189

**kwargs

190

) -> CollectionSchema

191

```

192

193

### create_field_schema

194

195

```python { .api }

196

@classmethod

197

def create_field_schema(

198

cls,

199

field_name: str,

200

datatype: DataType,

201

is_primary: bool = False,

202

**kwargs

203

) -> FieldSchema

204

```

205

206

### prepare_index_params

207

208

```python { .api }

209

@classmethod

210

def prepare_index_params(cls) -> IndexParams

211

```

212

213

**Returns:** Empty IndexParams object for building index configurations.

214

215

## Data Operations

216

217

### insert

218

219

```python { .api }

220

def insert(

221

self,

222

collection_name: str,

223

data: Union[List[Dict], pd.DataFrame],

224

partition_name: Optional[str] = None,

225

timeout: Optional[float] = None,

226

**kwargs

227

) -> Dict[str, Any]

228

```

229

230

**Parameters:**

231

- `collection_name`: Target collection name

232

- `data`: Data to insert as list of dictionaries or pandas DataFrame

233

- `partition_name`: Target partition (optional)

234

- `timeout`: Operation timeout

235

- `**kwargs`: Additional insertion parameters

236

237

**Returns:** Dictionary with `insert_count` and `primary_keys` (if not auto_id).

238

239

**Examples:**

240

```python

241

# Insert list of dictionaries

242

data = [

243

{"id": 1, "vector": [0.1] * 768, "title": "Document 1"},

244

{"id": 2, "vector": [0.2] * 768, "title": "Document 2"}

245

]

246

result = client.insert("documents", data)

247

248

# Insert pandas DataFrame

249

import pandas as pd

250

df = pd.DataFrame({

251

"id": [1, 2, 3],

252

"vector": [[0.1]*768, [0.2]*768, [0.3]*768],

253

"category": ["A", "B", "A"]

254

})

255

result = client.insert("products", df)

256

```

257

258

### upsert

259

260

```python { .api }

261

def upsert(

262

self,

263

collection_name: str,

264

data: Union[List[Dict], pd.DataFrame],

265

partition_name: Optional[str] = None,

266

timeout: Optional[float] = None,

267

**kwargs

268

) -> Dict[str, Any]

269

```

270

271

**Note:** Upsert will insert new entities or update existing ones based on primary key.

272

273

### delete

274

275

```python { .api }

276

def delete(

277

self,

278

collection_name: str,

279

pks: Optional[Union[List, str, int]] = None,

280

filter: Optional[str] = None,

281

partition_name: Optional[str] = None,

282

timeout: Optional[float] = None,

283

**kwargs

284

) -> Dict[str, Any]

285

```

286

287

**Parameters:**

288

- `pks`: Primary key values to delete (mutually exclusive with filter)

289

- `filter`: Boolean expression for filtering entities to delete

290

- `partition_name`: Target partition

291

- `timeout`: Operation timeout

292

293

**Examples:**

294

```python

295

# Delete by primary keys

296

client.delete("documents", pks=[1, 2, 3])

297

298

# Delete by filter expression

299

client.delete("products", filter="category == 'discontinued'")

300

301

# Delete from specific partition

302

client.delete("logs", filter="timestamp < 1640995200", partition_name="old_data")

303

```

304

305

### get

306

307

```python { .api }

308

def get(

309

self,

310

collection_name: str,

311

ids: Union[List, str, int],

312

output_fields: Optional[List[str]] = None,

313

partition_names: Optional[List[str]] = None,

314

timeout: Optional[float] = None

315

) -> List[Dict[str, Any]]

316

```

317

318

**Parameters:**

319

- `ids`: Primary key values to retrieve

320

- `output_fields`: Fields to return (default: all fields)

321

- `partition_names`: Partitions to search in

322

- `timeout`: Operation timeout

323

324

**Returns:** List of entity dictionaries.

325

326

## Query Operations

327

328

### query

329

330

```python { .api }

331

def query(

332

self,

333

collection_name: str,

334

filter: str,

335

output_fields: Optional[List[str]] = None,

336

partition_names: Optional[List[str]] = None,

337

limit: int = 16384,

338

offset: int = 0,

339

timeout: Optional[float] = None,

340

consistency_level: Optional[str] = None,

341

**kwargs

342

) -> List[Dict[str, Any]]

343

```

344

345

**Parameters:**

346

- `filter`: Boolean expression for filtering

347

- `output_fields`: Fields to return

348

- `partition_names`: Target partitions

349

- `limit`: Maximum number of results

350

- `offset`: Number of results to skip

351

- `consistency_level`: "Strong", "Eventually", "Bounded", or "Session"

352

353

**Examples:**

354

```python

355

# Basic query

356

results = client.query(

357

"products",

358

filter="price > 100 and category == 'electronics'",

359

output_fields=["id", "name", "price"],

360

limit=50

361

)

362

363

# Query with pagination

364

results = client.query(

365

"documents",

366

filter="status == 'published'",

367

output_fields=["id", "title", "content"],

368

offset=100,

369

limit=20

370

)

371

```

372

373

### query_iterator

374

375

```python { .api }

376

def query_iterator(

377

self,

378

collection_name: str,

379

filter: str,

380

output_fields: Optional[List[str]] = None,

381

partition_names: Optional[List[str]] = None,

382

batch_size: int = 1000,

383

limit: Optional[int] = None,

384

timeout: Optional[float] = None,

385

**kwargs

386

) -> QueryIterator

387

```

388

389

**Parameters:**

390

- `batch_size`: Number of results per batch

391

- `limit`: Total maximum results across all batches

392

393

**Returns:** Iterator that yields batches of results.

394

395

**Example:**

396

```python

397

# Process large result set in batches

398

iterator = client.query_iterator(

399

"large_collection",

400

filter="category == 'active'",

401

output_fields=["id", "data"],

402

batch_size=1000

403

)

404

405

for batch in iterator:

406

process_batch(batch)

407

print(f"Processed {len(batch)} records")

408

```

409

410

## Search Operations

411

412

### search

413

414

```python { .api }

415

def search(

416

self,

417

collection_name: str,

418

data: Union[List[List[float]], List[Dict]],

419

anns_field: str = "vector",

420

search_params: Optional[Dict] = None,

421

limit: int = 10,

422

expr: Optional[str] = None,

423

output_fields: Optional[List[str]] = None,

424

partition_names: Optional[List[str]] = None,

425

round_decimal: int = -1,

426

timeout: Optional[float] = None,

427

consistency_level: Optional[str] = None,

428

**kwargs

429

) -> List[List[Dict[str, Any]]]

430

```

431

432

**Parameters:**

433

- `data`: Query vectors as list of lists or list of dictionaries with vector field

434

- `anns_field`: Name of vector field to search

435

- `search_params`: Search algorithm parameters (e.g., {"nprobe": 10})

436

- `limit`: Maximum results per query

437

- `expr`: Filter expression

438

- `output_fields`: Fields to return in results

439

- `round_decimal`: Decimal precision for distances (-1 for no rounding)

440

441

**Returns:** List of result lists (one per query vector).

442

443

**Examples:**

444

```python

445

# Single vector search

446

query_vector = [0.1] * 768

447

results = client.search(

448

"documents",

449

data=[query_vector],

450

limit=5,

451

output_fields=["id", "title", "content"],

452

expr="category == 'news'"

453

)

454

455

# Multiple vector search

456

query_vectors = [[0.1] * 768, [0.2] * 768]

457

results = client.search(

458

"embeddings",

459

data=query_vectors,

460

search_params={"nprobe": 16},

461

limit=10,

462

round_decimal=4

463

)

464

```

465

466

### search_iterator

467

468

```python { .api }

469

def search_iterator(

470

self,

471

collection_name: str,

472

data: Union[List[List[float]], List[Dict]],

473

anns_field: str = "vector",

474

batch_size: int = 1000,

475

limit: Optional[int] = None,

476

search_params: Optional[Dict] = None,

477

expr: Optional[str] = None,

478

output_fields: Optional[List[str]] = None,

479

**kwargs

480

) -> SearchIterator

481

```

482

483

### hybrid_search

484

485

```python { .api }

486

def hybrid_search(

487

self,

488

collection_name: str,

489

reqs: List[AnnSearchRequest],

490

ranker: Union[RRFRanker, WeightedRanker],

491

limit: int = 10,

492

partition_names: Optional[List[str]] = None,

493

output_fields: Optional[List[str]] = None,

494

timeout: Optional[float] = None,

495

round_decimal: int = -1,

496

**kwargs

497

) -> List[List[Dict[str, Any]]]

498

```

499

500

**Parameters:**

501

- `reqs`: List of AnnSearchRequest objects for different vector fields

502

- `ranker`: Ranking algorithm (RRFRanker or WeightedRanker)

503

- `limit`: Final result count after reranking

504

505

**Example:**

506

```python

507

from pymilvus import AnnSearchRequest, RRFRanker

508

509

# Multiple vector search requests

510

req1 = AnnSearchRequest(

511

data=dense_vectors,

512

anns_field="dense_vector",

513

param={"metric_type": "L2", "params": {"nprobe": 16}},

514

limit=100

515

)

516

517

req2 = AnnSearchRequest(

518

data=sparse_vectors,

519

anns_field="sparse_vector",

520

param={"metric_type": "IP"},

521

limit=100

522

)

523

524

# Hybrid search with RRF ranking

525

results = client.hybrid_search(

526

"multi_vector_collection",

527

reqs=[req1, req2],

528

ranker=RRFRanker(k=60),

529

limit=10,

530

output_fields=["id", "title", "content"]

531

)

532

```

533

534

## Index Management

535

536

### create_index

537

538

```python { .api }

539

def create_index(

540

self,

541

collection_name: str,

542

field_name: str,

543

index_params: Dict[str, Any],

544

timeout: Optional[float] = None,

545

**kwargs

546

) -> None

547

```

548

549

**Parameters:**

550

- `field_name`: Field to create index on

551

- `index_params`: Index configuration dictionary

552

553

**Examples:**

554

```python

555

# Vector index

556

client.create_index(

557

"documents",

558

"vector",

559

{

560

"index_type": "IVF_FLAT",

561

"metric_type": "L2",

562

"params": {"nlist": 1024}

563

}

564

)

565

566

# Scalar index

567

client.create_index(

568

"products",

569

"category",

570

{"index_type": "TRIE"}

571

)

572

```

573

574

### drop_index

575

576

```python { .api }

577

def drop_index(

578

self,

579

collection_name: str,

580

field_name: str,

581

timeout: Optional[float] = None

582

) -> None

583

```

584

585

### list_indexes

586

587

```python { .api }

588

def list_indexes(

589

self,

590

collection_name: str,

591

field_name: Optional[str] = None,

592

timeout: Optional[float] = None

593

) -> List[str]

594

```

595

596

### describe_index

597

598

```python { .api }

599

def describe_index(

600

self,

601

collection_name: str,

602

field_name: str,

603

timeout: Optional[float] = None

604

) -> Dict[str, Any]

605

```

606

607

## Loading and Memory Management

608

609

### load_collection

610

611

```python { .api }

612

def load_collection(

613

self,

614

collection_name: str,

615

timeout: Optional[float] = None,

616

replica_number: int = 1,

617

resource_groups: Optional[List[str]] = None,

618

**kwargs

619

) -> None

620

```

621

622

### release_collection

623

624

```python { .api }

625

def release_collection(

626

self,

627

collection_name: str,

628

timeout: Optional[float] = None

629

) -> None

630

```

631

632

### get_load_state

633

634

```python { .api }

635

def get_load_state(

636

self,

637

collection_name: str,

638

partition_name: Optional[str] = None,

639

timeout: Optional[float] = None

640

) -> Dict[str, Any]

641

```

642

643

**Returns:** Dictionary with `state` ("NotExist", "NotLoad", "Loading", "Loaded") and progress information.

644

645

### refresh_load

646

647

```python { .api }

648

def refresh_load(

649

self,

650

collection_name: str,

651

timeout: Optional[float] = None

652

) -> None

653

```

654

655

## AsyncMilvusClient

656

657

The AsyncMilvusClient provides identical functionality to MilvusClient but with async/await support for non-blocking operations.

658

659

### Usage Pattern

660

661

```python { .api }

662

from pymilvus import AsyncMilvusClient

663

import asyncio

664

665

async def async_operations():

666

# Initialize async client

667

client = AsyncMilvusClient(uri="http://localhost:19530")

668

669

try:

670

# All methods are async and must be awaited

671

await client.create_collection("async_collection", dimension=768)

672

673

# Concurrent operations

674

tasks = [

675

client.insert("async_collection", batch1),

676

client.insert("async_collection", batch2),

677

client.insert("async_collection", batch3)

678

]

679

results = await asyncio.gather(*tasks)

680

681

# Search operations

682

search_results = await client.search(

683

"async_collection",

684

data=[[0.1] * 768],

685

limit=10

686

)

687

688

finally:

689

# Always close the client

690

await client.close()

691

692

# Run async operations

693

asyncio.run(async_operations())

694

```

695

696

### Key Differences from MilvusClient

697

698

1. **All methods are coroutines** - Must be awaited

699

2. **Concurrent execution** - Use `asyncio.gather()` for parallel operations

700

3. **Resource management** - Always call `await client.close()`

701

4. **Same method signatures** - Parameters and return types identical to sync version

702

703

### Async Method Examples

704

705

```python { .api }

706

# Async data operations

707

await client.insert(collection_name, data)

708

await client.upsert(collection_name, data)

709

await client.delete(collection_name, pks=[1, 2, 3])

710

711

# Async search operations

712

results = await client.search(collection_name, query_vectors, limit=10)

713

results = await client.query(collection_name, filter="category == 'A'")

714

715

# Async collection management

716

await client.create_collection(name, dimension=768)

717

collections = await client.list_collections()

718

await client.load_collection(name)

719

```

720

721

## Connection Management

722

723

### close

724

725

```python { .api }

726

def close(self) -> None

727

```

728

729

Closes the client connection and cleans up resources. For AsyncMilvusClient, this method is async:

730

731

```python { .api }

732

async def close(self) -> None # AsyncMilvusClient version

733

```

734

735

**Best Practice:**

736

```python

737

# Synchronous client

738

try:

739

client = MilvusClient()

740

# ... operations ...

741

finally:

742

client.close()

743

744

# Asynchronous client

745

try:

746

client = AsyncMilvusClient()

747

# ... operations ...

748

finally:

749

await client.close()

750

751

# Or use context manager (if supported)

752

async with AsyncMilvusClient() as client:

753

await client.search(...)

754

```

755

756

## Error Handling

757

758

Both MilvusClient and AsyncMilvusClient raise the same exception types. Common exceptions include:

759

760

```python

761

from pymilvus import MilvusException, MilvusUnavailableException

762

763

try:

764

client = MilvusClient(uri="invalid://host:port")

765

client.search("nonexistent", [[0.1] * 768])

766

except MilvusUnavailableException:

767

print("Milvus server unavailable")

768

except MilvusException as e:

769

print(f"Milvus error: {e.code} - {e.message}")

770

except Exception as e:

771

print(f"General error: {e}")

772

```

773

774

The MilvusClient interface provides a streamlined way to interact with Milvus, abstracting away many of the complexities while still providing access to advanced features when needed.