or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-management.mdindex-management.mdindex.mdmilvus-client.mdorm-collection.mdsearch-operations.mdtypes-enums.mduser-management.mdutility-functions.md

data-management.mddocs/

0

# Data Management

1

2

PyMilvus provides comprehensive data management capabilities including insertion, updates, deletion, and retrieval operations. This covers batch operations, data validation, transaction handling, and efficient iteration over large datasets.

3

4

## Data Insertion

5

6

### Basic Insert Operations

7

8

```python { .api }

9

from pymilvus import MilvusClient

10

11

client = MilvusClient()

12

13

def insert(

14

collection_name: str,

15

data: Union[List[Dict], pd.DataFrame],

16

partition_name: Optional[str] = None,

17

timeout: Optional[float] = None,

18

**kwargs

19

) -> Dict[str, Any]

20

```

21

22

**Parameters:**

23

- `data`: Data as list of dictionaries or pandas DataFrame

24

- `partition_name`: Target partition (optional)

25

- `timeout`: Operation timeout in seconds

26

- `**kwargs`: Additional insertion parameters

27

28

**Returns:** Dictionary with `insert_count` and `primary_keys` (if not auto_id)

29

30

### Insert Examples

31

32

```python { .api }

33

# Insert list of dictionaries

34

data = [

35

{

36

"id": 1,

37

"vector": [0.1, 0.2, 0.3] * 256,

38

"title": "First Document",

39

"metadata": {"category": "tech", "year": 2024}

40

},

41

{

42

"id": 2,

43

"vector": [0.4, 0.5, 0.6] * 256,

44

"title": "Second Document",

45

"metadata": {"category": "science", "year": 2024}

46

}

47

]

48

49

result = client.insert("documents", data)

50

print(f"Inserted {result['insert_count']} entities")

51

if 'primary_keys' in result:

52

print(f"Primary keys: {result['primary_keys']}")

53

54

# Insert pandas DataFrame

55

import pandas as pd

56

import numpy as np

57

58

df = pd.DataFrame({

59

"id": range(1000),

60

"vector": [np.random.rand(768).tolist() for _ in range(1000)],

61

"category": np.random.choice(["A", "B", "C"], 1000),

62

"score": np.random.rand(1000),

63

"active": np.random.choice([True, False], 1000)

64

})

65

66

result = client.insert("products", df)

67

print(f"Inserted {result['insert_count']} products from DataFrame")

68

69

# Insert into specific partition

70

seasonal_data = [

71

{"id": i, "vector": [0.1] * 128, "season": "winter"}

72

for i in range(100, 200)

73

]

74

75

client.insert(

76

"seasonal_collection",

77

seasonal_data,

78

partition_name="winter_2024"

79

)

80

```

81

82

### Auto-ID Collections

83

84

```python { .api }

85

# For collections with auto_id=True, omit primary key field

86

auto_id_data = [

87

{

88

"vector": [0.1, 0.2] * 384,

89

"content": "Auto-generated ID document",

90

"tags": ["auto", "generated"]

91

},

92

{

93

"vector": [0.3, 0.4] * 384,

94

"content": "Another auto-ID document",

95

"tags": ["automatic", "id"]

96

}

97

]

98

99

result = client.insert("auto_id_collection", auto_id_data)

100

print(f"Generated primary keys: {result['primary_keys']}")

101

```

102

103

### Large Batch Insertion

104

105

```python { .api }

106

def batch_insert_large_dataset(collection_name: str, data_generator, batch_size: int = 1000):

107

"""Insert large dataset in batches to manage memory"""

108

109

total_inserted = 0

110

batch = []

111

112

for record in data_generator():

113

batch.append(record)

114

115

if len(batch) >= batch_size:

116

result = client.insert(collection_name, batch)

117

total_inserted += result['insert_count']

118

print(f"Inserted batch: {result['insert_count']}, Total: {total_inserted}")

119

120

batch = [] # Clear batch

121

122

# Insert remaining records

123

if batch:

124

result = client.insert(collection_name, batch)

125

total_inserted += result['insert_count']

126

print(f"Final batch: {result['insert_count']}, Total: {total_inserted}")

127

128

return total_inserted

129

130

# Example generator for large dataset

131

def generate_documents(count: int):

132

"""Generator for memory-efficient data creation"""

133

for i in range(count):

134

yield {

135

"id": i,

136

"vector": np.random.rand(768).tolist(),

137

"title": f"Document {i}",

138

"content": f"Content for document {i}" * 50, # Larger text content

139

"metadata": {

140

"created_at": int(time.time()) - random.randint(0, 86400),

141

"category": random.choice(["tech", "science", "arts", "sports"]),

142

"priority": random.randint(1, 10)

143

}

144

}

145

146

# Use batch insertion

147

total = batch_insert_large_dataset("large_collection", lambda: generate_documents(100000), batch_size=2000)

148

print(f"Successfully inserted {total} documents")

149

```

150

151

## Data Updates (Upsert)

152

153

### Upsert Operations

154

155

```python { .api }

156

def upsert(

157

collection_name: str,

158

data: Union[List[Dict], pd.DataFrame],

159

partition_name: Optional[str] = None,

160

timeout: Optional[float] = None,

161

**kwargs

162

) -> Dict[str, Any]

163

```

164

165

Upsert performs insert-or-update based on primary key matching. If primary key exists, the entity is updated; otherwise, it's inserted.

166

167

```python { .api }

168

# Initial data

169

initial_data = [

170

{"id": 1, "vector": [0.1] * 768, "title": "Original Title", "status": "draft"},

171

{"id": 2, "vector": [0.2] * 768, "title": "Another Document", "status": "draft"}

172

]

173

client.insert("documents", initial_data)

174

175

# Upsert: update existing and insert new

176

upsert_data = [

177

{"id": 1, "vector": [0.15] * 768, "title": "Updated Title", "status": "published"}, # Update

178

{"id": 3, "vector": [0.3] * 768, "title": "New Document", "status": "draft"} # Insert

179

]

180

181

result = client.upsert("documents", upsert_data)

182

print(f"Upsert count: {result.get('upsert_count', 0)}")

183

184

# Verify changes

185

updated = client.query("documents", "id in [1, 3]", output_fields=["id", "title", "status"])

186

for doc in updated:

187

print(f"ID {doc['id']}: {doc['title']} - {doc['status']}")

188

```

189

190

### Conditional Upserts

191

192

```python { .api }

193

def conditional_upsert(collection_name: str, updates: List[Dict], condition_field: str):

194

"""Upsert only if condition is met"""

195

196

# Get existing entities

197

primary_keys = [update['id'] for update in updates]

198

existing = client.get(

199

collection_name,

200

ids=primary_keys,

201

output_fields=[condition_field, "id"]

202

)

203

204

# Create mapping of existing entities

205

existing_map = {entity['id']: entity for entity in existing}

206

207

# Filter updates based on conditions

208

valid_updates = []

209

for update in updates:

210

entity_id = update['id']

211

212

if entity_id in existing_map:

213

# Apply update condition (example: only update if timestamp is newer)

214

existing_timestamp = existing_map[entity_id].get('timestamp', 0)

215

new_timestamp = update.get('timestamp', 0)

216

217

if new_timestamp > existing_timestamp:

218

valid_updates.append(update)

219

else:

220

# New entity, always include

221

valid_updates.append(update)

222

223

if valid_updates:

224

return client.upsert(collection_name, valid_updates)

225

226

return {"upsert_count": 0}

227

```

228

229

## Data Deletion

230

231

### Delete by Primary Key

232

233

```python { .api }

234

def delete(

235

collection_name: str,

236

pks: Optional[Union[List, str, int]] = None,

237

filter: Optional[str] = None,

238

partition_name: Optional[str] = None,

239

timeout: Optional[float] = None,

240

**kwargs

241

) -> Dict[str, Any]

242

```

243

244

**Parameters:**

245

- `pks`: Primary key values (mutually exclusive with filter)

246

- `filter`: Boolean expression (mutually exclusive with pks)

247

- `partition_name`: Target partition

248

- `timeout`: Operation timeout

249

250

```python { .api }

251

# Delete by primary keys

252

result = client.delete("documents", pks=[1, 2, 3])

253

print(f"Deleted {result.get('delete_count', 0)} entities")

254

255

# Delete single entity

256

client.delete("products", pks=12345)

257

258

# Delete by string primary keys

259

client.delete("users", pks=["user_001", "user_002", "user_003"])

260

261

# Delete from specific partition

262

client.delete("logs", pks=[100, 101, 102], partition_name="old_logs")

263

```

264

265

### Delete by Expression

266

267

```python { .api }

268

# Delete by filter conditions

269

result = client.delete("products", filter="category == 'discontinued'")

270

print(f"Deleted {result['delete_count']} discontinued products")

271

272

# Delete old records

273

client.delete("events", filter="timestamp < 1640995200") # Before 2022-01-01

274

275

# Delete with complex conditions

276

client.delete(

277

"documents",

278

filter="status == 'draft' and created_at < 1577836800 and author == 'system'"

279

)

280

281

# Delete from specific partitions with conditions

282

client.delete(

283

"user_activity",

284

filter="action_type == 'login' and success == false",

285

partition_name="failed_attempts"

286

)

287

```

288

289

### Batch Deletion Patterns

290

291

```python { .api }

292

def safe_batch_delete(collection_name: str, delete_condition: str, batch_size: int = 1000):

293

"""Safely delete large numbers of entities in batches"""

294

295

total_deleted = 0

296

297

while True:

298

# Query entities matching delete condition

299

to_delete = client.query(

300

collection_name,

301

filter=delete_condition,

302

output_fields=["id"], # Only need primary keys

303

limit=batch_size

304

)

305

306

if not to_delete:

307

break # No more entities to delete

308

309

# Extract primary keys

310

pks = [entity['id'] for entity in to_delete]

311

312

# Delete batch

313

result = client.delete(collection_name, pks=pks)

314

deleted_count = result.get('delete_count', 0)

315

total_deleted += deleted_count

316

317

print(f"Deleted batch of {deleted_count} entities, total: {total_deleted}")

318

319

# If we deleted fewer than batch_size, we're done

320

if deleted_count < batch_size:

321

break

322

323

return total_deleted

324

325

# Example: Delete old inactive users

326

deleted_count = safe_batch_delete(

327

"users",

328

"last_login < 1609459200 and status == 'inactive'", # Before 2021-01-01

329

batch_size=500

330

)

331

print(f"Total deleted: {deleted_count} inactive users")

332

```

333

334

## Data Retrieval

335

336

### Get by Primary Key

337

338

```python { .api }

339

def get(

340

collection_name: str,

341

ids: Union[List, str, int],

342

output_fields: Optional[List[str]] = None,

343

partition_names: Optional[List[str]] = None,

344

timeout: Optional[float] = None

345

) -> List[Dict[str, Any]]

346

```

347

348

```python { .api }

349

# Get single entity

350

entity = client.get("documents", ids=1, output_fields=["id", "title", "content"])

351

if entity:

352

print(f"Document: {entity[0]['title']}")

353

354

# Get multiple entities

355

entities = client.get(

356

"products",

357

ids=[100, 101, 102],

358

output_fields=["id", "name", "price", "category"]

359

)

360

361

for product in entities:

362

print(f"{product['name']}: ${product['price']}")

363

364

# Get with string primary keys

365

user_profiles = client.get(

366

"users",

367

ids=["user_001", "user_002"],

368

output_fields=["user_id", "name", "email", "profile"]

369

)

370

371

# Get from specific partitions

372

recent_data = client.get(

373

"time_series",

374

ids=range(1000, 1100),

375

partition_names=["2024_q4"],

376

output_fields=["id", "timestamp", "value"]

377

)

378

```

379

380

### Error Handling for Retrieval

381

382

```python { .api }

383

def safe_get_entities(collection_name: str, ids: List, output_fields: List[str]) -> List[Dict]:

384

"""Safely retrieve entities with error handling"""

385

386

try:

387

entities = client.get(

388

collection_name,

389

ids=ids,

390

output_fields=output_fields

391

)

392

393

# Check if all requested entities were found

394

found_ids = {entity['id'] for entity in entities}

395

missing_ids = set(ids) - found_ids

396

397

if missing_ids:

398

print(f"Warning: {len(missing_ids)} entities not found: {list(missing_ids)[:5]}...")

399

400

return entities

401

402

except Exception as e:

403

print(f"Error retrieving entities: {e}")

404

return []

405

406

# Usage

407

products = safe_get_entities("products", [1, 2, 999999], ["id", "name", "price"])

408

```

409

410

## Data Iteration

411

412

### Query Iterator

413

414

```python { .api }

415

def query_iterator(

416

collection_name: str,

417

filter: str,

418

output_fields: Optional[List[str]] = None,

419

batch_size: int = 1000,

420

limit: Optional[int] = None,

421

partition_names: Optional[List[str]] = None,

422

timeout: Optional[float] = None,

423

**kwargs

424

) -> QueryIterator

425

```

426

427

```python { .api }

428

# Process large result sets efficiently

429

iterator = client.query_iterator(

430

"large_collection",

431

filter="status == 'active'",

432

output_fields=["id", "data", "timestamp"],

433

batch_size=2000

434

)

435

436

processed_count = 0

437

for batch in iterator:

438

print(f"Processing batch of {len(batch)} records")

439

440

# Process each record in the batch

441

for record in batch:

442

# Custom processing logic

443

process_record(record)

444

processed_count += 1

445

446

# Optional: limit processing

447

if processed_count >= 50000:

448

print("Reached processing limit")

449

break

450

451

print(f"Total processed: {processed_count} records")

452

```

453

454

### Data Export Patterns

455

456

```python { .api }

457

def export_collection_to_csv(collection_name: str, output_file: str, filter_expr: str = "", batch_size: int = 5000):

458

"""Export collection data to CSV file"""

459

460

import csv

461

462

# Get collection schema to determine fields

463

collection_info = client.describe_collection(collection_name)

464

field_names = [field['name'] for field in collection_info['schema']['fields']]

465

466

# Remove vector fields for CSV export (too large)

467

scalar_fields = [name for name in field_names if not name.endswith('_vector')]

468

469

with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:

470

writer = csv.DictWriter(csvfile, fieldnames=scalar_fields)

471

writer.writeheader()

472

473

# Use iterator for memory-efficient export

474

iterator = client.query_iterator(

475

collection_name,

476

filter=filter_expr or "id >= 0", # Get all if no filter

477

output_fields=scalar_fields,

478

batch_size=batch_size

479

)

480

481

total_exported = 0

482

for batch in iterator:

483

# Convert batch to CSV rows

484

for record in batch:

485

# Handle JSON fields by converting to string

486

csv_row = {}

487

for field in scalar_fields:

488

value = record.get(field)

489

if isinstance(value, (dict, list)):

490

csv_row[field] = json.dumps(value)

491

else:

492

csv_row[field] = value

493

494

writer.writerow(csv_row)

495

total_exported += 1

496

497

print(f"Exported {total_exported} records...")

498

499

print(f"Export completed: {total_exported} records saved to {output_file}")

500

501

# Export active products to CSV

502

export_collection_to_csv(

503

"products",

504

"active_products.csv",

505

filter_expr="status == 'active' and price > 0"

506

)

507

```

508

509

### Data Transformation Pipeline

510

511

```python { .api }

512

def data_migration_pipeline(source_collection: str, target_collection: str, transform_func):

513

"""Migrate data between collections with transformation"""

514

515

# Process in batches

516

iterator = client.query_iterator(

517

source_collection,

518

filter="id >= 0", # All records

519

batch_size=1000

520

)

521

522

migrated_count = 0

523

errors = []

524

525

for batch in iterator:

526

try:

527

# Transform batch data

528

transformed_batch = []

529

for record in batch:

530

transformed = transform_func(record)

531

if transformed: # Skip None results

532

transformed_batch.append(transformed)

533

534

# Insert into target collection

535

if transformed_batch:

536

result = client.insert(target_collection, transformed_batch)

537

migrated_count += result['insert_count']

538

print(f"Migrated {result['insert_count']} records, total: {migrated_count}")

539

540

except Exception as e:

541

error_msg = f"Error processing batch: {e}"

542

errors.append(error_msg)

543

print(error_msg)

544

545

return {"migrated": migrated_count, "errors": errors}

546

547

# Example transformation function

548

def modernize_document(old_record):

549

"""Transform old document format to new format"""

550

return {

551

"id": old_record["id"],

552

"title": old_record["title"],

553

"content": old_record["content"],

554

"vector": old_record["embedding"], # Rename field

555

"metadata": {

556

"category": old_record.get("category", "general"),

557

"created_at": old_record.get("timestamp", 0),

558

"migrated_at": int(time.time())

559

},

560

"status": "migrated"

561

}

562

563

# Run migration

564

result = data_migration_pipeline("old_documents", "new_documents", modernize_document)

565

print(f"Migration completed: {result}")

566

```

567

568

## Data Validation

569

570

### Insert Validation

571

572

```python { .api }

573

def validate_and_insert(collection_name: str, data: List[Dict], schema_info: Dict) -> Dict:

574

"""Validate data against collection schema before insertion"""

575

576

# Extract field information from schema

577

required_fields = set()

578

field_types = {}

579

vector_dims = {}

580

581

for field in schema_info['schema']['fields']:

582

field_name = field['name']

583

field_type = field['type']

584

585

if not field.get('autoID', False):

586

required_fields.add(field_name)

587

588

field_types[field_name] = field_type

589

590

if field_type in ['FloatVector', 'BinaryVector']:

591

vector_dims[field_name] = field.get('params', {}).get('dim', 0)

592

593

validated_data = []

594

errors = []

595

596

for i, record in enumerate(data):

597

record_errors = []

598

599

# Check required fields

600

missing_fields = required_fields - set(record.keys())

601

if missing_fields:

602

record_errors.append(f"Missing fields: {missing_fields}")

603

604

# Validate vector dimensions

605

for field_name, expected_dim in vector_dims.items():

606

if field_name in record:

607

vector = record[field_name]

608

if isinstance(vector, list) and len(vector) != expected_dim:

609

record_errors.append(f"{field_name} dimension mismatch: expected {expected_dim}, got {len(vector)}")

610

611

# Validate data types (basic validation)

612

for field_name, value in record.items():

613

if field_name in field_types:

614

field_type = field_types[field_name]

615

if field_type == 'VarChar' and not isinstance(value, str):

616

record_errors.append(f"{field_name} must be string, got {type(value)}")

617

elif field_type in ['Int64', 'Int32'] and not isinstance(value, int):

618

record_errors.append(f"{field_name} must be integer, got {type(value)}")

619

620

if record_errors:

621

errors.append(f"Record {i}: {'; '.join(record_errors)}")

622

else:

623

validated_data.append(record)

624

625

# Insert valid data

626

result = {"insert_count": 0, "errors": errors}

627

628

if validated_data:

629

insert_result = client.insert(collection_name, validated_data)

630

result["insert_count"] = insert_result["insert_count"]

631

632

return result

633

634

# Usage

635

collection_schema = client.describe_collection("products")

636

data_to_insert = [

637

{"id": 1, "name": "Product 1", "vector": [0.1] * 128},

638

{"id": 2, "vector": [0.2] * 64}, # Wrong dimension - will be flagged

639

]

640

641

validation_result = validate_and_insert("products", data_to_insert, collection_schema)

642

print(f"Inserted: {validation_result['insert_count']}")

643

if validation_result['errors']:

644

print("Validation errors:", validation_result['errors'])

645

```

646

647

PyMilvus data management operations provide robust capabilities for handling large-scale vector and scalar data with efficient batch processing, validation, and error handling mechanisms.