0
# Apache Airflow Backport Providers Apache Sqoop
1
2
Apache Airflow backport provider package for Apache Sqoop integration, providing SqoopHook and SqoopOperator for data import/export between relational databases and Hadoop. This package enables efficient bulk data transfer using Apache Sqoop within Airflow workflows, supporting various data formats and export/import operations.
3
4
## Package Information
5
6
- **Package Name**: apache-airflow-backport-providers-apache-sqoop
7
- **Package Type**: pip
8
- **Language**: Python
9
- **Installation**: `pip install apache-airflow-backport-providers-apache-sqoop`
10
11
## Core Imports
12
13
```python
14
from airflow.providers.apache.sqoop.hooks.sqoop import SqoopHook
15
from airflow.providers.apache.sqoop.operators.sqoop import SqoopOperator
16
```
17
18
## Basic Usage
19
20
```python
21
from airflow import DAG
22
from airflow.providers.apache.sqoop.operators.sqoop import SqoopOperator
23
from datetime import datetime, timedelta
24
25
# Define DAG
26
dag = DAG(
27
'sqoop_example',
28
default_args={
29
'owner': 'airflow',
30
'depends_on_past': False,
31
'start_date': datetime(2023, 1, 1),
32
'retries': 1,
33
'retry_delay': timedelta(minutes=5)
34
},
35
schedule_interval=timedelta(days=1)
36
)
37
38
# Import table from database to HDFS
39
import_task = SqoopOperator(
40
task_id='import_table',
41
conn_id='sqoop_default',
42
cmd_type='import',
43
table='customers',
44
target_dir='/user/hive/warehouse/customers',
45
file_type='text',
46
num_mappers=4,
47
dag=dag
48
)
49
50
# Export data from HDFS to database
51
export_task = SqoopOperator(
52
task_id='export_data',
53
conn_id='sqoop_default',
54
cmd_type='export',
55
table='processed_customers',
56
export_dir='/user/hive/warehouse/processed_customers',
57
dag=dag
58
)
59
60
import_task >> export_task
61
```
62
63
## Architecture
64
65
The package provides two main components:
66
67
- **SqoopHook**: Low-level interface for executing Sqoop commands, handling connection management, command construction, and subprocess execution
68
- **SqoopOperator**: High-level Airflow operator that wraps SqoopHook functionality for integration into Airflow DAGs
69
70
Both components support the full range of Sqoop operations including table imports, query-based imports, table exports, and various data format options (text, Avro, Sequence, Parquet).
71
72
## Capabilities
73
74
### SqoopHook - Connection and Command Execution
75
76
Core hook class that manages Sqoop connections and executes Sqoop commands through subprocess calls.
77
78
```python { .api }
79
class SqoopHook(BaseHook):
80
"""
81
Hook for executing Apache Sqoop commands.
82
83
Args:
84
conn_id (str): Reference to the sqoop connection (default: 'sqoop_default')
85
verbose (bool): Set sqoop to verbose mode (default: False)
86
num_mappers (Optional[int]): Number of map tasks to import in parallel (default: None)
87
hcatalog_database (Optional[str]): HCatalog database name (default: None)
88
hcatalog_table (Optional[str]): HCatalog table name (default: None)
89
properties (Optional[Dict[str, Any]]): Properties to set via -D argument (default: None)
90
"""
91
92
conn_name_attr: str = 'conn_id'
93
default_conn_name: str = 'sqoop_default'
94
conn_type: str = 'sqoop'
95
hook_name: str = 'Sqoop'
96
97
def __init__(
98
self,
99
conn_id: str = default_conn_name,
100
verbose: bool = False,
101
num_mappers: Optional[int] = None,
102
hcatalog_database: Optional[str] = None,
103
hcatalog_table: Optional[str] = None,
104
properties: Optional[Dict[str, Any]] = None,
105
) -> None: ...
106
107
def get_conn(self) -> Any:
108
"""Returns the connection object."""
109
110
def cmd_mask_password(self, cmd_orig: List[str]) -> List[str]:
111
"""
112
Mask command password for safety.
113
114
Args:
115
cmd_orig (List[str]): Original command list
116
117
Returns:
118
List[str]: Command with password masked
119
"""
120
121
def popen(self, cmd: List[str], **kwargs: Any) -> None:
122
"""
123
Execute remote command via subprocess.
124
125
Args:
126
cmd (List[str]): Command to remotely execute
127
**kwargs: Extra arguments to Popen (see subprocess.Popen)
128
129
Raises:
130
AirflowException: If sqoop command fails
131
"""
132
```
133
134
### SqoopHook - Table Import Operations
135
136
Methods for importing data from relational databases to HDFS.
137
138
```python { .api }
139
def import_table(
140
self,
141
table: str,
142
target_dir: Optional[str] = None,
143
append: bool = False,
144
file_type: str = "text",
145
columns: Optional[str] = None,
146
split_by: Optional[str] = None,
147
where: Optional[str] = None,
148
direct: bool = False,
149
driver: Any = None,
150
extra_import_options: Optional[Dict[str, Any]] = None,
151
) -> Any:
152
"""
153
Import table from remote database to HDFS.
154
155
Args:
156
table (str): Table to read
157
target_dir (Optional[str]): HDFS destination directory
158
append (bool): Append data to existing dataset in HDFS (default: False)
159
file_type (str): Output format - 'avro', 'sequence', 'text', or 'parquet' (default: 'text')
160
columns (Optional[str]): Comma-separated columns to import from table
161
split_by (Optional[str]): Column of the table used to split work units
162
where (Optional[str]): WHERE clause to use during import
163
direct (bool): Use direct connector if exists for the database (default: False)
164
driver (Any): Manually specify JDBC driver class to use
165
extra_import_options (Optional[Dict[str, Any]]): Extra import options as dict
166
167
Returns:
168
Any: Import operation result
169
"""
170
171
def import_query(
172
self,
173
query: str,
174
target_dir: Optional[str] = None,
175
append: bool = False,
176
file_type: str = "text",
177
split_by: Optional[str] = None,
178
direct: Optional[bool] = None,
179
driver: Optional[Any] = None,
180
extra_import_options: Optional[Dict[str, Any]] = None,
181
) -> Any:
182
"""
183
Import specific query results from RDBMS to HDFS.
184
185
Args:
186
query (str): Free format query to run
187
target_dir (Optional[str]): HDFS destination directory
188
append (bool): Append data to existing dataset in HDFS (default: False)
189
file_type (str): Output format - 'avro', 'sequence', 'text', or 'parquet' (default: 'text')
190
split_by (Optional[str]): Column of the table used to split work units
191
direct (Optional[bool]): Use direct import fast path
192
driver (Optional[Any]): Manually specify JDBC driver class to use
193
extra_import_options (Optional[Dict[str, Any]]): Extra import options as dict
194
195
Returns:
196
Any: Import operation result
197
"""
198
```
199
200
### SqoopHook - Table Export Operations
201
202
Methods for exporting data from HDFS to relational databases.
203
204
```python { .api }
205
def export_table(
206
self,
207
table: str,
208
export_dir: Optional[str] = None,
209
input_null_string: Optional[str] = None,
210
input_null_non_string: Optional[str] = None,
211
staging_table: Optional[str] = None,
212
clear_staging_table: bool = False,
213
enclosed_by: Optional[str] = None,
214
escaped_by: Optional[str] = None,
215
input_fields_terminated_by: Optional[str] = None,
216
input_lines_terminated_by: Optional[str] = None,
217
input_optionally_enclosed_by: Optional[str] = None,
218
batch: bool = False,
219
relaxed_isolation: bool = False,
220
extra_export_options: Optional[Dict[str, Any]] = None,
221
) -> None:
222
"""
223
Export Hive table to remote database.
224
225
Args:
226
table (str): Table remote destination
227
export_dir (Optional[str]): Hive table to export
228
input_null_string (Optional[str]): String to be interpreted as null for string columns
229
input_null_non_string (Optional[str]): String to be interpreted as null for non-string columns
230
staging_table (Optional[str]): Table for staging data before insertion
231
clear_staging_table (bool): Indicate that staging table data can be deleted (default: False)
232
enclosed_by (Optional[str]): Sets required field enclosing character
233
escaped_by (Optional[str]): Sets the escape character
234
input_fields_terminated_by (Optional[str]): Sets the field separator character
235
input_lines_terminated_by (Optional[str]): Sets the end-of-line character
236
input_optionally_enclosed_by (Optional[str]): Sets field enclosing character
237
batch (bool): Use batch mode for underlying statement execution (default: False)
238
relaxed_isolation (bool): Transaction isolation to read uncommitted for mappers (default: False)
239
extra_export_options (Optional[Dict[str, Any]]): Extra export options as dict
240
"""
241
```
242
243
### SqoopOperator - Airflow Integration
244
245
High-level operator for integrating Sqoop operations into Airflow DAGs.
246
247
```python { .api }
248
class SqoopOperator(BaseOperator):
249
"""
250
Execute a Sqoop job within an Airflow DAG.
251
252
Args:
253
conn_id (str): Connection ID (default: 'sqoop_default')
254
cmd_type (str): Command type - 'export' or 'import' (default: 'import')
255
table (Optional[str]): Table to read
256
query (Optional[str]): Import result of arbitrary SQL query
257
target_dir (Optional[str]): HDFS destination directory
258
append (bool): Append data to existing dataset in HDFS (default: False)
259
file_type (str): Output format - 'avro', 'sequence', 'text', or 'parquet' (default: 'text')
260
columns (Optional[str]): Comma-separated columns to import
261
num_mappers (Optional[int]): Number of mapper tasks for parallel processing
262
split_by (Optional[str]): Column used to split work units
263
where (Optional[str]): WHERE clause for import
264
export_dir (Optional[str]): HDFS Hive database directory to export
265
input_null_string (Optional[str]): String interpreted as null for string columns
266
input_null_non_string (Optional[str]): String interpreted as null for non-string columns
267
staging_table (Optional[str]): Table for staging data before insertion
268
clear_staging_table (bool): Clear staging table data (default: False)
269
enclosed_by (Optional[str]): Required field enclosing character
270
escaped_by (Optional[str]): Escape character
271
input_fields_terminated_by (Optional[str]): Input field separator
272
input_lines_terminated_by (Optional[str]): Input end-of-line character
273
input_optionally_enclosed_by (Optional[str]): Field enclosing character
274
batch (bool): Use batch mode for statement execution (default: False)
275
direct (bool): Use direct export fast path (default: False)
276
driver (Optional[Any]): Manually specify JDBC driver class
277
verbose (bool): Switch to verbose logging for debug purposes (default: False)
278
relaxed_isolation (bool): Use read uncommitted isolation level (default: False)
279
hcatalog_database (Optional[str]): HCatalog database name
280
hcatalog_table (Optional[str]): HCatalog table name
281
create_hcatalog_table (bool): Have sqoop create the hcatalog table (default: False)
282
properties (Optional[Dict[str, Any]]): Additional JVM properties passed to sqoop
283
extra_import_options (Optional[Dict[str, Any]]): Extra import options as dict
284
extra_export_options (Optional[Dict[str, Any]]): Extra export options as dict
285
**kwargs: Additional arguments passed to BaseOperator
286
"""
287
288
template_fields = (
289
'conn_id', 'cmd_type', 'table', 'query', 'target_dir', 'file_type',
290
'columns', 'split_by', 'where', 'export_dir', 'input_null_string',
291
'input_null_non_string', 'staging_table', 'enclosed_by', 'escaped_by',
292
'input_fields_terminated_by', 'input_lines_terminated_by',
293
'input_optionally_enclosed_by', 'properties', 'extra_import_options',
294
'driver', 'extra_export_options', 'hcatalog_database', 'hcatalog_table'
295
)
296
ui_color = '#7D8CA4'
297
298
@apply_defaults
299
def __init__(
300
self,
301
*,
302
conn_id: str = 'sqoop_default',
303
cmd_type: str = 'import',
304
table: Optional[str] = None,
305
query: Optional[str] = None,
306
target_dir: Optional[str] = None,
307
append: bool = False,
308
file_type: str = 'text',
309
columns: Optional[str] = None,
310
num_mappers: Optional[int] = None,
311
split_by: Optional[str] = None,
312
where: Optional[str] = None,
313
export_dir: Optional[str] = None,
314
input_null_string: Optional[str] = None,
315
input_null_non_string: Optional[str] = None,
316
staging_table: Optional[str] = None,
317
clear_staging_table: bool = False,
318
enclosed_by: Optional[str] = None,
319
escaped_by: Optional[str] = None,
320
input_fields_terminated_by: Optional[str] = None,
321
input_lines_terminated_by: Optional[str] = None,
322
input_optionally_enclosed_by: Optional[str] = None,
323
batch: bool = False,
324
direct: bool = False,
325
driver: Optional[Any] = None,
326
verbose: bool = False,
327
relaxed_isolation: bool = False,
328
properties: Optional[Dict[str, Any]] = None,
329
hcatalog_database: Optional[str] = None,
330
hcatalog_table: Optional[str] = None,
331
create_hcatalog_table: bool = False,
332
extra_import_options: Optional[Dict[str, Any]] = None,
333
extra_export_options: Optional[Dict[str, Any]] = None,
334
**kwargs: Any,
335
) -> None: ...
336
337
def execute(self, context: Dict[str, Any]) -> None:
338
"""Execute sqoop job based on cmd_type (import or export)."""
339
340
def on_kill(self) -> None:
341
"""Handle task termination by sending SIGTERM to sqoop subprocess."""
342
```
343
344
## Types
345
346
```python { .api }
347
from typing import Any, Dict, List, Optional, Tuple
348
from airflow.hooks.base import BaseHook
349
from airflow.models import BaseOperator
350
from airflow.exceptions import AirflowException
351
from airflow.utils.decorators import apply_defaults
352
353
# Connection parameter types used in extra JSON field
354
ConnectionExtraParams = Dict[str, Any] # Includes job_tracker, namenode, libjars, files, archives, password_file
355
356
# File format options for import/export operations
357
FileType = str # 'text', 'avro', 'sequence', 'parquet'
358
359
# Command type options for SqoopOperator
360
CommandType = str # 'import', 'export'
361
362
# Extra options dictionaries for additional Sqoop parameters
363
ExtraOptions = Dict[str, Any] # Key-value pairs for additional Sqoop command options
364
```
365
366
## Connection Configuration
367
368
The package uses Airflow connections with the following configuration:
369
370
**Connection Parameters:**
371
- `host`: Database host
372
- `port`: Database port
373
- `schema`: Database schema/name
374
- `login`: Database username
375
- `password`: Database password
376
377
**Extra JSON Parameters:**
378
- `job_tracker`: Job tracker local|jobtracker:port
379
- `namenode`: Namenode configuration
380
- `libjars`: Comma separated jar files to include in classpath
381
- `files`: Comma separated files to be copied to map reduce cluster
382
- `archives`: Comma separated archives to be unarchived on compute machines
383
- `password_file`: Path to file containing the password
384
385
## Error Handling
386
387
The package raises `AirflowException` in the following cases:
388
- Sqoop command execution failures (when subprocess returns non-zero exit code)
389
- Invalid command type (cmd_type must be 'import' or 'export')
390
- Invalid file type specification (must be 'avro', 'sequence', 'text', or 'parquet')
391
- Both table and query specified for import operations (mutually exclusive)
392
- Missing required parameters for import operations (must provide either table or query)
393
- Missing required table parameter for export operations
394
395
## Usage Examples
396
397
### Basic Table Import
398
399
```python
400
from airflow.providers.apache.sqoop.operators.sqoop import SqoopOperator
401
402
# Import entire table
403
import_task = SqoopOperator(
404
task_id='import_customers',
405
conn_id='mysql_default',
406
cmd_type='import',
407
table='customers',
408
target_dir='/user/hive/warehouse/customers',
409
file_type='avro',
410
num_mappers=4
411
)
412
```
413
414
### Query-Based Import with Conditions
415
416
```python
417
# Import with custom query and conditions
418
import_query_task = SqoopOperator(
419
task_id='import_filtered_orders',
420
conn_id='postgres_default',
421
cmd_type='import',
422
query="SELECT * FROM orders WHERE order_date >= '2023-01-01' AND \\$CONDITIONS",
423
target_dir='/user/hive/warehouse/recent_orders',
424
file_type='parquet',
425
split_by='order_id',
426
num_mappers=8
427
)
428
```
429
430
### Data Export to Database
431
432
```python
433
# Export processed data back to database
434
export_task = SqoopOperator(
435
task_id='export_aggregated_data',
436
conn_id='mysql_default',
437
cmd_type='export',
438
table='customer_summary',
439
export_dir='/user/hive/warehouse/processed_customers',
440
input_fields_terminated_by=',',
441
batch=True
442
)
443
```
444
445
### Advanced Import with HCatalog Integration
446
447
```python
448
# Import with HCatalog table creation
449
hcatalog_import = SqoopOperator(
450
task_id='import_to_hcatalog',
451
conn_id='oracle_default',
452
cmd_type='import',
453
table='products',
454
hcatalog_database='retail',
455
hcatalog_table='products',
456
create_hcatalog_table=True,
457
file_type='avro',
458
extra_import_options={
459
'map-column-java': 'price=String',
460
'null-string': '\\\\N',
461
'null-non-string': '\\\\N'
462
}
463
)
464
```