Apache Airflow backport provider package for Apache Sqoop integration, providing SqoopHook and SqoopOperator for data import/export between relational databases and Hadoop
npx @tessl/cli install tessl/pypi-apache-airflow-backport-providers-apache-sqoop@2021.3.0Apache Airflow backport provider package for Apache Sqoop integration, providing SqoopHook and SqoopOperator for data import/export between relational databases and Hadoop. This package enables efficient bulk data transfer using Apache Sqoop within Airflow workflows, supporting various data formats and export/import operations.
pip install apache-airflow-backport-providers-apache-sqoopfrom airflow.providers.apache.sqoop.hooks.sqoop import SqoopHook
from airflow.providers.apache.sqoop.operators.sqoop import SqoopOperatorfrom airflow import DAG
from airflow.providers.apache.sqoop.operators.sqoop import SqoopOperator
from datetime import datetime, timedelta
# Define DAG
dag = DAG(
'sqoop_example',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5)
},
schedule_interval=timedelta(days=1)
)
# Import table from database to HDFS
import_task = SqoopOperator(
task_id='import_table',
conn_id='sqoop_default',
cmd_type='import',
table='customers',
target_dir='/user/hive/warehouse/customers',
file_type='text',
num_mappers=4,
dag=dag
)
# Export data from HDFS to database
export_task = SqoopOperator(
task_id='export_data',
conn_id='sqoop_default',
cmd_type='export',
table='processed_customers',
export_dir='/user/hive/warehouse/processed_customers',
dag=dag
)
import_task >> export_taskThe package provides two main components:
Both components support the full range of Sqoop operations including table imports, query-based imports, table exports, and various data format options (text, Avro, Sequence, Parquet).
Core hook class that manages Sqoop connections and executes Sqoop commands through subprocess calls.
class SqoopHook(BaseHook):
"""
Hook for executing Apache Sqoop commands.
Args:
conn_id (str): Reference to the sqoop connection (default: 'sqoop_default')
verbose (bool): Set sqoop to verbose mode (default: False)
num_mappers (Optional[int]): Number of map tasks to import in parallel (default: None)
hcatalog_database (Optional[str]): HCatalog database name (default: None)
hcatalog_table (Optional[str]): HCatalog table name (default: None)
properties (Optional[Dict[str, Any]]): Properties to set via -D argument (default: None)
"""
conn_name_attr: str = 'conn_id'
default_conn_name: str = 'sqoop_default'
conn_type: str = 'sqoop'
hook_name: str = 'Sqoop'
def __init__(
self,
conn_id: str = default_conn_name,
verbose: bool = False,
num_mappers: Optional[int] = None,
hcatalog_database: Optional[str] = None,
hcatalog_table: Optional[str] = None,
properties: Optional[Dict[str, Any]] = None,
) -> None: ...
def get_conn(self) -> Any:
"""Returns the connection object."""
def cmd_mask_password(self, cmd_orig: List[str]) -> List[str]:
"""
Mask command password for safety.
Args:
cmd_orig (List[str]): Original command list
Returns:
List[str]: Command with password masked
"""
def popen(self, cmd: List[str], **kwargs: Any) -> None:
"""
Execute remote command via subprocess.
Args:
cmd (List[str]): Command to remotely execute
**kwargs: Extra arguments to Popen (see subprocess.Popen)
Raises:
AirflowException: If sqoop command fails
"""Methods for importing data from relational databases to HDFS.
def import_table(
self,
table: str,
target_dir: Optional[str] = None,
append: bool = False,
file_type: str = "text",
columns: Optional[str] = None,
split_by: Optional[str] = None,
where: Optional[str] = None,
direct: bool = False,
driver: Any = None,
extra_import_options: Optional[Dict[str, Any]] = None,
) -> Any:
"""
Import table from remote database to HDFS.
Args:
table (str): Table to read
target_dir (Optional[str]): HDFS destination directory
append (bool): Append data to existing dataset in HDFS (default: False)
file_type (str): Output format - 'avro', 'sequence', 'text', or 'parquet' (default: 'text')
columns (Optional[str]): Comma-separated columns to import from table
split_by (Optional[str]): Column of the table used to split work units
where (Optional[str]): WHERE clause to use during import
direct (bool): Use direct connector if exists for the database (default: False)
driver (Any): Manually specify JDBC driver class to use
extra_import_options (Optional[Dict[str, Any]]): Extra import options as dict
Returns:
Any: Import operation result
"""
def import_query(
self,
query: str,
target_dir: Optional[str] = None,
append: bool = False,
file_type: str = "text",
split_by: Optional[str] = None,
direct: Optional[bool] = None,
driver: Optional[Any] = None,
extra_import_options: Optional[Dict[str, Any]] = None,
) -> Any:
"""
Import specific query results from RDBMS to HDFS.
Args:
query (str): Free format query to run
target_dir (Optional[str]): HDFS destination directory
append (bool): Append data to existing dataset in HDFS (default: False)
file_type (str): Output format - 'avro', 'sequence', 'text', or 'parquet' (default: 'text')
split_by (Optional[str]): Column of the table used to split work units
direct (Optional[bool]): Use direct import fast path
driver (Optional[Any]): Manually specify JDBC driver class to use
extra_import_options (Optional[Dict[str, Any]]): Extra import options as dict
Returns:
Any: Import operation result
"""Methods for exporting data from HDFS to relational databases.
def export_table(
self,
table: str,
export_dir: Optional[str] = None,
input_null_string: Optional[str] = None,
input_null_non_string: Optional[str] = None,
staging_table: Optional[str] = None,
clear_staging_table: bool = False,
enclosed_by: Optional[str] = None,
escaped_by: Optional[str] = None,
input_fields_terminated_by: Optional[str] = None,
input_lines_terminated_by: Optional[str] = None,
input_optionally_enclosed_by: Optional[str] = None,
batch: bool = False,
relaxed_isolation: bool = False,
extra_export_options: Optional[Dict[str, Any]] = None,
) -> None:
"""
Export Hive table to remote database.
Args:
table (str): Table remote destination
export_dir (Optional[str]): Hive table to export
input_null_string (Optional[str]): String to be interpreted as null for string columns
input_null_non_string (Optional[str]): String to be interpreted as null for non-string columns
staging_table (Optional[str]): Table for staging data before insertion
clear_staging_table (bool): Indicate that staging table data can be deleted (default: False)
enclosed_by (Optional[str]): Sets required field enclosing character
escaped_by (Optional[str]): Sets the escape character
input_fields_terminated_by (Optional[str]): Sets the field separator character
input_lines_terminated_by (Optional[str]): Sets the end-of-line character
input_optionally_enclosed_by (Optional[str]): Sets field enclosing character
batch (bool): Use batch mode for underlying statement execution (default: False)
relaxed_isolation (bool): Transaction isolation to read uncommitted for mappers (default: False)
extra_export_options (Optional[Dict[str, Any]]): Extra export options as dict
"""High-level operator for integrating Sqoop operations into Airflow DAGs.
class SqoopOperator(BaseOperator):
"""
Execute a Sqoop job within an Airflow DAG.
Args:
conn_id (str): Connection ID (default: 'sqoop_default')
cmd_type (str): Command type - 'export' or 'import' (default: 'import')
table (Optional[str]): Table to read
query (Optional[str]): Import result of arbitrary SQL query
target_dir (Optional[str]): HDFS destination directory
append (bool): Append data to existing dataset in HDFS (default: False)
file_type (str): Output format - 'avro', 'sequence', 'text', or 'parquet' (default: 'text')
columns (Optional[str]): Comma-separated columns to import
num_mappers (Optional[int]): Number of mapper tasks for parallel processing
split_by (Optional[str]): Column used to split work units
where (Optional[str]): WHERE clause for import
export_dir (Optional[str]): HDFS Hive database directory to export
input_null_string (Optional[str]): String interpreted as null for string columns
input_null_non_string (Optional[str]): String interpreted as null for non-string columns
staging_table (Optional[str]): Table for staging data before insertion
clear_staging_table (bool): Clear staging table data (default: False)
enclosed_by (Optional[str]): Required field enclosing character
escaped_by (Optional[str]): Escape character
input_fields_terminated_by (Optional[str]): Input field separator
input_lines_terminated_by (Optional[str]): Input end-of-line character
input_optionally_enclosed_by (Optional[str]): Field enclosing character
batch (bool): Use batch mode for statement execution (default: False)
direct (bool): Use direct export fast path (default: False)
driver (Optional[Any]): Manually specify JDBC driver class
verbose (bool): Switch to verbose logging for debug purposes (default: False)
relaxed_isolation (bool): Use read uncommitted isolation level (default: False)
hcatalog_database (Optional[str]): HCatalog database name
hcatalog_table (Optional[str]): HCatalog table name
create_hcatalog_table (bool): Have sqoop create the hcatalog table (default: False)
properties (Optional[Dict[str, Any]]): Additional JVM properties passed to sqoop
extra_import_options (Optional[Dict[str, Any]]): Extra import options as dict
extra_export_options (Optional[Dict[str, Any]]): Extra export options as dict
**kwargs: Additional arguments passed to BaseOperator
"""
template_fields = (
'conn_id', 'cmd_type', 'table', 'query', 'target_dir', 'file_type',
'columns', 'split_by', 'where', 'export_dir', 'input_null_string',
'input_null_non_string', 'staging_table', 'enclosed_by', 'escaped_by',
'input_fields_terminated_by', 'input_lines_terminated_by',
'input_optionally_enclosed_by', 'properties', 'extra_import_options',
'driver', 'extra_export_options', 'hcatalog_database', 'hcatalog_table'
)
ui_color = '#7D8CA4'
@apply_defaults
def __init__(
self,
*,
conn_id: str = 'sqoop_default',
cmd_type: str = 'import',
table: Optional[str] = None,
query: Optional[str] = None,
target_dir: Optional[str] = None,
append: bool = False,
file_type: str = 'text',
columns: Optional[str] = None,
num_mappers: Optional[int] = None,
split_by: Optional[str] = None,
where: Optional[str] = None,
export_dir: Optional[str] = None,
input_null_string: Optional[str] = None,
input_null_non_string: Optional[str] = None,
staging_table: Optional[str] = None,
clear_staging_table: bool = False,
enclosed_by: Optional[str] = None,
escaped_by: Optional[str] = None,
input_fields_terminated_by: Optional[str] = None,
input_lines_terminated_by: Optional[str] = None,
input_optionally_enclosed_by: Optional[str] = None,
batch: bool = False,
direct: bool = False,
driver: Optional[Any] = None,
verbose: bool = False,
relaxed_isolation: bool = False,
properties: Optional[Dict[str, Any]] = None,
hcatalog_database: Optional[str] = None,
hcatalog_table: Optional[str] = None,
create_hcatalog_table: bool = False,
extra_import_options: Optional[Dict[str, Any]] = None,
extra_export_options: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> None: ...
def execute(self, context: Dict[str, Any]) -> None:
"""Execute sqoop job based on cmd_type (import or export)."""
def on_kill(self) -> None:
"""Handle task termination by sending SIGTERM to sqoop subprocess."""from typing import Any, Dict, List, Optional, Tuple
from airflow.hooks.base import BaseHook
from airflow.models import BaseOperator
from airflow.exceptions import AirflowException
from airflow.utils.decorators import apply_defaults
# Connection parameter types used in extra JSON field
ConnectionExtraParams = Dict[str, Any] # Includes job_tracker, namenode, libjars, files, archives, password_file
# File format options for import/export operations
FileType = str # 'text', 'avro', 'sequence', 'parquet'
# Command type options for SqoopOperator
CommandType = str # 'import', 'export'
# Extra options dictionaries for additional Sqoop parameters
ExtraOptions = Dict[str, Any] # Key-value pairs for additional Sqoop command optionsThe package uses Airflow connections with the following configuration:
Connection Parameters:
host: Database hostport: Database portschema: Database schema/namelogin: Database usernamepassword: Database passwordExtra JSON Parameters:
job_tracker: Job tracker local|jobtracker:portnamenode: Namenode configurationlibjars: Comma separated jar files to include in classpathfiles: Comma separated files to be copied to map reduce clusterarchives: Comma separated archives to be unarchived on compute machinespassword_file: Path to file containing the passwordThe package raises AirflowException in the following cases:
from airflow.providers.apache.sqoop.operators.sqoop import SqoopOperator
# Import entire table
import_task = SqoopOperator(
task_id='import_customers',
conn_id='mysql_default',
cmd_type='import',
table='customers',
target_dir='/user/hive/warehouse/customers',
file_type='avro',
num_mappers=4
)# Import with custom query and conditions
import_query_task = SqoopOperator(
task_id='import_filtered_orders',
conn_id='postgres_default',
cmd_type='import',
query="SELECT * FROM orders WHERE order_date >= '2023-01-01' AND \\$CONDITIONS",
target_dir='/user/hive/warehouse/recent_orders',
file_type='parquet',
split_by='order_id',
num_mappers=8
)# Export processed data back to database
export_task = SqoopOperator(
task_id='export_aggregated_data',
conn_id='mysql_default',
cmd_type='export',
table='customer_summary',
export_dir='/user/hive/warehouse/processed_customers',
input_fields_terminated_by=',',
batch=True
)# Import with HCatalog table creation
hcatalog_import = SqoopOperator(
task_id='import_to_hcatalog',
conn_id='oracle_default',
cmd_type='import',
table='products',
hcatalog_database='retail',
hcatalog_table='products',
create_hcatalog_table=True,
file_type='avro',
extra_import_options={
'map-column-java': 'price=String',
'null-string': '\\\\N',
'null-non-string': '\\\\N'
}
)