0
# Apache Airflow Providers Apache Kylin
1
2
Provider package for Apache Kylin integration with Apache Airflow. This package enables orchestration of Apache Kylin OLAP cube operations within Airflow workflows, providing hooks for connectivity and operators for cube lifecycle management including building, refreshing, merging, and monitoring cube operations.
3
4
## Package Information
5
6
- **Package Name**: apache-airflow-providers-apache-kylin
7
- **Language**: Python
8
- **Installation**: `pip install apache-airflow-providers-apache-kylin`
9
- **Dependencies**: apache-airflow (>=2.10.0), kylinpy (>2.7.0)
10
11
## Core Imports
12
13
```python
14
from airflow.providers.apache.kylin.hooks.kylin import KylinHook
15
from airflow.providers.apache.kylin.operators.kylin_cube import KylinCubeOperator
16
```
17
18
## Basic Usage
19
20
```python
21
from datetime import datetime
22
from airflow import DAG
23
from airflow.providers.apache.kylin.operators.kylin_cube import KylinCubeOperator
24
25
# Define DAG
26
dag = DAG(
27
'kylin_cube_operations',
28
start_date=datetime(2023, 1, 1),
29
schedule_interval='@daily',
30
catchup=False
31
)
32
33
# Build a Kylin cube
34
build_cube = KylinCubeOperator(
35
task_id='build_sales_cube',
36
kylin_conn_id='kylin_default',
37
project='learn_kylin',
38
cube='kylin_sales_cube',
39
command='build',
40
start_time='1483200000000', # Timestamp in milliseconds
41
end_time='1483286400000',
42
is_track_job=True,
43
timeout=3600, # 1 hour timeout
44
dag=dag
45
)
46
47
# Refresh a cube segment
48
refresh_cube = KylinCubeOperator(
49
task_id='refresh_sales_cube',
50
kylin_conn_id='kylin_default',
51
project='learn_kylin',
52
cube='kylin_sales_cube',
53
command='refresh',
54
start_time='1483200000000',
55
end_time='1483286400000',
56
is_track_job=True,
57
dag=dag
58
)
59
60
# Set task dependencies
61
build_cube >> refresh_cube
62
```
63
64
## Architecture
65
66
The provider follows Airflow's standard pattern with two main components:
67
68
- **KylinHook**: Manages connections to Kylin servers and provides low-level API methods for cube operations and job status monitoring
69
- **KylinCubeOperator**: High-level operator for executing cube operations with job tracking, timeout handling, and error management
70
71
## Capabilities
72
73
### Connection Management
74
75
Establishes and manages connections to Apache Kylin servers using Airflow's connection framework.
76
77
```python { .api }
78
class KylinHook(BaseHook):
79
def __init__(
80
self,
81
kylin_conn_id: str = "kylin_default",
82
project: str | None = None,
83
dsn: str | None = None,
84
):
85
"""
86
Initialize Kylin hook.
87
88
Args:
89
kylin_conn_id: Connection ID configured in Airflow
90
project: Kylin project name
91
dsn: Direct DSN URL (overrides kylin_conn_id)
92
"""
93
94
def get_conn(self):
95
"""
96
Get Kylin connection instance.
97
98
Returns:
99
kylinpy.Kylin: Connected Kylin instance
100
"""
101
102
def cube_run(self, datasource_name: str, op: str, **op_args):
103
"""
104
Run CubeSource command.
105
106
Args:
107
datasource_name: Name of the cube/datasource
108
op: Operation command
109
**op_args: Additional operation arguments
110
111
Returns:
112
dict: Response from Kylin API
113
114
Raises:
115
AirflowException: When cube operation fails
116
"""
117
118
def get_job_status(self, job_id: str) -> str:
119
"""
120
Get job status by job ID.
121
122
Args:
123
job_id: Kylin job ID
124
125
Returns:
126
str: Job status (RUNNING, FINISHED, ERROR, etc.)
127
"""
128
```
129
130
### Cube Operations
131
132
Comprehensive cube lifecycle operations including build, refresh, merge, and management commands.
133
134
```python { .api }
135
class KylinCubeOperator(BaseOperator):
136
def __init__(
137
self,
138
*,
139
kylin_conn_id: str = "kylin_default",
140
project: str | None = None,
141
cube: str | None = None,
142
dsn: str | None = None,
143
command: str | None = None,
144
start_time: str | None = None,
145
end_time: str | None = None,
146
offset_start: str | None = None,
147
offset_end: str | None = None,
148
segment_name: str | None = None,
149
is_track_job: bool = False,
150
interval: int = 60,
151
timeout: int = 86400,
152
eager_error_status: tuple = ("ERROR", "DISCARDED", "KILLED", "SUICIDAL", "STOPPED"),
153
**kwargs,
154
):
155
"""
156
Initialize Kylin cube operator.
157
158
Args:
159
kylin_conn_id: Connection ID for Kylin server
160
project: Kylin project name (overrides connection project)
161
cube: Target cube name
162
dsn: Direct DSN URL (overrides kylin_conn_id)
163
command: Cube operation command
164
start_time: Build segment start time (milliseconds timestamp)
165
end_time: Build segment end time (milliseconds timestamp)
166
offset_start: Streaming build segment start offset
167
offset_end: Streaming build segment end offset
168
segment_name: Specific segment name for operations
169
is_track_job: Whether to monitor job status until completion
170
interval: Job status polling interval in seconds
171
timeout: Maximum wait time in seconds
172
eager_error_status: Job statuses that trigger immediate failure
173
"""
174
175
def execute(self, context) -> dict:
176
"""
177
Execute the cube operation.
178
179
Args:
180
context: Airflow task context
181
182
Returns:
183
dict: Operation response from Kylin API
184
185
Raises:
186
AirflowException: When operation fails, job times out, or encounters error status
187
"""
188
```
189
190
### Supported Commands
191
192
The operator supports the following cube operations:
193
194
#### Build Operations
195
- `fullbuild`: Complete cube build
196
- `build`: Build cube segments for specified time range
197
- `build_streaming`: Build streaming cube segments with offset parameters
198
199
#### Maintenance Operations
200
- `refresh`: Refresh existing cube segments
201
- `refresh_streaming`: Refresh streaming cube segments
202
- `merge`: Merge cube segments
203
- `merge_streaming`: Merge streaming cube segments
204
205
#### Management Operations
206
- `delete`: Delete specific cube segments
207
- `disable`: Disable cube
208
- `enable`: Enable cube
209
- `purge`: Purge cube data
210
- `clone`: Clone cube (creates {cube_name}_clone)
211
- `drop`: Drop cube completely
212
213
### Job Status Monitoring
214
215
Built-in job tracking with configurable polling intervals and error handling.
216
217
**Job End Statuses**: FINISHED, ERROR, DISCARDED, KILLED, SUICIDAL, STOPPED
218
219
**Template Fields**: The following fields support Jinja2 templating:
220
- project
221
- cube
222
- dsn
223
- command
224
- start_time
225
- end_time
226
- segment_name
227
- offset_start
228
- offset_end
229
230
### Usage Examples
231
232
#### Basic Cube Build
233
```python
234
build_task = KylinCubeOperator(
235
task_id='build_cube',
236
cube='sales_cube',
237
command='build',
238
start_time='1640995200000', # 2022-01-01 00:00:00 UTC
239
end_time='1641081600000', # 2022-01-02 00:00:00 UTC
240
dag=dag
241
)
242
```
243
244
#### Streaming Cube Operations
245
```python
246
streaming_build = KylinCubeOperator(
247
task_id='build_streaming_cube',
248
cube='streaming_sales_cube',
249
command='build_streaming',
250
offset_start='0',
251
offset_end='1000',
252
dag=dag
253
)
254
```
255
256
#### Job Tracking with Custom Settings
257
```python
258
tracked_build = KylinCubeOperator(
259
task_id='tracked_build',
260
cube='large_cube',
261
command='build',
262
start_time='{{ ds_nodash }}000000', # Templated start time
263
end_time='{{ next_ds_nodash }}000000', # Templated end time
264
is_track_job=True,
265
interval=30, # Check every 30 seconds
266
timeout=7200, # 2 hour timeout
267
dag=dag
268
)
269
```
270
271
#### Using Direct DSN Connection
272
```python
273
dsn_task = KylinCubeOperator(
274
task_id='dsn_operation',
275
dsn='kylin://ADMIN:KYLIN@sandbox/learn_kylin?timeout=60&is_debug=1',
276
cube='test_cube',
277
command='enable',
278
dag=dag
279
)
280
```
281
282
### Connection Configuration
283
284
Configure Kylin connections in Airflow Admin UI:
285
286
- **Connection Type**: kylin
287
- **Host**: Kylin server hostname
288
- **Port**: Kylin server port (typically 7070)
289
- **Login**: Username
290
- **Password**: Password
291
- **Schema**: Default project name
292
- **Extra**: Additional connection parameters as JSON
293
294
Example connection configuration:
295
```json
296
{
297
"timeout": 60,
298
"is_debug": true,
299
"verify_ssl": false
300
}
301
```
302
303
## Types
304
305
```python { .api }
306
# Hook connection attributes
307
class KylinHook(BaseHook):
308
conn_name_attr: str = "kylin_conn_id"
309
default_conn_name: str = "kylin_default"
310
conn_type: str = "kylin"
311
hook_name: str = "Apache Kylin"
312
313
# Operator template fields for Jinja2 templating
314
class KylinCubeOperator(BaseOperator):
315
template_fields: tuple[str, ...] = (
316
"project",
317
"cube",
318
"dsn",
319
"command",
320
"start_time",
321
"end_time",
322
"segment_name",
323
"offset_start",
324
"offset_end",
325
)
326
327
ui_color: str = "#E79C46"
328
build_command: set[str] = {
329
"fullbuild",
330
"build",
331
"merge",
332
"refresh",
333
"build_streaming",
334
"merge_streaming",
335
"refresh_streaming",
336
}
337
jobs_end_status: set[str] = {
338
"FINISHED",
339
"ERROR",
340
"DISCARDED",
341
"KILLED",
342
"SUICIDAL",
343
"STOPPED"
344
}
345
```
346
347
## Error Handling
348
349
The package raises `AirflowException` for various error conditions:
350
351
- **Invalid Commands**: When command is not in supported command list
352
- **Kylin API Errors**: Wrapped from `kylinpy.exceptions.KylinError`
353
- **Job Timeout**: When job monitoring exceeds timeout duration
354
- **Job Failures**: When job status matches eager_error_status patterns
355
- **Missing Parameters**: When required cube name or job ID is missing
356
357
All errors include descriptive messages with context about the failing operation.