0
# Apache Kylin Provider for Apache Airflow
1
2
Apache Kylin provider package for Apache Airflow that enables integration with Apache Kylin OLAP engine. This backport provider allows users to trigger Kylin cube builds, manage cube operations, and monitor job statuses within Airflow workflows.
3
4
## Package Information
5
6
- **Package Name**: apache-airflow-backport-providers-apache-kylin
7
- **Package Type**: pypi
8
- **Language**: Python
9
- **Installation**: `pip install apache-airflow-backport-providers-apache-kylin`
10
11
## Core Imports
12
13
```python
14
from airflow.providers.apache.kylin.hooks.kylin import KylinHook
15
from airflow.providers.apache.kylin.operators.kylin_cube import KylinCubeOperator
16
```
17
18
For error handling and advanced usage:
19
20
```python
21
from kylinpy import kylinpy, exceptions
22
from airflow.exceptions import AirflowException
23
from airflow.utils.decorators import apply_defaults
24
```
25
26
## Basic Usage
27
28
```python
29
from airflow import DAG
30
from airflow.providers.apache.kylin.operators.kylin_cube import KylinCubeOperator
31
from datetime import datetime, timedelta
32
33
default_args = {
34
'owner': 'airflow',
35
'depends_on_past': False,
36
'start_date': datetime(2023, 1, 1),
37
'retries': 1,
38
'retry_delay': timedelta(minutes=5)
39
}
40
41
dag = DAG(
42
'kylin_cube_build',
43
default_args=default_args,
44
description='Build Kylin cube',
45
schedule_interval=timedelta(days=1)
46
)
47
48
# Build a Kylin cube with job tracking
49
build_cube = KylinCubeOperator(
50
task_id='build_kylin_cube',
51
kylin_conn_id='kylin_default',
52
project='sales_analytics',
53
cube='sales_cube',
54
command='build',
55
start_time='{{ ds_nodash }}000000000', # Start of day in milliseconds
56
end_time='{{ next_ds_nodash }}000000000', # End of day in milliseconds
57
is_track_job=True,
58
timeout=3600, # 1 hour timeout
59
dag=dag
60
)
61
```
62
63
## Capabilities
64
65
### Connection Management
66
67
Establishes and manages connections to Kylin server.
68
69
```python { .api }
70
class KylinHook(BaseHook):
71
def __init__(
72
self,
73
kylin_conn_id: str = 'kylin_default',
74
project: Optional[str] = None,
75
dsn: Optional[str] = None,
76
): ...
77
78
def get_conn(self):
79
"""
80
Establishes and returns a connection to the Kylin server.
81
82
Returns:
83
kylinpy.Kylin: Connection object for interacting with Kylin server.
84
Can be used to get datasources and manage cube operations.
85
"""
86
```
87
88
### Cube Operations
89
90
Executes various cube operations including build, refresh, merge, and management operations.
91
92
```python { .api }
93
def cube_run(self, datasource_name: str, op: str, **op_args) -> Any:
94
"""
95
Execute cube operations on the specified datasource.
96
97
Args:
98
datasource_name (str): Name of the datasource/cube to operate on
99
op (str): Command to execute (must be in supported commands)
100
**op_args: Additional keyword arguments for the operation
101
102
Returns:
103
Response from the cube operation
104
105
Raises:
106
AirflowException: If the cube operation encounters a KylinError
107
"""
108
```
109
110
### Job Status Monitoring
111
112
Retrieves and monitors the status of Kylin jobs.
113
114
```python { .api }
115
def get_job_status(self, job_id: str) -> str:
116
"""
117
Retrieve the status of a Kylin job.
118
119
Args:
120
job_id (str): Kylin job ID
121
122
Returns:
123
str: Job status
124
"""
125
```
126
127
### Cube Build Operator
128
129
Airflow operator for submitting Kylin cube operations with optional job tracking.
130
131
```python { .api }
132
class KylinCubeOperator(BaseOperator):
133
def __init__(
134
self,
135
*,
136
kylin_conn_id: str = 'kylin_default',
137
project: Optional[str] = None,
138
cube: Optional[str] = None,
139
dsn: Optional[str] = None,
140
command: Optional[str] = None,
141
start_time: Optional[str] = None,
142
end_time: Optional[str] = None,
143
offset_start: Optional[str] = None,
144
offset_end: Optional[str] = None,
145
segment_name: Optional[str] = None,
146
is_track_job: bool = False,
147
interval: int = 60,
148
timeout: int = 60 * 60 * 24,
149
eager_error_status: Tuple[str, ...] = ("ERROR", "DISCARDED", "KILLED", "SUICIDAL", "STOPPED"),
150
**kwargs,
151
): ...
152
153
def execute(self, context: Dict[str, Any]) -> Any: ...
154
```
155
156
### Batch Operations
157
158
Build, refresh, and merge cube segments for batch processing.
159
160
```python { .api }
161
# Full build of cube segments
162
fullbuild_task = KylinCubeOperator(
163
task_id="fullbuild_cube",
164
command='fullbuild',
165
start_time='1325347200000', # Start timestamp in milliseconds
166
end_time='1325433600000', # End timestamp in milliseconds
167
)
168
169
# Build cube segments
170
build_task = KylinCubeOperator(
171
task_id="build_cube",
172
command='build',
173
start_time='1325347200000', # Start timestamp in milliseconds
174
end_time='1325433600000', # End timestamp in milliseconds
175
)
176
177
# Refresh existing segments
178
refresh_task = KylinCubeOperator(
179
task_id="refresh_cube",
180
command='refresh',
181
start_time='1325347200000',
182
end_time='1325433600000',
183
)
184
185
# Merge segments
186
merge_task = KylinCubeOperator(
187
task_id="merge_cube",
188
command='merge',
189
start_time='1325347200000',
190
end_time='1325433600000',
191
)
192
```
193
194
### Streaming Operations
195
196
Build, refresh, and merge operations for streaming cubes.
197
198
```python { .api }
199
# Build streaming segments
200
build_streaming_task = KylinCubeOperator(
201
task_id="build_streaming",
202
command='build_streaming',
203
offset_start='0', # Start offset
204
offset_end='100000', # End offset
205
)
206
207
# Refresh streaming segments
208
refresh_streaming_task = KylinCubeOperator(
209
task_id="refresh_streaming",
210
command='refresh_streaming',
211
offset_start='0',
212
offset_end='100000',
213
)
214
215
# Merge streaming segments
216
merge_streaming_task = KylinCubeOperator(
217
task_id="merge_streaming",
218
command='merge_streaming',
219
offset_start='0',
220
offset_end='100000',
221
)
222
```
223
224
### Cube Management Operations
225
226
Enable, disable, delete, clone, drop, and purge cube operations.
227
228
```python { .api }
229
# Enable cube
230
enable_task = KylinCubeOperator(
231
task_id="enable_cube",
232
command='enable',
233
)
234
235
# Disable cube
236
disable_task = KylinCubeOperator(
237
task_id="disable_cube",
238
command='disable',
239
)
240
241
# Delete segment
242
delete_task = KylinCubeOperator(
243
task_id="delete_segment",
244
command='delete',
245
segment_name='segment_20230101_20230102',
246
)
247
248
# Clone cube (creates {cube_name}_clone)
249
clone_task = KylinCubeOperator(
250
task_id="clone_cube",
251
command='clone',
252
)
253
254
# Drop cube
255
drop_task = KylinCubeOperator(
256
task_id="drop_cube",
257
command='drop',
258
)
259
260
# Purge cube
261
purge_task = KylinCubeOperator(
262
task_id="purge_cube",
263
command='purge',
264
)
265
```
266
267
### Job Tracking
268
269
Monitor job execution with automatic status checking and error handling.
270
271
```python { .api }
272
# Track job until completion with custom timeout and interval
273
tracked_build = KylinCubeOperator(
274
task_id="tracked_build",
275
command='build',
276
start_time='{{ ds_nodash }}000000000',
277
end_time='{{ next_ds_nodash }}000000000',
278
is_track_job=True,
279
interval=30, # Check status every 30 seconds
280
timeout=7200, # 2 hour timeout
281
eager_error_status=("ERROR", "KILLED", "STOPPED"), # Custom error statuses
282
)
283
```
284
285
## Types
286
287
```python { .api }
288
from typing import Optional, Dict, Any, Tuple
289
from airflow.hooks.base import BaseHook
290
from airflow.models import BaseOperator
291
292
# Supported Kylin commands for cube operations
293
SUPPORTED_COMMANDS = {
294
'fullbuild', # Full build of cube segments
295
'build', # Build cube segments (batch)
296
'refresh', # Refresh segments (batch)
297
'merge', # Merge segments (batch)
298
'build_streaming', # Build streaming segments
299
'refresh_streaming', # Refresh streaming segments
300
'merge_streaming', # Merge streaming segments
301
'delete', # Delete segment (requires segment_name)
302
'disable', # Disable cube
303
'enable', # Enable cube
304
'purge', # Purge cube
305
'clone', # Clone cube
306
'drop', # Drop cube
307
}
308
309
# Commands that trigger job tracking when is_track_job=True
310
BUILD_COMMANDS = {
311
'fullbuild',
312
'build',
313
'merge',
314
'refresh',
315
'build_streaming',
316
'merge_streaming',
317
'refresh_streaming',
318
}
319
320
# Job status states
321
JOB_END_STATUSES = {"FINISHED", "ERROR", "DISCARDED", "KILLED", "SUICIDAL", "STOPPED"}
322
323
# Template fields for Jinja templating
324
TEMPLATE_FIELDS = (
325
'project',
326
'cube',
327
'dsn',
328
'command',
329
'start_time',
330
'end_time',
331
'segment_name',
332
'offset_start',
333
'offset_end',
334
)
335
336
# Connection parameters
337
class KylinConnectionConfig:
338
kylin_conn_id: str # Airflow connection ID
339
project: Optional[str] # Kylin project name
340
dsn: Optional[str] # Data Source Name URL
341
342
# Operation parameters for cube operations
343
class CubeOperationParams:
344
datasource_name: str # Cube/datasource name
345
op: str # Operation command
346
start: Optional[str] # Start time/offset
347
end: Optional[str] # End time/offset
348
segment_name: Optional[str] # Target segment name
349
```
350
351
## Error Handling
352
353
The Apache Kylin provider raises `AirflowException` for various error conditions:
354
355
```python { .api }
356
from airflow.exceptions import AirflowException
357
from kylinpy import exceptions
358
359
try:
360
hook = KylinHook(kylin_conn_id='kylin_prod')
361
response = hook.cube_run('sales_cube', 'build', start='2023-01-01', end='2023-01-02')
362
363
if 'uuid' in response:
364
job_id = response['uuid']
365
status = hook.get_job_status(job_id)
366
367
if status in ['ERROR', 'KILLED', 'STOPPED']:
368
raise AirflowException(f"Kylin job {job_id} failed with status: {status}")
369
370
except exceptions.KylinError as kylin_err:
371
raise AirflowException(f"Cube operation error: {kylin_err}")
372
except AirflowException:
373
raise # Re-raise Airflow exceptions
374
except Exception as e:
375
raise AirflowException(f"Unexpected error in Kylin operation: {str(e)}")
376
```
377
378
## Usage Examples
379
380
### Complete DAG with Multiple Operations
381
382
```python
383
from airflow import DAG
384
from airflow.providers.apache.kylin.operators.kylin_cube import KylinCubeOperator
385
from datetime import datetime, timedelta
386
387
default_args = {
388
'owner': 'data_team',
389
'depends_on_past': False,
390
'start_date': datetime(2023, 1, 1),
391
'email_on_failure': True,
392
'email_on_retry': False,
393
'retries': 2,
394
'retry_delay': timedelta(minutes=10)
395
}
396
397
dag = DAG(
398
'kylin_cube_pipeline',
399
default_args=default_args,
400
description='Complete Kylin cube processing pipeline',
401
schedule_interval='@daily',
402
catchup=False
403
)
404
405
# Build daily cube segment
406
build_daily = KylinCubeOperator(
407
task_id='build_daily_segment',
408
kylin_conn_id='prod_kylin',
409
project='analytics',
410
cube='daily_sales_cube',
411
command='build',
412
start_time='{{ ds_nodash }}000000000',
413
end_time='{{ next_ds_nodash }}000000000',
414
is_track_job=True,
415
timeout=1800, # 30 minutes
416
dag=dag
417
)
418
419
# Merge weekly segments on Sundays
420
merge_weekly = KylinCubeOperator(
421
task_id='merge_weekly_segments',
422
kylin_conn_id='prod_kylin',
423
project='analytics',
424
cube='daily_sales_cube',
425
command='merge',
426
start_time='{{ macros.ds_add(ds, -6) | replace("-", "") }}000000000',
427
end_time='{{ next_ds_nodash }}000000000',
428
is_track_job=True,
429
timeout=3600, # 1 hour
430
dag=dag
431
)
432
433
# Set task dependencies
434
build_daily >> merge_weekly
435
```
436
437
### Using KylinHook Directly in Custom Operator
438
439
```python
440
from airflow.providers.apache.kylin.hooks.kylin import KylinHook
441
from airflow.models import BaseOperator
442
443
class CustomKylinOperator(BaseOperator):
444
def execute(self, context):
445
# Initialize hook
446
hook = KylinHook(
447
kylin_conn_id=self.kylin_conn_id,
448
project=self.project
449
)
450
451
# Execute multiple operations
452
try:
453
# Build cube
454
build_response = hook.cube_run(
455
datasource_name=self.cube_name,
456
op='build',
457
start=self.start_time,
458
end=self.end_time
459
)
460
461
# Track job if UUID returned
462
if 'uuid' in build_response:
463
job_id = build_response['uuid']
464
465
# Monitor job status
466
while True:
467
status = hook.get_job_status(job_id)
468
469
if status in ['FINISHED']:
470
self.log.info(f"Job {job_id} completed successfully")
471
break
472
elif status in ['ERROR', 'KILLED', 'STOPPED']:
473
raise AirflowException(f"Job {job_id} failed with status: {status}")
474
475
time.sleep(60) # Wait 1 minute before next check
476
477
except Exception as e:
478
self.log.error(f"Kylin operation failed: {str(e)}")
479
raise
480
```
481
482
### Streaming Cube Operations
483
484
```python
485
# Real-time streaming cube build
486
streaming_build = KylinCubeOperator(
487
task_id='build_streaming_cube',
488
kylin_conn_id='streaming_kylin',
489
project='realtime_analytics',
490
cube='events_streaming_cube',
491
command='build_streaming',
492
offset_start='{{ prev_ds_nodash }}000000000',
493
offset_end='{{ ds_nodash }}000000000',
494
is_track_job=True,
495
interval=30, # Check every 30 seconds
496
timeout=600, # 10 minute timeout for streaming
497
dag=dag
498
)
499
```
500
501
### Error Handling and Monitoring
502
503
```python
504
# Custom error handling with specific error statuses
505
robust_build = KylinCubeOperator(
506
task_id='robust_cube_build',
507
kylin_conn_id='kylin_prod',
508
project='critical_analytics',
509
cube='revenue_cube',
510
command='build',
511
start_time='{{ ds_nodash }}000000000',
512
end_time='{{ next_ds_nodash }}000000000',
513
is_track_job=True,
514
interval=45, # Check every 45 seconds
515
timeout=14400, # 4 hour timeout
516
eager_error_status=("ERROR", "KILLED", "STOPPED", "DISCARDED"), # All error states
517
on_failure_callback=lambda context: send_alert(context),
518
dag=dag
519
)
520
```