Provider package for Microsoft Azure integrations with Apache Airflow
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive Azure File Share integration for managing file shares, directories, and files within Azure Storage. Azure File Share provides fully managed file shares in the cloud accessible via SMB protocol and REST API.
Core hook for connecting to and managing Azure File Share resources including shares, directories, and file operations.
class AzureFileShareHook(BaseHook):
"""
Hook for Azure File Share operations.
Provides methods for managing file shares, directories, and files
with support for various authentication methods.
"""
def __init__(
self,
share_name: str | None = None,
file_path: str | None = None,
directory_path: str | None = None,
azure_fileshare_conn_id: str = "azure_fileshare_default",
): ...
def get_conn(self) -> None: ...
@property
def share_service_client(self) -> ShareServiceClient: ...
@property
def share_directory_client(self) -> ShareDirectoryClient: ...
@property
def share_file_client(self) -> ShareFileClient: ...
def check_for_directory(self) -> bool: ...
def list_directories_and_files(self) -> list: ...
def list_files(self) -> list[str]: ...
def create_share(self, share_name: str, **kwargs) -> bool: ...
def delete_share(self, share_name: str, **kwargs) -> bool: ...
def create_directory(self, **kwargs) -> Any: ...
def get_file(self, file_path: str, **kwargs) -> None: ...
def get_file_to_stream(self, stream: IO, **kwargs) -> None: ...
def load_file(self, file_path: str, **kwargs) -> None: ...
def load_data(self, string_data: bytes | str | IO, **kwargs) -> None: ...from airflow import DAG
from airflow.providers.microsoft.azure.hooks.fileshare import AzureFileShareHook
from datetime import datetime, timedelta
dag = DAG(
'azure_fileshare_example',
default_args={'owner': 'data-team'},
description='Azure File Share operations',
schedule_interval=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False
)
def manage_fileshare_operations(**context):
# Initialize hook
hook = AzureFileShareHook(
share_name='data-share',
azure_fileshare_conn_id='azure_fileshare_connection'
)
# Create file share
hook.create_share('data-share')
# Create directory structure
hook = AzureFileShareHook(
share_name='data-share',
directory_path='processed/2024/01',
azure_fileshare_conn_id='azure_fileshare_connection'
)
hook.create_directory()
# Upload file
hook = AzureFileShareHook(
share_name='data-share',
file_path='processed/2024/01/data.csv',
azure_fileshare_conn_id='azure_fileshare_connection'
)
hook.load_file('/local/path/to/data.csv')
# List files in directory
hook = AzureFileShareHook(
share_name='data-share',
directory_path='processed/2024/01',
azure_fileshare_conn_id='azure_fileshare_connection'
)
files = hook.list_files()
print(f"Files in directory: {files}")
# Create PythonOperator task
from airflow.operators.python import PythonOperator
fileshare_task = PythonOperator(
task_id='manage_fileshare',
python_callable=manage_fileshare_operations,
dag=dag
)def process_shared_files(**context):
# Hook for reading files
read_hook = AzureFileShareHook(
share_name='input-data',
directory_path='raw',
azure_fileshare_conn_id='azure_fileshare_connection'
)
# List all files to process
files = read_hook.list_files()
for file_name in files:
# Download file for processing
file_hook = AzureFileShareHook(
share_name='input-data',
file_path=f'raw/{file_name}',
azure_fileshare_conn_id='azure_fileshare_connection'
)
# Download to local temp file
import tempfile
with tempfile.NamedTemporaryFile() as temp_file:
file_hook.get_file_to_stream(temp_file)
# Process the file (your custom logic here)
processed_data = process_data_from_stream(temp_file)
# Upload processed file to output share
output_hook = AzureFileShareHook(
share_name='processed-data',
file_path=f'processed/{file_name}',
azure_fileshare_conn_id='azure_fileshare_connection'
)
output_hook.load_data(processed_data)
processing_task = PythonOperator(
task_id='process_shared_files',
python_callable=process_shared_files,
dag=dag
)def organize_file_share(**context):
base_hook = AzureFileShareHook(
share_name='document-archive',
azure_fileshare_conn_id='azure_fileshare_connection'
)
# Create share if it doesn't exist
base_hook.create_share('document-archive')
# Create organized directory structure
directories = [
'documents/2024/invoices',
'documents/2024/reports',
'documents/2024/contracts',
'templates/email',
'templates/reports'
]
for directory_path in directories:
dir_hook = AzureFileShareHook(
share_name='document-archive',
directory_path=directory_path,
azure_fileshare_conn_id='azure_fileshare_connection'
)
# Check if directory exists
if not dir_hook.check_for_directory():
dir_hook.create_directory()
print(f"Created directory: {directory_path}")
# List contents of each directory
contents = dir_hook.list_directories_and_files()
print(f"Contents of {directory_path}: {contents}")
organization_task = PythonOperator(
task_id='organize_file_share',
python_callable=organize_file_share,
dag=dag
)Azure File Share supports multiple authentication methods:
Connection configuration requires the storage account information and chosen authentication method in the connection extras.
Azure File Share provides:
# Azure File Share client types
class ShareServiceClient:
"""Client for managing file share service operations."""
def create_share(self, share_name: str, **kwargs) -> ShareClient: ...
def delete_share(self, share_name: str, **kwargs) -> None: ...
def list_shares(self, **kwargs) -> ItemPaged[ShareProperties]: ...
class ShareDirectoryClient:
"""Client for directory operations within a file share."""
def create_directory(self, **kwargs) -> dict[str, Any]: ...
def delete_directory(self, **kwargs) -> None: ...
def list_directories_and_files(self, **kwargs) -> ItemPaged[dict[str, Any]]: ...
class ShareFileClient:
"""Client for file operations within a file share."""
def upload_file(self, data: Any, **kwargs) -> dict[str, Any]: ...
def download_file(self, **kwargs) -> StorageStreamDownloader: ...
def delete_file(self, **kwargs) -> None: ...
class FileProperties:
"""Properties of a file in Azure File Share."""
name: str
size: int
last_modified: datetime
content_type: strInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-microsoft-azure