or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/pypi-apache-airflow-providers-apache-kylin

Provider package for Apache Kylin integration with Apache Airflow

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-apache-kylin@3.9.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-apache-kylin@3.9.0

index.mddocs/

Apache Airflow Providers Apache Kylin

Provider package for Apache Kylin integration with Apache Airflow. This package enables orchestration of Apache Kylin OLAP cube operations within Airflow workflows, providing hooks for connectivity and operators for cube lifecycle management including building, refreshing, merging, and monitoring cube operations.

Package Information

  • Package Name: apache-airflow-providers-apache-kylin
  • Language: Python
  • Installation: pip install apache-airflow-providers-apache-kylin
  • Dependencies: apache-airflow (>=2.10.0), kylinpy (>2.7.0)

Core Imports

from airflow.providers.apache.kylin.hooks.kylin import KylinHook
from airflow.providers.apache.kylin.operators.kylin_cube import KylinCubeOperator

Basic Usage

from datetime import datetime
from airflow import DAG
from airflow.providers.apache.kylin.operators.kylin_cube import KylinCubeOperator

# Define DAG
dag = DAG(
    'kylin_cube_operations',
    start_date=datetime(2023, 1, 1),
    schedule_interval='@daily',
    catchup=False
)

# Build a Kylin cube
build_cube = KylinCubeOperator(
    task_id='build_sales_cube',
    kylin_conn_id='kylin_default',
    project='learn_kylin',
    cube='kylin_sales_cube',
    command='build',
    start_time='1483200000000',  # Timestamp in milliseconds
    end_time='1483286400000',
    is_track_job=True,
    timeout=3600,  # 1 hour timeout
    dag=dag
)

# Refresh a cube segment
refresh_cube = KylinCubeOperator(
    task_id='refresh_sales_cube',
    kylin_conn_id='kylin_default',
    project='learn_kylin',
    cube='kylin_sales_cube',
    command='refresh',
    start_time='1483200000000',
    end_time='1483286400000',
    is_track_job=True,
    dag=dag
)

# Set task dependencies
build_cube >> refresh_cube

Architecture

The provider follows Airflow's standard pattern with two main components:

  • KylinHook: Manages connections to Kylin servers and provides low-level API methods for cube operations and job status monitoring
  • KylinCubeOperator: High-level operator for executing cube operations with job tracking, timeout handling, and error management

Capabilities

Connection Management

Establishes and manages connections to Apache Kylin servers using Airflow's connection framework.

class KylinHook(BaseHook):
    def __init__(
        self,
        kylin_conn_id: str = "kylin_default",
        project: str | None = None,
        dsn: str | None = None,
    ):
        """
        Initialize Kylin hook.

        Args:
            kylin_conn_id: Connection ID configured in Airflow
            project: Kylin project name
            dsn: Direct DSN URL (overrides kylin_conn_id)
        """

    def get_conn(self):
        """
        Get Kylin connection instance.

        Returns:
            kylinpy.Kylin: Connected Kylin instance
        """

    def cube_run(self, datasource_name: str, op: str, **op_args):
        """
        Run CubeSource command.

        Args:
            datasource_name: Name of the cube/datasource
            op: Operation command
            **op_args: Additional operation arguments

        Returns:
            dict: Response from Kylin API

        Raises:
            AirflowException: When cube operation fails
        """

    def get_job_status(self, job_id: str) -> str:
        """
        Get job status by job ID.

        Args:
            job_id: Kylin job ID

        Returns:
            str: Job status (RUNNING, FINISHED, ERROR, etc.)
        """

Cube Operations

Comprehensive cube lifecycle operations including build, refresh, merge, and management commands.

class KylinCubeOperator(BaseOperator):
    def __init__(
        self,
        *,
        kylin_conn_id: str = "kylin_default",
        project: str | None = None,
        cube: str | None = None,
        dsn: str | None = None,
        command: str | None = None,
        start_time: str | None = None,
        end_time: str | None = None,
        offset_start: str | None = None,
        offset_end: str | None = None,
        segment_name: str | None = None,
        is_track_job: bool = False,
        interval: int = 60,
        timeout: int = 86400,
        eager_error_status: tuple = ("ERROR", "DISCARDED", "KILLED", "SUICIDAL", "STOPPED"),
        **kwargs,
    ):
        """
        Initialize Kylin cube operator.

        Args:
            kylin_conn_id: Connection ID for Kylin server
            project: Kylin project name (overrides connection project)
            cube: Target cube name
            dsn: Direct DSN URL (overrides kylin_conn_id)
            command: Cube operation command
            start_time: Build segment start time (milliseconds timestamp)
            end_time: Build segment end time (milliseconds timestamp)
            offset_start: Streaming build segment start offset
            offset_end: Streaming build segment end offset
            segment_name: Specific segment name for operations
            is_track_job: Whether to monitor job status until completion
            interval: Job status polling interval in seconds
            timeout: Maximum wait time in seconds
            eager_error_status: Job statuses that trigger immediate failure
        """

    def execute(self, context) -> dict:
        """
        Execute the cube operation.

        Args:
            context: Airflow task context

        Returns:
            dict: Operation response from Kylin API

        Raises:
            AirflowException: When operation fails, job times out, or encounters error status
        """

Supported Commands

The operator supports the following cube operations:

Build Operations

  • fullbuild: Complete cube build
  • build: Build cube segments for specified time range
  • build_streaming: Build streaming cube segments with offset parameters

Maintenance Operations

  • refresh: Refresh existing cube segments
  • refresh_streaming: Refresh streaming cube segments
  • merge: Merge cube segments
  • merge_streaming: Merge streaming cube segments

Management Operations

  • delete: Delete specific cube segments
  • disable: Disable cube
  • enable: Enable cube
  • purge: Purge cube data
  • clone: Clone cube (creates {cube_name}_clone)
  • drop: Drop cube completely

Job Status Monitoring

Built-in job tracking with configurable polling intervals and error handling.

Job End Statuses: FINISHED, ERROR, DISCARDED, KILLED, SUICIDAL, STOPPED

Template Fields: The following fields support Jinja2 templating:

  • project
  • cube
  • dsn
  • command
  • start_time
  • end_time
  • segment_name
  • offset_start
  • offset_end

Usage Examples

Basic Cube Build

build_task = KylinCubeOperator(
    task_id='build_cube',
    cube='sales_cube',
    command='build',
    start_time='1640995200000',  # 2022-01-01 00:00:00 UTC
    end_time='1641081600000',    # 2022-01-02 00:00:00 UTC
    dag=dag
)

Streaming Cube Operations

streaming_build = KylinCubeOperator(
    task_id='build_streaming_cube',
    cube='streaming_sales_cube',
    command='build_streaming',
    offset_start='0',
    offset_end='1000',
    dag=dag
)

Job Tracking with Custom Settings

tracked_build = KylinCubeOperator(
    task_id='tracked_build',
    cube='large_cube',
    command='build',
    start_time='{{ ds_nodash }}000000',  # Templated start time
    end_time='{{ next_ds_nodash }}000000',  # Templated end time
    is_track_job=True,
    interval=30,  # Check every 30 seconds
    timeout=7200,  # 2 hour timeout
    dag=dag
)

Using Direct DSN Connection

dsn_task = KylinCubeOperator(
    task_id='dsn_operation',
    dsn='kylin://ADMIN:KYLIN@sandbox/learn_kylin?timeout=60&is_debug=1',
    cube='test_cube',
    command='enable',
    dag=dag
)

Connection Configuration

Configure Kylin connections in Airflow Admin UI:

  • Connection Type: kylin
  • Host: Kylin server hostname
  • Port: Kylin server port (typically 7070)
  • Login: Username
  • Password: Password
  • Schema: Default project name
  • Extra: Additional connection parameters as JSON

Example connection configuration:

{
  "timeout": 60,
  "is_debug": true,
  "verify_ssl": false
}

Types

# Hook connection attributes
class KylinHook(BaseHook):
    conn_name_attr: str = "kylin_conn_id"
    default_conn_name: str = "kylin_default"
    conn_type: str = "kylin"
    hook_name: str = "Apache Kylin"

# Operator template fields for Jinja2 templating
class KylinCubeOperator(BaseOperator):
    template_fields: tuple[str, ...] = (
        "project",
        "cube", 
        "dsn",
        "command",
        "start_time",
        "end_time",
        "segment_name",
        "offset_start",
        "offset_end",
    )
    
    ui_color: str = "#E79C46"
    build_command: set[str] = {
        "fullbuild",
        "build",
        "merge", 
        "refresh",
        "build_streaming",
        "merge_streaming",
        "refresh_streaming",
    }
    jobs_end_status: set[str] = {
        "FINISHED",
        "ERROR", 
        "DISCARDED",
        "KILLED",
        "SUICIDAL",
        "STOPPED"
    }

Error Handling

The package raises AirflowException for various error conditions:

  • Invalid Commands: When command is not in supported command list
  • Kylin API Errors: Wrapped from kylinpy.exceptions.KylinError
  • Job Timeout: When job monitoring exceeds timeout duration
  • Job Failures: When job status matches eager_error_status patterns
  • Missing Parameters: When required cube name or job ID is missing

All errors include descriptive messages with context about the failing operation.