Apache Kylin provider for Apache Airflow
npx @tessl/cli install tessl/pypi-apache-airflow-backport-providers-apache-kylin@2021.3.0Apache Kylin provider package for Apache Airflow that enables integration with Apache Kylin OLAP engine. This backport provider allows users to trigger Kylin cube builds, manage cube operations, and monitor job statuses within Airflow workflows.
pip install apache-airflow-backport-providers-apache-kylinfrom airflow.providers.apache.kylin.hooks.kylin import KylinHook
from airflow.providers.apache.kylin.operators.kylin_cube import KylinCubeOperatorFor error handling and advanced usage:
from kylinpy import kylinpy, exceptions
from airflow.exceptions import AirflowException
from airflow.utils.decorators import apply_defaultsfrom airflow import DAG
from airflow.providers.apache.kylin.operators.kylin_cube import KylinCubeOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'kylin_cube_build',
default_args=default_args,
description='Build Kylin cube',
schedule_interval=timedelta(days=1)
)
# Build a Kylin cube with job tracking
build_cube = KylinCubeOperator(
task_id='build_kylin_cube',
kylin_conn_id='kylin_default',
project='sales_analytics',
cube='sales_cube',
command='build',
start_time='{{ ds_nodash }}000000000', # Start of day in milliseconds
end_time='{{ next_ds_nodash }}000000000', # End of day in milliseconds
is_track_job=True,
timeout=3600, # 1 hour timeout
dag=dag
)Establishes and manages connections to Kylin server.
class KylinHook(BaseHook):
def __init__(
self,
kylin_conn_id: str = 'kylin_default',
project: Optional[str] = None,
dsn: Optional[str] = None,
): ...
def get_conn(self):
"""
Establishes and returns a connection to the Kylin server.
Returns:
kylinpy.Kylin: Connection object for interacting with Kylin server.
Can be used to get datasources and manage cube operations.
"""Executes various cube operations including build, refresh, merge, and management operations.
def cube_run(self, datasource_name: str, op: str, **op_args) -> Any:
"""
Execute cube operations on the specified datasource.
Args:
datasource_name (str): Name of the datasource/cube to operate on
op (str): Command to execute (must be in supported commands)
**op_args: Additional keyword arguments for the operation
Returns:
Response from the cube operation
Raises:
AirflowException: If the cube operation encounters a KylinError
"""Retrieves and monitors the status of Kylin jobs.
def get_job_status(self, job_id: str) -> str:
"""
Retrieve the status of a Kylin job.
Args:
job_id (str): Kylin job ID
Returns:
str: Job status
"""Airflow operator for submitting Kylin cube operations with optional job tracking.
class KylinCubeOperator(BaseOperator):
def __init__(
self,
*,
kylin_conn_id: str = 'kylin_default',
project: Optional[str] = None,
cube: Optional[str] = None,
dsn: Optional[str] = None,
command: Optional[str] = None,
start_time: Optional[str] = None,
end_time: Optional[str] = None,
offset_start: Optional[str] = None,
offset_end: Optional[str] = None,
segment_name: Optional[str] = None,
is_track_job: bool = False,
interval: int = 60,
timeout: int = 60 * 60 * 24,
eager_error_status: Tuple[str, ...] = ("ERROR", "DISCARDED", "KILLED", "SUICIDAL", "STOPPED"),
**kwargs,
): ...
def execute(self, context: Dict[str, Any]) -> Any: ...Build, refresh, and merge cube segments for batch processing.
# Full build of cube segments
fullbuild_task = KylinCubeOperator(
task_id="fullbuild_cube",
command='fullbuild',
start_time='1325347200000', # Start timestamp in milliseconds
end_time='1325433600000', # End timestamp in milliseconds
)
# Build cube segments
build_task = KylinCubeOperator(
task_id="build_cube",
command='build',
start_time='1325347200000', # Start timestamp in milliseconds
end_time='1325433600000', # End timestamp in milliseconds
)
# Refresh existing segments
refresh_task = KylinCubeOperator(
task_id="refresh_cube",
command='refresh',
start_time='1325347200000',
end_time='1325433600000',
)
# Merge segments
merge_task = KylinCubeOperator(
task_id="merge_cube",
command='merge',
start_time='1325347200000',
end_time='1325433600000',
)Build, refresh, and merge operations for streaming cubes.
# Build streaming segments
build_streaming_task = KylinCubeOperator(
task_id="build_streaming",
command='build_streaming',
offset_start='0', # Start offset
offset_end='100000', # End offset
)
# Refresh streaming segments
refresh_streaming_task = KylinCubeOperator(
task_id="refresh_streaming",
command='refresh_streaming',
offset_start='0',
offset_end='100000',
)
# Merge streaming segments
merge_streaming_task = KylinCubeOperator(
task_id="merge_streaming",
command='merge_streaming',
offset_start='0',
offset_end='100000',
)Enable, disable, delete, clone, drop, and purge cube operations.
# Enable cube
enable_task = KylinCubeOperator(
task_id="enable_cube",
command='enable',
)
# Disable cube
disable_task = KylinCubeOperator(
task_id="disable_cube",
command='disable',
)
# Delete segment
delete_task = KylinCubeOperator(
task_id="delete_segment",
command='delete',
segment_name='segment_20230101_20230102',
)
# Clone cube (creates {cube_name}_clone)
clone_task = KylinCubeOperator(
task_id="clone_cube",
command='clone',
)
# Drop cube
drop_task = KylinCubeOperator(
task_id="drop_cube",
command='drop',
)
# Purge cube
purge_task = KylinCubeOperator(
task_id="purge_cube",
command='purge',
)Monitor job execution with automatic status checking and error handling.
# Track job until completion with custom timeout and interval
tracked_build = KylinCubeOperator(
task_id="tracked_build",
command='build',
start_time='{{ ds_nodash }}000000000',
end_time='{{ next_ds_nodash }}000000000',
is_track_job=True,
interval=30, # Check status every 30 seconds
timeout=7200, # 2 hour timeout
eager_error_status=("ERROR", "KILLED", "STOPPED"), # Custom error statuses
)from typing import Optional, Dict, Any, Tuple
from airflow.hooks.base import BaseHook
from airflow.models import BaseOperator
# Supported Kylin commands for cube operations
SUPPORTED_COMMANDS = {
'fullbuild', # Full build of cube segments
'build', # Build cube segments (batch)
'refresh', # Refresh segments (batch)
'merge', # Merge segments (batch)
'build_streaming', # Build streaming segments
'refresh_streaming', # Refresh streaming segments
'merge_streaming', # Merge streaming segments
'delete', # Delete segment (requires segment_name)
'disable', # Disable cube
'enable', # Enable cube
'purge', # Purge cube
'clone', # Clone cube
'drop', # Drop cube
}
# Commands that trigger job tracking when is_track_job=True
BUILD_COMMANDS = {
'fullbuild',
'build',
'merge',
'refresh',
'build_streaming',
'merge_streaming',
'refresh_streaming',
}
# Job status states
JOB_END_STATUSES = {"FINISHED", "ERROR", "DISCARDED", "KILLED", "SUICIDAL", "STOPPED"}
# Template fields for Jinja templating
TEMPLATE_FIELDS = (
'project',
'cube',
'dsn',
'command',
'start_time',
'end_time',
'segment_name',
'offset_start',
'offset_end',
)
# Connection parameters
class KylinConnectionConfig:
kylin_conn_id: str # Airflow connection ID
project: Optional[str] # Kylin project name
dsn: Optional[str] # Data Source Name URL
# Operation parameters for cube operations
class CubeOperationParams:
datasource_name: str # Cube/datasource name
op: str # Operation command
start: Optional[str] # Start time/offset
end: Optional[str] # End time/offset
segment_name: Optional[str] # Target segment nameThe Apache Kylin provider raises AirflowException for various error conditions:
from airflow.exceptions import AirflowException
from kylinpy import exceptions
try:
hook = KylinHook(kylin_conn_id='kylin_prod')
response = hook.cube_run('sales_cube', 'build', start='2023-01-01', end='2023-01-02')
if 'uuid' in response:
job_id = response['uuid']
status = hook.get_job_status(job_id)
if status in ['ERROR', 'KILLED', 'STOPPED']:
raise AirflowException(f"Kylin job {job_id} failed with status: {status}")
except exceptions.KylinError as kylin_err:
raise AirflowException(f"Cube operation error: {kylin_err}")
except AirflowException:
raise # Re-raise Airflow exceptions
except Exception as e:
raise AirflowException(f"Unexpected error in Kylin operation: {str(e)}")from airflow import DAG
from airflow.providers.apache.kylin.operators.kylin_cube import KylinCubeOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data_team',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=10)
}
dag = DAG(
'kylin_cube_pipeline',
default_args=default_args,
description='Complete Kylin cube processing pipeline',
schedule_interval='@daily',
catchup=False
)
# Build daily cube segment
build_daily = KylinCubeOperator(
task_id='build_daily_segment',
kylin_conn_id='prod_kylin',
project='analytics',
cube='daily_sales_cube',
command='build',
start_time='{{ ds_nodash }}000000000',
end_time='{{ next_ds_nodash }}000000000',
is_track_job=True,
timeout=1800, # 30 minutes
dag=dag
)
# Merge weekly segments on Sundays
merge_weekly = KylinCubeOperator(
task_id='merge_weekly_segments',
kylin_conn_id='prod_kylin',
project='analytics',
cube='daily_sales_cube',
command='merge',
start_time='{{ macros.ds_add(ds, -6) | replace("-", "") }}000000000',
end_time='{{ next_ds_nodash }}000000000',
is_track_job=True,
timeout=3600, # 1 hour
dag=dag
)
# Set task dependencies
build_daily >> merge_weeklyfrom airflow.providers.apache.kylin.hooks.kylin import KylinHook
from airflow.models import BaseOperator
class CustomKylinOperator(BaseOperator):
def execute(self, context):
# Initialize hook
hook = KylinHook(
kylin_conn_id=self.kylin_conn_id,
project=self.project
)
# Execute multiple operations
try:
# Build cube
build_response = hook.cube_run(
datasource_name=self.cube_name,
op='build',
start=self.start_time,
end=self.end_time
)
# Track job if UUID returned
if 'uuid' in build_response:
job_id = build_response['uuid']
# Monitor job status
while True:
status = hook.get_job_status(job_id)
if status in ['FINISHED']:
self.log.info(f"Job {job_id} completed successfully")
break
elif status in ['ERROR', 'KILLED', 'STOPPED']:
raise AirflowException(f"Job {job_id} failed with status: {status}")
time.sleep(60) # Wait 1 minute before next check
except Exception as e:
self.log.error(f"Kylin operation failed: {str(e)}")
raise# Real-time streaming cube build
streaming_build = KylinCubeOperator(
task_id='build_streaming_cube',
kylin_conn_id='streaming_kylin',
project='realtime_analytics',
cube='events_streaming_cube',
command='build_streaming',
offset_start='{{ prev_ds_nodash }}000000000',
offset_end='{{ ds_nodash }}000000000',
is_track_job=True,
interval=30, # Check every 30 seconds
timeout=600, # 10 minute timeout for streaming
dag=dag
)# Custom error handling with specific error statuses
robust_build = KylinCubeOperator(
task_id='robust_cube_build',
kylin_conn_id='kylin_prod',
project='critical_analytics',
cube='revenue_cube',
command='build',
start_time='{{ ds_nodash }}000000000',
end_time='{{ next_ds_nodash }}000000000',
is_track_job=True,
interval=45, # Check every 45 seconds
timeout=14400, # 4 hour timeout
eager_error_status=("ERROR", "KILLED", "STOPPED", "DISCARDED"), # All error states
on_failure_callback=lambda context: send_alert(context),
dag=dag
)