Provider package for Apache Airflow integration with Hadoop Distributed File System (HDFS) and WebHDFS operations
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Sensors for monitoring HDFS file system states and coordinating workflow execution based on file availability. These sensors enable event-driven data pipelines that wait for specific files or file sets before proceeding with downstream processing.
Wait for a specific file or directory to appear in HDFS before proceeding with downstream tasks.
class WebHdfsSensor:
"""
Sensor that waits for a file or folder to land in HDFS.
Attributes:
template_fields: Sequence of template fields ("filepath",)
"""
def __init__(
self,
*,
filepath: str,
webhdfs_conn_id: str = "webhdfs_default",
**kwargs
) -> None:
"""
Initialize WebHDFS sensor for single file monitoring.
Parameters:
filepath: The path to monitor in HDFS
webhdfs_conn_id: The connection id for the webhdfs client
**kwargs: Additional sensor parameters (poke_interval, timeout, etc.)
"""
def poke(self, context) -> bool:
"""
Check if the filepath exists in HDFS.
Parameters:
context: Airflow task context
Returns:
bool: True if file exists, False otherwise
"""Wait for multiple specific files to appear in a directory before proceeding, useful for batch processing scenarios where multiple input files are required.
class MultipleFilesWebHdfsSensor:
"""
Sensor that waits for multiple files in a folder to land in HDFS.
Attributes:
template_fields: Sequence of template fields ("directory_path", "expected_filenames")
"""
def __init__(
self,
*,
directory_path: str,
expected_filenames: Sequence[str],
webhdfs_conn_id: str = "webhdfs_default",
**kwargs
) -> None:
"""
Initialize WebHDFS sensor for multiple files monitoring.
Parameters:
directory_path: The directory path to monitor in HDFS
expected_filenames: Sequence of expected filenames to wait for
webhdfs_conn_id: The connection id for the webhdfs client
**kwargs: Additional sensor parameters (poke_interval, timeout, etc.)
"""
def poke(self, context) -> bool:
"""
Check if all expected files exist in the directory.
Parameters:
context: Airflow task context
Returns:
bool: True if all expected files exist, False if any are missing
"""Wait for a single file to appear in HDFS:
from airflow import DAG
from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def process_data():
print("File is ready, processing data...")
dag = DAG(
'hdfs_file_sensor_example',
default_args={
'owner': 'data_team',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
},
description='Wait for HDFS file and process',
schedule_interval=timedelta(hours=1),
start_date=datetime(2024, 1, 1),
catchup=False
)
# Sensor task
file_sensor = WebHdfsSensor(
task_id='wait_for_input_file',
filepath='/data/input/daily_sales_{{ ds }}.csv', # Templated filepath
webhdfs_conn_id='production_hdfs',
poke_interval=60, # Check every 60 seconds
timeout=3600, # Timeout after 1 hour
dag=dag
)
# Processing task
process_task = PythonOperator(
task_id='process_sales_data',
python_callable=process_data,
dag=dag
)
file_sensor >> process_taskWait for multiple files to appear in a directory:
from airflow import DAG
from airflow.providers.apache.hdfs.sensors.web_hdfs import MultipleFilesWebHdfsSensor
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
dag = DAG(
'hdfs_multiple_files_sensor',
default_args={'owner': 'data_team'},
description='Wait for multiple HDFS files',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False
)
# Wait for multiple required files
files_sensor = MultipleFilesWebHdfsSensor(
task_id='wait_for_batch_files',
directory_path='/data/batch/{{ ds }}/',
expected_filenames=[
'transactions.parquet',
'customers.parquet',
'products.parquet',
'inventory.parquet'
],
webhdfs_conn_id='batch_hdfs',
poke_interval=120, # Check every 2 minutes
timeout=7200, # Timeout after 2 hours
dag=dag
)
# Start batch processing when all files are ready
batch_process = BashOperator(
task_id='start_batch_processing',
bash_command='spark-submit /scripts/batch_processing.py --date {{ ds }}',
dag=dag
)
files_sensor >> batch_processConfigure sensors with custom retry logic and failure handling:
from airflow import DAG
from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor
from airflow.operators.python import PythonOperator
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime, timedelta
def handle_missing_file():
print("File was not found within timeout period")
# Implement fallback logic or notifications
def process_when_ready():
print("File found, proceeding with processing")
dag = DAG(
'robust_hdfs_sensor',
default_args={'owner': 'data_team'},
start_date=datetime(2024, 1, 1),
schedule_interval='@hourly'
)
# Primary sensor with shorter timeout
primary_sensor = WebHdfsSensor(
task_id='wait_for_primary_file',
filepath='/data/primary/hourly_{{ ts_nodash }}.json',
webhdfs_conn_id='primary_hdfs',
poke_interval=30,
timeout=1800, # 30 minutes
soft_fail=True, # Don't fail the DAG if timeout
dag=dag
)
# Fallback sensor for backup location
fallback_sensor = WebHdfsSensor(
task_id='wait_for_backup_file',
filepath='/data/backup/hourly_{{ ts_nodash }}.json',
webhdfs_conn_id='backup_hdfs',
poke_interval=60,
timeout=900, # 15 minutes
trigger_rule=TriggerRule.ALL_FAILED, # Only run if primary fails
dag=dag
)
# Processing task that runs if either sensor succeeds
process_task = PythonOperator(
task_id='process_data',
python_callable=process_when_ready,
trigger_rule=TriggerRule.ONE_SUCCESS,
dag=dag
)
# Cleanup task for failed scenarios
cleanup_task = PythonOperator(
task_id='handle_missing_files',
python_callable=handle_missing_file,
trigger_rule=TriggerRule.ALL_FAILED,
dag=dag
)
# Task dependencies
[primary_sensor, fallback_sensor] >> process_task
[primary_sensor, fallback_sensor] >> cleanup_taskUse templated filepaths for dynamic file monitoring:
from airflow import DAG
from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor
from datetime import datetime, timedelta
dag = DAG(
'dynamic_hdfs_sensor',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily'
)
# Sensor with Jinja templating
dynamic_sensor = WebHdfsSensor(
task_id='wait_for_dated_file',
# Wait for file with current date in path and filename
filepath='/warehouse/{{ macros.ds_format(ds, "%Y-%m-%d", "%Y/%m/%d") }}/data_{{ ds_nodash }}.parquet',
webhdfs_conn_id='warehouse_hdfs',
poke_interval=300, # 5 minutes
timeout=14400, # 4 hours
dag=dag
)
# Multiple sensors for different file types
sensor_configs = [
{'name': 'transactions', 'path': '/raw/transactions/{{ ds }}/'},
{'name': 'customers', 'path': '/raw/customers/{{ ds }}/'},
{'name': 'products', 'path': '/raw/products/{{ ds }}/'}
]
sensors = []
for config in sensor_configs:
sensor = WebHdfsSensor(
task_id=f'wait_for_{config["name"]}_files',
filepath=f'{config["path"]}_SUCCESS', # Wait for success marker
webhdfs_conn_id='data_lake_hdfs',
poke_interval=120,
timeout=3600,
dag=dag
)
sensors.append(sensor)
# All sensors must complete before downstream processing
from airflow.operators.dummy import DummyOperator
all_ready = DummyOperator(
task_id='all_files_ready',
trigger_rule=TriggerRule.ALL_SUCCESS,
dag=dag
)
sensors >> all_readyUse sensors to trigger hook-based file operations:
from airflow import DAG
from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor
from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook
from airflow.operators.python import PythonOperator
from datetime import datetime
def copy_and_process():
"""Copy file from input to processing directory and validate."""
hook = WebHDFSHook(webhdfs_conn_id='main_hdfs')
input_path = '/input/raw_data.csv'
processing_path = '/processing/raw_data.csv'
# Read from input location
data = hook.read_file(input_path)
# Write to temporary file for processing
with open('/tmp/processing_data.csv', 'wb') as f:
f.write(data)
# Upload to processing directory
hook.load_file('/tmp/processing_data.csv', processing_path)
print(f"File copied and ready for processing: {processing_path}")
dag = DAG('sensor_hook_integration', start_date=datetime(2024, 1, 1))
# Wait for input file
sensor = WebHdfsSensor(
task_id='wait_for_input',
filepath='/input/raw_data.csv',
webhdfs_conn_id='main_hdfs',
dag=dag
)
# Copy and prepare for processing
copy_task = PythonOperator(
task_id='copy_and_process',
python_callable=copy_and_process,
dag=dag
)
sensor >> copy_taskExtend sensors for custom monitoring logic:
from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor
from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook
from airflow.utils.context import Context
class FileSizeWebHdfsSensor(WebHdfsSensor):
"""Custom sensor that checks both file existence and minimum size."""
def __init__(self, min_size_bytes: int = 0, **kwargs):
super().__init__(**kwargs)
self.min_size_bytes = min_size_bytes
def poke(self, context: Context) -> bool:
"""Check if file exists and meets minimum size requirement."""
hook = WebHDFSHook(self.webhdfs_conn_id)
# Check if file exists
if not hook.check_for_path(self.filepath):
self.log.info(f"File {self.filepath} does not exist yet")
return False
# Check file size if minimum size specified
if self.min_size_bytes > 0:
client = hook.get_conn()
file_status = client.status(self.filepath)
file_size = file_status.get('length', 0)
if file_size < self.min_size_bytes:
self.log.info(f"File {self.filepath} exists but size {file_size} < {self.min_size_bytes} bytes")
return False
self.log.info(f"File {self.filepath} exists and meets size requirements")
return True
# Usage
custom_sensor = FileSizeWebHdfsSensor(
task_id='wait_for_large_file',
filepath='/data/large_dataset.parquet',
min_size_bytes=1024 * 1024, # Minimum 1MB
webhdfs_conn_id='data_hdfs',
poke_interval=60,
timeout=3600
)# Short-lived files (expected within minutes)
quick_sensor = WebHdfsSensor(
task_id='wait_for_quick_file',
filepath='/tmp/quick_process.flag',
poke_interval=10, # Check every 10 seconds
timeout=300, # 5 minute timeout
dag=dag
)
# Regular batch files (expected within hours)
batch_sensor = WebHdfsSensor(
task_id='wait_for_batch_file',
filepath='/batch/daily_extract.csv',
poke_interval=300, # Check every 5 minutes
timeout=14400, # 4 hour timeout
dag=dag
)
# Large ETL files (expected within day)
etl_sensor = WebHdfsSensor(
task_id='wait_for_etl_file',
filepath='/warehouse/etl_complete.marker',
poke_interval=1800, # Check every 30 minutes
timeout=86400, # 24 hour timeout
dag=dag
)from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor
from airflow.utils.email import send_email
from airflow.operators.python import PythonOperator
def send_timeout_notification(context):
"""Send notification when sensor times out."""
task_instance = context['task_instance']
send_email(
to=['data-team@company.com'],
subject=f'HDFS Sensor Timeout: {task_instance.task_id}',
html_content=f'Sensor {task_instance.task_id} timed out waiting for file.'
)
monitored_sensor = WebHdfsSensor(
task_id='monitored_file_sensor',
filepath='/critical/daily_report.csv',
webhdfs_conn_id='production_hdfs',
poke_interval=120,
timeout=7200,
on_failure_callback=send_timeout_notification,
dag=dag
)Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-apache-hdfs