0
# Batch Operations
1
2
High-performance batch transaction processing supporting multiple entity operations in single atomic transactions with comprehensive operation types and efficient bulk processing capabilities.
3
4
## Capabilities
5
6
### Transaction Operations
7
8
Execute multiple entity operations atomically within a single transaction for improved performance and data consistency.
9
10
```python { .api }
11
class TableClient:
12
def submit_transaction(
13
self,
14
operations: Iterable[Union[
15
Tuple[str, Union[TableEntity, Mapping[str, Any]]],
16
Tuple[str, Union[TableEntity, Mapping[str, Any]], Mapping[str, Any]]
17
]],
18
**kwargs
19
) -> List[Mapping[str, Any]]:
20
"""
21
Submit a batch of operations as a single atomic transaction.
22
23
Parameters:
24
- operations: Iterable of operation tuples in format:
25
- (operation_type, entity) for basic operations
26
- (operation_type, entity, options) for operations with additional parameters
27
28
Operation Types:
29
- "create": Insert new entity
30
- "update": Update existing entity
31
- "upsert": Insert or update entity
32
- "delete": Delete entity
33
34
Returns:
35
List of operation results with metadata
36
37
Constraints:
38
- All entities must have the same PartitionKey
39
- Maximum 100 operations per transaction
40
- All operations succeed or all fail (atomic)
41
- Total payload must be under 4MB
42
43
Raises:
44
TableTransactionError: If any operation fails, entire transaction is rolled back
45
"""
46
```
47
48
#### Usage Example
49
50
```python
51
from azure.data.tables import TableClient, UpdateMode, TableTransactionError
52
53
table_client = TableClient.from_connection_string(conn_str, "orders")
54
55
# Basic batch operations
56
operations = [
57
# Create new entities
58
("create", {
59
"PartitionKey": "2023-Q4",
60
"RowKey": "order-001",
61
"CustomerName": "John Doe",
62
"Total": 299.99,
63
"Status": "pending"
64
}),
65
66
("create", {
67
"PartitionKey": "2023-Q4",
68
"RowKey": "order-002",
69
"CustomerName": "Jane Smith",
70
"Total": 149.99,
71
"Status": "pending"
72
}),
73
74
# Update existing entity
75
("update", {
76
"PartitionKey": "2023-Q4",
77
"RowKey": "order-003",
78
"Status": "shipped",
79
"ShippedDate": "2023-12-15"
80
}),
81
82
# Upsert entity (insert or update)
83
("upsert", {
84
"PartitionKey": "2023-Q4",
85
"RowKey": "order-004",
86
"CustomerName": "Bob Wilson",
87
"Total": 75.50,
88
"Status": "completed"
89
}),
90
91
# Delete entity
92
("delete", {
93
"PartitionKey": "2023-Q4",
94
"RowKey": "order-005"
95
})
96
]
97
98
try:
99
results = table_client.submit_transaction(operations)
100
print(f"Successfully processed {len(results)} operations")
101
102
for i, result in enumerate(results):
103
if result: # Delete operations return None
104
print(f"Operation {i}: ETag = {result.get('etag', 'N/A')}")
105
106
except TableTransactionError as e:
107
print(f"Transaction failed at operation {e.index}: {e.message}")
108
print(f"Error code: {e.error_code}")
109
```
110
111
### Advanced Transaction Options
112
113
Use additional parameters for fine-grained control over batch operations.
114
115
```python { .api }
116
# Extended operation format with options
117
TransactionOperationType = Union[
118
Tuple[str, Union[TableEntity, Mapping[str, Any]]],
119
Tuple[str, Union[TableEntity, Mapping[str, Any]], Mapping[str, Any]]
120
]
121
```
122
123
#### Usage Example
124
125
```python
126
from azure.data.tables import TableClient, UpdateMode
127
from azure.core import MatchConditions
128
129
table_client = TableClient.from_connection_string(conn_str, "inventory")
130
131
# Advanced batch with options
132
advanced_operations = [
133
# Create with no additional options
134
("create", {
135
"PartitionKey": "electronics",
136
"RowKey": "item-001",
137
"Name": "Laptop",
138
"Quantity": 10,
139
"Price": 999.99
140
}),
141
142
# Update with REPLACE mode
143
("update", {
144
"PartitionKey": "electronics",
145
"RowKey": "item-002",
146
"Name": "Updated Tablet",
147
"Quantity": 5,
148
"Price": 299.99
149
}, {
150
"mode": UpdateMode.REPLACE
151
}),
152
153
# Update with optimistic concurrency
154
("update", {
155
"PartitionKey": "electronics",
156
"RowKey": "item-003",
157
"Quantity": 8,
158
"etag": "W/\"datetime'2023-12-15T10%3A30%3A00.123Z'\""
159
}, {
160
"mode": UpdateMode.MERGE,
161
"match_condition": MatchConditions.IfNotModified
162
}),
163
164
# Upsert with REPLACE mode
165
("upsert", {
166
"PartitionKey": "electronics",
167
"RowKey": "item-004",
168
"Name": "Wireless Mouse",
169
"Quantity": 25,
170
"Price": 49.99
171
}, {
172
"mode": UpdateMode.REPLACE
173
})
174
]
175
176
try:
177
results = table_client.submit_transaction(advanced_operations)
178
print("Advanced batch transaction completed successfully")
179
except TableTransactionError as e:
180
print(f"Advanced transaction failed: {e.message}")
181
```
182
183
### Transaction Operation Types
184
185
Comprehensive enumeration of supported batch operation types.
186
187
```python { .api }
188
class TransactionOperation(Enum):
189
"""
190
Batch transaction operation types.
191
192
Defines the available operations that can be performed
193
within a batch transaction.
194
"""
195
196
CREATE = "create" # Insert new entity (fails if exists)
197
UPDATE = "update" # Update existing entity (fails if not exists)
198
UPSERT = "upsert" # Insert or update entity
199
DELETE = "delete" # Delete entity (fails if not exists)
200
```
201
202
#### Usage Example
203
204
```python
205
from azure.data.tables import TableClient, TransactionOperation
206
207
table_client = TableClient.from_connection_string(conn_str, "products")
208
209
# Using enum values for type safety
210
operations = [
211
(TransactionOperation.CREATE, {
212
"PartitionKey": "category-a",
213
"RowKey": "prod-001",
214
"Name": "Product 1",
215
"Price": 19.99
216
}),
217
218
(TransactionOperation.UPSERT, {
219
"PartitionKey": "category-a",
220
"RowKey": "prod-002",
221
"Name": "Product 2",
222
"Price": 29.99
223
}),
224
225
(TransactionOperation.DELETE, {
226
"PartitionKey": "category-a",
227
"RowKey": "prod-003"
228
})
229
]
230
231
# Submit with enum-based operations
232
results = table_client.submit_transaction(operations)
233
```
234
235
### Update Modes
236
237
Control how entity updates are applied during batch operations.
238
239
```python { .api }
240
class UpdateMode(Enum):
241
"""
242
Entity update modes for controlling merge behavior.
243
"""
244
245
REPLACE = "replace" # Replace entire entity with new properties
246
MERGE = "merge" # Merge new properties with existing entity
247
```
248
249
#### Usage Example
250
251
```python
252
from azure.data.tables import TableClient, UpdateMode
253
254
table_client = TableClient.from_connection_string(conn_str, "customers")
255
256
# Existing entity:
257
# {
258
# "PartitionKey": "vip",
259
# "RowKey": "customer-001",
260
# "Name": "John Doe",
261
# "Email": "john@example.com",
262
# "Phone": "555-1234",
263
# "VipLevel": "Gold"
264
# }
265
266
operations_merge = [
267
# MERGE: Only updates specified properties, keeps others
268
("update", {
269
"PartitionKey": "vip",
270
"RowKey": "customer-001",
271
"Phone": "555-9999", # Updated
272
"LastContact": "2023-12-15" # Added
273
# Name, Email, VipLevel remain unchanged
274
}, {"mode": UpdateMode.MERGE})
275
]
276
277
operations_replace = [
278
# REPLACE: Replaces entire entity, unspecified properties are removed
279
("update", {
280
"PartitionKey": "vip",
281
"RowKey": "customer-001",
282
"Name": "John Doe",
283
"Email": "newemail@example.com",
284
"VipLevel": "Platinum"
285
# Phone property will be removed since not specified
286
}, {"mode": UpdateMode.REPLACE})
287
]
288
289
# Execute merge operation
290
table_client.submit_transaction(operations_merge)
291
print("Merge update completed - existing properties preserved")
292
293
# Execute replace operation
294
table_client.submit_transaction(operations_replace)
295
print("Replace update completed - entity fully replaced")
296
```
297
298
### Batch Processing Patterns
299
300
Common patterns for efficient batch processing of large datasets.
301
302
#### Chunked Batch Processing
303
304
```python
305
from azure.data.tables import TableClient
306
from typing import List, Dict, Any
307
308
def process_entities_in_batches(
309
table_client: TableClient,
310
entities: List[Dict[str, Any]],
311
operation_type: str = "create",
312
batch_size: int = 100
313
):
314
"""
315
Process large number of entities in optimally-sized batches.
316
317
Automatically groups entities by PartitionKey and processes
318
in batches respecting Azure Tables constraints.
319
"""
320
321
# Group entities by PartitionKey (required for batching)
322
partition_groups = {}
323
for entity in entities:
324
partition_key = entity["PartitionKey"]
325
if partition_key not in partition_groups:
326
partition_groups[partition_key] = []
327
partition_groups[partition_key].append(entity)
328
329
results = []
330
total_processed = 0
331
332
for partition_key, partition_entities in partition_groups.items():
333
print(f"Processing partition '{partition_key}': {len(partition_entities)} entities")
334
335
# Process partition in batches
336
for i in range(0, len(partition_entities), batch_size):
337
batch = partition_entities[i:i + batch_size]
338
339
operations = [(operation_type, entity) for entity in batch]
340
341
try:
342
batch_results = table_client.submit_transaction(operations)
343
results.extend(batch_results)
344
total_processed += len(batch)
345
346
print(f" Batch {i//batch_size + 1}: {len(batch)} entities processed")
347
348
except Exception as e:
349
print(f" Batch {i//batch_size + 1} failed: {e}")
350
# Handle individual entities or skip batch
351
continue
352
353
print(f"Total processed: {total_processed}/{len(entities)} entities")
354
return results
355
356
# Usage example
357
entities_to_create = [
358
{"PartitionKey": "2023-Q4", "RowKey": f"order-{i:04d}", "Amount": i * 10.0}
359
for i in range(1, 501) # 500 entities across potentially multiple partitions
360
]
361
362
results = process_entities_in_batches(
363
table_client,
364
entities_to_create,
365
operation_type="create"
366
)
367
```
368
369
#### Mixed Operation Batches
370
371
```python
372
from azure.data.tables import TableClient
373
from datetime import datetime
374
375
def process_order_batch(table_client: TableClient, order_updates: Dict):
376
"""
377
Process complex order updates with mixed operations.
378
379
Example: New orders, status updates, cancellations in single transaction.
380
"""
381
382
partition_key = f"orders-{datetime.now().strftime('%Y-%m')}"
383
384
operations = []
385
386
# Add new orders
387
for order_id, order_data in order_updates.get("new_orders", {}).items():
388
operations.append(("create", {
389
"PartitionKey": partition_key,
390
"RowKey": order_id,
391
**order_data,
392
"CreatedAt": datetime.utcnow().isoformat(),
393
"Status": "pending"
394
}))
395
396
# Update existing orders
397
for order_id, updates in order_updates.get("order_updates", {}).items():
398
operations.append(("update", {
399
"PartitionKey": partition_key,
400
"RowKey": order_id,
401
**updates,
402
"ModifiedAt": datetime.utcnow().isoformat()
403
}))
404
405
# Cancel orders (soft delete by status update)
406
for order_id in order_updates.get("cancelled_orders", []):
407
operations.append(("update", {
408
"PartitionKey": partition_key,
409
"RowKey": order_id,
410
"Status": "cancelled",
411
"CancelledAt": datetime.utcnow().isoformat()
412
}))
413
414
# Hard delete orders
415
for order_id in order_updates.get("deleted_orders", []):
416
operations.append(("delete", {
417
"PartitionKey": partition_key,
418
"RowKey": order_id
419
}))
420
421
if not operations:
422
print("No operations to process")
423
return []
424
425
try:
426
results = table_client.submit_transaction(operations)
427
print(f"Successfully processed {len(operations)} order operations")
428
return results
429
430
except Exception as e:
431
print(f"Order batch processing failed: {e}")
432
raise
433
434
# Usage
435
order_updates = {
436
"new_orders": {
437
"ORD-001": {"CustomerName": "Alice", "Total": 199.99},
438
"ORD-002": {"CustomerName": "Bob", "Total": 299.99}
439
},
440
"order_updates": {
441
"ORD-003": {"Status": "shipped", "TrackingNumber": "TRK123"},
442
"ORD-004": {"Status": "delivered", "DeliveredAt": "2023-12-15T14:30:00"}
443
},
444
"cancelled_orders": ["ORD-005"],
445
"deleted_orders": ["ORD-006"]
446
}
447
448
process_order_batch(table_client, order_updates)
449
```
450
451
### Performance Optimization
452
453
Best practices for maximizing batch operation performance.
454
455
#### Batch Size Optimization
456
457
```python
458
import time
459
from azure.data.tables import TableClient
460
461
def find_optimal_batch_size(table_client: TableClient, sample_entities: List[Dict]):
462
"""
463
Determine optimal batch size based on entity size and performance.
464
"""
465
466
test_sizes = [1, 10, 25, 50, 75, 100] # Azure Tables max is 100
467
performance_data = {}
468
469
for batch_size in test_sizes:
470
if len(sample_entities) < batch_size:
471
continue
472
473
# Test batch performance
474
test_entities = sample_entities[:batch_size]
475
operations = [("create", entity) for entity in test_entities]
476
477
start_time = time.time()
478
try:
479
table_client.submit_transaction(operations)
480
elapsed = time.time() - start_time
481
482
# Calculate throughput
483
throughput = batch_size / elapsed
484
performance_data[batch_size] = {
485
"elapsed": elapsed,
486
"throughput": throughput
487
}
488
489
print(f"Batch size {batch_size}: {elapsed:.3f}s, {throughput:.1f} entities/sec")
490
491
# Clean up test entities
492
cleanup_operations = [("delete", entity) for entity in test_entities]
493
table_client.submit_transaction(cleanup_operations)
494
495
except Exception as e:
496
print(f"Batch size {batch_size} failed: {e}")
497
performance_data[batch_size] = {"error": str(e)}
498
499
# Find optimal size
500
optimal_size = max(
501
(size for size, data in performance_data.items() if "throughput" in data),
502
key=lambda size: performance_data[size]["throughput"]
503
)
504
505
print(f"Optimal batch size: {optimal_size}")
506
return optimal_size
507
```
508
509
#### Parallel Batch Processing
510
511
```python
512
import asyncio
513
from concurrent.futures import ThreadPoolExecutor
514
from azure.data.tables import TableClient
515
from azure.data.tables.aio import TableClient as AsyncTableClient
516
517
async def parallel_batch_processing(
518
table_clients: List[AsyncTableClient],
519
entity_groups: List[List[Dict[str, Any]]]
520
):
521
"""
522
Process multiple batches in parallel using async clients.
523
524
Each batch must contain entities with the same PartitionKey.
525
"""
526
527
async def process_batch(client: AsyncTableClient, entities: List[Dict]):
528
operations = [("create", entity) for entity in entities]
529
return await client.submit_transaction(operations)
530
531
# Create tasks for parallel processing
532
tasks = [
533
process_batch(client, entities)
534
for client, entities in zip(table_clients, entity_groups)
535
]
536
537
# Execute all batches concurrently
538
results = await asyncio.gather(*tasks, return_exceptions=True)
539
540
# Process results
541
successful_batches = 0
542
for i, result in enumerate(results):
543
if isinstance(result, Exception):
544
print(f"Batch {i} failed: {result}")
545
else:
546
successful_batches += 1
547
print(f"Batch {i} completed: {len(result)} entities")
548
549
print(f"Parallel processing completed: {successful_batches}/{len(tasks)} batches successful")
550
return results
551
552
# Usage with thread pool for synchronous clients
553
def parallel_sync_batches(table_client: TableClient, entity_groups: List[List[Dict]]):
554
"""Process batches in parallel using thread pool."""
555
556
def process_sync_batch(entities):
557
operations = [("create", entity) for entity in entities]
558
return table_client.submit_transaction(operations)
559
560
with ThreadPoolExecutor(max_workers=5) as executor:
561
futures = [
562
executor.submit(process_sync_batch, entities)
563
for entities in entity_groups
564
]
565
566
results = []
567
for i, future in enumerate(futures):
568
try:
569
result = future.result(timeout=30)
570
results.append(result)
571
print(f"Batch {i} completed successfully")
572
except Exception as e:
573
print(f"Batch {i} failed: {e}")
574
results.append(None)
575
576
return results
577
578
# Example usage
579
entity_groups = [
580
[{"PartitionKey": "group1", "RowKey": f"item{i}", "Value": i} for i in range(50)],
581
[{"PartitionKey": "group2", "RowKey": f"item{i}", "Value": i} for i in range(50)],
582
[{"PartitionKey": "group3", "RowKey": f"item{i}", "Value": i} for i in range(50)]
583
]
584
585
results = parallel_sync_batches(table_client, entity_groups)
586
```
587
588
## Transaction Constraints and Limitations
589
590
```python { .api }
591
# Azure Tables batch transaction constraints:
592
#
593
# 1. Same PartitionKey: All entities in a batch must have the same PartitionKey
594
# 2. Maximum Operations: Up to 100 operations per batch
595
# 3. Payload Size: Total request payload must be under 4MB
596
# 4. Atomicity: All operations succeed or all fail
597
# 5. Operation Types: create, update, upsert, delete
598
# 6. No Query Operations: Batch transactions don't support query operations
599
# 7. No Cross-Partition: Cannot batch across different partitions
600
```
601
602
#### Constraint Validation
603
604
```python
605
from azure.data.tables import TableClient
606
607
def validate_batch_constraints(operations):
608
"""Validate batch operations against Azure Tables constraints."""
609
610
if len(operations) > 100:
611
raise ValueError(f"Too many operations: {len(operations)} (max 100)")
612
613
if not operations:
614
raise ValueError("No operations provided")
615
616
# Check same partition key constraint
617
partition_keys = set()
618
for operation in operations:
619
operation_type, entity = operation[:2]
620
partition_key = entity.get("PartitionKey")
621
622
if not partition_key:
623
raise ValueError("All entities must have PartitionKey")
624
625
partition_keys.add(partition_key)
626
627
if len(partition_keys) > 1:
628
raise ValueError(f"Multiple partition keys not allowed: {partition_keys}")
629
630
# Estimate payload size (rough approximation)
631
estimated_size = sum(
632
len(str(operation[1])) for operation in operations
633
)
634
635
if estimated_size > 4 * 1024 * 1024: # 4MB
636
print(f"Warning: Estimated payload size {estimated_size} bytes may exceed 4MB limit")
637
638
print(f"Batch validation passed: {len(operations)} operations, partition '{list(partition_keys)[0]}'")
639
return True
640
641
# Usage
642
operations = [
643
("create", {"PartitionKey": "test", "RowKey": f"item{i}", "Data": f"value{i}"})
644
for i in range(50)
645
]
646
647
validate_batch_constraints(operations)
648
table_client.submit_transaction(operations)
649
```
650
651
## Type Aliases
652
653
Important type aliases used throughout batch operations for type safety and flexibility.
654
655
```python { .api }
656
# Entity representation types
657
EntityType = Union[TableEntity, Mapping[str, Any]]
658
"""Entity can be TableEntity or any mapping (dict-like) object with PartitionKey and RowKey"""
659
660
# Operation type specifications
661
OperationType = Union[TransactionOperation, str]
662
"""Operation type can be TransactionOperation enum value or string"""
663
664
# Complete transaction operation specification
665
TransactionOperationType = Union[
666
Tuple[OperationType, EntityType],
667
Tuple[OperationType, EntityType, Mapping[str, Any]]
668
]
669
"""
670
Transaction operation specification supporting:
671
- (operation_type, entity) for basic operations
672
- (operation_type, entity, options) for operations with additional parameters
673
"""
674
```