or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/pypi-apache-airflow-backport-providers-apache-sqoop

Apache Airflow backport provider package for Apache Sqoop integration, providing SqoopHook and SqoopOperator for data import/export between relational databases and Hadoop

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-backport-providers-apache-sqoop@2021.3.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-backport-providers-apache-sqoop@2021.3.0

0

# Apache Airflow Backport Providers Apache Sqoop

1

2

Apache Airflow backport provider package for Apache Sqoop integration, providing SqoopHook and SqoopOperator for data import/export between relational databases and Hadoop. This package enables efficient bulk data transfer using Apache Sqoop within Airflow workflows, supporting various data formats and export/import operations.

3

4

## Package Information

5

6

- **Package Name**: apache-airflow-backport-providers-apache-sqoop

7

- **Package Type**: pip

8

- **Language**: Python

9

- **Installation**: `pip install apache-airflow-backport-providers-apache-sqoop`

10

11

## Core Imports

12

13

```python

14

from airflow.providers.apache.sqoop.hooks.sqoop import SqoopHook

15

from airflow.providers.apache.sqoop.operators.sqoop import SqoopOperator

16

```

17

18

## Basic Usage

19

20

```python

21

from airflow import DAG

22

from airflow.providers.apache.sqoop.operators.sqoop import SqoopOperator

23

from datetime import datetime, timedelta

24

25

# Define DAG

26

dag = DAG(

27

'sqoop_example',

28

default_args={

29

'owner': 'airflow',

30

'depends_on_past': False,

31

'start_date': datetime(2023, 1, 1),

32

'retries': 1,

33

'retry_delay': timedelta(minutes=5)

34

},

35

schedule_interval=timedelta(days=1)

36

)

37

38

# Import table from database to HDFS

39

import_task = SqoopOperator(

40

task_id='import_table',

41

conn_id='sqoop_default',

42

cmd_type='import',

43

table='customers',

44

target_dir='/user/hive/warehouse/customers',

45

file_type='text',

46

num_mappers=4,

47

dag=dag

48

)

49

50

# Export data from HDFS to database

51

export_task = SqoopOperator(

52

task_id='export_data',

53

conn_id='sqoop_default',

54

cmd_type='export',

55

table='processed_customers',

56

export_dir='/user/hive/warehouse/processed_customers',

57

dag=dag

58

)

59

60

import_task >> export_task

61

```

62

63

## Architecture

64

65

The package provides two main components:

66

67

- **SqoopHook**: Low-level interface for executing Sqoop commands, handling connection management, command construction, and subprocess execution

68

- **SqoopOperator**: High-level Airflow operator that wraps SqoopHook functionality for integration into Airflow DAGs

69

70

Both components support the full range of Sqoop operations including table imports, query-based imports, table exports, and various data format options (text, Avro, Sequence, Parquet).

71

72

## Capabilities

73

74

### SqoopHook - Connection and Command Execution

75

76

Core hook class that manages Sqoop connections and executes Sqoop commands through subprocess calls.

77

78

```python { .api }

79

class SqoopHook(BaseHook):

80

"""

81

Hook for executing Apache Sqoop commands.

82

83

Args:

84

conn_id (str): Reference to the sqoop connection (default: 'sqoop_default')

85

verbose (bool): Set sqoop to verbose mode (default: False)

86

num_mappers (Optional[int]): Number of map tasks to import in parallel (default: None)

87

hcatalog_database (Optional[str]): HCatalog database name (default: None)

88

hcatalog_table (Optional[str]): HCatalog table name (default: None)

89

properties (Optional[Dict[str, Any]]): Properties to set via -D argument (default: None)

90

"""

91

92

conn_name_attr: str = 'conn_id'

93

default_conn_name: str = 'sqoop_default'

94

conn_type: str = 'sqoop'

95

hook_name: str = 'Sqoop'

96

97

def __init__(

98

self,

99

conn_id: str = default_conn_name,

100

verbose: bool = False,

101

num_mappers: Optional[int] = None,

102

hcatalog_database: Optional[str] = None,

103

hcatalog_table: Optional[str] = None,

104

properties: Optional[Dict[str, Any]] = None,

105

) -> None: ...

106

107

def get_conn(self) -> Any:

108

"""Returns the connection object."""

109

110

def cmd_mask_password(self, cmd_orig: List[str]) -> List[str]:

111

"""

112

Mask command password for safety.

113

114

Args:

115

cmd_orig (List[str]): Original command list

116

117

Returns:

118

List[str]: Command with password masked

119

"""

120

121

def popen(self, cmd: List[str], **kwargs: Any) -> None:

122

"""

123

Execute remote command via subprocess.

124

125

Args:

126

cmd (List[str]): Command to remotely execute

127

**kwargs: Extra arguments to Popen (see subprocess.Popen)

128

129

Raises:

130

AirflowException: If sqoop command fails

131

"""

132

```

133

134

### SqoopHook - Table Import Operations

135

136

Methods for importing data from relational databases to HDFS.

137

138

```python { .api }

139

def import_table(

140

self,

141

table: str,

142

target_dir: Optional[str] = None,

143

append: bool = False,

144

file_type: str = "text",

145

columns: Optional[str] = None,

146

split_by: Optional[str] = None,

147

where: Optional[str] = None,

148

direct: bool = False,

149

driver: Any = None,

150

extra_import_options: Optional[Dict[str, Any]] = None,

151

) -> Any:

152

"""

153

Import table from remote database to HDFS.

154

155

Args:

156

table (str): Table to read

157

target_dir (Optional[str]): HDFS destination directory

158

append (bool): Append data to existing dataset in HDFS (default: False)

159

file_type (str): Output format - 'avro', 'sequence', 'text', or 'parquet' (default: 'text')

160

columns (Optional[str]): Comma-separated columns to import from table

161

split_by (Optional[str]): Column of the table used to split work units

162

where (Optional[str]): WHERE clause to use during import

163

direct (bool): Use direct connector if exists for the database (default: False)

164

driver (Any): Manually specify JDBC driver class to use

165

extra_import_options (Optional[Dict[str, Any]]): Extra import options as dict

166

167

Returns:

168

Any: Import operation result

169

"""

170

171

def import_query(

172

self,

173

query: str,

174

target_dir: Optional[str] = None,

175

append: bool = False,

176

file_type: str = "text",

177

split_by: Optional[str] = None,

178

direct: Optional[bool] = None,

179

driver: Optional[Any] = None,

180

extra_import_options: Optional[Dict[str, Any]] = None,

181

) -> Any:

182

"""

183

Import specific query results from RDBMS to HDFS.

184

185

Args:

186

query (str): Free format query to run

187

target_dir (Optional[str]): HDFS destination directory

188

append (bool): Append data to existing dataset in HDFS (default: False)

189

file_type (str): Output format - 'avro', 'sequence', 'text', or 'parquet' (default: 'text')

190

split_by (Optional[str]): Column of the table used to split work units

191

direct (Optional[bool]): Use direct import fast path

192

driver (Optional[Any]): Manually specify JDBC driver class to use

193

extra_import_options (Optional[Dict[str, Any]]): Extra import options as dict

194

195

Returns:

196

Any: Import operation result

197

"""

198

```

199

200

### SqoopHook - Table Export Operations

201

202

Methods for exporting data from HDFS to relational databases.

203

204

```python { .api }

205

def export_table(

206

self,

207

table: str,

208

export_dir: Optional[str] = None,

209

input_null_string: Optional[str] = None,

210

input_null_non_string: Optional[str] = None,

211

staging_table: Optional[str] = None,

212

clear_staging_table: bool = False,

213

enclosed_by: Optional[str] = None,

214

escaped_by: Optional[str] = None,

215

input_fields_terminated_by: Optional[str] = None,

216

input_lines_terminated_by: Optional[str] = None,

217

input_optionally_enclosed_by: Optional[str] = None,

218

batch: bool = False,

219

relaxed_isolation: bool = False,

220

extra_export_options: Optional[Dict[str, Any]] = None,

221

) -> None:

222

"""

223

Export Hive table to remote database.

224

225

Args:

226

table (str): Table remote destination

227

export_dir (Optional[str]): Hive table to export

228

input_null_string (Optional[str]): String to be interpreted as null for string columns

229

input_null_non_string (Optional[str]): String to be interpreted as null for non-string columns

230

staging_table (Optional[str]): Table for staging data before insertion

231

clear_staging_table (bool): Indicate that staging table data can be deleted (default: False)

232

enclosed_by (Optional[str]): Sets required field enclosing character

233

escaped_by (Optional[str]): Sets the escape character

234

input_fields_terminated_by (Optional[str]): Sets the field separator character

235

input_lines_terminated_by (Optional[str]): Sets the end-of-line character

236

input_optionally_enclosed_by (Optional[str]): Sets field enclosing character

237

batch (bool): Use batch mode for underlying statement execution (default: False)

238

relaxed_isolation (bool): Transaction isolation to read uncommitted for mappers (default: False)

239

extra_export_options (Optional[Dict[str, Any]]): Extra export options as dict

240

"""

241

```

242

243

### SqoopOperator - Airflow Integration

244

245

High-level operator for integrating Sqoop operations into Airflow DAGs.

246

247

```python { .api }

248

class SqoopOperator(BaseOperator):

249

"""

250

Execute a Sqoop job within an Airflow DAG.

251

252

Args:

253

conn_id (str): Connection ID (default: 'sqoop_default')

254

cmd_type (str): Command type - 'export' or 'import' (default: 'import')

255

table (Optional[str]): Table to read

256

query (Optional[str]): Import result of arbitrary SQL query

257

target_dir (Optional[str]): HDFS destination directory

258

append (bool): Append data to existing dataset in HDFS (default: False)

259

file_type (str): Output format - 'avro', 'sequence', 'text', or 'parquet' (default: 'text')

260

columns (Optional[str]): Comma-separated columns to import

261

num_mappers (Optional[int]): Number of mapper tasks for parallel processing

262

split_by (Optional[str]): Column used to split work units

263

where (Optional[str]): WHERE clause for import

264

export_dir (Optional[str]): HDFS Hive database directory to export

265

input_null_string (Optional[str]): String interpreted as null for string columns

266

input_null_non_string (Optional[str]): String interpreted as null for non-string columns

267

staging_table (Optional[str]): Table for staging data before insertion

268

clear_staging_table (bool): Clear staging table data (default: False)

269

enclosed_by (Optional[str]): Required field enclosing character

270

escaped_by (Optional[str]): Escape character

271

input_fields_terminated_by (Optional[str]): Input field separator

272

input_lines_terminated_by (Optional[str]): Input end-of-line character

273

input_optionally_enclosed_by (Optional[str]): Field enclosing character

274

batch (bool): Use batch mode for statement execution (default: False)

275

direct (bool): Use direct export fast path (default: False)

276

driver (Optional[Any]): Manually specify JDBC driver class

277

verbose (bool): Switch to verbose logging for debug purposes (default: False)

278

relaxed_isolation (bool): Use read uncommitted isolation level (default: False)

279

hcatalog_database (Optional[str]): HCatalog database name

280

hcatalog_table (Optional[str]): HCatalog table name

281

create_hcatalog_table (bool): Have sqoop create the hcatalog table (default: False)

282

properties (Optional[Dict[str, Any]]): Additional JVM properties passed to sqoop

283

extra_import_options (Optional[Dict[str, Any]]): Extra import options as dict

284

extra_export_options (Optional[Dict[str, Any]]): Extra export options as dict

285

**kwargs: Additional arguments passed to BaseOperator

286

"""

287

288

template_fields = (

289

'conn_id', 'cmd_type', 'table', 'query', 'target_dir', 'file_type',

290

'columns', 'split_by', 'where', 'export_dir', 'input_null_string',

291

'input_null_non_string', 'staging_table', 'enclosed_by', 'escaped_by',

292

'input_fields_terminated_by', 'input_lines_terminated_by',

293

'input_optionally_enclosed_by', 'properties', 'extra_import_options',

294

'driver', 'extra_export_options', 'hcatalog_database', 'hcatalog_table'

295

)

296

ui_color = '#7D8CA4'

297

298

@apply_defaults

299

def __init__(

300

self,

301

*,

302

conn_id: str = 'sqoop_default',

303

cmd_type: str = 'import',

304

table: Optional[str] = None,

305

query: Optional[str] = None,

306

target_dir: Optional[str] = None,

307

append: bool = False,

308

file_type: str = 'text',

309

columns: Optional[str] = None,

310

num_mappers: Optional[int] = None,

311

split_by: Optional[str] = None,

312

where: Optional[str] = None,

313

export_dir: Optional[str] = None,

314

input_null_string: Optional[str] = None,

315

input_null_non_string: Optional[str] = None,

316

staging_table: Optional[str] = None,

317

clear_staging_table: bool = False,

318

enclosed_by: Optional[str] = None,

319

escaped_by: Optional[str] = None,

320

input_fields_terminated_by: Optional[str] = None,

321

input_lines_terminated_by: Optional[str] = None,

322

input_optionally_enclosed_by: Optional[str] = None,

323

batch: bool = False,

324

direct: bool = False,

325

driver: Optional[Any] = None,

326

verbose: bool = False,

327

relaxed_isolation: bool = False,

328

properties: Optional[Dict[str, Any]] = None,

329

hcatalog_database: Optional[str] = None,

330

hcatalog_table: Optional[str] = None,

331

create_hcatalog_table: bool = False,

332

extra_import_options: Optional[Dict[str, Any]] = None,

333

extra_export_options: Optional[Dict[str, Any]] = None,

334

**kwargs: Any,

335

) -> None: ...

336

337

def execute(self, context: Dict[str, Any]) -> None:

338

"""Execute sqoop job based on cmd_type (import or export)."""

339

340

def on_kill(self) -> None:

341

"""Handle task termination by sending SIGTERM to sqoop subprocess."""

342

```

343

344

## Types

345

346

```python { .api }

347

from typing import Any, Dict, List, Optional, Tuple

348

from airflow.hooks.base import BaseHook

349

from airflow.models import BaseOperator

350

from airflow.exceptions import AirflowException

351

from airflow.utils.decorators import apply_defaults

352

353

# Connection parameter types used in extra JSON field

354

ConnectionExtraParams = Dict[str, Any] # Includes job_tracker, namenode, libjars, files, archives, password_file

355

356

# File format options for import/export operations

357

FileType = str # 'text', 'avro', 'sequence', 'parquet'

358

359

# Command type options for SqoopOperator

360

CommandType = str # 'import', 'export'

361

362

# Extra options dictionaries for additional Sqoop parameters

363

ExtraOptions = Dict[str, Any] # Key-value pairs for additional Sqoop command options

364

```

365

366

## Connection Configuration

367

368

The package uses Airflow connections with the following configuration:

369

370

**Connection Parameters:**

371

- `host`: Database host

372

- `port`: Database port

373

- `schema`: Database schema/name

374

- `login`: Database username

375

- `password`: Database password

376

377

**Extra JSON Parameters:**

378

- `job_tracker`: Job tracker local|jobtracker:port

379

- `namenode`: Namenode configuration

380

- `libjars`: Comma separated jar files to include in classpath

381

- `files`: Comma separated files to be copied to map reduce cluster

382

- `archives`: Comma separated archives to be unarchived on compute machines

383

- `password_file`: Path to file containing the password

384

385

## Error Handling

386

387

The package raises `AirflowException` in the following cases:

388

- Sqoop command execution failures (when subprocess returns non-zero exit code)

389

- Invalid command type (cmd_type must be 'import' or 'export')

390

- Invalid file type specification (must be 'avro', 'sequence', 'text', or 'parquet')

391

- Both table and query specified for import operations (mutually exclusive)

392

- Missing required parameters for import operations (must provide either table or query)

393

- Missing required table parameter for export operations

394

395

## Usage Examples

396

397

### Basic Table Import

398

399

```python

400

from airflow.providers.apache.sqoop.operators.sqoop import SqoopOperator

401

402

# Import entire table

403

import_task = SqoopOperator(

404

task_id='import_customers',

405

conn_id='mysql_default',

406

cmd_type='import',

407

table='customers',

408

target_dir='/user/hive/warehouse/customers',

409

file_type='avro',

410

num_mappers=4

411

)

412

```

413

414

### Query-Based Import with Conditions

415

416

```python

417

# Import with custom query and conditions

418

import_query_task = SqoopOperator(

419

task_id='import_filtered_orders',

420

conn_id='postgres_default',

421

cmd_type='import',

422

query="SELECT * FROM orders WHERE order_date >= '2023-01-01' AND \\$CONDITIONS",

423

target_dir='/user/hive/warehouse/recent_orders',

424

file_type='parquet',

425

split_by='order_id',

426

num_mappers=8

427

)

428

```

429

430

### Data Export to Database

431

432

```python

433

# Export processed data back to database

434

export_task = SqoopOperator(

435

task_id='export_aggregated_data',

436

conn_id='mysql_default',

437

cmd_type='export',

438

table='customer_summary',

439

export_dir='/user/hive/warehouse/processed_customers',

440

input_fields_terminated_by=',',

441

batch=True

442

)

443

```

444

445

### Advanced Import with HCatalog Integration

446

447

```python

448

# Import with HCatalog table creation

449

hcatalog_import = SqoopOperator(

450

task_id='import_to_hcatalog',

451

conn_id='oracle_default',

452

cmd_type='import',

453

table='products',

454

hcatalog_database='retail',

455

hcatalog_table='products',

456

create_hcatalog_table=True,

457

file_type='avro',

458

extra_import_options={

459

'map-column-java': 'price=String',

460

'null-string': '\\\\N',

461

'null-non-string': '\\\\N'

462

}

463

)

464

```