0
# Data Management
1
2
PyMilvus provides comprehensive data management capabilities including insertion, updates, deletion, and retrieval operations. This covers batch operations, data validation, transaction handling, and efficient iteration over large datasets.
3
4
## Data Insertion
5
6
### Basic Insert Operations
7
8
```python { .api }
9
from pymilvus import MilvusClient
10
11
client = MilvusClient()
12
13
def insert(
14
collection_name: str,
15
data: Union[List[Dict], pd.DataFrame],
16
partition_name: Optional[str] = None,
17
timeout: Optional[float] = None,
18
**kwargs
19
) -> Dict[str, Any]
20
```
21
22
**Parameters:**
23
- `data`: Data as list of dictionaries or pandas DataFrame
24
- `partition_name`: Target partition (optional)
25
- `timeout`: Operation timeout in seconds
26
- `**kwargs`: Additional insertion parameters
27
28
**Returns:** Dictionary with `insert_count` and `primary_keys` (if not auto_id)
29
30
### Insert Examples
31
32
```python { .api }
33
# Insert list of dictionaries
34
data = [
35
{
36
"id": 1,
37
"vector": [0.1, 0.2, 0.3] * 256,
38
"title": "First Document",
39
"metadata": {"category": "tech", "year": 2024}
40
},
41
{
42
"id": 2,
43
"vector": [0.4, 0.5, 0.6] * 256,
44
"title": "Second Document",
45
"metadata": {"category": "science", "year": 2024}
46
}
47
]
48
49
result = client.insert("documents", data)
50
print(f"Inserted {result['insert_count']} entities")
51
if 'primary_keys' in result:
52
print(f"Primary keys: {result['primary_keys']}")
53
54
# Insert pandas DataFrame
55
import pandas as pd
56
import numpy as np
57
58
df = pd.DataFrame({
59
"id": range(1000),
60
"vector": [np.random.rand(768).tolist() for _ in range(1000)],
61
"category": np.random.choice(["A", "B", "C"], 1000),
62
"score": np.random.rand(1000),
63
"active": np.random.choice([True, False], 1000)
64
})
65
66
result = client.insert("products", df)
67
print(f"Inserted {result['insert_count']} products from DataFrame")
68
69
# Insert into specific partition
70
seasonal_data = [
71
{"id": i, "vector": [0.1] * 128, "season": "winter"}
72
for i in range(100, 200)
73
]
74
75
client.insert(
76
"seasonal_collection",
77
seasonal_data,
78
partition_name="winter_2024"
79
)
80
```
81
82
### Auto-ID Collections
83
84
```python { .api }
85
# For collections with auto_id=True, omit primary key field
86
auto_id_data = [
87
{
88
"vector": [0.1, 0.2] * 384,
89
"content": "Auto-generated ID document",
90
"tags": ["auto", "generated"]
91
},
92
{
93
"vector": [0.3, 0.4] * 384,
94
"content": "Another auto-ID document",
95
"tags": ["automatic", "id"]
96
}
97
]
98
99
result = client.insert("auto_id_collection", auto_id_data)
100
print(f"Generated primary keys: {result['primary_keys']}")
101
```
102
103
### Large Batch Insertion
104
105
```python { .api }
106
def batch_insert_large_dataset(collection_name: str, data_generator, batch_size: int = 1000):
107
"""Insert large dataset in batches to manage memory"""
108
109
total_inserted = 0
110
batch = []
111
112
for record in data_generator():
113
batch.append(record)
114
115
if len(batch) >= batch_size:
116
result = client.insert(collection_name, batch)
117
total_inserted += result['insert_count']
118
print(f"Inserted batch: {result['insert_count']}, Total: {total_inserted}")
119
120
batch = [] # Clear batch
121
122
# Insert remaining records
123
if batch:
124
result = client.insert(collection_name, batch)
125
total_inserted += result['insert_count']
126
print(f"Final batch: {result['insert_count']}, Total: {total_inserted}")
127
128
return total_inserted
129
130
# Example generator for large dataset
131
def generate_documents(count: int):
132
"""Generator for memory-efficient data creation"""
133
for i in range(count):
134
yield {
135
"id": i,
136
"vector": np.random.rand(768).tolist(),
137
"title": f"Document {i}",
138
"content": f"Content for document {i}" * 50, # Larger text content
139
"metadata": {
140
"created_at": int(time.time()) - random.randint(0, 86400),
141
"category": random.choice(["tech", "science", "arts", "sports"]),
142
"priority": random.randint(1, 10)
143
}
144
}
145
146
# Use batch insertion
147
total = batch_insert_large_dataset("large_collection", lambda: generate_documents(100000), batch_size=2000)
148
print(f"Successfully inserted {total} documents")
149
```
150
151
## Data Updates (Upsert)
152
153
### Upsert Operations
154
155
```python { .api }
156
def upsert(
157
collection_name: str,
158
data: Union[List[Dict], pd.DataFrame],
159
partition_name: Optional[str] = None,
160
timeout: Optional[float] = None,
161
**kwargs
162
) -> Dict[str, Any]
163
```
164
165
Upsert performs insert-or-update based on primary key matching. If primary key exists, the entity is updated; otherwise, it's inserted.
166
167
```python { .api }
168
# Initial data
169
initial_data = [
170
{"id": 1, "vector": [0.1] * 768, "title": "Original Title", "status": "draft"},
171
{"id": 2, "vector": [0.2] * 768, "title": "Another Document", "status": "draft"}
172
]
173
client.insert("documents", initial_data)
174
175
# Upsert: update existing and insert new
176
upsert_data = [
177
{"id": 1, "vector": [0.15] * 768, "title": "Updated Title", "status": "published"}, # Update
178
{"id": 3, "vector": [0.3] * 768, "title": "New Document", "status": "draft"} # Insert
179
]
180
181
result = client.upsert("documents", upsert_data)
182
print(f"Upsert count: {result.get('upsert_count', 0)}")
183
184
# Verify changes
185
updated = client.query("documents", "id in [1, 3]", output_fields=["id", "title", "status"])
186
for doc in updated:
187
print(f"ID {doc['id']}: {doc['title']} - {doc['status']}")
188
```
189
190
### Conditional Upserts
191
192
```python { .api }
193
def conditional_upsert(collection_name: str, updates: List[Dict], condition_field: str):
194
"""Upsert only if condition is met"""
195
196
# Get existing entities
197
primary_keys = [update['id'] for update in updates]
198
existing = client.get(
199
collection_name,
200
ids=primary_keys,
201
output_fields=[condition_field, "id"]
202
)
203
204
# Create mapping of existing entities
205
existing_map = {entity['id']: entity for entity in existing}
206
207
# Filter updates based on conditions
208
valid_updates = []
209
for update in updates:
210
entity_id = update['id']
211
212
if entity_id in existing_map:
213
# Apply update condition (example: only update if timestamp is newer)
214
existing_timestamp = existing_map[entity_id].get('timestamp', 0)
215
new_timestamp = update.get('timestamp', 0)
216
217
if new_timestamp > existing_timestamp:
218
valid_updates.append(update)
219
else:
220
# New entity, always include
221
valid_updates.append(update)
222
223
if valid_updates:
224
return client.upsert(collection_name, valid_updates)
225
226
return {"upsert_count": 0}
227
```
228
229
## Data Deletion
230
231
### Delete by Primary Key
232
233
```python { .api }
234
def delete(
235
collection_name: str,
236
pks: Optional[Union[List, str, int]] = None,
237
filter: Optional[str] = None,
238
partition_name: Optional[str] = None,
239
timeout: Optional[float] = None,
240
**kwargs
241
) -> Dict[str, Any]
242
```
243
244
**Parameters:**
245
- `pks`: Primary key values (mutually exclusive with filter)
246
- `filter`: Boolean expression (mutually exclusive with pks)
247
- `partition_name`: Target partition
248
- `timeout`: Operation timeout
249
250
```python { .api }
251
# Delete by primary keys
252
result = client.delete("documents", pks=[1, 2, 3])
253
print(f"Deleted {result.get('delete_count', 0)} entities")
254
255
# Delete single entity
256
client.delete("products", pks=12345)
257
258
# Delete by string primary keys
259
client.delete("users", pks=["user_001", "user_002", "user_003"])
260
261
# Delete from specific partition
262
client.delete("logs", pks=[100, 101, 102], partition_name="old_logs")
263
```
264
265
### Delete by Expression
266
267
```python { .api }
268
# Delete by filter conditions
269
result = client.delete("products", filter="category == 'discontinued'")
270
print(f"Deleted {result['delete_count']} discontinued products")
271
272
# Delete old records
273
client.delete("events", filter="timestamp < 1640995200") # Before 2022-01-01
274
275
# Delete with complex conditions
276
client.delete(
277
"documents",
278
filter="status == 'draft' and created_at < 1577836800 and author == 'system'"
279
)
280
281
# Delete from specific partitions with conditions
282
client.delete(
283
"user_activity",
284
filter="action_type == 'login' and success == false",
285
partition_name="failed_attempts"
286
)
287
```
288
289
### Batch Deletion Patterns
290
291
```python { .api }
292
def safe_batch_delete(collection_name: str, delete_condition: str, batch_size: int = 1000):
293
"""Safely delete large numbers of entities in batches"""
294
295
total_deleted = 0
296
297
while True:
298
# Query entities matching delete condition
299
to_delete = client.query(
300
collection_name,
301
filter=delete_condition,
302
output_fields=["id"], # Only need primary keys
303
limit=batch_size
304
)
305
306
if not to_delete:
307
break # No more entities to delete
308
309
# Extract primary keys
310
pks = [entity['id'] for entity in to_delete]
311
312
# Delete batch
313
result = client.delete(collection_name, pks=pks)
314
deleted_count = result.get('delete_count', 0)
315
total_deleted += deleted_count
316
317
print(f"Deleted batch of {deleted_count} entities, total: {total_deleted}")
318
319
# If we deleted fewer than batch_size, we're done
320
if deleted_count < batch_size:
321
break
322
323
return total_deleted
324
325
# Example: Delete old inactive users
326
deleted_count = safe_batch_delete(
327
"users",
328
"last_login < 1609459200 and status == 'inactive'", # Before 2021-01-01
329
batch_size=500
330
)
331
print(f"Total deleted: {deleted_count} inactive users")
332
```
333
334
## Data Retrieval
335
336
### Get by Primary Key
337
338
```python { .api }
339
def get(
340
collection_name: str,
341
ids: Union[List, str, int],
342
output_fields: Optional[List[str]] = None,
343
partition_names: Optional[List[str]] = None,
344
timeout: Optional[float] = None
345
) -> List[Dict[str, Any]]
346
```
347
348
```python { .api }
349
# Get single entity
350
entity = client.get("documents", ids=1, output_fields=["id", "title", "content"])
351
if entity:
352
print(f"Document: {entity[0]['title']}")
353
354
# Get multiple entities
355
entities = client.get(
356
"products",
357
ids=[100, 101, 102],
358
output_fields=["id", "name", "price", "category"]
359
)
360
361
for product in entities:
362
print(f"{product['name']}: ${product['price']}")
363
364
# Get with string primary keys
365
user_profiles = client.get(
366
"users",
367
ids=["user_001", "user_002"],
368
output_fields=["user_id", "name", "email", "profile"]
369
)
370
371
# Get from specific partitions
372
recent_data = client.get(
373
"time_series",
374
ids=range(1000, 1100),
375
partition_names=["2024_q4"],
376
output_fields=["id", "timestamp", "value"]
377
)
378
```
379
380
### Error Handling for Retrieval
381
382
```python { .api }
383
def safe_get_entities(collection_name: str, ids: List, output_fields: List[str]) -> List[Dict]:
384
"""Safely retrieve entities with error handling"""
385
386
try:
387
entities = client.get(
388
collection_name,
389
ids=ids,
390
output_fields=output_fields
391
)
392
393
# Check if all requested entities were found
394
found_ids = {entity['id'] for entity in entities}
395
missing_ids = set(ids) - found_ids
396
397
if missing_ids:
398
print(f"Warning: {len(missing_ids)} entities not found: {list(missing_ids)[:5]}...")
399
400
return entities
401
402
except Exception as e:
403
print(f"Error retrieving entities: {e}")
404
return []
405
406
# Usage
407
products = safe_get_entities("products", [1, 2, 999999], ["id", "name", "price"])
408
```
409
410
## Data Iteration
411
412
### Query Iterator
413
414
```python { .api }
415
def query_iterator(
416
collection_name: str,
417
filter: str,
418
output_fields: Optional[List[str]] = None,
419
batch_size: int = 1000,
420
limit: Optional[int] = None,
421
partition_names: Optional[List[str]] = None,
422
timeout: Optional[float] = None,
423
**kwargs
424
) -> QueryIterator
425
```
426
427
```python { .api }
428
# Process large result sets efficiently
429
iterator = client.query_iterator(
430
"large_collection",
431
filter="status == 'active'",
432
output_fields=["id", "data", "timestamp"],
433
batch_size=2000
434
)
435
436
processed_count = 0
437
for batch in iterator:
438
print(f"Processing batch of {len(batch)} records")
439
440
# Process each record in the batch
441
for record in batch:
442
# Custom processing logic
443
process_record(record)
444
processed_count += 1
445
446
# Optional: limit processing
447
if processed_count >= 50000:
448
print("Reached processing limit")
449
break
450
451
print(f"Total processed: {processed_count} records")
452
```
453
454
### Data Export Patterns
455
456
```python { .api }
457
def export_collection_to_csv(collection_name: str, output_file: str, filter_expr: str = "", batch_size: int = 5000):
458
"""Export collection data to CSV file"""
459
460
import csv
461
462
# Get collection schema to determine fields
463
collection_info = client.describe_collection(collection_name)
464
field_names = [field['name'] for field in collection_info['schema']['fields']]
465
466
# Remove vector fields for CSV export (too large)
467
scalar_fields = [name for name in field_names if not name.endswith('_vector')]
468
469
with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
470
writer = csv.DictWriter(csvfile, fieldnames=scalar_fields)
471
writer.writeheader()
472
473
# Use iterator for memory-efficient export
474
iterator = client.query_iterator(
475
collection_name,
476
filter=filter_expr or "id >= 0", # Get all if no filter
477
output_fields=scalar_fields,
478
batch_size=batch_size
479
)
480
481
total_exported = 0
482
for batch in iterator:
483
# Convert batch to CSV rows
484
for record in batch:
485
# Handle JSON fields by converting to string
486
csv_row = {}
487
for field in scalar_fields:
488
value = record.get(field)
489
if isinstance(value, (dict, list)):
490
csv_row[field] = json.dumps(value)
491
else:
492
csv_row[field] = value
493
494
writer.writerow(csv_row)
495
total_exported += 1
496
497
print(f"Exported {total_exported} records...")
498
499
print(f"Export completed: {total_exported} records saved to {output_file}")
500
501
# Export active products to CSV
502
export_collection_to_csv(
503
"products",
504
"active_products.csv",
505
filter_expr="status == 'active' and price > 0"
506
)
507
```
508
509
### Data Transformation Pipeline
510
511
```python { .api }
512
def data_migration_pipeline(source_collection: str, target_collection: str, transform_func):
513
"""Migrate data between collections with transformation"""
514
515
# Process in batches
516
iterator = client.query_iterator(
517
source_collection,
518
filter="id >= 0", # All records
519
batch_size=1000
520
)
521
522
migrated_count = 0
523
errors = []
524
525
for batch in iterator:
526
try:
527
# Transform batch data
528
transformed_batch = []
529
for record in batch:
530
transformed = transform_func(record)
531
if transformed: # Skip None results
532
transformed_batch.append(transformed)
533
534
# Insert into target collection
535
if transformed_batch:
536
result = client.insert(target_collection, transformed_batch)
537
migrated_count += result['insert_count']
538
print(f"Migrated {result['insert_count']} records, total: {migrated_count}")
539
540
except Exception as e:
541
error_msg = f"Error processing batch: {e}"
542
errors.append(error_msg)
543
print(error_msg)
544
545
return {"migrated": migrated_count, "errors": errors}
546
547
# Example transformation function
548
def modernize_document(old_record):
549
"""Transform old document format to new format"""
550
return {
551
"id": old_record["id"],
552
"title": old_record["title"],
553
"content": old_record["content"],
554
"vector": old_record["embedding"], # Rename field
555
"metadata": {
556
"category": old_record.get("category", "general"),
557
"created_at": old_record.get("timestamp", 0),
558
"migrated_at": int(time.time())
559
},
560
"status": "migrated"
561
}
562
563
# Run migration
564
result = data_migration_pipeline("old_documents", "new_documents", modernize_document)
565
print(f"Migration completed: {result}")
566
```
567
568
## Data Validation
569
570
### Insert Validation
571
572
```python { .api }
573
def validate_and_insert(collection_name: str, data: List[Dict], schema_info: Dict) -> Dict:
574
"""Validate data against collection schema before insertion"""
575
576
# Extract field information from schema
577
required_fields = set()
578
field_types = {}
579
vector_dims = {}
580
581
for field in schema_info['schema']['fields']:
582
field_name = field['name']
583
field_type = field['type']
584
585
if not field.get('autoID', False):
586
required_fields.add(field_name)
587
588
field_types[field_name] = field_type
589
590
if field_type in ['FloatVector', 'BinaryVector']:
591
vector_dims[field_name] = field.get('params', {}).get('dim', 0)
592
593
validated_data = []
594
errors = []
595
596
for i, record in enumerate(data):
597
record_errors = []
598
599
# Check required fields
600
missing_fields = required_fields - set(record.keys())
601
if missing_fields:
602
record_errors.append(f"Missing fields: {missing_fields}")
603
604
# Validate vector dimensions
605
for field_name, expected_dim in vector_dims.items():
606
if field_name in record:
607
vector = record[field_name]
608
if isinstance(vector, list) and len(vector) != expected_dim:
609
record_errors.append(f"{field_name} dimension mismatch: expected {expected_dim}, got {len(vector)}")
610
611
# Validate data types (basic validation)
612
for field_name, value in record.items():
613
if field_name in field_types:
614
field_type = field_types[field_name]
615
if field_type == 'VarChar' and not isinstance(value, str):
616
record_errors.append(f"{field_name} must be string, got {type(value)}")
617
elif field_type in ['Int64', 'Int32'] and not isinstance(value, int):
618
record_errors.append(f"{field_name} must be integer, got {type(value)}")
619
620
if record_errors:
621
errors.append(f"Record {i}: {'; '.join(record_errors)}")
622
else:
623
validated_data.append(record)
624
625
# Insert valid data
626
result = {"insert_count": 0, "errors": errors}
627
628
if validated_data:
629
insert_result = client.insert(collection_name, validated_data)
630
result["insert_count"] = insert_result["insert_count"]
631
632
return result
633
634
# Usage
635
collection_schema = client.describe_collection("products")
636
data_to_insert = [
637
{"id": 1, "name": "Product 1", "vector": [0.1] * 128},
638
{"id": 2, "vector": [0.2] * 64}, # Wrong dimension - will be flagged
639
]
640
641
validation_result = validate_and_insert("products", data_to_insert, collection_schema)
642
print(f"Inserted: {validation_result['insert_count']}")
643
if validation_result['errors']:
644
print("Validation errors:", validation_result['errors'])
645
```
646
647
PyMilvus data management operations provide robust capabilities for handling large-scale vector and scalar data with efficient batch processing, validation, and error handling mechanisms.