or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.md

index.mddocs/

0

# Apache Airflow Providers Apache Kylin

1

2

Provider package for Apache Kylin integration with Apache Airflow. This package enables orchestration of Apache Kylin OLAP cube operations within Airflow workflows, providing hooks for connectivity and operators for cube lifecycle management including building, refreshing, merging, and monitoring cube operations.

3

4

## Package Information

5

6

- **Package Name**: apache-airflow-providers-apache-kylin

7

- **Language**: Python

8

- **Installation**: `pip install apache-airflow-providers-apache-kylin`

9

- **Dependencies**: apache-airflow (>=2.10.0), kylinpy (>2.7.0)

10

11

## Core Imports

12

13

```python

14

from airflow.providers.apache.kylin.hooks.kylin import KylinHook

15

from airflow.providers.apache.kylin.operators.kylin_cube import KylinCubeOperator

16

```

17

18

## Basic Usage

19

20

```python

21

from datetime import datetime

22

from airflow import DAG

23

from airflow.providers.apache.kylin.operators.kylin_cube import KylinCubeOperator

24

25

# Define DAG

26

dag = DAG(

27

'kylin_cube_operations',

28

start_date=datetime(2023, 1, 1),

29

schedule_interval='@daily',

30

catchup=False

31

)

32

33

# Build a Kylin cube

34

build_cube = KylinCubeOperator(

35

task_id='build_sales_cube',

36

kylin_conn_id='kylin_default',

37

project='learn_kylin',

38

cube='kylin_sales_cube',

39

command='build',

40

start_time='1483200000000', # Timestamp in milliseconds

41

end_time='1483286400000',

42

is_track_job=True,

43

timeout=3600, # 1 hour timeout

44

dag=dag

45

)

46

47

# Refresh a cube segment

48

refresh_cube = KylinCubeOperator(

49

task_id='refresh_sales_cube',

50

kylin_conn_id='kylin_default',

51

project='learn_kylin',

52

cube='kylin_sales_cube',

53

command='refresh',

54

start_time='1483200000000',

55

end_time='1483286400000',

56

is_track_job=True,

57

dag=dag

58

)

59

60

# Set task dependencies

61

build_cube >> refresh_cube

62

```

63

64

## Architecture

65

66

The provider follows Airflow's standard pattern with two main components:

67

68

- **KylinHook**: Manages connections to Kylin servers and provides low-level API methods for cube operations and job status monitoring

69

- **KylinCubeOperator**: High-level operator for executing cube operations with job tracking, timeout handling, and error management

70

71

## Capabilities

72

73

### Connection Management

74

75

Establishes and manages connections to Apache Kylin servers using Airflow's connection framework.

76

77

```python { .api }

78

class KylinHook(BaseHook):

79

def __init__(

80

self,

81

kylin_conn_id: str = "kylin_default",

82

project: str | None = None,

83

dsn: str | None = None,

84

):

85

"""

86

Initialize Kylin hook.

87

88

Args:

89

kylin_conn_id: Connection ID configured in Airflow

90

project: Kylin project name

91

dsn: Direct DSN URL (overrides kylin_conn_id)

92

"""

93

94

def get_conn(self):

95

"""

96

Get Kylin connection instance.

97

98

Returns:

99

kylinpy.Kylin: Connected Kylin instance

100

"""

101

102

def cube_run(self, datasource_name: str, op: str, **op_args):

103

"""

104

Run CubeSource command.

105

106

Args:

107

datasource_name: Name of the cube/datasource

108

op: Operation command

109

**op_args: Additional operation arguments

110

111

Returns:

112

dict: Response from Kylin API

113

114

Raises:

115

AirflowException: When cube operation fails

116

"""

117

118

def get_job_status(self, job_id: str) -> str:

119

"""

120

Get job status by job ID.

121

122

Args:

123

job_id: Kylin job ID

124

125

Returns:

126

str: Job status (RUNNING, FINISHED, ERROR, etc.)

127

"""

128

```

129

130

### Cube Operations

131

132

Comprehensive cube lifecycle operations including build, refresh, merge, and management commands.

133

134

```python { .api }

135

class KylinCubeOperator(BaseOperator):

136

def __init__(

137

self,

138

*,

139

kylin_conn_id: str = "kylin_default",

140

project: str | None = None,

141

cube: str | None = None,

142

dsn: str | None = None,

143

command: str | None = None,

144

start_time: str | None = None,

145

end_time: str | None = None,

146

offset_start: str | None = None,

147

offset_end: str | None = None,

148

segment_name: str | None = None,

149

is_track_job: bool = False,

150

interval: int = 60,

151

timeout: int = 86400,

152

eager_error_status: tuple = ("ERROR", "DISCARDED", "KILLED", "SUICIDAL", "STOPPED"),

153

**kwargs,

154

):

155

"""

156

Initialize Kylin cube operator.

157

158

Args:

159

kylin_conn_id: Connection ID for Kylin server

160

project: Kylin project name (overrides connection project)

161

cube: Target cube name

162

dsn: Direct DSN URL (overrides kylin_conn_id)

163

command: Cube operation command

164

start_time: Build segment start time (milliseconds timestamp)

165

end_time: Build segment end time (milliseconds timestamp)

166

offset_start: Streaming build segment start offset

167

offset_end: Streaming build segment end offset

168

segment_name: Specific segment name for operations

169

is_track_job: Whether to monitor job status until completion

170

interval: Job status polling interval in seconds

171

timeout: Maximum wait time in seconds

172

eager_error_status: Job statuses that trigger immediate failure

173

"""

174

175

def execute(self, context) -> dict:

176

"""

177

Execute the cube operation.

178

179

Args:

180

context: Airflow task context

181

182

Returns:

183

dict: Operation response from Kylin API

184

185

Raises:

186

AirflowException: When operation fails, job times out, or encounters error status

187

"""

188

```

189

190

### Supported Commands

191

192

The operator supports the following cube operations:

193

194

#### Build Operations

195

- `fullbuild`: Complete cube build

196

- `build`: Build cube segments for specified time range

197

- `build_streaming`: Build streaming cube segments with offset parameters

198

199

#### Maintenance Operations

200

- `refresh`: Refresh existing cube segments

201

- `refresh_streaming`: Refresh streaming cube segments

202

- `merge`: Merge cube segments

203

- `merge_streaming`: Merge streaming cube segments

204

205

#### Management Operations

206

- `delete`: Delete specific cube segments

207

- `disable`: Disable cube

208

- `enable`: Enable cube

209

- `purge`: Purge cube data

210

- `clone`: Clone cube (creates {cube_name}_clone)

211

- `drop`: Drop cube completely

212

213

### Job Status Monitoring

214

215

Built-in job tracking with configurable polling intervals and error handling.

216

217

**Job End Statuses**: FINISHED, ERROR, DISCARDED, KILLED, SUICIDAL, STOPPED

218

219

**Template Fields**: The following fields support Jinja2 templating:

220

- project

221

- cube

222

- dsn

223

- command

224

- start_time

225

- end_time

226

- segment_name

227

- offset_start

228

- offset_end

229

230

### Usage Examples

231

232

#### Basic Cube Build

233

```python

234

build_task = KylinCubeOperator(

235

task_id='build_cube',

236

cube='sales_cube',

237

command='build',

238

start_time='1640995200000', # 2022-01-01 00:00:00 UTC

239

end_time='1641081600000', # 2022-01-02 00:00:00 UTC

240

dag=dag

241

)

242

```

243

244

#### Streaming Cube Operations

245

```python

246

streaming_build = KylinCubeOperator(

247

task_id='build_streaming_cube',

248

cube='streaming_sales_cube',

249

command='build_streaming',

250

offset_start='0',

251

offset_end='1000',

252

dag=dag

253

)

254

```

255

256

#### Job Tracking with Custom Settings

257

```python

258

tracked_build = KylinCubeOperator(

259

task_id='tracked_build',

260

cube='large_cube',

261

command='build',

262

start_time='{{ ds_nodash }}000000', # Templated start time

263

end_time='{{ next_ds_nodash }}000000', # Templated end time

264

is_track_job=True,

265

interval=30, # Check every 30 seconds

266

timeout=7200, # 2 hour timeout

267

dag=dag

268

)

269

```

270

271

#### Using Direct DSN Connection

272

```python

273

dsn_task = KylinCubeOperator(

274

task_id='dsn_operation',

275

dsn='kylin://ADMIN:KYLIN@sandbox/learn_kylin?timeout=60&is_debug=1',

276

cube='test_cube',

277

command='enable',

278

dag=dag

279

)

280

```

281

282

### Connection Configuration

283

284

Configure Kylin connections in Airflow Admin UI:

285

286

- **Connection Type**: kylin

287

- **Host**: Kylin server hostname

288

- **Port**: Kylin server port (typically 7070)

289

- **Login**: Username

290

- **Password**: Password

291

- **Schema**: Default project name

292

- **Extra**: Additional connection parameters as JSON

293

294

Example connection configuration:

295

```json

296

{

297

"timeout": 60,

298

"is_debug": true,

299

"verify_ssl": false

300

}

301

```

302

303

## Types

304

305

```python { .api }

306

# Hook connection attributes

307

class KylinHook(BaseHook):

308

conn_name_attr: str = "kylin_conn_id"

309

default_conn_name: str = "kylin_default"

310

conn_type: str = "kylin"

311

hook_name: str = "Apache Kylin"

312

313

# Operator template fields for Jinja2 templating

314

class KylinCubeOperator(BaseOperator):

315

template_fields: tuple[str, ...] = (

316

"project",

317

"cube",

318

"dsn",

319

"command",

320

"start_time",

321

"end_time",

322

"segment_name",

323

"offset_start",

324

"offset_end",

325

)

326

327

ui_color: str = "#E79C46"

328

build_command: set[str] = {

329

"fullbuild",

330

"build",

331

"merge",

332

"refresh",

333

"build_streaming",

334

"merge_streaming",

335

"refresh_streaming",

336

}

337

jobs_end_status: set[str] = {

338

"FINISHED",

339

"ERROR",

340

"DISCARDED",

341

"KILLED",

342

"SUICIDAL",

343

"STOPPED"

344

}

345

```

346

347

## Error Handling

348

349

The package raises `AirflowException` for various error conditions:

350

351

- **Invalid Commands**: When command is not in supported command list

352

- **Kylin API Errors**: Wrapped from `kylinpy.exceptions.KylinError`

353

- **Job Timeout**: When job monitoring exceeds timeout duration

354

- **Job Failures**: When job status matches eager_error_status patterns

355

- **Missing Parameters**: When required cube name or job ID is missing

356

357

All errors include descriptive messages with context about the failing operation.