or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/pypi-apache-airflow-backport-providers-apache-sqoop

Apache Airflow backport provider package for Apache Sqoop integration, providing SqoopHook and SqoopOperator for data import/export between relational databases and Hadoop

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-backport-providers-apache-sqoop@2021.3.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-backport-providers-apache-sqoop@2021.3.0

index.mddocs/

Apache Airflow Backport Providers Apache Sqoop

Apache Airflow backport provider package for Apache Sqoop integration, providing SqoopHook and SqoopOperator for data import/export between relational databases and Hadoop. This package enables efficient bulk data transfer using Apache Sqoop within Airflow workflows, supporting various data formats and export/import operations.

Package Information

  • Package Name: apache-airflow-backport-providers-apache-sqoop
  • Package Type: pip
  • Language: Python
  • Installation: pip install apache-airflow-backport-providers-apache-sqoop

Core Imports

from airflow.providers.apache.sqoop.hooks.sqoop import SqoopHook
from airflow.providers.apache.sqoop.operators.sqoop import SqoopOperator

Basic Usage

from airflow import DAG
from airflow.providers.apache.sqoop.operators.sqoop import SqoopOperator
from datetime import datetime, timedelta

# Define DAG
dag = DAG(
    'sqoop_example',
    default_args={
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime(2023, 1, 1),
        'retries': 1,
        'retry_delay': timedelta(minutes=5)
    },
    schedule_interval=timedelta(days=1)
)

# Import table from database to HDFS
import_task = SqoopOperator(
    task_id='import_table',
    conn_id='sqoop_default',
    cmd_type='import',
    table='customers',
    target_dir='/user/hive/warehouse/customers',
    file_type='text',
    num_mappers=4,
    dag=dag
)

# Export data from HDFS to database
export_task = SqoopOperator(
    task_id='export_data',
    conn_id='sqoop_default',
    cmd_type='export',
    table='processed_customers',
    export_dir='/user/hive/warehouse/processed_customers',
    dag=dag
)

import_task >> export_task

Architecture

The package provides two main components:

  • SqoopHook: Low-level interface for executing Sqoop commands, handling connection management, command construction, and subprocess execution
  • SqoopOperator: High-level Airflow operator that wraps SqoopHook functionality for integration into Airflow DAGs

Both components support the full range of Sqoop operations including table imports, query-based imports, table exports, and various data format options (text, Avro, Sequence, Parquet).

Capabilities

SqoopHook - Connection and Command Execution

Core hook class that manages Sqoop connections and executes Sqoop commands through subprocess calls.

class SqoopHook(BaseHook):
    """
    Hook for executing Apache Sqoop commands.
    
    Args:
        conn_id (str): Reference to the sqoop connection (default: 'sqoop_default')
        verbose (bool): Set sqoop to verbose mode (default: False)
        num_mappers (Optional[int]): Number of map tasks to import in parallel (default: None)
        hcatalog_database (Optional[str]): HCatalog database name (default: None)
        hcatalog_table (Optional[str]): HCatalog table name (default: None)
        properties (Optional[Dict[str, Any]]): Properties to set via -D argument (default: None)
    """
    
    conn_name_attr: str = 'conn_id'
    default_conn_name: str = 'sqoop_default'
    conn_type: str = 'sqoop'
    hook_name: str = 'Sqoop'
    
    def __init__(
        self,
        conn_id: str = default_conn_name,
        verbose: bool = False,
        num_mappers: Optional[int] = None,
        hcatalog_database: Optional[str] = None,
        hcatalog_table: Optional[str] = None,
        properties: Optional[Dict[str, Any]] = None,
    ) -> None: ...
    
    def get_conn(self) -> Any:
        """Returns the connection object."""
    
    def cmd_mask_password(self, cmd_orig: List[str]) -> List[str]:
        """
        Mask command password for safety.
        
        Args:
            cmd_orig (List[str]): Original command list
            
        Returns:
            List[str]: Command with password masked
        """
    
    def popen(self, cmd: List[str], **kwargs: Any) -> None:
        """
        Execute remote command via subprocess.
        
        Args:
            cmd (List[str]): Command to remotely execute
            **kwargs: Extra arguments to Popen (see subprocess.Popen)
            
        Raises:
            AirflowException: If sqoop command fails
        """

SqoopHook - Table Import Operations

Methods for importing data from relational databases to HDFS.

def import_table(
    self,
    table: str,
    target_dir: Optional[str] = None,
    append: bool = False,
    file_type: str = "text",
    columns: Optional[str] = None,
    split_by: Optional[str] = None,
    where: Optional[str] = None,
    direct: bool = False,
    driver: Any = None,
    extra_import_options: Optional[Dict[str, Any]] = None,
) -> Any:
    """
    Import table from remote database to HDFS.
    
    Args:
        table (str): Table to read
        target_dir (Optional[str]): HDFS destination directory
        append (bool): Append data to existing dataset in HDFS (default: False)
        file_type (str): Output format - 'avro', 'sequence', 'text', or 'parquet' (default: 'text')
        columns (Optional[str]): Comma-separated columns to import from table
        split_by (Optional[str]): Column of the table used to split work units
        where (Optional[str]): WHERE clause to use during import
        direct (bool): Use direct connector if exists for the database (default: False)
        driver (Any): Manually specify JDBC driver class to use
        extra_import_options (Optional[Dict[str, Any]]): Extra import options as dict
        
    Returns:
        Any: Import operation result
    """

def import_query(
    self,
    query: str,
    target_dir: Optional[str] = None,
    append: bool = False,
    file_type: str = "text",
    split_by: Optional[str] = None,
    direct: Optional[bool] = None,
    driver: Optional[Any] = None,
    extra_import_options: Optional[Dict[str, Any]] = None,
) -> Any:
    """
    Import specific query results from RDBMS to HDFS.
    
    Args:
        query (str): Free format query to run
        target_dir (Optional[str]): HDFS destination directory
        append (bool): Append data to existing dataset in HDFS (default: False)
        file_type (str): Output format - 'avro', 'sequence', 'text', or 'parquet' (default: 'text')
        split_by (Optional[str]): Column of the table used to split work units
        direct (Optional[bool]): Use direct import fast path
        driver (Optional[Any]): Manually specify JDBC driver class to use
        extra_import_options (Optional[Dict[str, Any]]): Extra import options as dict
        
    Returns:
        Any: Import operation result
    """

SqoopHook - Table Export Operations

Methods for exporting data from HDFS to relational databases.

def export_table(
    self,
    table: str,
    export_dir: Optional[str] = None,
    input_null_string: Optional[str] = None,
    input_null_non_string: Optional[str] = None,
    staging_table: Optional[str] = None,
    clear_staging_table: bool = False,
    enclosed_by: Optional[str] = None,
    escaped_by: Optional[str] = None,
    input_fields_terminated_by: Optional[str] = None,
    input_lines_terminated_by: Optional[str] = None,
    input_optionally_enclosed_by: Optional[str] = None,
    batch: bool = False,
    relaxed_isolation: bool = False,
    extra_export_options: Optional[Dict[str, Any]] = None,
) -> None:
    """
    Export Hive table to remote database.
    
    Args:
        table (str): Table remote destination
        export_dir (Optional[str]): Hive table to export
        input_null_string (Optional[str]): String to be interpreted as null for string columns
        input_null_non_string (Optional[str]): String to be interpreted as null for non-string columns
        staging_table (Optional[str]): Table for staging data before insertion
        clear_staging_table (bool): Indicate that staging table data can be deleted (default: False)
        enclosed_by (Optional[str]): Sets required field enclosing character
        escaped_by (Optional[str]): Sets the escape character
        input_fields_terminated_by (Optional[str]): Sets the field separator character
        input_lines_terminated_by (Optional[str]): Sets the end-of-line character
        input_optionally_enclosed_by (Optional[str]): Sets field enclosing character
        batch (bool): Use batch mode for underlying statement execution (default: False)
        relaxed_isolation (bool): Transaction isolation to read uncommitted for mappers (default: False)
        extra_export_options (Optional[Dict[str, Any]]): Extra export options as dict
    """

SqoopOperator - Airflow Integration

High-level operator for integrating Sqoop operations into Airflow DAGs.

class SqoopOperator(BaseOperator):
    """
    Execute a Sqoop job within an Airflow DAG.
    
    Args:
        conn_id (str): Connection ID (default: 'sqoop_default')
        cmd_type (str): Command type - 'export' or 'import' (default: 'import')
        table (Optional[str]): Table to read
        query (Optional[str]): Import result of arbitrary SQL query
        target_dir (Optional[str]): HDFS destination directory
        append (bool): Append data to existing dataset in HDFS (default: False)
        file_type (str): Output format - 'avro', 'sequence', 'text', or 'parquet' (default: 'text')
        columns (Optional[str]): Comma-separated columns to import
        num_mappers (Optional[int]): Number of mapper tasks for parallel processing
        split_by (Optional[str]): Column used to split work units
        where (Optional[str]): WHERE clause for import
        export_dir (Optional[str]): HDFS Hive database directory to export
        input_null_string (Optional[str]): String interpreted as null for string columns
        input_null_non_string (Optional[str]): String interpreted as null for non-string columns
        staging_table (Optional[str]): Table for staging data before insertion
        clear_staging_table (bool): Clear staging table data (default: False)
        enclosed_by (Optional[str]): Required field enclosing character
        escaped_by (Optional[str]): Escape character
        input_fields_terminated_by (Optional[str]): Input field separator
        input_lines_terminated_by (Optional[str]): Input end-of-line character
        input_optionally_enclosed_by (Optional[str]): Field enclosing character
        batch (bool): Use batch mode for statement execution (default: False)
        direct (bool): Use direct export fast path (default: False)
        driver (Optional[Any]): Manually specify JDBC driver class
        verbose (bool): Switch to verbose logging for debug purposes (default: False)
        relaxed_isolation (bool): Use read uncommitted isolation level (default: False)
        hcatalog_database (Optional[str]): HCatalog database name
        hcatalog_table (Optional[str]): HCatalog table name
        create_hcatalog_table (bool): Have sqoop create the hcatalog table (default: False)
        properties (Optional[Dict[str, Any]]): Additional JVM properties passed to sqoop
        extra_import_options (Optional[Dict[str, Any]]): Extra import options as dict
        extra_export_options (Optional[Dict[str, Any]]): Extra export options as dict
        **kwargs: Additional arguments passed to BaseOperator
    """
    
    template_fields = (
        'conn_id', 'cmd_type', 'table', 'query', 'target_dir', 'file_type',
        'columns', 'split_by', 'where', 'export_dir', 'input_null_string',
        'input_null_non_string', 'staging_table', 'enclosed_by', 'escaped_by',
        'input_fields_terminated_by', 'input_lines_terminated_by',
        'input_optionally_enclosed_by', 'properties', 'extra_import_options',
        'driver', 'extra_export_options', 'hcatalog_database', 'hcatalog_table'
    )
    ui_color = '#7D8CA4'
    
    @apply_defaults
    def __init__(
        self,
        *,
        conn_id: str = 'sqoop_default',
        cmd_type: str = 'import',
        table: Optional[str] = None,
        query: Optional[str] = None,
        target_dir: Optional[str] = None,
        append: bool = False,
        file_type: str = 'text',
        columns: Optional[str] = None,
        num_mappers: Optional[int] = None,
        split_by: Optional[str] = None,
        where: Optional[str] = None,
        export_dir: Optional[str] = None,
        input_null_string: Optional[str] = None,
        input_null_non_string: Optional[str] = None,
        staging_table: Optional[str] = None,
        clear_staging_table: bool = False,
        enclosed_by: Optional[str] = None,
        escaped_by: Optional[str] = None,
        input_fields_terminated_by: Optional[str] = None,
        input_lines_terminated_by: Optional[str] = None,
        input_optionally_enclosed_by: Optional[str] = None,
        batch: bool = False,
        direct: bool = False,
        driver: Optional[Any] = None,
        verbose: bool = False,
        relaxed_isolation: bool = False,
        properties: Optional[Dict[str, Any]] = None,
        hcatalog_database: Optional[str] = None,
        hcatalog_table: Optional[str] = None,
        create_hcatalog_table: bool = False,
        extra_import_options: Optional[Dict[str, Any]] = None,
        extra_export_options: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> None: ...
    
    def execute(self, context: Dict[str, Any]) -> None:
        """Execute sqoop job based on cmd_type (import or export)."""
    
    def on_kill(self) -> None:
        """Handle task termination by sending SIGTERM to sqoop subprocess."""

Types

from typing import Any, Dict, List, Optional, Tuple
from airflow.hooks.base import BaseHook
from airflow.models import BaseOperator
from airflow.exceptions import AirflowException
from airflow.utils.decorators import apply_defaults

# Connection parameter types used in extra JSON field
ConnectionExtraParams = Dict[str, Any]  # Includes job_tracker, namenode, libjars, files, archives, password_file

# File format options for import/export operations
FileType = str  # 'text', 'avro', 'sequence', 'parquet'

# Command type options for SqoopOperator
CommandType = str  # 'import', 'export'

# Extra options dictionaries for additional Sqoop parameters
ExtraOptions = Dict[str, Any]  # Key-value pairs for additional Sqoop command options

Connection Configuration

The package uses Airflow connections with the following configuration:

Connection Parameters:

  • host: Database host
  • port: Database port
  • schema: Database schema/name
  • login: Database username
  • password: Database password

Extra JSON Parameters:

  • job_tracker: Job tracker local|jobtracker:port
  • namenode: Namenode configuration
  • libjars: Comma separated jar files to include in classpath
  • files: Comma separated files to be copied to map reduce cluster
  • archives: Comma separated archives to be unarchived on compute machines
  • password_file: Path to file containing the password

Error Handling

The package raises AirflowException in the following cases:

  • Sqoop command execution failures (when subprocess returns non-zero exit code)
  • Invalid command type (cmd_type must be 'import' or 'export')
  • Invalid file type specification (must be 'avro', 'sequence', 'text', or 'parquet')
  • Both table and query specified for import operations (mutually exclusive)
  • Missing required parameters for import operations (must provide either table or query)
  • Missing required table parameter for export operations

Usage Examples

Basic Table Import

from airflow.providers.apache.sqoop.operators.sqoop import SqoopOperator

# Import entire table
import_task = SqoopOperator(
    task_id='import_customers',
    conn_id='mysql_default',
    cmd_type='import',
    table='customers',
    target_dir='/user/hive/warehouse/customers',
    file_type='avro',
    num_mappers=4
)

Query-Based Import with Conditions

# Import with custom query and conditions
import_query_task = SqoopOperator(
    task_id='import_filtered_orders',
    conn_id='postgres_default',
    cmd_type='import',
    query="SELECT * FROM orders WHERE order_date >= '2023-01-01' AND \\$CONDITIONS",
    target_dir='/user/hive/warehouse/recent_orders',
    file_type='parquet',
    split_by='order_id',
    num_mappers=8
)

Data Export to Database

# Export processed data back to database
export_task = SqoopOperator(
    task_id='export_aggregated_data',
    conn_id='mysql_default',
    cmd_type='export',
    table='customer_summary',
    export_dir='/user/hive/warehouse/processed_customers',
    input_fields_terminated_by=',',
    batch=True
)

Advanced Import with HCatalog Integration

# Import with HCatalog table creation
hcatalog_import = SqoopOperator(
    task_id='import_to_hcatalog',
    conn_id='oracle_default',
    cmd_type='import',
    table='products',
    hcatalog_database='retail',
    hcatalog_table='products',
    create_hcatalog_table=True,
    file_type='avro',
    extra_import_options={
        'map-column-java': 'price=String',
        'null-string': '\\\\N',
        'null-non-string': '\\\\N'
    }
)