0
# Async Client Operations - Asynchronous Programming
1
2
The Azure Storage File Share SDK provides async versions of all clients for high-performance, non-blocking operations. These clients use Python's `async`/`await` syntax and are ideal for I/O-bound workloads and concurrent operations.
3
4
## Import and Core Types
5
6
```python { .api }
7
from azure.storage.fileshare.aio import (
8
ShareServiceClient,
9
ShareClient,
10
ShareDirectoryClient,
11
ShareFileClient,
12
ShareLeaseClient
13
)
14
from azure.core.credentials import AzureNamedKeyCredential
15
from typing import Optional, Union, Dict, Any, List, AsyncIterator
16
import asyncio
17
```
18
19
## AsyncShareServiceClient
20
21
```python { .api }
22
class ShareServiceClient:
23
def __init__(
24
self,
25
account_url: str,
26
credential: Optional[Union[str, Dict[str, str], AzureNamedKeyCredential, AzureSasCredential, TokenCredential]] = None,
27
*,
28
token_intent: Optional[Literal['backup']] = None,
29
**kwargs: Any
30
) -> None:
31
"""
32
Async version of ShareServiceClient for account-level operations.
33
34
Parameters:
35
account_url: The URL to the file service endpoint
36
credential: Authentication credential
37
token_intent: Specifies the intent for all requests when using TokenCredential authentication
38
**kwargs: Additional client configuration options
39
"""
40
41
@classmethod
42
def from_connection_string(
43
cls,
44
conn_str: str,
45
credential: Optional[Union[str, Dict[str, str], AzureNamedKeyCredential, AzureSasCredential, TokenCredential]] = None,
46
**kwargs: Any
47
) -> ShareServiceClient:
48
"""Create async ShareServiceClient from connection string."""
49
50
async def get_service_properties(self, **kwargs: Any) -> Dict[str, Any]:
51
"""Async version of get_service_properties."""
52
53
async def set_service_properties(self, **kwargs: Any) -> None:
54
"""Async version of set_service_properties."""
55
56
def list_shares(self, **kwargs: Any) -> AsyncItemPaged[ShareProperties]:
57
"""Returns async auto-paging iterable of ShareProperties."""
58
59
async def create_share(self, share_name: str, **kwargs: Any) -> ShareClient:
60
"""Async version of create_share."""
61
62
async def delete_share(self, share_name: Union[ShareProperties, str], **kwargs: Any) -> None:
63
"""Async version of delete_share."""
64
65
async def undelete_share(self, deleted_share_name: str, deleted_share_version: str, **kwargs: Any) -> ShareClient:
66
"""Async version of undelete_share."""
67
68
async def close(self) -> None:
69
"""Close the async client and release resources."""
70
71
async def __aenter__(self) -> ShareServiceClient:
72
"""Async context manager entry."""
73
return self
74
75
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
76
"""Async context manager exit."""
77
await self.close()
78
```
79
80
## AsyncShareClient
81
82
```python { .api }
83
class ShareClient:
84
def __init__(
85
self,
86
account_url: str,
87
share_name: str,
88
snapshot: Optional[str] = None,
89
credential: Optional[Union[str, Dict[str, str], AzureNamedKeyCredential, AzureSasCredential, TokenCredential]] = None,
90
**kwargs: Any
91
) -> None:
92
"""Async version of ShareClient for share-level operations."""
93
94
@classmethod
95
def from_share_url(cls, share_url: str, **kwargs: Any) -> ShareClient:
96
"""Create async ShareClient from share URL."""
97
98
@classmethod
99
def from_connection_string(cls, conn_str: str, share_name: str, **kwargs: Any) -> ShareClient:
100
"""Create async ShareClient from connection string."""
101
102
async def create_share(self, **kwargs: Any) -> Dict[str, Any]:
103
"""Async version of create_share."""
104
105
async def delete_share(self, **kwargs: Any) -> None:
106
"""Async version of delete_share."""
107
108
async def create_snapshot(self, **kwargs: Any) -> Dict[str, Any]:
109
"""Async version of create_snapshot."""
110
111
async def get_share_properties(self, **kwargs: Any) -> ShareProperties:
112
"""Async version of get_share_properties."""
113
114
async def set_share_quota(self, quota: int, **kwargs: Any) -> Dict[str, Any]:
115
"""Async version of set_share_quota."""
116
117
async def set_share_properties(self, **kwargs: Any) -> Dict[str, Any]:
118
"""Async version of set_share_properties."""
119
120
async def set_share_metadata(self, metadata: Dict[str, str], **kwargs: Any) -> Dict[str, Any]:
121
"""Async version of set_share_metadata."""
122
123
async def get_share_access_policy(self, **kwargs: Any) -> Dict[str, Any]:
124
"""Async version of get_share_access_policy."""
125
126
async def set_share_access_policy(self, signed_identifiers: Dict[str, AccessPolicy], **kwargs: Any) -> Dict[str, Any]:
127
"""Async version of set_share_access_policy."""
128
129
async def get_share_stats(self, **kwargs: Any) -> int:
130
"""Async version of get_share_stats."""
131
132
def list_directories_and_files(self, **kwargs: Any) -> AsyncItemPaged[Dict[str, Any]]:
133
"""Returns async auto-paging iterable of directory/file info."""
134
135
async def create_directory(self, directory_name: str, **kwargs: Any) -> ShareDirectoryClient:
136
"""Async version of create_directory."""
137
138
async def delete_directory(self, directory_name: str, **kwargs: Any) -> None:
139
"""Async version of delete_directory."""
140
141
async def create_permission_for_share(self, file_permission: str, **kwargs: Any) -> Optional[str]:
142
"""Async version of create_permission_for_share."""
143
144
async def get_permission_for_share(self, permission_key: str, **kwargs: Any) -> str:
145
"""Async version of get_permission_for_share."""
146
147
async def acquire_lease(self, **kwargs: Any) -> ShareLeaseClient:
148
"""Async version of acquire_lease."""
149
150
async def close(self) -> None:
151
"""Close the async client and release resources."""
152
153
async def __aenter__(self) -> ShareClient:
154
"""Async context manager entry."""
155
return self
156
157
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
158
"""Async context manager exit."""
159
await self.close()
160
```
161
162
## AsyncShareDirectoryClient
163
164
```python { .api }
165
class ShareDirectoryClient:
166
def __init__(
167
self,
168
account_url: str,
169
share_name: str,
170
directory_path: str,
171
**kwargs: Any
172
) -> None:
173
"""Async version of ShareDirectoryClient for directory operations."""
174
175
@classmethod
176
def from_directory_url(cls, directory_url: str, **kwargs: Any) -> ShareDirectoryClient:
177
"""Create async ShareDirectoryClient from directory URL."""
178
179
@classmethod
180
def from_connection_string(cls, conn_str: str, share_name: str, directory_path: str, **kwargs: Any) -> ShareDirectoryClient:
181
"""Create async ShareDirectoryClient from connection string."""
182
183
async def exists(self, **kwargs: Any) -> bool:
184
"""Async version of exists."""
185
186
async def create_directory(self, **kwargs: Any) -> Dict[str, Any]:
187
"""Async version of create_directory."""
188
189
async def delete_directory(self, **kwargs: Any) -> None:
190
"""Async version of delete_directory."""
191
192
async def rename_directory(self, new_name: str, **kwargs: Any) -> ShareDirectoryClient:
193
"""Async version of rename_directory."""
194
195
def list_directories_and_files(self, **kwargs: Any) -> AsyncItemPaged[Dict[str, Any]]:
196
"""Returns async auto-paging iterable of directory contents."""
197
198
def list_handles(self, **kwargs: Any) -> AsyncItemPaged[Handle]:
199
"""Returns async auto-paging iterable of handles."""
200
201
async def close_handle(self, handle: Union[str, Handle], **kwargs: Any) -> Dict[str, int]:
202
"""Async version of close_handle."""
203
204
async def close_all_handles(self, **kwargs: Any) -> Dict[str, int]:
205
"""Async version of close_all_handles."""
206
207
async def get_directory_properties(self, **kwargs: Any) -> DirectoryProperties:
208
"""Async version of get_directory_properties."""
209
210
async def set_directory_metadata(self, metadata: Dict[str, Any], **kwargs: Any) -> Dict[str, Any]:
211
"""Async version of set_directory_metadata."""
212
213
async def set_http_headers(self, **kwargs: Any) -> Dict[str, Any]:
214
"""Async version of set_http_headers."""
215
216
async def create_subdirectory(self, directory_name: str, **kwargs: Any) -> ShareDirectoryClient:
217
"""Async version of create_subdirectory."""
218
219
async def delete_subdirectory(self, directory_name: str, **kwargs: Any) -> None:
220
"""Async version of delete_subdirectory."""
221
222
async def upload_file(self, file_name: str, data: Union[bytes, str, Iterable[AnyStr], IO[AnyStr]], **kwargs: Any) -> ShareFileClient:
223
"""Async version of upload_file."""
224
225
async def delete_file(self, file_name: str, **kwargs: Any) -> None:
226
"""Async version of delete_file."""
227
228
async def close(self) -> None:
229
"""Close the async client and release resources."""
230
231
async def __aenter__(self) -> ShareDirectoryClient:
232
"""Async context manager entry."""
233
return self
234
235
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
236
"""Async context manager exit."""
237
await self.close()
238
```
239
240
## AsyncShareFileClient
241
242
```python { .api }
243
class ShareFileClient:
244
def __init__(
245
self,
246
account_url: str,
247
share_name: str,
248
file_path: str,
249
**kwargs: Any
250
) -> None:
251
"""Async version of ShareFileClient for file operations."""
252
253
@classmethod
254
def from_file_url(cls, file_url: str, **kwargs: Any) -> ShareFileClient:
255
"""Create async ShareFileClient from file URL."""
256
257
@classmethod
258
def from_connection_string(cls, conn_str: str, share_name: str, file_path: str, **kwargs: Any) -> ShareFileClient:
259
"""Create async ShareFileClient from connection string."""
260
261
async def exists(self, **kwargs: Any) -> bool:
262
"""Async version of exists."""
263
264
async def create_file(self, size: int, **kwargs: Any) -> Dict[str, Any]:
265
"""Async version of create_file."""
266
267
async def delete_file(self, **kwargs: Any) -> None:
268
"""Async version of delete_file."""
269
270
async def rename_file(self, new_name: str, **kwargs: Any) -> ShareFileClient:
271
"""Async version of rename_file."""
272
273
async def upload_file(self, data: Union[bytes, str, Iterable[AnyStr], IO[AnyStr]], **kwargs: Any) -> Dict[str, Any]:
274
"""Async version of upload_file."""
275
276
async def upload_range(self, data: bytes, offset: int, length: int, **kwargs: Any) -> Dict[str, Any]:
277
"""Async version of upload_range."""
278
279
async def upload_range_from_url(self, source_url: str, offset: int, length: int, source_offset: int, **kwargs: Any) -> Dict[str, Any]:
280
"""Async version of upload_range_from_url."""
281
282
async def download_file(self, **kwargs: Any) -> StorageStreamDownloader:
283
"""Async version of download_file."""
284
285
async def start_copy_from_url(self, source_url: str, **kwargs: Any) -> Dict[str, Any]:
286
"""Async version of start_copy_from_url."""
287
288
async def abort_copy(self, copy_id: Union[str, FileProperties], **kwargs: Any) -> None:
289
"""Async version of abort_copy."""
290
291
async def get_file_properties(self, **kwargs: Any) -> FileProperties:
292
"""Async version of get_file_properties."""
293
294
async def set_http_headers(self, **kwargs: Any) -> Dict[str, Any]:
295
"""Async version of set_http_headers."""
296
297
async def set_file_metadata(self, **kwargs: Any) -> Dict[str, Any]:
298
"""Async version of set_file_metadata."""
299
300
async def get_ranges(self, **kwargs: Any) -> List[Dict[str, int]]:
301
"""Async version of get_ranges."""
302
303
async def get_ranges_diff(self, previous_sharesnapshot: Union[str, Dict[str, Any]], **kwargs: Any) -> Tuple[List[Dict[str, int]], List[Dict[str, int]]]:
304
"""Async version of get_ranges_diff."""
305
306
async def clear_range(self, offset: int, length: int, **kwargs: Any) -> Dict[str, Any]:
307
"""Async version of clear_range."""
308
309
async def resize_file(self, size: int, **kwargs: Any) -> Dict[str, Any]:
310
"""Async version of resize_file."""
311
312
def list_handles(self, **kwargs: Any) -> AsyncItemPaged[Handle]:
313
"""Returns async auto-paging iterable of handles."""
314
315
async def close_handle(self, handle: Union[str, Handle], **kwargs: Any) -> Dict[str, int]:
316
"""Async version of close_handle."""
317
318
async def close_all_handles(self, **kwargs: Any) -> Dict[str, int]:
319
"""Async version of close_all_handles."""
320
321
async def create_hardlink(self, new_name: str, **kwargs: Any) -> ShareFileClient:
322
"""Async version of create_hardlink."""
323
324
async def create_symlink(self, target_path: str, **kwargs: Any) -> ShareFileClient:
325
"""Async version of create_symlink."""
326
327
async def get_symlink(self, **kwargs: Any) -> str:
328
"""Async version of get_symlink."""
329
330
async def acquire_lease(self, **kwargs: Any) -> ShareLeaseClient:
331
"""Async version of acquire_lease."""
332
333
async def close(self) -> None:
334
"""Close the async client and release resources."""
335
336
async def __aenter__(self) -> ShareFileClient:
337
"""Async context manager entry."""
338
return self
339
340
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
341
"""Async context manager exit."""
342
await self.close()
343
```
344
345
## AsyncShareLeaseClient
346
347
```python { .api }
348
class ShareLeaseClient:
349
def __init__(
350
self,
351
client: Union[ShareClient, ShareFileClient],
352
lease_id: Optional[str] = None
353
) -> None:
354
"""Async version of ShareLeaseClient for lease management."""
355
356
async def acquire(self, **kwargs: Any) -> None:
357
"""Async version of acquire."""
358
359
async def renew(self, **kwargs: Any) -> None:
360
"""Async version of renew."""
361
362
async def release(self, **kwargs: Any) -> None:
363
"""Async version of release."""
364
365
async def change(self, proposed_lease_id: str, **kwargs: Any) -> None:
366
"""Async version of change."""
367
368
async def break_lease(self, **kwargs: Any) -> int:
369
"""Async version of break_lease."""
370
371
async def __aenter__(self) -> ShareLeaseClient:
372
"""Async context manager entry."""
373
await self.acquire()
374
return self
375
376
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
377
"""Async context manager exit."""
378
await self.release()
379
```
380
381
## Usage Examples
382
383
### Basic Async Operations
384
385
```python { .api }
386
import asyncio
387
from azure.storage.fileshare.aio import ShareServiceClient, ShareFileClient
388
from azure.core.credentials import AzureNamedKeyCredential
389
390
async def basic_async_operations():
391
"""Demonstrate basic async operations."""
392
393
# Initialize async service client
394
credential = AzureNamedKeyCredential("myaccount", "mykey")
395
396
async with ShareServiceClient(
397
account_url="https://myaccount.file.core.windows.net",
398
credential=credential
399
) as service_client:
400
401
# Create share asynchronously
402
share_client = await service_client.create_share("async-share", quota=10)
403
print("Share created asynchronously")
404
405
# Get file client
406
file_client = share_client.get_file_client("test.txt")
407
408
# Upload file asynchronously
409
await file_client.upload_file(
410
data=b"Hello from async operation!",
411
overwrite=True
412
)
413
print("File uploaded asynchronously")
414
415
# Download file asynchronously
416
download_stream = await file_client.download_file()
417
content = await download_stream.readall()
418
print(f"Downloaded content: {content.decode()}")
419
420
# Get file properties asynchronously
421
properties = await file_client.get_file_properties()
422
print(f"File size: {properties.size} bytes")
423
424
# Run the async function
425
asyncio.run(basic_async_operations())
426
```
427
428
### Concurrent Operations
429
430
```python { .api }
431
async def concurrent_file_operations():
432
"""Demonstrate concurrent file operations."""
433
434
credential = AzureNamedKeyCredential("myaccount", "mykey")
435
436
async with ShareServiceClient(
437
account_url="https://myaccount.file.core.windows.net",
438
credential=credential
439
) as service_client:
440
441
share_client = await service_client.create_share("concurrent-demo", quota=10)
442
443
# Create multiple file clients
444
file_clients = [
445
share_client.get_file_client(f"file_{i}.txt")
446
for i in range(10)
447
]
448
449
# Upload files concurrently
450
upload_tasks = [
451
client.upload_file(data=f"Content for file {i}".encode(), overwrite=True)
452
for i, client in enumerate(file_clients)
453
]
454
455
results = await asyncio.gather(*upload_tasks, return_exceptions=True)
456
successful_uploads = sum(1 for r in results if not isinstance(r, Exception))
457
print(f"Successfully uploaded {successful_uploads} files concurrently")
458
459
# Download files concurrently
460
download_tasks = [
461
client.download_file()
462
for client in file_clients
463
]
464
465
download_streams = await asyncio.gather(*download_tasks)
466
467
# Read content from streams concurrently
468
read_tasks = [stream.readall() for stream in download_streams]
469
contents = await asyncio.gather(*read_tasks)
470
471
print(f"Downloaded {len(contents)} files concurrently")
472
for i, content in enumerate(contents):
473
print(f"File {i}: {content.decode()}")
474
475
asyncio.run(concurrent_file_operations())
476
```
477
478
### Async Pagination
479
480
```python { .api }
481
async def async_pagination_demo():
482
"""Demonstrate async pagination with large lists."""
483
484
credential = AzureNamedKeyCredential("myaccount", "mykey")
485
486
async with ShareServiceClient(
487
account_url="https://myaccount.file.core.windows.net",
488
credential=credential
489
) as service_client:
490
491
# List shares asynchronously
492
print("Listing shares:")
493
share_count = 0
494
async for share in service_client.list_shares(include_metadata=True):
495
print(f" Share: {share.name}, Quota: {share.quota}GB")
496
share_count += 1
497
498
print(f"Total shares: {share_count}")
499
500
# Get a share and list its contents
501
if share_count > 0:
502
share_client = service_client.get_share_client("documents") # Assuming it exists
503
504
print("\nListing share contents:")
505
item_count = 0
506
async for item in share_client.list_directories_and_files():
507
item_type = "Directory" if item.get('is_directory') else "File"
508
print(f" {item_type}: {item['name']}")
509
item_count += 1
510
511
print(f"Total items: {item_count}")
512
513
asyncio.run(async_pagination_demo())
514
```
515
516
### Async File Processing Pipeline
517
518
```python { .api }
519
import aiofiles
520
from pathlib import Path
521
522
async def file_processing_pipeline():
523
"""Demonstrate async file processing pipeline."""
524
525
credential = AzureNamedKeyCredential("myaccount", "mykey")
526
527
async with ShareServiceClient(
528
account_url="https://myaccount.file.core.windows.net",
529
credential=credential
530
) as service_client:
531
532
share_client = await service_client.create_share("processing-pipeline", quota=50)
533
534
# Stage 1: Upload multiple files asynchronously
535
local_files = ["file1.txt", "file2.txt", "file3.txt"]
536
upload_tasks = []
537
538
for filename in local_files:
539
# Create local file content
540
content = f"Processing content for {filename}"
541
542
file_client = share_client.get_file_client(f"input/{filename}")
543
task = file_client.upload_file(data=content.encode(), overwrite=True)
544
upload_tasks.append(task)
545
546
await asyncio.gather(*upload_tasks)
547
print("Stage 1: All files uploaded")
548
549
# Stage 2: Process files concurrently
550
async def process_file(filename):
551
input_client = share_client.get_file_client(f"input/{filename}")
552
output_client = share_client.get_file_client(f"output/processed_{filename}")
553
554
# Download and process
555
download_stream = await input_client.download_file()
556
content = await download_stream.readall()
557
558
# Simulate processing
559
processed_content = content.decode().upper() + " - PROCESSED"
560
561
# Upload processed result
562
await output_client.upload_file(data=processed_content.encode(), overwrite=True)
563
564
return f"Processed {filename}"
565
566
processing_tasks = [process_file(filename) for filename in local_files]
567
results = await asyncio.gather(*processing_tasks)
568
569
print("Stage 2: Processing completed")
570
for result in results:
571
print(f" {result}")
572
573
# Stage 3: Verify results asynchronously
574
verification_tasks = []
575
for filename in local_files:
576
output_client = share_client.get_file_client(f"output/processed_{filename}")
577
task = output_client.get_file_properties()
578
verification_tasks.append(task)
579
580
properties = await asyncio.gather(*verification_tasks)
581
582
print("Stage 3: Verification completed")
583
for i, props in enumerate(properties):
584
print(f" processed_{local_files[i]}: {props.size} bytes")
585
586
asyncio.run(file_processing_pipeline())
587
```
588
589
### Async Error Handling and Retry
590
591
```python { .api }
592
import random
593
from azure.core.exceptions import HttpResponseError
594
595
async def resilient_async_operations():
596
"""Demonstrate error handling and retry logic in async operations."""
597
598
async def retry_async_operation(operation, max_retries=3, delay=1.0):
599
"""Retry async operation with exponential backoff."""
600
for attempt in range(max_retries):
601
try:
602
return await operation()
603
except HttpResponseError as e:
604
if attempt == max_retries - 1:
605
raise
606
607
wait_time = delay * (2 ** attempt) + random.uniform(0, 1)
608
print(f"Attempt {attempt + 1} failed: {e.message}")
609
print(f"Retrying in {wait_time:.2f} seconds...")
610
await asyncio.sleep(wait_time)
611
612
credential = AzureNamedKeyCredential("myaccount", "mykey")
613
614
async with ShareServiceClient(
615
account_url="https://myaccount.file.core.windows.net",
616
credential=credential
617
) as service_client:
618
619
# Resilient share creation
620
async def create_share_operation():
621
return await service_client.create_share("resilient-share", quota=10)
622
623
try:
624
share_client = await retry_async_operation(create_share_operation)
625
print("Share created successfully with retry logic")
626
except Exception as e:
627
print(f"Failed to create share after retries: {e}")
628
return
629
630
# Resilient file operations
631
file_client = share_client.get_file_client("test.txt")
632
633
async def upload_operation():
634
return await file_client.upload_file(
635
data=b"Resilient upload content",
636
overwrite=True
637
)
638
639
try:
640
await retry_async_operation(upload_operation)
641
print("File uploaded successfully with retry logic")
642
except Exception as e:
643
print(f"Failed to upload file after retries: {e}")
644
645
asyncio.run(resilient_async_operations())
646
```
647
648
### Async Batch Processing
649
650
```python { .api }
651
async def batch_file_processor():
652
"""Process large numbers of files in batches."""
653
654
credential = AzureNamedKeyCredential("myaccount", "mykey")
655
656
async with ShareServiceClient(
657
account_url="https://myaccount.file.core.windows.net",
658
credential=credential
659
) as service_client:
660
661
share_client = await service_client.create_share("batch-processing", quota=100)
662
663
# Generate file list
664
file_names = [f"batch_file_{i:04d}.txt" for i in range(100)]
665
666
async def process_batch(batch_files, batch_id):
667
"""Process a batch of files."""
668
print(f"Processing batch {batch_id} with {len(batch_files)} files")
669
670
tasks = []
671
for filename in batch_files:
672
file_client = share_client.get_file_client(filename)
673
content = f"Batch {batch_id} - Content for {filename}"
674
task = file_client.upload_file(data=content.encode(), overwrite=True)
675
tasks.append(task)
676
677
await asyncio.gather(*tasks)
678
print(f"Batch {batch_id} completed")
679
return len(batch_files)
680
681
# Process in batches of 10
682
batch_size = 10
683
batch_tasks = []
684
685
for i in range(0, len(file_names), batch_size):
686
batch = file_names[i:i + batch_size]
687
batch_id = i // batch_size + 1
688
task = process_batch(batch, batch_id)
689
batch_tasks.append(task)
690
691
# Process batches concurrently
692
results = await asyncio.gather(*batch_tasks)
693
total_files = sum(results)
694
695
print(f"Batch processing completed: {total_files} files processed")
696
697
# Verify by listing files
698
file_count = 0
699
async for item in share_client.list_directories_and_files():
700
if not item.get('is_directory'):
701
file_count += 1
702
703
print(f"Verification: {file_count} files found in share")
704
705
asyncio.run(batch_file_processor())
706
```
707
708
### Async Context Management
709
710
```python { .api }
711
async def advanced_context_management():
712
"""Demonstrate advanced async context management."""
713
714
class AsyncFileProcessor:
715
def __init__(self, share_client):
716
self.share_client = share_client
717
self.temp_files = []
718
719
async def __aenter__(self):
720
print("Starting file processing session")
721
return self
722
723
async def __aexit__(self, exc_type, exc_val, exc_tb):
724
# Clean up temporary files
725
if self.temp_files:
726
print(f"Cleaning up {len(self.temp_files)} temporary files")
727
cleanup_tasks = [
728
self.share_client.get_file_client(filename).delete_file()
729
for filename in self.temp_files
730
]
731
await asyncio.gather(*cleanup_tasks, return_exceptions=True)
732
733
print("File processing session ended")
734
735
async def process_file(self, input_name, output_name):
736
temp_name = f"temp_{input_name}"
737
self.temp_files.append(temp_name)
738
739
# Simulate processing with temporary file
740
input_client = self.share_client.get_file_client(input_name)
741
temp_client = self.share_client.get_file_client(temp_name)
742
output_client = self.share_client.get_file_client(output_name)
743
744
# Download -> Process -> Upload
745
download_stream = await input_client.download_file()
746
content = await download_stream.readall()
747
748
processed_content = content.decode().upper().encode()
749
750
await temp_client.upload_file(data=processed_content, overwrite=True)
751
752
# Final processing step
753
final_stream = await temp_client.download_file()
754
final_content = await final_stream.readall()
755
756
await output_client.upload_file(data=final_content, overwrite=True)
757
758
return output_name
759
760
credential = AzureNamedKeyCredential("myaccount", "mykey")
761
762
async with ShareServiceClient(
763
account_url="https://myaccount.file.core.windows.net",
764
credential=credential
765
) as service_client:
766
767
share_client = await service_client.create_share("context-demo", quota=10)
768
769
# Upload input files
770
input_files = ["input1.txt", "input2.txt", "input3.txt"]
771
upload_tasks = []
772
773
for filename in input_files:
774
file_client = share_client.get_file_client(filename)
775
content = f"Input content for {filename}"
776
task = file_client.upload_file(data=content.encode(), overwrite=True)
777
upload_tasks.append(task)
778
779
await asyncio.gather(*upload_tasks)
780
781
# Process files with automatic cleanup
782
async with AsyncFileProcessor(share_client) as processor:
783
processing_tasks = [
784
processor.process_file(input_name, f"output_{input_name}")
785
for input_name in input_files
786
]
787
788
results = await asyncio.gather(*processing_tasks)
789
print(f"Processed files: {results}")
790
791
# Verify cleanup worked
792
all_files = []
793
async for item in share_client.list_directories_and_files():
794
if not item.get('is_directory'):
795
all_files.append(item['name'])
796
797
temp_files = [f for f in all_files if f.startswith('temp_')]
798
print(f"Temporary files remaining: {len(temp_files)}")
799
800
asyncio.run(advanced_context_management())
801
```
802
803
### Performance Monitoring and Optimization
804
805
```python { .api }
806
import time
807
from typing import List, Tuple
808
809
async def performance_monitoring():
810
"""Monitor and optimize async operation performance."""
811
812
class PerformanceTimer:
813
def __init__(self, name: str):
814
self.name = name
815
self.start_time = None
816
817
async def __aenter__(self):
818
self.start_time = time.time()
819
return self
820
821
async def __aexit__(self, exc_type, exc_val, exc_tb):
822
end_time = time.time()
823
duration = end_time - self.start_time
824
print(f"{self.name}: {duration:.2f} seconds")
825
826
credential = AzureNamedKeyCredential("myaccount", "mykey")
827
828
async with ShareServiceClient(
829
account_url="https://myaccount.file.core.windows.net",
830
credential=credential
831
) as service_client:
832
833
share_client = await service_client.create_share("performance-test", quota=50)
834
835
# Test different concurrency levels
836
concurrency_levels = [1, 5, 10, 20]
837
file_count = 50
838
839
for concurrency in concurrency_levels:
840
async with PerformanceTimer(f"Concurrency {concurrency}"):
841
842
semaphore = asyncio.Semaphore(concurrency)
843
844
async def upload_with_limit(file_index):
845
async with semaphore:
846
file_client = share_client.get_file_client(f"perf_test_{file_index}.txt")
847
content = f"Performance test content {file_index}" * 100
848
await file_client.upload_file(data=content.encode(), overwrite=True)
849
return file_index
850
851
tasks = [upload_with_limit(i) for i in range(file_count)]
852
results = await asyncio.gather(*tasks)
853
854
print(f" Uploaded {len(results)} files")
855
856
# Cleanup
857
async with PerformanceTimer("Cleanup"):
858
cleanup_tasks = [
859
share_client.get_file_client(f"perf_test_{i}.txt").delete_file()
860
for i in range(file_count)
861
]
862
await asyncio.gather(*cleanup_tasks, return_exceptions=True)
863
864
asyncio.run(performance_monitoring())
865
```
866
867
The async clients provide the same functionality as their synchronous counterparts while enabling high-performance, concurrent operations. Use them for I/O-bound workloads, batch processing, and applications requiring high throughput with Azure File Share storage.