Provider package for Apache Kylin integration with Apache Airflow
npx @tessl/cli install tessl/pypi-apache-airflow-providers-apache-kylin@3.9.0Provider package for Apache Kylin integration with Apache Airflow. This package enables orchestration of Apache Kylin OLAP cube operations within Airflow workflows, providing hooks for connectivity and operators for cube lifecycle management including building, refreshing, merging, and monitoring cube operations.
pip install apache-airflow-providers-apache-kylinfrom airflow.providers.apache.kylin.hooks.kylin import KylinHook
from airflow.providers.apache.kylin.operators.kylin_cube import KylinCubeOperatorfrom datetime import datetime
from airflow import DAG
from airflow.providers.apache.kylin.operators.kylin_cube import KylinCubeOperator
# Define DAG
dag = DAG(
'kylin_cube_operations',
start_date=datetime(2023, 1, 1),
schedule_interval='@daily',
catchup=False
)
# Build a Kylin cube
build_cube = KylinCubeOperator(
task_id='build_sales_cube',
kylin_conn_id='kylin_default',
project='learn_kylin',
cube='kylin_sales_cube',
command='build',
start_time='1483200000000', # Timestamp in milliseconds
end_time='1483286400000',
is_track_job=True,
timeout=3600, # 1 hour timeout
dag=dag
)
# Refresh a cube segment
refresh_cube = KylinCubeOperator(
task_id='refresh_sales_cube',
kylin_conn_id='kylin_default',
project='learn_kylin',
cube='kylin_sales_cube',
command='refresh',
start_time='1483200000000',
end_time='1483286400000',
is_track_job=True,
dag=dag
)
# Set task dependencies
build_cube >> refresh_cubeThe provider follows Airflow's standard pattern with two main components:
Establishes and manages connections to Apache Kylin servers using Airflow's connection framework.
class KylinHook(BaseHook):
def __init__(
self,
kylin_conn_id: str = "kylin_default",
project: str | None = None,
dsn: str | None = None,
):
"""
Initialize Kylin hook.
Args:
kylin_conn_id: Connection ID configured in Airflow
project: Kylin project name
dsn: Direct DSN URL (overrides kylin_conn_id)
"""
def get_conn(self):
"""
Get Kylin connection instance.
Returns:
kylinpy.Kylin: Connected Kylin instance
"""
def cube_run(self, datasource_name: str, op: str, **op_args):
"""
Run CubeSource command.
Args:
datasource_name: Name of the cube/datasource
op: Operation command
**op_args: Additional operation arguments
Returns:
dict: Response from Kylin API
Raises:
AirflowException: When cube operation fails
"""
def get_job_status(self, job_id: str) -> str:
"""
Get job status by job ID.
Args:
job_id: Kylin job ID
Returns:
str: Job status (RUNNING, FINISHED, ERROR, etc.)
"""Comprehensive cube lifecycle operations including build, refresh, merge, and management commands.
class KylinCubeOperator(BaseOperator):
def __init__(
self,
*,
kylin_conn_id: str = "kylin_default",
project: str | None = None,
cube: str | None = None,
dsn: str | None = None,
command: str | None = None,
start_time: str | None = None,
end_time: str | None = None,
offset_start: str | None = None,
offset_end: str | None = None,
segment_name: str | None = None,
is_track_job: bool = False,
interval: int = 60,
timeout: int = 86400,
eager_error_status: tuple = ("ERROR", "DISCARDED", "KILLED", "SUICIDAL", "STOPPED"),
**kwargs,
):
"""
Initialize Kylin cube operator.
Args:
kylin_conn_id: Connection ID for Kylin server
project: Kylin project name (overrides connection project)
cube: Target cube name
dsn: Direct DSN URL (overrides kylin_conn_id)
command: Cube operation command
start_time: Build segment start time (milliseconds timestamp)
end_time: Build segment end time (milliseconds timestamp)
offset_start: Streaming build segment start offset
offset_end: Streaming build segment end offset
segment_name: Specific segment name for operations
is_track_job: Whether to monitor job status until completion
interval: Job status polling interval in seconds
timeout: Maximum wait time in seconds
eager_error_status: Job statuses that trigger immediate failure
"""
def execute(self, context) -> dict:
"""
Execute the cube operation.
Args:
context: Airflow task context
Returns:
dict: Operation response from Kylin API
Raises:
AirflowException: When operation fails, job times out, or encounters error status
"""The operator supports the following cube operations:
fullbuild: Complete cube buildbuild: Build cube segments for specified time rangebuild_streaming: Build streaming cube segments with offset parametersrefresh: Refresh existing cube segmentsrefresh_streaming: Refresh streaming cube segmentsmerge: Merge cube segmentsmerge_streaming: Merge streaming cube segmentsdelete: Delete specific cube segmentsdisable: Disable cubeenable: Enable cubepurge: Purge cube dataclone: Clone cube (creates {cube_name}_clone)drop: Drop cube completelyBuilt-in job tracking with configurable polling intervals and error handling.
Job End Statuses: FINISHED, ERROR, DISCARDED, KILLED, SUICIDAL, STOPPED
Template Fields: The following fields support Jinja2 templating:
build_task = KylinCubeOperator(
task_id='build_cube',
cube='sales_cube',
command='build',
start_time='1640995200000', # 2022-01-01 00:00:00 UTC
end_time='1641081600000', # 2022-01-02 00:00:00 UTC
dag=dag
)streaming_build = KylinCubeOperator(
task_id='build_streaming_cube',
cube='streaming_sales_cube',
command='build_streaming',
offset_start='0',
offset_end='1000',
dag=dag
)tracked_build = KylinCubeOperator(
task_id='tracked_build',
cube='large_cube',
command='build',
start_time='{{ ds_nodash }}000000', # Templated start time
end_time='{{ next_ds_nodash }}000000', # Templated end time
is_track_job=True,
interval=30, # Check every 30 seconds
timeout=7200, # 2 hour timeout
dag=dag
)dsn_task = KylinCubeOperator(
task_id='dsn_operation',
dsn='kylin://ADMIN:KYLIN@sandbox/learn_kylin?timeout=60&is_debug=1',
cube='test_cube',
command='enable',
dag=dag
)Configure Kylin connections in Airflow Admin UI:
Example connection configuration:
{
"timeout": 60,
"is_debug": true,
"verify_ssl": false
}# Hook connection attributes
class KylinHook(BaseHook):
conn_name_attr: str = "kylin_conn_id"
default_conn_name: str = "kylin_default"
conn_type: str = "kylin"
hook_name: str = "Apache Kylin"
# Operator template fields for Jinja2 templating
class KylinCubeOperator(BaseOperator):
template_fields: tuple[str, ...] = (
"project",
"cube",
"dsn",
"command",
"start_time",
"end_time",
"segment_name",
"offset_start",
"offset_end",
)
ui_color: str = "#E79C46"
build_command: set[str] = {
"fullbuild",
"build",
"merge",
"refresh",
"build_streaming",
"merge_streaming",
"refresh_streaming",
}
jobs_end_status: set[str] = {
"FINISHED",
"ERROR",
"DISCARDED",
"KILLED",
"SUICIDAL",
"STOPPED"
}The package raises AirflowException for various error conditions:
kylinpy.exceptions.KylinErrorAll errors include descriptive messages with context about the failing operation.