0
# Glue Data Processing
1
2
AWS Glue integration for serverless ETL (Extract, Transform, Load) operations and data catalog management. Provides job execution, crawler management, and data preparation capabilities for building scalable data processing workflows.
3
4
## Capabilities
5
6
### Glue Job Hook
7
8
Core Glue client providing job execution and monitoring functionality.
9
10
```python { .api }
11
class GlueJobHook(AwsBaseHook):
12
def __init__(self, job_name: str = None, desc: str = None, concurrent_run_limit: int = 1, script_location: str = None, retry_limit: int = 0, num_of_dpus: int = None, aws_conn_id: str = 'aws_default', region_name: str = None, s3_bucket: str = None, iam_role_name: str = None, create_job_kwargs: dict = None, **kwargs):
13
"""
14
Initialize Glue Job Hook.
15
16
Parameters:
17
- job_name: Name of the Glue job
18
- desc: Job description
19
- concurrent_run_limit: Maximum concurrent runs
20
- script_location: S3 location of job script
21
- retry_limit: Number of retries on failure
22
- num_of_dpus: Number of DPUs allocated to job
23
- aws_conn_id: AWS connection ID
24
- region_name: AWS region name
25
- s3_bucket: S3 bucket for job artifacts
26
- iam_role_name: IAM role for job execution
27
- create_job_kwargs: Additional job creation parameters
28
"""
29
30
def list_jobs(self) -> list:
31
"""
32
List all Glue jobs.
33
34
Returns:
35
List of job names
36
"""
37
38
def get_job_state(self, job_name: str, run_id: str) -> str:
39
"""
40
Get state of specific Glue job run.
41
42
Parameters:
43
- job_name: Name of the Glue job
44
- run_id: Job run ID
45
46
Returns:
47
Current job run state
48
"""
49
50
def initialize_job(self, job_name: str, arguments: dict = None) -> dict:
51
"""
52
Initialize and start Glue job.
53
54
Parameters:
55
- job_name: Name of the Glue job
56
- arguments: Job arguments dictionary
57
58
Returns:
59
Job run information
60
"""
61
62
def get_or_create_glue_job(self) -> str:
63
"""
64
Get existing Glue job or create new one.
65
66
Returns:
67
Job name
68
"""
69
70
def get_job_run(self, run_id: str, job_name: str) -> dict:
71
"""
72
Get details of specific job run.
73
74
Parameters:
75
- run_id: Job run ID
76
- job_name: Name of the Glue job
77
78
Returns:
79
Job run details
80
"""
81
```
82
83
### Glue Crawler Hook
84
85
Hook for managing Glue crawlers that discover and catalog data.
86
87
```python { .api }
88
class GlueCrawlerHook(AwsBaseHook):
89
def __init__(self, crawler_name: str, aws_conn_id: str = 'aws_default', **kwargs):
90
"""
91
Initialize Glue Crawler Hook.
92
93
Parameters:
94
- crawler_name: Name of the Glue crawler
95
- aws_conn_id: AWS connection ID
96
"""
97
98
def get_crawler(self, name: str) -> dict:
99
"""
100
Get crawler configuration.
101
102
Parameters:
103
- name: Crawler name
104
105
Returns:
106
Crawler configuration and state
107
"""
108
109
def start_crawler(self, name: str) -> dict:
110
"""
111
Start Glue crawler.
112
113
Parameters:
114
- name: Crawler name
115
116
Returns:
117
Start crawler response
118
"""
119
120
def stop_crawler(self, name: str) -> dict:
121
"""
122
Stop running crawler.
123
124
Parameters:
125
- name: Crawler name
126
127
Returns:
128
Stop crawler response
129
"""
130
131
def get_crawler_metrics(self, crawler_names: list = None) -> dict:
132
"""
133
Get crawler metrics.
134
135
Parameters:
136
- crawler_names: List of crawler names
137
138
Returns:
139
Crawler metrics
140
"""
141
```
142
143
### Glue Operators
144
145
Task implementations for Glue job and crawler operations.
146
147
```python { .api }
148
class GlueJobOperator(BaseOperator):
149
def __init__(self, job_name: str = 'aws_glue_default_job', job_desc: str = 'AWS Glue Job with Airflow', script_location: str = None, concurrent_run_limit: int = 1, script_args: dict = None, retry_limit: int = 0, num_of_dpus: int = 10, aws_conn_id: str = 'aws_default', region_name: str = None, s3_bucket: str = None, iam_role_name: str = None, create_job_kwargs: dict = None, **kwargs):
150
"""
151
Execute AWS Glue job.
152
153
Parameters:
154
- job_name: Name of the Glue job
155
- job_desc: Job description
156
- script_location: S3 location of job script
157
- concurrent_run_limit: Maximum concurrent runs
158
- script_args: Arguments passed to the job script
159
- retry_limit: Number of retries on failure
160
- num_of_dpus: Number of DPUs allocated to job
161
- aws_conn_id: AWS connection ID
162
- region_name: AWS region name
163
- s3_bucket: S3 bucket for job artifacts
164
- iam_role_name: IAM role for job execution
165
- create_job_kwargs: Additional job creation parameters
166
"""
167
168
class GlueCrawlerOperator(BaseOperator):
169
def __init__(self, crawler_name: str, aws_conn_id: str = 'aws_default', poll_interval: int = 5, **kwargs):
170
"""
171
Run AWS Glue crawler.
172
173
Parameters:
174
- crawler_name: Name of the Glue crawler
175
- aws_conn_id: AWS connection ID
176
- poll_interval: Polling interval in seconds
177
"""
178
```
179
180
### Glue Sensors
181
182
Monitoring tasks for Glue job and crawler completion.
183
184
```python { .api }
185
class GlueJobSensor(BaseSensorOperator):
186
def __init__(self, job_name: str, run_id: str, aws_conn_id: str = 'aws_default', **kwargs):
187
"""
188
Wait for Glue job completion.
189
190
Parameters:
191
- job_name: Name of the Glue job
192
- run_id: Job run ID to monitor
193
- aws_conn_id: AWS connection ID
194
"""
195
196
class GlueCrawlerSensor(BaseSensorOperator):
197
def __init__(self, crawler_name: str, aws_conn_id: str = 'aws_default', **kwargs):
198
"""
199
Wait for Glue crawler completion.
200
201
Parameters:
202
- crawler_name: Name of the Glue crawler
203
- aws_conn_id: AWS connection ID
204
"""
205
```
206
207
### Glue Triggers
208
209
Asynchronous triggers for Glue operations.
210
211
```python { .api }
212
class GlueJobTrigger(BaseTrigger):
213
def __init__(self, job_name: str, run_id: str, aws_conn_id: str = 'aws_default', poll_interval: int = 60, **kwargs):
214
"""
215
Asynchronous trigger for Glue job monitoring.
216
217
Parameters:
218
- job_name: Name of the Glue job
219
- run_id: Job run ID to monitor
220
- aws_conn_id: AWS connection ID
221
- poll_interval: Polling interval in seconds
222
"""
223
```
224
225
## Usage Examples
226
227
### Basic Glue Job Execution
228
229
```python
230
from airflow import DAG
231
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
232
from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor
233
234
dag = DAG('glue_etl_job', start_date=datetime(2023, 1, 1))
235
236
# Run Glue ETL job
237
run_glue_job = GlueJobOperator(
238
task_id='run_etl_job',
239
job_name='customer-data-processing',
240
script_location='s3://my-glue-scripts/customer_etl.py',
241
script_args={
242
'--input_path': 's3://raw-data/customers/{{ ds }}/',
243
'--output_path': 's3://processed-data/customers/{{ ds }}/',
244
'--database_name': 'analytics_db',
245
'--table_name': 'customer_dim'
246
},
247
retry_limit=2,
248
num_of_dpus=10,
249
aws_conn_id='aws_default',
250
dag=dag
251
)
252
```
253
254
### Glue Crawler for Data Discovery
255
256
```python
257
from airflow.providers.amazon.aws.operators.glue import GlueCrawlerOperator
258
from airflow.providers.amazon.aws.sensors.glue import GlueCrawlerSensor
259
260
# Run crawler to discover new data
261
discover_data = GlueCrawlerOperator(
262
task_id='discover_new_data',
263
crawler_name='s3-data-crawler',
264
aws_conn_id='aws_default',
265
dag=dag
266
)
267
268
# Wait for crawler completion
269
wait_for_crawler = GlueCrawlerSensor(
270
task_id='wait_for_discovery',
271
crawler_name='s3-data-crawler',
272
timeout=1800, # 30 minutes
273
dag=dag
274
)
275
276
discover_data >> wait_for_crawler
277
```
278
279
### Data Pipeline with Glue DataBrew
280
281
```python
282
from airflow.providers.amazon.aws.operators.glue_databrew import GlueDataBrewStartJobRunOperator
283
284
# Data preparation with DataBrew
285
prepare_data = GlueDataBrewStartJobRunOperator(
286
task_id='prepare_customer_data',
287
job_name='customer-data-preparation',
288
aws_conn_id='aws_default',
289
dag=dag
290
)
291
292
# Process prepared data with Glue job
293
process_data = GlueJobOperator(
294
task_id='process_prepared_data',
295
job_name='customer-analytics-job',
296
script_location='s3://my-glue-scripts/analytics.py',
297
script_args={
298
'--input_path': 's3://prepared-data/customers/{{ ds }}/',
299
'--output_path': 's3://analytics-data/customers/{{ ds }}/'
300
},
301
dag=dag
302
)
303
304
prepare_data >> process_data
305
```
306
307
## Types
308
309
```python { .api }
310
# Glue job states
311
class GlueJobState:
312
STARTING = 'STARTING'
313
RUNNING = 'RUNNING'
314
STOPPING = 'STOPPING'
315
STOPPED = 'STOPPED'
316
SUCCEEDED = 'SUCCEEDED'
317
FAILED = 'FAILED'
318
TIMEOUT = 'TIMEOUT'
319
320
# Glue crawler states
321
class GlueCrawlerState:
322
READY = 'READY'
323
RUNNING = 'RUNNING'
324
STOPPING = 'STOPPING'
325
326
# Glue job configuration
327
class GlueJobConfig:
328
name: str
329
description: str
330
role: str
331
command: dict
332
default_arguments: dict = None
333
connections: dict = None
334
max_retries: int = 0
335
allocated_capacity: int = None
336
timeout: int = None
337
max_capacity: float = None
338
security_configuration: str = None
339
tags: dict = None
340
notification_property: dict = None
341
glue_version: str = None
342
number_of_workers: int = None
343
worker_type: str = None
344
code_gen_configuration_nodes: dict = None
345
346
# Glue crawler configuration
347
class GlueCrawlerConfig:
348
name: str
349
role: str
350
database_name: str
351
targets: dict
352
description: str = None
353
classifiers: list = None
354
table_prefix: str = None
355
schema_change_policy: dict = None
356
recrawl_policy: dict = None
357
lineage_configuration: dict = None
358
lake_formation_configuration: dict = None
359
configuration: str = None
360
crawler_security_configuration: str = None
361
tags: dict = None
362
363
# Worker types for Glue 2.0+
364
class GlueWorkerType:
365
STANDARD = 'Standard'
366
G_1X = 'G.1X'
367
G_2X = 'G.2X'
368
G_025X = 'G.025X'
369
370
# Glue versions
371
class GlueVersion:
372
VERSION_1_0 = '1.0'
373
VERSION_2_0 = '2.0'
374
VERSION_3_0 = '3.0'
375
VERSION_4_0 = '4.0'
376
```