CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-microsoft-azure

Provider package for Microsoft Azure integrations with Apache Airflow

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

azure-file-share.mddocs/

Azure File Share

Comprehensive Azure File Share integration for managing file shares, directories, and files within Azure Storage. Azure File Share provides fully managed file shares in the cloud accessible via SMB protocol and REST API.

Capabilities

Azure File Share Hook

Core hook for connecting to and managing Azure File Share resources including shares, directories, and file operations.

class AzureFileShareHook(BaseHook):
    """
    Hook for Azure File Share operations.
    
    Provides methods for managing file shares, directories, and files
    with support for various authentication methods.
    """
    
    def __init__(
        self,
        share_name: str | None = None,
        file_path: str | None = None,
        directory_path: str | None = None,
        azure_fileshare_conn_id: str = "azure_fileshare_default",
    ): ...
    
    def get_conn(self) -> None: ...
    
    @property
    def share_service_client(self) -> ShareServiceClient: ...
    
    @property  
    def share_directory_client(self) -> ShareDirectoryClient: ...
    
    @property
    def share_file_client(self) -> ShareFileClient: ...
    
    def check_for_directory(self) -> bool: ...
    
    def list_directories_and_files(self) -> list: ...
    
    def list_files(self) -> list[str]: ...
    
    def create_share(self, share_name: str, **kwargs) -> bool: ...
    
    def delete_share(self, share_name: str, **kwargs) -> bool: ...
    
    def create_directory(self, **kwargs) -> Any: ...
    
    def get_file(self, file_path: str, **kwargs) -> None: ...
    
    def get_file_to_stream(self, stream: IO, **kwargs) -> None: ...
    
    def load_file(self, file_path: str, **kwargs) -> None: ...
    
    def load_data(self, string_data: bytes | str | IO, **kwargs) -> None: ...

Usage Examples

Basic File Share Operations

from airflow import DAG
from airflow.providers.microsoft.azure.hooks.fileshare import AzureFileShareHook
from datetime import datetime, timedelta

dag = DAG(
    'azure_fileshare_example',
    default_args={'owner': 'data-team'},
    description='Azure File Share operations',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False
)

def manage_fileshare_operations(**context):
    # Initialize hook
    hook = AzureFileShareHook(
        share_name='data-share',
        azure_fileshare_conn_id='azure_fileshare_connection'
    )
    
    # Create file share
    hook.create_share('data-share')
    
    # Create directory structure
    hook = AzureFileShareHook(
        share_name='data-share',
        directory_path='processed/2024/01',
        azure_fileshare_conn_id='azure_fileshare_connection'
    )
    hook.create_directory()
    
    # Upload file
    hook = AzureFileShareHook(
        share_name='data-share',
        file_path='processed/2024/01/data.csv',
        azure_fileshare_conn_id='azure_fileshare_connection'
    )
    hook.load_file('/local/path/to/data.csv')
    
    # List files in directory
    hook = AzureFileShareHook(
        share_name='data-share',
        directory_path='processed/2024/01',
        azure_fileshare_conn_id='azure_fileshare_connection'
    )
    files = hook.list_files()
    print(f"Files in directory: {files}")

# Create PythonOperator task
from airflow.operators.python import PythonOperator

fileshare_task = PythonOperator(
    task_id='manage_fileshare',
    python_callable=manage_fileshare_operations,
    dag=dag
)

File Share Data Processing Pipeline

def process_shared_files(**context):
    # Hook for reading files
    read_hook = AzureFileShareHook(
        share_name='input-data',
        directory_path='raw',
        azure_fileshare_conn_id='azure_fileshare_connection'
    )
    
    # List all files to process
    files = read_hook.list_files()
    
    for file_name in files:
        # Download file for processing
        file_hook = AzureFileShareHook(
            share_name='input-data',
            file_path=f'raw/{file_name}',
            azure_fileshare_conn_id='azure_fileshare_connection'
        )
        
        # Download to local temp file
        import tempfile
        with tempfile.NamedTemporaryFile() as temp_file:
            file_hook.get_file_to_stream(temp_file)
            
            # Process the file (your custom logic here)
            processed_data = process_data_from_stream(temp_file)
            
            # Upload processed file to output share
            output_hook = AzureFileShareHook(
                share_name='processed-data',
                file_path=f'processed/{file_name}',
                azure_fileshare_conn_id='azure_fileshare_connection'
            )
            output_hook.load_data(processed_data)

processing_task = PythonOperator(
    task_id='process_shared_files',
    python_callable=process_shared_files,
    dag=dag
)

File Share with Directory Management

def organize_file_share(**context):
    base_hook = AzureFileShareHook(
        share_name='document-archive',
        azure_fileshare_conn_id='azure_fileshare_connection'
    )
    
    # Create share if it doesn't exist
    base_hook.create_share('document-archive')
    
    # Create organized directory structure
    directories = [
        'documents/2024/invoices',
        'documents/2024/reports', 
        'documents/2024/contracts',
        'templates/email',
        'templates/reports'
    ]
    
    for directory_path in directories:
        dir_hook = AzureFileShareHook(
            share_name='document-archive',
            directory_path=directory_path,
            azure_fileshare_conn_id='azure_fileshare_connection'
        )
        
        # Check if directory exists
        if not dir_hook.check_for_directory():
            dir_hook.create_directory()
            print(f"Created directory: {directory_path}")
        
        # List contents of each directory
        contents = dir_hook.list_directories_and_files()
        print(f"Contents of {directory_path}: {contents}")

organization_task = PythonOperator(
    task_id='organize_file_share',
    python_callable=organize_file_share,
    dag=dag
)

Authentication and Connection

Azure File Share supports multiple authentication methods:

  • Account Key: Storage account name and access key
  • SAS Token: Shared Access Signature for granular permissions
  • Connection String: Complete connection string with authentication
  • Managed Identity: For Azure-hosted Airflow instances
  • DefaultAzureCredential: Azure SDK default credential chain

Connection configuration requires the storage account information and chosen authentication method in the connection extras.

File Share Features

Azure File Share provides:

  • SMB Protocol: Standard SMB 3.0 file sharing protocol
  • REST API: HTTP-based file operations
  • Directory Structure: Hierarchical file organization
  • Cross-Platform: Windows, Linux, and macOS compatibility
  • Shared Access: Multiple clients can access simultaneously
  • Snapshots: Point-in-time share snapshots
  • Premium Performance: High IOPS and low latency options

Types

# Azure File Share client types
class ShareServiceClient:
    """Client for managing file share service operations."""
    def create_share(self, share_name: str, **kwargs) -> ShareClient: ...
    def delete_share(self, share_name: str, **kwargs) -> None: ...
    def list_shares(self, **kwargs) -> ItemPaged[ShareProperties]: ...

class ShareDirectoryClient:
    """Client for directory operations within a file share."""
    def create_directory(self, **kwargs) -> dict[str, Any]: ...
    def delete_directory(self, **kwargs) -> None: ...
    def list_directories_and_files(self, **kwargs) -> ItemPaged[dict[str, Any]]: ...

class ShareFileClient:
    """Client for file operations within a file share."""
    def upload_file(self, data: Any, **kwargs) -> dict[str, Any]: ...
    def download_file(self, **kwargs) -> StorageStreamDownloader: ...
    def delete_file(self, **kwargs) -> None: ...

class FileProperties:
    """Properties of a file in Azure File Share."""
    name: str
    size: int
    last_modified: datetime
    content_type: str

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-microsoft-azure

docs

azure-batch.md

azure-data-explorer.md

azure-file-share.md

blob-storage.md

container-services.md

cosmos-db.md

data-factory.md

data-lake-storage.md

data-transfers.md

index.md

microsoft-graph.md

powerbi.md

service-bus.md

synapse-analytics.md

tile.json