or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/pypi-apache-airflow-backport-providers-apache-kylin

Apache Kylin provider for Apache Airflow

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-backport-providers-apache-kylin@2021.3.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-backport-providers-apache-kylin@2021.3.0

index.mddocs/

Apache Kylin Provider for Apache Airflow

Apache Kylin provider package for Apache Airflow that enables integration with Apache Kylin OLAP engine. This backport provider allows users to trigger Kylin cube builds, manage cube operations, and monitor job statuses within Airflow workflows.

Package Information

  • Package Name: apache-airflow-backport-providers-apache-kylin
  • Package Type: pypi
  • Language: Python
  • Installation: pip install apache-airflow-backport-providers-apache-kylin

Core Imports

from airflow.providers.apache.kylin.hooks.kylin import KylinHook
from airflow.providers.apache.kylin.operators.kylin_cube import KylinCubeOperator

For error handling and advanced usage:

from kylinpy import kylinpy, exceptions
from airflow.exceptions import AirflowException
from airflow.utils.decorators import apply_defaults

Basic Usage

from airflow import DAG
from airflow.providers.apache.kylin.operators.kylin_cube import KylinCubeOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'kylin_cube_build',
    default_args=default_args,
    description='Build Kylin cube',
    schedule_interval=timedelta(days=1)
)

# Build a Kylin cube with job tracking
build_cube = KylinCubeOperator(
    task_id='build_kylin_cube',
    kylin_conn_id='kylin_default',
    project='sales_analytics',
    cube='sales_cube',
    command='build',
    start_time='{{ ds_nodash }}000000000',  # Start of day in milliseconds
    end_time='{{ next_ds_nodash }}000000000',  # End of day in milliseconds
    is_track_job=True,
    timeout=3600,  # 1 hour timeout
    dag=dag
)

Capabilities

Connection Management

Establishes and manages connections to Kylin server.

class KylinHook(BaseHook):
    def __init__(
        self,
        kylin_conn_id: str = 'kylin_default',
        project: Optional[str] = None,
        dsn: Optional[str] = None,
    ): ...
    
    def get_conn(self):
        """
        Establishes and returns a connection to the Kylin server.
        
        Returns:
            kylinpy.Kylin: Connection object for interacting with Kylin server.
                          Can be used to get datasources and manage cube operations.
        """

Cube Operations

Executes various cube operations including build, refresh, merge, and management operations.

def cube_run(self, datasource_name: str, op: str, **op_args) -> Any:
    """
    Execute cube operations on the specified datasource.
    
    Args:
        datasource_name (str): Name of the datasource/cube to operate on
        op (str): Command to execute (must be in supported commands)
        **op_args: Additional keyword arguments for the operation
    
    Returns:
        Response from the cube operation
    
    Raises:
        AirflowException: If the cube operation encounters a KylinError
    """

Job Status Monitoring

Retrieves and monitors the status of Kylin jobs.

def get_job_status(self, job_id: str) -> str:
    """
    Retrieve the status of a Kylin job.
    
    Args:
        job_id (str): Kylin job ID
    
    Returns:
        str: Job status
    """

Cube Build Operator

Airflow operator for submitting Kylin cube operations with optional job tracking.

class KylinCubeOperator(BaseOperator):
    def __init__(
        self,
        *,
        kylin_conn_id: str = 'kylin_default',
        project: Optional[str] = None,
        cube: Optional[str] = None,
        dsn: Optional[str] = None,
        command: Optional[str] = None,
        start_time: Optional[str] = None,
        end_time: Optional[str] = None,
        offset_start: Optional[str] = None,
        offset_end: Optional[str] = None,
        segment_name: Optional[str] = None,
        is_track_job: bool = False,
        interval: int = 60,
        timeout: int = 60 * 60 * 24,
        eager_error_status: Tuple[str, ...] = ("ERROR", "DISCARDED", "KILLED", "SUICIDAL", "STOPPED"),
        **kwargs,
    ): ...
    
    def execute(self, context: Dict[str, Any]) -> Any: ...

Batch Operations

Build, refresh, and merge cube segments for batch processing.

# Full build of cube segments
fullbuild_task = KylinCubeOperator(
    task_id="fullbuild_cube",
    command='fullbuild',
    start_time='1325347200000',  # Start timestamp in milliseconds
    end_time='1325433600000',    # End timestamp in milliseconds
)

# Build cube segments
build_task = KylinCubeOperator(
    task_id="build_cube",
    command='build',
    start_time='1325347200000',  # Start timestamp in milliseconds
    end_time='1325433600000',    # End timestamp in milliseconds
)

# Refresh existing segments
refresh_task = KylinCubeOperator(
    task_id="refresh_cube", 
    command='refresh',
    start_time='1325347200000',
    end_time='1325433600000',
)

# Merge segments
merge_task = KylinCubeOperator(
    task_id="merge_cube",
    command='merge',
    start_time='1325347200000', 
    end_time='1325433600000',
)

Streaming Operations

Build, refresh, and merge operations for streaming cubes.

# Build streaming segments
build_streaming_task = KylinCubeOperator(
    task_id="build_streaming",
    command='build_streaming',
    offset_start='0',      # Start offset
    offset_end='100000',   # End offset
)

# Refresh streaming segments  
refresh_streaming_task = KylinCubeOperator(
    task_id="refresh_streaming",
    command='refresh_streaming',
    offset_start='0',
    offset_end='100000',
)

# Merge streaming segments
merge_streaming_task = KylinCubeOperator(
    task_id="merge_streaming", 
    command='merge_streaming',
    offset_start='0',
    offset_end='100000',
)

Cube Management Operations

Enable, disable, delete, clone, drop, and purge cube operations.

# Enable cube
enable_task = KylinCubeOperator(
    task_id="enable_cube",
    command='enable',
)

# Disable cube
disable_task = KylinCubeOperator(
    task_id="disable_cube",
    command='disable',
)

# Delete segment
delete_task = KylinCubeOperator(
    task_id="delete_segment",
    command='delete',
    segment_name='segment_20230101_20230102',
)

# Clone cube (creates {cube_name}_clone)
clone_task = KylinCubeOperator(
    task_id="clone_cube",
    command='clone',
)

# Drop cube
drop_task = KylinCubeOperator(
    task_id="drop_cube", 
    command='drop',
)

# Purge cube
purge_task = KylinCubeOperator(
    task_id="purge_cube",
    command='purge',
)

Job Tracking

Monitor job execution with automatic status checking and error handling.

# Track job until completion with custom timeout and interval
tracked_build = KylinCubeOperator(
    task_id="tracked_build",
    command='build',
    start_time='{{ ds_nodash }}000000000',
    end_time='{{ next_ds_nodash }}000000000', 
    is_track_job=True,
    interval=30,        # Check status every 30 seconds
    timeout=7200,       # 2 hour timeout
    eager_error_status=("ERROR", "KILLED", "STOPPED"),  # Custom error statuses
)

Types

from typing import Optional, Dict, Any, Tuple
from airflow.hooks.base import BaseHook
from airflow.models import BaseOperator

# Supported Kylin commands for cube operations
SUPPORTED_COMMANDS = {
    'fullbuild',      # Full build of cube segments
    'build',           # Build cube segments (batch)
    'refresh',         # Refresh segments (batch) 
    'merge',          # Merge segments (batch)
    'build_streaming', # Build streaming segments
    'refresh_streaming', # Refresh streaming segments
    'merge_streaming', # Merge streaming segments
    'delete',         # Delete segment (requires segment_name)
    'disable',        # Disable cube
    'enable',         # Enable cube
    'purge',          # Purge cube
    'clone',          # Clone cube
    'drop',           # Drop cube
}

# Commands that trigger job tracking when is_track_job=True
BUILD_COMMANDS = {
    'fullbuild',
    'build', 
    'merge',
    'refresh',
    'build_streaming',
    'merge_streaming', 
    'refresh_streaming',
}

# Job status states
JOB_END_STATUSES = {"FINISHED", "ERROR", "DISCARDED", "KILLED", "SUICIDAL", "STOPPED"}

# Template fields for Jinja templating
TEMPLATE_FIELDS = (
    'project',
    'cube',
    'dsn', 
    'command',
    'start_time',
    'end_time',
    'segment_name',
    'offset_start',
    'offset_end',
)

# Connection parameters
class KylinConnectionConfig:
    kylin_conn_id: str  # Airflow connection ID
    project: Optional[str]  # Kylin project name
    dsn: Optional[str]  # Data Source Name URL
    
# Operation parameters for cube operations
class CubeOperationParams:
    datasource_name: str  # Cube/datasource name
    op: str  # Operation command
    start: Optional[str]  # Start time/offset
    end: Optional[str]  # End time/offset
    segment_name: Optional[str]  # Target segment name

Error Handling

The Apache Kylin provider raises AirflowException for various error conditions:

from airflow.exceptions import AirflowException
from kylinpy import exceptions

try:
    hook = KylinHook(kylin_conn_id='kylin_prod')
    response = hook.cube_run('sales_cube', 'build', start='2023-01-01', end='2023-01-02')
    
    if 'uuid' in response:
        job_id = response['uuid']
        status = hook.get_job_status(job_id)
        
        if status in ['ERROR', 'KILLED', 'STOPPED']:
            raise AirflowException(f"Kylin job {job_id} failed with status: {status}")
            
except exceptions.KylinError as kylin_err:
    raise AirflowException(f"Cube operation error: {kylin_err}")
except AirflowException:
    raise  # Re-raise Airflow exceptions
except Exception as e:
    raise AirflowException(f"Unexpected error in Kylin operation: {str(e)}")

Usage Examples

Complete DAG with Multiple Operations

from airflow import DAG
from airflow.providers.apache.kylin.operators.kylin_cube import KylinCubeOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data_team',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=10)
}

dag = DAG(
    'kylin_cube_pipeline',
    default_args=default_args,
    description='Complete Kylin cube processing pipeline',
    schedule_interval='@daily',
    catchup=False
)

# Build daily cube segment
build_daily = KylinCubeOperator(
    task_id='build_daily_segment',
    kylin_conn_id='prod_kylin',
    project='analytics',
    cube='daily_sales_cube',
    command='build',
    start_time='{{ ds_nodash }}000000000',
    end_time='{{ next_ds_nodash }}000000000',
    is_track_job=True,
    timeout=1800,  # 30 minutes
    dag=dag
)

# Merge weekly segments on Sundays
merge_weekly = KylinCubeOperator(
    task_id='merge_weekly_segments', 
    kylin_conn_id='prod_kylin',
    project='analytics',
    cube='daily_sales_cube',
    command='merge',
    start_time='{{ macros.ds_add(ds, -6) | replace("-", "") }}000000000',
    end_time='{{ next_ds_nodash }}000000000',
    is_track_job=True,
    timeout=3600,  # 1 hour
    dag=dag
)

# Set task dependencies
build_daily >> merge_weekly

Using KylinHook Directly in Custom Operator

from airflow.providers.apache.kylin.hooks.kylin import KylinHook
from airflow.models import BaseOperator

class CustomKylinOperator(BaseOperator):
    def execute(self, context):
        # Initialize hook
        hook = KylinHook(
            kylin_conn_id=self.kylin_conn_id,
            project=self.project
        )
        
        # Execute multiple operations
        try:
            # Build cube
            build_response = hook.cube_run(
                datasource_name=self.cube_name,
                op='build',
                start=self.start_time,
                end=self.end_time
            )
            
            # Track job if UUID returned
            if 'uuid' in build_response:
                job_id = build_response['uuid']
                
                # Monitor job status
                while True:
                    status = hook.get_job_status(job_id)
                    
                    if status in ['FINISHED']:
                        self.log.info(f"Job {job_id} completed successfully")
                        break
                    elif status in ['ERROR', 'KILLED', 'STOPPED']:
                        raise AirflowException(f"Job {job_id} failed with status: {status}")
                    
                    time.sleep(60)  # Wait 1 minute before next check
                    
        except Exception as e:
            self.log.error(f"Kylin operation failed: {str(e)}")
            raise

Streaming Cube Operations

# Real-time streaming cube build
streaming_build = KylinCubeOperator(
    task_id='build_streaming_cube',
    kylin_conn_id='streaming_kylin',
    project='realtime_analytics',
    cube='events_streaming_cube', 
    command='build_streaming',
    offset_start='{{ prev_ds_nodash }}000000000',
    offset_end='{{ ds_nodash }}000000000',
    is_track_job=True,
    interval=30,  # Check every 30 seconds
    timeout=600,  # 10 minute timeout for streaming
    dag=dag
)

Error Handling and Monitoring

# Custom error handling with specific error statuses
robust_build = KylinCubeOperator(
    task_id='robust_cube_build',
    kylin_conn_id='kylin_prod',
    project='critical_analytics', 
    cube='revenue_cube',
    command='build',
    start_time='{{ ds_nodash }}000000000',
    end_time='{{ next_ds_nodash }}000000000',
    is_track_job=True,
    interval=45,  # Check every 45 seconds
    timeout=14400,  # 4 hour timeout
    eager_error_status=("ERROR", "KILLED", "STOPPED", "DISCARDED"),  # All error states
    on_failure_callback=lambda context: send_alert(context),
    dag=dag
)