or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.md

index.mddocs/

0

# Apache Kylin Provider for Apache Airflow

1

2

Apache Kylin provider package for Apache Airflow that enables integration with Apache Kylin OLAP engine. This backport provider allows users to trigger Kylin cube builds, manage cube operations, and monitor job statuses within Airflow workflows.

3

4

## Package Information

5

6

- **Package Name**: apache-airflow-backport-providers-apache-kylin

7

- **Package Type**: pypi

8

- **Language**: Python

9

- **Installation**: `pip install apache-airflow-backport-providers-apache-kylin`

10

11

## Core Imports

12

13

```python

14

from airflow.providers.apache.kylin.hooks.kylin import KylinHook

15

from airflow.providers.apache.kylin.operators.kylin_cube import KylinCubeOperator

16

```

17

18

For error handling and advanced usage:

19

20

```python

21

from kylinpy import kylinpy, exceptions

22

from airflow.exceptions import AirflowException

23

from airflow.utils.decorators import apply_defaults

24

```

25

26

## Basic Usage

27

28

```python

29

from airflow import DAG

30

from airflow.providers.apache.kylin.operators.kylin_cube import KylinCubeOperator

31

from datetime import datetime, timedelta

32

33

default_args = {

34

'owner': 'airflow',

35

'depends_on_past': False,

36

'start_date': datetime(2023, 1, 1),

37

'retries': 1,

38

'retry_delay': timedelta(minutes=5)

39

}

40

41

dag = DAG(

42

'kylin_cube_build',

43

default_args=default_args,

44

description='Build Kylin cube',

45

schedule_interval=timedelta(days=1)

46

)

47

48

# Build a Kylin cube with job tracking

49

build_cube = KylinCubeOperator(

50

task_id='build_kylin_cube',

51

kylin_conn_id='kylin_default',

52

project='sales_analytics',

53

cube='sales_cube',

54

command='build',

55

start_time='{{ ds_nodash }}000000000', # Start of day in milliseconds

56

end_time='{{ next_ds_nodash }}000000000', # End of day in milliseconds

57

is_track_job=True,

58

timeout=3600, # 1 hour timeout

59

dag=dag

60

)

61

```

62

63

## Capabilities

64

65

### Connection Management

66

67

Establishes and manages connections to Kylin server.

68

69

```python { .api }

70

class KylinHook(BaseHook):

71

def __init__(

72

self,

73

kylin_conn_id: str = 'kylin_default',

74

project: Optional[str] = None,

75

dsn: Optional[str] = None,

76

): ...

77

78

def get_conn(self):

79

"""

80

Establishes and returns a connection to the Kylin server.

81

82

Returns:

83

kylinpy.Kylin: Connection object for interacting with Kylin server.

84

Can be used to get datasources and manage cube operations.

85

"""

86

```

87

88

### Cube Operations

89

90

Executes various cube operations including build, refresh, merge, and management operations.

91

92

```python { .api }

93

def cube_run(self, datasource_name: str, op: str, **op_args) -> Any:

94

"""

95

Execute cube operations on the specified datasource.

96

97

Args:

98

datasource_name (str): Name of the datasource/cube to operate on

99

op (str): Command to execute (must be in supported commands)

100

**op_args: Additional keyword arguments for the operation

101

102

Returns:

103

Response from the cube operation

104

105

Raises:

106

AirflowException: If the cube operation encounters a KylinError

107

"""

108

```

109

110

### Job Status Monitoring

111

112

Retrieves and monitors the status of Kylin jobs.

113

114

```python { .api }

115

def get_job_status(self, job_id: str) -> str:

116

"""

117

Retrieve the status of a Kylin job.

118

119

Args:

120

job_id (str): Kylin job ID

121

122

Returns:

123

str: Job status

124

"""

125

```

126

127

### Cube Build Operator

128

129

Airflow operator for submitting Kylin cube operations with optional job tracking.

130

131

```python { .api }

132

class KylinCubeOperator(BaseOperator):

133

def __init__(

134

self,

135

*,

136

kylin_conn_id: str = 'kylin_default',

137

project: Optional[str] = None,

138

cube: Optional[str] = None,

139

dsn: Optional[str] = None,

140

command: Optional[str] = None,

141

start_time: Optional[str] = None,

142

end_time: Optional[str] = None,

143

offset_start: Optional[str] = None,

144

offset_end: Optional[str] = None,

145

segment_name: Optional[str] = None,

146

is_track_job: bool = False,

147

interval: int = 60,

148

timeout: int = 60 * 60 * 24,

149

eager_error_status: Tuple[str, ...] = ("ERROR", "DISCARDED", "KILLED", "SUICIDAL", "STOPPED"),

150

**kwargs,

151

): ...

152

153

def execute(self, context: Dict[str, Any]) -> Any: ...

154

```

155

156

### Batch Operations

157

158

Build, refresh, and merge cube segments for batch processing.

159

160

```python { .api }

161

# Full build of cube segments

162

fullbuild_task = KylinCubeOperator(

163

task_id="fullbuild_cube",

164

command='fullbuild',

165

start_time='1325347200000', # Start timestamp in milliseconds

166

end_time='1325433600000', # End timestamp in milliseconds

167

)

168

169

# Build cube segments

170

build_task = KylinCubeOperator(

171

task_id="build_cube",

172

command='build',

173

start_time='1325347200000', # Start timestamp in milliseconds

174

end_time='1325433600000', # End timestamp in milliseconds

175

)

176

177

# Refresh existing segments

178

refresh_task = KylinCubeOperator(

179

task_id="refresh_cube",

180

command='refresh',

181

start_time='1325347200000',

182

end_time='1325433600000',

183

)

184

185

# Merge segments

186

merge_task = KylinCubeOperator(

187

task_id="merge_cube",

188

command='merge',

189

start_time='1325347200000',

190

end_time='1325433600000',

191

)

192

```

193

194

### Streaming Operations

195

196

Build, refresh, and merge operations for streaming cubes.

197

198

```python { .api }

199

# Build streaming segments

200

build_streaming_task = KylinCubeOperator(

201

task_id="build_streaming",

202

command='build_streaming',

203

offset_start='0', # Start offset

204

offset_end='100000', # End offset

205

)

206

207

# Refresh streaming segments

208

refresh_streaming_task = KylinCubeOperator(

209

task_id="refresh_streaming",

210

command='refresh_streaming',

211

offset_start='0',

212

offset_end='100000',

213

)

214

215

# Merge streaming segments

216

merge_streaming_task = KylinCubeOperator(

217

task_id="merge_streaming",

218

command='merge_streaming',

219

offset_start='0',

220

offset_end='100000',

221

)

222

```

223

224

### Cube Management Operations

225

226

Enable, disable, delete, clone, drop, and purge cube operations.

227

228

```python { .api }

229

# Enable cube

230

enable_task = KylinCubeOperator(

231

task_id="enable_cube",

232

command='enable',

233

)

234

235

# Disable cube

236

disable_task = KylinCubeOperator(

237

task_id="disable_cube",

238

command='disable',

239

)

240

241

# Delete segment

242

delete_task = KylinCubeOperator(

243

task_id="delete_segment",

244

command='delete',

245

segment_name='segment_20230101_20230102',

246

)

247

248

# Clone cube (creates {cube_name}_clone)

249

clone_task = KylinCubeOperator(

250

task_id="clone_cube",

251

command='clone',

252

)

253

254

# Drop cube

255

drop_task = KylinCubeOperator(

256

task_id="drop_cube",

257

command='drop',

258

)

259

260

# Purge cube

261

purge_task = KylinCubeOperator(

262

task_id="purge_cube",

263

command='purge',

264

)

265

```

266

267

### Job Tracking

268

269

Monitor job execution with automatic status checking and error handling.

270

271

```python { .api }

272

# Track job until completion with custom timeout and interval

273

tracked_build = KylinCubeOperator(

274

task_id="tracked_build",

275

command='build',

276

start_time='{{ ds_nodash }}000000000',

277

end_time='{{ next_ds_nodash }}000000000',

278

is_track_job=True,

279

interval=30, # Check status every 30 seconds

280

timeout=7200, # 2 hour timeout

281

eager_error_status=("ERROR", "KILLED", "STOPPED"), # Custom error statuses

282

)

283

```

284

285

## Types

286

287

```python { .api }

288

from typing import Optional, Dict, Any, Tuple

289

from airflow.hooks.base import BaseHook

290

from airflow.models import BaseOperator

291

292

# Supported Kylin commands for cube operations

293

SUPPORTED_COMMANDS = {

294

'fullbuild', # Full build of cube segments

295

'build', # Build cube segments (batch)

296

'refresh', # Refresh segments (batch)

297

'merge', # Merge segments (batch)

298

'build_streaming', # Build streaming segments

299

'refresh_streaming', # Refresh streaming segments

300

'merge_streaming', # Merge streaming segments

301

'delete', # Delete segment (requires segment_name)

302

'disable', # Disable cube

303

'enable', # Enable cube

304

'purge', # Purge cube

305

'clone', # Clone cube

306

'drop', # Drop cube

307

}

308

309

# Commands that trigger job tracking when is_track_job=True

310

BUILD_COMMANDS = {

311

'fullbuild',

312

'build',

313

'merge',

314

'refresh',

315

'build_streaming',

316

'merge_streaming',

317

'refresh_streaming',

318

}

319

320

# Job status states

321

JOB_END_STATUSES = {"FINISHED", "ERROR", "DISCARDED", "KILLED", "SUICIDAL", "STOPPED"}

322

323

# Template fields for Jinja templating

324

TEMPLATE_FIELDS = (

325

'project',

326

'cube',

327

'dsn',

328

'command',

329

'start_time',

330

'end_time',

331

'segment_name',

332

'offset_start',

333

'offset_end',

334

)

335

336

# Connection parameters

337

class KylinConnectionConfig:

338

kylin_conn_id: str # Airflow connection ID

339

project: Optional[str] # Kylin project name

340

dsn: Optional[str] # Data Source Name URL

341

342

# Operation parameters for cube operations

343

class CubeOperationParams:

344

datasource_name: str # Cube/datasource name

345

op: str # Operation command

346

start: Optional[str] # Start time/offset

347

end: Optional[str] # End time/offset

348

segment_name: Optional[str] # Target segment name

349

```

350

351

## Error Handling

352

353

The Apache Kylin provider raises `AirflowException` for various error conditions:

354

355

```python { .api }

356

from airflow.exceptions import AirflowException

357

from kylinpy import exceptions

358

359

try:

360

hook = KylinHook(kylin_conn_id='kylin_prod')

361

response = hook.cube_run('sales_cube', 'build', start='2023-01-01', end='2023-01-02')

362

363

if 'uuid' in response:

364

job_id = response['uuid']

365

status = hook.get_job_status(job_id)

366

367

if status in ['ERROR', 'KILLED', 'STOPPED']:

368

raise AirflowException(f"Kylin job {job_id} failed with status: {status}")

369

370

except exceptions.KylinError as kylin_err:

371

raise AirflowException(f"Cube operation error: {kylin_err}")

372

except AirflowException:

373

raise # Re-raise Airflow exceptions

374

except Exception as e:

375

raise AirflowException(f"Unexpected error in Kylin operation: {str(e)}")

376

```

377

378

## Usage Examples

379

380

### Complete DAG with Multiple Operations

381

382

```python

383

from airflow import DAG

384

from airflow.providers.apache.kylin.operators.kylin_cube import KylinCubeOperator

385

from datetime import datetime, timedelta

386

387

default_args = {

388

'owner': 'data_team',

389

'depends_on_past': False,

390

'start_date': datetime(2023, 1, 1),

391

'email_on_failure': True,

392

'email_on_retry': False,

393

'retries': 2,

394

'retry_delay': timedelta(minutes=10)

395

}

396

397

dag = DAG(

398

'kylin_cube_pipeline',

399

default_args=default_args,

400

description='Complete Kylin cube processing pipeline',

401

schedule_interval='@daily',

402

catchup=False

403

)

404

405

# Build daily cube segment

406

build_daily = KylinCubeOperator(

407

task_id='build_daily_segment',

408

kylin_conn_id='prod_kylin',

409

project='analytics',

410

cube='daily_sales_cube',

411

command='build',

412

start_time='{{ ds_nodash }}000000000',

413

end_time='{{ next_ds_nodash }}000000000',

414

is_track_job=True,

415

timeout=1800, # 30 minutes

416

dag=dag

417

)

418

419

# Merge weekly segments on Sundays

420

merge_weekly = KylinCubeOperator(

421

task_id='merge_weekly_segments',

422

kylin_conn_id='prod_kylin',

423

project='analytics',

424

cube='daily_sales_cube',

425

command='merge',

426

start_time='{{ macros.ds_add(ds, -6) | replace("-", "") }}000000000',

427

end_time='{{ next_ds_nodash }}000000000',

428

is_track_job=True,

429

timeout=3600, # 1 hour

430

dag=dag

431

)

432

433

# Set task dependencies

434

build_daily >> merge_weekly

435

```

436

437

### Using KylinHook Directly in Custom Operator

438

439

```python

440

from airflow.providers.apache.kylin.hooks.kylin import KylinHook

441

from airflow.models import BaseOperator

442

443

class CustomKylinOperator(BaseOperator):

444

def execute(self, context):

445

# Initialize hook

446

hook = KylinHook(

447

kylin_conn_id=self.kylin_conn_id,

448

project=self.project

449

)

450

451

# Execute multiple operations

452

try:

453

# Build cube

454

build_response = hook.cube_run(

455

datasource_name=self.cube_name,

456

op='build',

457

start=self.start_time,

458

end=self.end_time

459

)

460

461

# Track job if UUID returned

462

if 'uuid' in build_response:

463

job_id = build_response['uuid']

464

465

# Monitor job status

466

while True:

467

status = hook.get_job_status(job_id)

468

469

if status in ['FINISHED']:

470

self.log.info(f"Job {job_id} completed successfully")

471

break

472

elif status in ['ERROR', 'KILLED', 'STOPPED']:

473

raise AirflowException(f"Job {job_id} failed with status: {status}")

474

475

time.sleep(60) # Wait 1 minute before next check

476

477

except Exception as e:

478

self.log.error(f"Kylin operation failed: {str(e)}")

479

raise

480

```

481

482

### Streaming Cube Operations

483

484

```python

485

# Real-time streaming cube build

486

streaming_build = KylinCubeOperator(

487

task_id='build_streaming_cube',

488

kylin_conn_id='streaming_kylin',

489

project='realtime_analytics',

490

cube='events_streaming_cube',

491

command='build_streaming',

492

offset_start='{{ prev_ds_nodash }}000000000',

493

offset_end='{{ ds_nodash }}000000000',

494

is_track_job=True,

495

interval=30, # Check every 30 seconds

496

timeout=600, # 10 minute timeout for streaming

497

dag=dag

498

)

499

```

500

501

### Error Handling and Monitoring

502

503

```python

504

# Custom error handling with specific error statuses

505

robust_build = KylinCubeOperator(

506

task_id='robust_cube_build',

507

kylin_conn_id='kylin_prod',

508

project='critical_analytics',

509

cube='revenue_cube',

510

command='build',

511

start_time='{{ ds_nodash }}000000000',

512

end_time='{{ next_ds_nodash }}000000000',

513

is_track_job=True,

514

interval=45, # Check every 45 seconds

515

timeout=14400, # 4 hour timeout

516

eager_error_status=("ERROR", "KILLED", "STOPPED", "DISCARDED"), # All error states

517

on_failure_callback=lambda context: send_alert(context),

518

dag=dag

519

)

520

```