CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-apache-hdfs

Provider package for Apache Airflow integration with Hadoop Distributed File System (HDFS) and WebHDFS operations

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

monitoring-sensors.mddocs/

File System Monitoring

Sensors for monitoring HDFS file system states and coordinating workflow execution based on file availability. These sensors enable event-driven data pipelines that wait for specific files or file sets before proceeding with downstream processing.

Capabilities

Single File Monitoring

Wait for a specific file or directory to appear in HDFS before proceeding with downstream tasks.

class WebHdfsSensor:
    """
    Sensor that waits for a file or folder to land in HDFS.
    
    Attributes:
        template_fields: Sequence of template fields ("filepath",)
    """
    
    def __init__(
        self, 
        *, 
        filepath: str, 
        webhdfs_conn_id: str = "webhdfs_default", 
        **kwargs
    ) -> None:
        """
        Initialize WebHDFS sensor for single file monitoring.
        
        Parameters:
            filepath: The path to monitor in HDFS
            webhdfs_conn_id: The connection id for the webhdfs client
            **kwargs: Additional sensor parameters (poke_interval, timeout, etc.)
        """
    
    def poke(self, context) -> bool:
        """
        Check if the filepath exists in HDFS.
        
        Parameters:
            context: Airflow task context
            
        Returns:
            bool: True if file exists, False otherwise
        """

Multiple Files Monitoring

Wait for multiple specific files to appear in a directory before proceeding, useful for batch processing scenarios where multiple input files are required.

class MultipleFilesWebHdfsSensor:
    """
    Sensor that waits for multiple files in a folder to land in HDFS.
    
    Attributes:
        template_fields: Sequence of template fields ("directory_path", "expected_filenames")
    """
    
    def __init__(
        self,
        *,
        directory_path: str,
        expected_filenames: Sequence[str],
        webhdfs_conn_id: str = "webhdfs_default",
        **kwargs
    ) -> None:
        """
        Initialize WebHDFS sensor for multiple files monitoring.
        
        Parameters:
            directory_path: The directory path to monitor in HDFS
            expected_filenames: Sequence of expected filenames to wait for
            webhdfs_conn_id: The connection id for the webhdfs client
            **kwargs: Additional sensor parameters (poke_interval, timeout, etc.)
        """
    
    def poke(self, context) -> bool:
        """
        Check if all expected files exist in the directory.
        
        Parameters:
            context: Airflow task context
            
        Returns:
            bool: True if all expected files exist, False if any are missing
        """

Usage Examples

Basic File Sensor

Wait for a single file to appear in HDFS:

from airflow import DAG
from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def process_data():
    print("File is ready, processing data...")

dag = DAG(
    'hdfs_file_sensor_example',
    default_args={
        'owner': 'data_team',
        'depends_on_past': False,
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5)
    },
    description='Wait for HDFS file and process',
    schedule_interval=timedelta(hours=1),
    start_date=datetime(2024, 1, 1),
    catchup=False
)

# Sensor task
file_sensor = WebHdfsSensor(
    task_id='wait_for_input_file',
    filepath='/data/input/daily_sales_{{ ds }}.csv',  # Templated filepath
    webhdfs_conn_id='production_hdfs',
    poke_interval=60,  # Check every 60 seconds
    timeout=3600,      # Timeout after 1 hour
    dag=dag
)

# Processing task
process_task = PythonOperator(
    task_id='process_sales_data',
    python_callable=process_data,
    dag=dag
)

file_sensor >> process_task

Multiple Files Sensor

Wait for multiple files to appear in a directory:

from airflow import DAG
from airflow.providers.apache.hdfs.sensors.web_hdfs import MultipleFilesWebHdfsSensor
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

dag = DAG(
    'hdfs_multiple_files_sensor',
    default_args={'owner': 'data_team'},
    description='Wait for multiple HDFS files',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False
)

# Wait for multiple required files
files_sensor = MultipleFilesWebHdfsSensor(
    task_id='wait_for_batch_files',
    directory_path='/data/batch/{{ ds }}/',
    expected_filenames=[
        'transactions.parquet',
        'customers.parquet', 
        'products.parquet',
        'inventory.parquet'
    ],
    webhdfs_conn_id='batch_hdfs',
    poke_interval=120,  # Check every 2 minutes
    timeout=7200,       # Timeout after 2 hours
    dag=dag
)

# Start batch processing when all files are ready
batch_process = BashOperator(
    task_id='start_batch_processing',
    bash_command='spark-submit /scripts/batch_processing.py --date {{ ds }}',
    dag=dag
)

files_sensor >> batch_process

Advanced Sensor Configuration

Configure sensors with custom retry logic and failure handling:

from airflow import DAG
from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor
from airflow.operators.python import PythonOperator
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime, timedelta

def handle_missing_file():
    print("File was not found within timeout period")
    # Implement fallback logic or notifications

def process_when_ready():
    print("File found, proceeding with processing")

dag = DAG(
    'robust_hdfs_sensor',
    default_args={'owner': 'data_team'},
    start_date=datetime(2024, 1, 1),
    schedule_interval='@hourly'
)

# Primary sensor with shorter timeout
primary_sensor = WebHdfsSensor(
    task_id='wait_for_primary_file',
    filepath='/data/primary/hourly_{{ ts_nodash }}.json',
    webhdfs_conn_id='primary_hdfs',
    poke_interval=30,
    timeout=1800,  # 30 minutes
    soft_fail=True,  # Don't fail the DAG if timeout
    dag=dag
)

# Fallback sensor for backup location
fallback_sensor = WebHdfsSensor(
    task_id='wait_for_backup_file',
    filepath='/data/backup/hourly_{{ ts_nodash }}.json',
    webhdfs_conn_id='backup_hdfs',
    poke_interval=60,
    timeout=900,   # 15 minutes
    trigger_rule=TriggerRule.ALL_FAILED,  # Only run if primary fails
    dag=dag
)

# Processing task that runs if either sensor succeeds
process_task = PythonOperator(
    task_id='process_data',
    python_callable=process_when_ready,
    trigger_rule=TriggerRule.ONE_SUCCESS,
    dag=dag
)

# Cleanup task for failed scenarios
cleanup_task = PythonOperator(
    task_id='handle_missing_files',
    python_callable=handle_missing_file,
    trigger_rule=TriggerRule.ALL_FAILED,
    dag=dag
)

# Task dependencies
[primary_sensor, fallback_sensor] >> process_task
[primary_sensor, fallback_sensor] >> cleanup_task

Sensor with Dynamic File Patterns

Use templated filepaths for dynamic file monitoring:

from airflow import DAG
from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor
from datetime import datetime, timedelta

dag = DAG(
    'dynamic_hdfs_sensor',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily'
)

# Sensor with Jinja templating
dynamic_sensor = WebHdfsSensor(
    task_id='wait_for_dated_file',
    # Wait for file with current date in path and filename
    filepath='/warehouse/{{ macros.ds_format(ds, "%Y-%m-%d", "%Y/%m/%d") }}/data_{{ ds_nodash }}.parquet',
    webhdfs_conn_id='warehouse_hdfs',
    poke_interval=300,  # 5 minutes
    timeout=14400,      # 4 hours
    dag=dag
)

# Multiple sensors for different file types
sensor_configs = [
    {'name': 'transactions', 'path': '/raw/transactions/{{ ds }}/'},
    {'name': 'customers', 'path': '/raw/customers/{{ ds }}/'},
    {'name': 'products', 'path': '/raw/products/{{ ds }}/'}
]

sensors = []
for config in sensor_configs:
    sensor = WebHdfsSensor(
        task_id=f'wait_for_{config["name"]}_files',
        filepath=f'{config["path"]}_SUCCESS',  # Wait for success marker
        webhdfs_conn_id='data_lake_hdfs',
        poke_interval=120,
        timeout=3600,
        dag=dag
    )
    sensors.append(sensor)

# All sensors must complete before downstream processing
from airflow.operators.dummy import DummyOperator

all_ready = DummyOperator(
    task_id='all_files_ready',
    trigger_rule=TriggerRule.ALL_SUCCESS,
    dag=dag
)

sensors >> all_ready

Integration Patterns

Combining with Hook Operations

Use sensors to trigger hook-based file operations:

from airflow import DAG
from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor
from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook
from airflow.operators.python import PythonOperator
from datetime import datetime

def copy_and_process():
    """Copy file from input to processing directory and validate."""
    hook = WebHDFSHook(webhdfs_conn_id='main_hdfs')
    
    input_path = '/input/raw_data.csv'
    processing_path = '/processing/raw_data.csv'
    
    # Read from input location
    data = hook.read_file(input_path)
    
    # Write to temporary file for processing
    with open('/tmp/processing_data.csv', 'wb') as f:
        f.write(data)
    
    # Upload to processing directory
    hook.load_file('/tmp/processing_data.csv', processing_path)
    
    print(f"File copied and ready for processing: {processing_path}")

dag = DAG('sensor_hook_integration', start_date=datetime(2024, 1, 1))

# Wait for input file
sensor = WebHdfsSensor(
    task_id='wait_for_input',
    filepath='/input/raw_data.csv',
    webhdfs_conn_id='main_hdfs',
    dag=dag
)

# Copy and prepare for processing
copy_task = PythonOperator(
    task_id='copy_and_process',
    python_callable=copy_and_process,
    dag=dag
)

sensor >> copy_task

Custom Sensor Logic

Extend sensors for custom monitoring logic:

from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor
from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook
from airflow.utils.context import Context

class FileSizeWebHdfsSensor(WebHdfsSensor):
    """Custom sensor that checks both file existence and minimum size."""
    
    def __init__(self, min_size_bytes: int = 0, **kwargs):
        super().__init__(**kwargs)
        self.min_size_bytes = min_size_bytes
    
    def poke(self, context: Context) -> bool:
        """Check if file exists and meets minimum size requirement."""
        hook = WebHDFSHook(self.webhdfs_conn_id)
        
        # Check if file exists
        if not hook.check_for_path(self.filepath):
            self.log.info(f"File {self.filepath} does not exist yet")
            return False
        
        # Check file size if minimum size specified
        if self.min_size_bytes > 0:
            client = hook.get_conn()
            file_status = client.status(self.filepath)
            file_size = file_status.get('length', 0)
            
            if file_size < self.min_size_bytes:
                self.log.info(f"File {self.filepath} exists but size {file_size} < {self.min_size_bytes} bytes")
                return False
        
        self.log.info(f"File {self.filepath} exists and meets size requirements")
        return True

# Usage
custom_sensor = FileSizeWebHdfsSensor(
    task_id='wait_for_large_file',
    filepath='/data/large_dataset.parquet',
    min_size_bytes=1024 * 1024,  # Minimum 1MB
    webhdfs_conn_id='data_hdfs',
    poke_interval=60,
    timeout=3600
)

Sensor Configuration Best Practices

Optimal Polling Configuration

# Short-lived files (expected within minutes)
quick_sensor = WebHdfsSensor(
    task_id='wait_for_quick_file',
    filepath='/tmp/quick_process.flag',
    poke_interval=10,   # Check every 10 seconds
    timeout=300,        # 5 minute timeout
    dag=dag
)

# Regular batch files (expected within hours)
batch_sensor = WebHdfsSensor(
    task_id='wait_for_batch_file',
    filepath='/batch/daily_extract.csv',
    poke_interval=300,  # Check every 5 minutes
    timeout=14400,      # 4 hour timeout
    dag=dag
)

# Large ETL files (expected within day)
etl_sensor = WebHdfsSensor(
    task_id='wait_for_etl_file',
    filepath='/warehouse/etl_complete.marker',
    poke_interval=1800, # Check every 30 minutes
    timeout=86400,      # 24 hour timeout
    dag=dag
)

Error Handling and Monitoring

from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor
from airflow.utils.email import send_email
from airflow.operators.python import PythonOperator

def send_timeout_notification(context):
    """Send notification when sensor times out."""
    task_instance = context['task_instance']
    send_email(
        to=['data-team@company.com'],
        subject=f'HDFS Sensor Timeout: {task_instance.task_id}',
        html_content=f'Sensor {task_instance.task_id} timed out waiting for file.'
    )

monitored_sensor = WebHdfsSensor(
    task_id='monitored_file_sensor',
    filepath='/critical/daily_report.csv',
    webhdfs_conn_id='production_hdfs',
    poke_interval=120,
    timeout=7200,
    on_failure_callback=send_timeout_notification,
    dag=dag
)

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-apache-hdfs

docs

index.md

logging-integration.md

monitoring-sensors.md

webhdfs-operations.md

tile.json