CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-storage-file-share

Azure File Share storage client library for Python

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

async-clients.mddocs/

Async Client Operations - Asynchronous Programming

The Azure Storage File Share SDK provides async versions of all clients for high-performance, non-blocking operations. These clients use Python's async/await syntax and are ideal for I/O-bound workloads and concurrent operations.

Import and Core Types

from azure.storage.fileshare.aio import (
    ShareServiceClient,
    ShareClient,
    ShareDirectoryClient,
    ShareFileClient,
    ShareLeaseClient
)
from azure.core.credentials import AzureNamedKeyCredential
from typing import Optional, Union, Dict, Any, List, AsyncIterator
import asyncio

AsyncShareServiceClient

class ShareServiceClient:
    def __init__(
        self,
        account_url: str,
        credential: Optional[Union[str, Dict[str, str], AzureNamedKeyCredential, AzureSasCredential, TokenCredential]] = None,
        *,
        token_intent: Optional[Literal['backup']] = None,
        **kwargs: Any
    ) -> None:
        """
        Async version of ShareServiceClient for account-level operations.
        
        Parameters:
            account_url: The URL to the file service endpoint
            credential: Authentication credential
            token_intent: Specifies the intent for all requests when using TokenCredential authentication
            **kwargs: Additional client configuration options
        """
    
    @classmethod
    def from_connection_string(
        cls,
        conn_str: str,
        credential: Optional[Union[str, Dict[str, str], AzureNamedKeyCredential, AzureSasCredential, TokenCredential]] = None,
        **kwargs: Any
    ) -> ShareServiceClient:
        """Create async ShareServiceClient from connection string."""
    
    async def get_service_properties(self, **kwargs: Any) -> Dict[str, Any]:
        """Async version of get_service_properties."""
    
    async def set_service_properties(self, **kwargs: Any) -> None:
        """Async version of set_service_properties."""
    
    def list_shares(self, **kwargs: Any) -> AsyncItemPaged[ShareProperties]:
        """Returns async auto-paging iterable of ShareProperties."""
    
    async def create_share(self, share_name: str, **kwargs: Any) -> ShareClient:
        """Async version of create_share."""
    
    async def delete_share(self, share_name: Union[ShareProperties, str], **kwargs: Any) -> None:
        """Async version of delete_share."""
    
    async def undelete_share(self, deleted_share_name: str, deleted_share_version: str, **kwargs: Any) -> ShareClient:
        """Async version of undelete_share."""
    
    async def close(self) -> None:
        """Close the async client and release resources."""
    
    async def __aenter__(self) -> ShareServiceClient:
        """Async context manager entry."""
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        """Async context manager exit."""
        await self.close()

AsyncShareClient

class ShareClient:
    def __init__(
        self,
        account_url: str,
        share_name: str,
        snapshot: Optional[str] = None,
        credential: Optional[Union[str, Dict[str, str], AzureNamedKeyCredential, AzureSasCredential, TokenCredential]] = None,
        **kwargs: Any
    ) -> None:
        """Async version of ShareClient for share-level operations."""
    
    @classmethod
    def from_share_url(cls, share_url: str, **kwargs: Any) -> ShareClient:
        """Create async ShareClient from share URL."""
    
    @classmethod
    def from_connection_string(cls, conn_str: str, share_name: str, **kwargs: Any) -> ShareClient:
        """Create async ShareClient from connection string."""
    
    async def create_share(self, **kwargs: Any) -> Dict[str, Any]:
        """Async version of create_share."""
    
    async def delete_share(self, **kwargs: Any) -> None:
        """Async version of delete_share."""
    
    async def create_snapshot(self, **kwargs: Any) -> Dict[str, Any]:
        """Async version of create_snapshot."""
    
    async def get_share_properties(self, **kwargs: Any) -> ShareProperties:
        """Async version of get_share_properties."""
    
    async def set_share_quota(self, quota: int, **kwargs: Any) -> Dict[str, Any]:
        """Async version of set_share_quota."""
    
    async def set_share_properties(self, **kwargs: Any) -> Dict[str, Any]:
        """Async version of set_share_properties."""
    
    async def set_share_metadata(self, metadata: Dict[str, str], **kwargs: Any) -> Dict[str, Any]:
        """Async version of set_share_metadata."""
    
    async def get_share_access_policy(self, **kwargs: Any) -> Dict[str, Any]:
        """Async version of get_share_access_policy."""
    
    async def set_share_access_policy(self, signed_identifiers: Dict[str, AccessPolicy], **kwargs: Any) -> Dict[str, Any]:
        """Async version of set_share_access_policy."""
    
    async def get_share_stats(self, **kwargs: Any) -> int:
        """Async version of get_share_stats."""
    
    def list_directories_and_files(self, **kwargs: Any) -> AsyncItemPaged[Dict[str, Any]]:
        """Returns async auto-paging iterable of directory/file info."""
    
    async def create_directory(self, directory_name: str, **kwargs: Any) -> ShareDirectoryClient:
        """Async version of create_directory."""
    
    async def delete_directory(self, directory_name: str, **kwargs: Any) -> None:
        """Async version of delete_directory."""
    
    async def create_permission_for_share(self, file_permission: str, **kwargs: Any) -> Optional[str]:
        """Async version of create_permission_for_share."""
    
    async def get_permission_for_share(self, permission_key: str, **kwargs: Any) -> str:
        """Async version of get_permission_for_share."""
    
    async def acquire_lease(self, **kwargs: Any) -> ShareLeaseClient:
        """Async version of acquire_lease."""
    
    async def close(self) -> None:
        """Close the async client and release resources."""
    
    async def __aenter__(self) -> ShareClient:
        """Async context manager entry."""
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        """Async context manager exit."""
        await self.close()

AsyncShareDirectoryClient

class ShareDirectoryClient:
    def __init__(
        self,
        account_url: str,
        share_name: str,
        directory_path: str,
        **kwargs: Any
    ) -> None:
        """Async version of ShareDirectoryClient for directory operations."""
    
    @classmethod
    def from_directory_url(cls, directory_url: str, **kwargs: Any) -> ShareDirectoryClient:
        """Create async ShareDirectoryClient from directory URL."""
    
    @classmethod
    def from_connection_string(cls, conn_str: str, share_name: str, directory_path: str, **kwargs: Any) -> ShareDirectoryClient:
        """Create async ShareDirectoryClient from connection string."""
    
    async def exists(self, **kwargs: Any) -> bool:
        """Async version of exists."""
    
    async def create_directory(self, **kwargs: Any) -> Dict[str, Any]:
        """Async version of create_directory."""
    
    async def delete_directory(self, **kwargs: Any) -> None:
        """Async version of delete_directory."""
    
    async def rename_directory(self, new_name: str, **kwargs: Any) -> ShareDirectoryClient:
        """Async version of rename_directory."""
    
    def list_directories_and_files(self, **kwargs: Any) -> AsyncItemPaged[Dict[str, Any]]:
        """Returns async auto-paging iterable of directory contents."""
    
    def list_handles(self, **kwargs: Any) -> AsyncItemPaged[Handle]:
        """Returns async auto-paging iterable of handles."""
    
    async def close_handle(self, handle: Union[str, Handle], **kwargs: Any) -> Dict[str, int]:
        """Async version of close_handle."""
    
    async def close_all_handles(self, **kwargs: Any) -> Dict[str, int]:
        """Async version of close_all_handles."""
    
    async def get_directory_properties(self, **kwargs: Any) -> DirectoryProperties:
        """Async version of get_directory_properties."""
    
    async def set_directory_metadata(self, metadata: Dict[str, Any], **kwargs: Any) -> Dict[str, Any]:
        """Async version of set_directory_metadata."""
    
    async def set_http_headers(self, **kwargs: Any) -> Dict[str, Any]:
        """Async version of set_http_headers."""
    
    async def create_subdirectory(self, directory_name: str, **kwargs: Any) -> ShareDirectoryClient:
        """Async version of create_subdirectory."""
    
    async def delete_subdirectory(self, directory_name: str, **kwargs: Any) -> None:
        """Async version of delete_subdirectory."""
    
    async def upload_file(self, file_name: str, data: Union[bytes, str, Iterable[AnyStr], IO[AnyStr]], **kwargs: Any) -> ShareFileClient:
        """Async version of upload_file."""
    
    async def delete_file(self, file_name: str, **kwargs: Any) -> None:
        """Async version of delete_file."""
    
    async def close(self) -> None:
        """Close the async client and release resources."""
    
    async def __aenter__(self) -> ShareDirectoryClient:
        """Async context manager entry."""
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        """Async context manager exit."""
        await self.close()

AsyncShareFileClient

class ShareFileClient:
    def __init__(
        self,
        account_url: str,
        share_name: str,
        file_path: str,
        **kwargs: Any
    ) -> None:
        """Async version of ShareFileClient for file operations."""
    
    @classmethod
    def from_file_url(cls, file_url: str, **kwargs: Any) -> ShareFileClient:
        """Create async ShareFileClient from file URL."""
    
    @classmethod  
    def from_connection_string(cls, conn_str: str, share_name: str, file_path: str, **kwargs: Any) -> ShareFileClient:
        """Create async ShareFileClient from connection string."""
    
    async def exists(self, **kwargs: Any) -> bool:
        """Async version of exists."""
    
    async def create_file(self, size: int, **kwargs: Any) -> Dict[str, Any]:
        """Async version of create_file."""
    
    async def delete_file(self, **kwargs: Any) -> None:
        """Async version of delete_file."""
    
    async def rename_file(self, new_name: str, **kwargs: Any) -> ShareFileClient:
        """Async version of rename_file."""
    
    async def upload_file(self, data: Union[bytes, str, Iterable[AnyStr], IO[AnyStr]], **kwargs: Any) -> Dict[str, Any]:
        """Async version of upload_file."""
    
    async def upload_range(self, data: bytes, offset: int, length: int, **kwargs: Any) -> Dict[str, Any]:
        """Async version of upload_range."""
    
    async def upload_range_from_url(self, source_url: str, offset: int, length: int, source_offset: int, **kwargs: Any) -> Dict[str, Any]:
        """Async version of upload_range_from_url."""
    
    async def download_file(self, **kwargs: Any) -> StorageStreamDownloader:
        """Async version of download_file."""
    
    async def start_copy_from_url(self, source_url: str, **kwargs: Any) -> Dict[str, Any]:
        """Async version of start_copy_from_url."""
    
    async def abort_copy(self, copy_id: Union[str, FileProperties], **kwargs: Any) -> None:
        """Async version of abort_copy."""
    
    async def get_file_properties(self, **kwargs: Any) -> FileProperties:
        """Async version of get_file_properties."""
    
    async def set_http_headers(self, **kwargs: Any) -> Dict[str, Any]:
        """Async version of set_http_headers."""
    
    async def set_file_metadata(self, **kwargs: Any) -> Dict[str, Any]:
        """Async version of set_file_metadata."""
    
    async def get_ranges(self, **kwargs: Any) -> List[Dict[str, int]]:
        """Async version of get_ranges."""
    
    async def get_ranges_diff(self, previous_sharesnapshot: Union[str, Dict[str, Any]], **kwargs: Any) -> Tuple[List[Dict[str, int]], List[Dict[str, int]]]:
        """Async version of get_ranges_diff."""
    
    async def clear_range(self, offset: int, length: int, **kwargs: Any) -> Dict[str, Any]:
        """Async version of clear_range."""
    
    async def resize_file(self, size: int, **kwargs: Any) -> Dict[str, Any]:
        """Async version of resize_file."""
    
    def list_handles(self, **kwargs: Any) -> AsyncItemPaged[Handle]:
        """Returns async auto-paging iterable of handles."""
    
    async def close_handle(self, handle: Union[str, Handle], **kwargs: Any) -> Dict[str, int]:
        """Async version of close_handle."""
    
    async def close_all_handles(self, **kwargs: Any) -> Dict[str, int]:
        """Async version of close_all_handles."""
    
    async def create_hardlink(self, new_name: str, **kwargs: Any) -> ShareFileClient:
        """Async version of create_hardlink."""
    
    async def create_symlink(self, target_path: str, **kwargs: Any) -> ShareFileClient:
        """Async version of create_symlink."""
    
    async def get_symlink(self, **kwargs: Any) -> str:
        """Async version of get_symlink."""
    
    async def acquire_lease(self, **kwargs: Any) -> ShareLeaseClient:
        """Async version of acquire_lease."""
    
    async def close(self) -> None:
        """Close the async client and release resources."""
    
    async def __aenter__(self) -> ShareFileClient:
        """Async context manager entry."""
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        """Async context manager exit."""
        await self.close()

AsyncShareLeaseClient

class ShareLeaseClient:
    def __init__(
        self,
        client: Union[ShareClient, ShareFileClient],
        lease_id: Optional[str] = None
    ) -> None:
        """Async version of ShareLeaseClient for lease management."""
    
    async def acquire(self, **kwargs: Any) -> None:
        """Async version of acquire."""
    
    async def renew(self, **kwargs: Any) -> None:
        """Async version of renew."""
    
    async def release(self, **kwargs: Any) -> None:
        """Async version of release."""
    
    async def change(self, proposed_lease_id: str, **kwargs: Any) -> None:
        """Async version of change."""
    
    async def break_lease(self, **kwargs: Any) -> int:
        """Async version of break_lease."""
    
    async def __aenter__(self) -> ShareLeaseClient:
        """Async context manager entry."""
        await self.acquire()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        """Async context manager exit."""
        await self.release()

Usage Examples

Basic Async Operations

import asyncio
from azure.storage.fileshare.aio import ShareServiceClient, ShareFileClient
from azure.core.credentials import AzureNamedKeyCredential

async def basic_async_operations():
    """Demonstrate basic async operations."""
    
    # Initialize async service client
    credential = AzureNamedKeyCredential("myaccount", "mykey")
    
    async with ShareServiceClient(
        account_url="https://myaccount.file.core.windows.net",
        credential=credential
    ) as service_client:
        
        # Create share asynchronously
        share_client = await service_client.create_share("async-share", quota=10)
        print("Share created asynchronously")
        
        # Get file client
        file_client = share_client.get_file_client("test.txt")
        
        # Upload file asynchronously
        await file_client.upload_file(
            data=b"Hello from async operation!",
            overwrite=True
        )
        print("File uploaded asynchronously")
        
        # Download file asynchronously
        download_stream = await file_client.download_file()
        content = await download_stream.readall()
        print(f"Downloaded content: {content.decode()}")
        
        # Get file properties asynchronously
        properties = await file_client.get_file_properties()
        print(f"File size: {properties.size} bytes")

# Run the async function
asyncio.run(basic_async_operations())

Concurrent Operations

async def concurrent_file_operations():
    """Demonstrate concurrent file operations."""
    
    credential = AzureNamedKeyCredential("myaccount", "mykey")
    
    async with ShareServiceClient(
        account_url="https://myaccount.file.core.windows.net",
        credential=credential
    ) as service_client:
        
        share_client = await service_client.create_share("concurrent-demo", quota=10)
        
        # Create multiple file clients
        file_clients = [
            share_client.get_file_client(f"file_{i}.txt")
            for i in range(10)
        ]
        
        # Upload files concurrently
        upload_tasks = [
            client.upload_file(data=f"Content for file {i}".encode(), overwrite=True)
            for i, client in enumerate(file_clients)
        ]
        
        results = await asyncio.gather(*upload_tasks, return_exceptions=True)
        successful_uploads = sum(1 for r in results if not isinstance(r, Exception))
        print(f"Successfully uploaded {successful_uploads} files concurrently")
        
        # Download files concurrently
        download_tasks = [
            client.download_file()
            for client in file_clients
        ]
        
        download_streams = await asyncio.gather(*download_tasks)
        
        # Read content from streams concurrently
        read_tasks = [stream.readall() for stream in download_streams]
        contents = await asyncio.gather(*read_tasks)
        
        print(f"Downloaded {len(contents)} files concurrently")
        for i, content in enumerate(contents):
            print(f"File {i}: {content.decode()}")

asyncio.run(concurrent_file_operations())

Async Pagination

async def async_pagination_demo():
    """Demonstrate async pagination with large lists."""
    
    credential = AzureNamedKeyCredential("myaccount", "mykey")
    
    async with ShareServiceClient(
        account_url="https://myaccount.file.core.windows.net",
        credential=credential
    ) as service_client:
        
        # List shares asynchronously
        print("Listing shares:")
        share_count = 0
        async for share in service_client.list_shares(include_metadata=True):
            print(f"  Share: {share.name}, Quota: {share.quota}GB")
            share_count += 1
        
        print(f"Total shares: {share_count}")
        
        # Get a share and list its contents
        if share_count > 0:
            share_client = service_client.get_share_client("documents")  # Assuming it exists
            
            print("\nListing share contents:")
            item_count = 0
            async for item in share_client.list_directories_and_files():
                item_type = "Directory" if item.get('is_directory') else "File"
                print(f"  {item_type}: {item['name']}")
                item_count += 1
            
            print(f"Total items: {item_count}")

asyncio.run(async_pagination_demo())

Async File Processing Pipeline

import aiofiles
from pathlib import Path

async def file_processing_pipeline():
    """Demonstrate async file processing pipeline."""
    
    credential = AzureNamedKeyCredential("myaccount", "mykey")
    
    async with ShareServiceClient(
        account_url="https://myaccount.file.core.windows.net",
        credential=credential
    ) as service_client:
        
        share_client = await service_client.create_share("processing-pipeline", quota=50)
        
        # Stage 1: Upload multiple files asynchronously
        local_files = ["file1.txt", "file2.txt", "file3.txt"]
        upload_tasks = []
        
        for filename in local_files:
            # Create local file content
            content = f"Processing content for {filename}"
            
            file_client = share_client.get_file_client(f"input/{filename}")
            task = file_client.upload_file(data=content.encode(), overwrite=True)
            upload_tasks.append(task)
        
        await asyncio.gather(*upload_tasks)
        print("Stage 1: All files uploaded")
        
        # Stage 2: Process files concurrently
        async def process_file(filename):
            input_client = share_client.get_file_client(f"input/{filename}")
            output_client = share_client.get_file_client(f"output/processed_{filename}")
            
            # Download and process
            download_stream = await input_client.download_file()
            content = await download_stream.readall()
            
            # Simulate processing
            processed_content = content.decode().upper() + " - PROCESSED"
            
            # Upload processed result
            await output_client.upload_file(data=processed_content.encode(), overwrite=True)
            
            return f"Processed {filename}"
        
        processing_tasks = [process_file(filename) for filename in local_files]
        results = await asyncio.gather(*processing_tasks)
        
        print("Stage 2: Processing completed")
        for result in results:
            print(f"  {result}")
        
        # Stage 3: Verify results asynchronously
        verification_tasks = []
        for filename in local_files:
            output_client = share_client.get_file_client(f"output/processed_{filename}")
            task = output_client.get_file_properties()
            verification_tasks.append(task)
        
        properties = await asyncio.gather(*verification_tasks)
        
        print("Stage 3: Verification completed")
        for i, props in enumerate(properties):
            print(f"  processed_{local_files[i]}: {props.size} bytes")

asyncio.run(file_processing_pipeline())

Async Error Handling and Retry

import random
from azure.core.exceptions import HttpResponseError

async def resilient_async_operations():
    """Demonstrate error handling and retry logic in async operations."""
    
    async def retry_async_operation(operation, max_retries=3, delay=1.0):
        """Retry async operation with exponential backoff."""
        for attempt in range(max_retries):
            try:
                return await operation()
            except HttpResponseError as e:
                if attempt == max_retries - 1:
                    raise
                
                wait_time = delay * (2 ** attempt) + random.uniform(0, 1)
                print(f"Attempt {attempt + 1} failed: {e.message}")
                print(f"Retrying in {wait_time:.2f} seconds...")
                await asyncio.sleep(wait_time)
    
    credential = AzureNamedKeyCredential("myaccount", "mykey")
    
    async with ShareServiceClient(
        account_url="https://myaccount.file.core.windows.net",
        credential=credential
    ) as service_client:
        
        # Resilient share creation
        async def create_share_operation():
            return await service_client.create_share("resilient-share", quota=10)
        
        try:
            share_client = await retry_async_operation(create_share_operation)
            print("Share created successfully with retry logic")
        except Exception as e:
            print(f"Failed to create share after retries: {e}")
            return
        
        # Resilient file operations
        file_client = share_client.get_file_client("test.txt")
        
        async def upload_operation():
            return await file_client.upload_file(
                data=b"Resilient upload content",
                overwrite=True
            )
        
        try:
            await retry_async_operation(upload_operation)
            print("File uploaded successfully with retry logic")
        except Exception as e:
            print(f"Failed to upload file after retries: {e}")

asyncio.run(resilient_async_operations())

Async Batch Processing

async def batch_file_processor():
    """Process large numbers of files in batches."""
    
    credential = AzureNamedKeyCredential("myaccount", "mykey")
    
    async with ShareServiceClient(
        account_url="https://myaccount.file.core.windows.net",
        credential=credential
    ) as service_client:
        
        share_client = await service_client.create_share("batch-processing", quota=100)
        
        # Generate file list
        file_names = [f"batch_file_{i:04d}.txt" for i in range(100)]
        
        async def process_batch(batch_files, batch_id):
            """Process a batch of files."""
            print(f"Processing batch {batch_id} with {len(batch_files)} files")
            
            tasks = []
            for filename in batch_files:
                file_client = share_client.get_file_client(filename)
                content = f"Batch {batch_id} - Content for {filename}"
                task = file_client.upload_file(data=content.encode(), overwrite=True)
                tasks.append(task)
            
            await asyncio.gather(*tasks)
            print(f"Batch {batch_id} completed")
            return len(batch_files)
        
        # Process in batches of 10
        batch_size = 10
        batch_tasks = []
        
        for i in range(0, len(file_names), batch_size):
            batch = file_names[i:i + batch_size]
            batch_id = i // batch_size + 1
            task = process_batch(batch, batch_id)
            batch_tasks.append(task)
        
        # Process batches concurrently
        results = await asyncio.gather(*batch_tasks)
        total_files = sum(results)
        
        print(f"Batch processing completed: {total_files} files processed")
        
        # Verify by listing files
        file_count = 0
        async for item in share_client.list_directories_and_files():
            if not item.get('is_directory'):
                file_count += 1
        
        print(f"Verification: {file_count} files found in share")

asyncio.run(batch_file_processor())

Async Context Management

async def advanced_context_management():
    """Demonstrate advanced async context management."""
    
    class AsyncFileProcessor:
        def __init__(self, share_client):
            self.share_client = share_client
            self.temp_files = []
        
        async def __aenter__(self):
            print("Starting file processing session")
            return self
        
        async def __aexit__(self, exc_type, exc_val, exc_tb):
            # Clean up temporary files
            if self.temp_files:
                print(f"Cleaning up {len(self.temp_files)} temporary files")
                cleanup_tasks = [
                    self.share_client.get_file_client(filename).delete_file()
                    for filename in self.temp_files
                ]
                await asyncio.gather(*cleanup_tasks, return_exceptions=True)
            
            print("File processing session ended")
        
        async def process_file(self, input_name, output_name):
            temp_name = f"temp_{input_name}"
            self.temp_files.append(temp_name)
            
            # Simulate processing with temporary file
            input_client = self.share_client.get_file_client(input_name)
            temp_client = self.share_client.get_file_client(temp_name)
            output_client = self.share_client.get_file_client(output_name)
            
            # Download -> Process -> Upload
            download_stream = await input_client.download_file()
            content = await download_stream.readall()
            
            processed_content = content.decode().upper().encode()
            
            await temp_client.upload_file(data=processed_content, overwrite=True)
            
            # Final processing step
            final_stream = await temp_client.download_file()
            final_content = await final_stream.readall()
            
            await output_client.upload_file(data=final_content, overwrite=True)
            
            return output_name
    
    credential = AzureNamedKeyCredential("myaccount", "mykey")
    
    async with ShareServiceClient(
        account_url="https://myaccount.file.core.windows.net",
        credential=credential
    ) as service_client:
        
        share_client = await service_client.create_share("context-demo", quota=10)
        
        # Upload input files
        input_files = ["input1.txt", "input2.txt", "input3.txt"]
        upload_tasks = []
        
        for filename in input_files:
            file_client = share_client.get_file_client(filename)
            content = f"Input content for {filename}"
            task = file_client.upload_file(data=content.encode(), overwrite=True)
            upload_tasks.append(task)
        
        await asyncio.gather(*upload_tasks)
        
        # Process files with automatic cleanup
        async with AsyncFileProcessor(share_client) as processor:
            processing_tasks = [
                processor.process_file(input_name, f"output_{input_name}")
                for input_name in input_files
            ]
            
            results = await asyncio.gather(*processing_tasks)
            print(f"Processed files: {results}")
        
        # Verify cleanup worked
        all_files = []
        async for item in share_client.list_directories_and_files():
            if not item.get('is_directory'):
                all_files.append(item['name'])
        
        temp_files = [f for f in all_files if f.startswith('temp_')]
        print(f"Temporary files remaining: {len(temp_files)}")

asyncio.run(advanced_context_management())

Performance Monitoring and Optimization

import time
from typing import List, Tuple

async def performance_monitoring():
    """Monitor and optimize async operation performance."""
    
    class PerformanceTimer:
        def __init__(self, name: str):
            self.name = name
            self.start_time = None
        
        async def __aenter__(self):
            self.start_time = time.time()
            return self
        
        async def __aexit__(self, exc_type, exc_val, exc_tb):
            end_time = time.time()
            duration = end_time - self.start_time
            print(f"{self.name}: {duration:.2f} seconds")
    
    credential = AzureNamedKeyCredential("myaccount", "mykey")
    
    async with ShareServiceClient(
        account_url="https://myaccount.file.core.windows.net",
        credential=credential
    ) as service_client:
        
        share_client = await service_client.create_share("performance-test", quota=50)
        
        # Test different concurrency levels
        concurrency_levels = [1, 5, 10, 20]
        file_count = 50
        
        for concurrency in concurrency_levels:
            async with PerformanceTimer(f"Concurrency {concurrency}"):
                
                semaphore = asyncio.Semaphore(concurrency)
                
                async def upload_with_limit(file_index):
                    async with semaphore:
                        file_client = share_client.get_file_client(f"perf_test_{file_index}.txt")
                        content = f"Performance test content {file_index}" * 100
                        await file_client.upload_file(data=content.encode(), overwrite=True)
                        return file_index
                
                tasks = [upload_with_limit(i) for i in range(file_count)]
                results = await asyncio.gather(*tasks)
                
                print(f"  Uploaded {len(results)} files")
        
        # Cleanup
        async with PerformanceTimer("Cleanup"):
            cleanup_tasks = [
                share_client.get_file_client(f"perf_test_{i}.txt").delete_file()
                for i in range(file_count)
            ]
            await asyncio.gather(*cleanup_tasks, return_exceptions=True)

asyncio.run(performance_monitoring())

The async clients provide the same functionality as their synchronous counterparts while enabling high-performance, concurrent operations. Use them for I/O-bound workloads, batch processing, and applications requiring high throughput with Azure File Share storage.

Install with Tessl CLI

npx tessl i tessl/pypi-azure-storage-file-share

docs

async-clients.md

directory-client.md

file-client.md

index.md

lease-client.md

models.md

sas-generation.md

service-client.md

share-client.md

tile.json