0
# Redshift Data Warehouse
1
2
Amazon Redshift integration for data warehouse operations including cluster management, SQL execution, and data loading. Supports both traditional Redshift connections and the modern Redshift Data API for serverless SQL execution.
3
4
## Capabilities
5
6
### Redshift SQL Hook
7
8
Hook for executing SQL operations against Redshift clusters using traditional database connections.
9
10
```python { .api }
11
class RedshiftSqlHook(AwsBaseHook):
12
def __init__(self, redshift_conn_id: str = 'redshift_default', **kwargs):
13
"""
14
Initialize Redshift SQL Hook.
15
16
Parameters:
17
- redshift_conn_id: Redshift connection ID
18
"""
19
20
def run(self, sql: str, autocommit: bool = False, parameters: dict = None, handler: callable = None) -> Any:
21
"""
22
Execute SQL statement.
23
24
Parameters:
25
- sql: SQL statement to execute
26
- autocommit: Enable autocommit mode
27
- parameters: Query parameters
28
- handler: Result handler function
29
30
Returns:
31
Query results
32
"""
33
34
def get_records(self, sql: str, parameters: dict = None) -> list:
35
"""
36
Execute SQL and return records.
37
38
Parameters:
39
- sql: SQL query to execute
40
- parameters: Query parameters
41
42
Returns:
43
List of result records
44
"""
45
46
def get_first(self, sql: str, parameters: dict = None) -> Any:
47
"""
48
Execute SQL and return first result.
49
50
Parameters:
51
- sql: SQL query to execute
52
- parameters: Query parameters
53
54
Returns:
55
First result record
56
"""
57
58
def get_pandas_df(self, sql: str, parameters: dict = None, **kwargs) -> Any:
59
"""
60
Execute SQL and return pandas DataFrame.
61
62
Parameters:
63
- sql: SQL query to execute
64
- parameters: Query parameters
65
66
Returns:
67
pandas DataFrame with results
68
"""
69
70
def test_connection(self) -> tuple:
71
"""
72
Test Redshift connection.
73
74
Returns:
75
Connection test result tuple (success, message)
76
"""
77
```
78
79
### Redshift Data Hook
80
81
Hook for serverless SQL execution using Redshift Data API.
82
83
```python { .api }
84
class RedshiftDataHook(AwsBaseHook):
85
def __init__(self, aws_conn_id: str = 'aws_default', **kwargs):
86
"""
87
Initialize Redshift Data Hook.
88
89
Parameters:
90
- aws_conn_id: AWS connection ID
91
"""
92
93
def execute_query(self, database: str, sql: str, cluster_identifier: str = None, db_user: str = None, parameters: list = None, secret_arn: str = None, statement_name: str = None, with_event: bool = False, wait_for_completion: bool = True, poll_interval: int = 10) -> str:
94
"""
95
Execute SQL using Redshift Data API.
96
97
Parameters:
98
- database: Database name
99
- sql: SQL statement to execute
100
- cluster_identifier: Redshift cluster identifier
101
- db_user: Database user name
102
- parameters: SQL parameters
103
- secret_arn: AWS Secrets Manager ARN for credentials
104
- statement_name: Statement name for identification
105
- with_event: Enable event notifications
106
- wait_for_completion: Wait for query completion
107
- poll_interval: Polling interval in seconds
108
109
Returns:
110
Statement ID
111
"""
112
113
def describe_statement(self, id: str) -> dict:
114
"""
115
Get statement execution details.
116
117
Parameters:
118
- id: Statement ID
119
120
Returns:
121
Statement details
122
"""
123
124
def get_statement_result(self, id: str, next_token: str = None) -> dict:
125
"""
126
Get statement execution results.
127
128
Parameters:
129
- id: Statement ID
130
- next_token: Pagination token
131
132
Returns:
133
Query results
134
"""
135
136
def cancel_statement(self, id: str) -> bool:
137
"""
138
Cancel running statement.
139
140
Parameters:
141
- id: Statement ID
142
143
Returns:
144
Cancellation success status
145
"""
146
147
def list_statements(self, status: str = None, statement_name: str = None, max_items: int = 100, next_token: str = None) -> dict:
148
"""
149
List executed statements.
150
151
Parameters:
152
- status: Filter by statement status
153
- statement_name: Filter by statement name
154
- max_items: Maximum items to return
155
- next_token: Pagination token
156
157
Returns:
158
List of statements
159
"""
160
```
161
162
### Redshift Cluster Hook
163
164
Hook for Redshift cluster lifecycle management.
165
166
```python { .api }
167
class RedshiftHook(AwsBaseHook):
168
def __init__(self, aws_conn_id: str = 'aws_default', **kwargs):
169
"""
170
Initialize Redshift Cluster Hook.
171
172
Parameters:
173
- aws_conn_id: AWS connection ID
174
"""
175
176
def create_cluster(self, cluster_identifier: str, node_type: str, master_username: str, master_user_password: str, **kwargs) -> dict:
177
"""
178
Create Redshift cluster.
179
180
Parameters:
181
- cluster_identifier: Unique cluster identifier
182
- node_type: Node type (e.g., 'dc2.large')
183
- master_username: Master username
184
- master_user_password: Master password
185
186
Returns:
187
Cluster configuration
188
"""
189
190
def delete_cluster(self, cluster_identifier: str, skip_final_cluster_snapshot: bool = False, final_cluster_snapshot_identifier: str = None) -> dict:
191
"""
192
Delete Redshift cluster.
193
194
Parameters:
195
- cluster_identifier: Cluster identifier
196
- skip_final_cluster_snapshot: Skip final snapshot
197
- final_cluster_snapshot_identifier: Final snapshot identifier
198
199
Returns:
200
Deletion response
201
"""
202
203
def describe_clusters(self, cluster_identifier: str = None) -> dict:
204
"""
205
Describe Redshift clusters.
206
207
Parameters:
208
- cluster_identifier: Specific cluster identifier
209
210
Returns:
211
Cluster descriptions
212
"""
213
214
def pause_cluster(self, cluster_identifier: str) -> dict:
215
"""
216
Pause Redshift cluster.
217
218
Parameters:
219
- cluster_identifier: Cluster identifier
220
221
Returns:
222
Pause response
223
"""
224
225
def resume_cluster(self, cluster_identifier: str) -> dict:
226
"""
227
Resume paused Redshift cluster.
228
229
Parameters:
230
- cluster_identifier: Cluster identifier
231
232
Returns:
233
Resume response
234
"""
235
236
def get_cluster_status(self, cluster_identifier: str) -> str:
237
"""
238
Get cluster status.
239
240
Parameters:
241
- cluster_identifier: Cluster identifier
242
243
Returns:
244
Current cluster status
245
"""
246
```
247
248
### Redshift Operators
249
250
Task implementations for Redshift operations.
251
252
```python { .api }
253
class RedshiftSqlOperator(BaseOperator):
254
def __init__(self, sql: str, redshift_conn_id: str = 'redshift_default', parameters: dict = None, autocommit: bool = True, **kwargs):
255
"""
256
Execute SQL on Redshift cluster.
257
258
Parameters:
259
- sql: SQL statement to execute
260
- redshift_conn_id: Redshift connection ID
261
- parameters: SQL parameters
262
- autocommit: Enable autocommit mode
263
"""
264
265
class RedshiftDataOperator(BaseOperator):
266
def __init__(self, database: str, sql: str, cluster_identifier: str = None, db_user: str = None, secret_arn: str = None, statement_name: str = None, parameters: list = None, poll_interval: int = 10, aws_conn_id: str = 'aws_default', **kwargs):
267
"""
268
Execute SQL using Redshift Data API.
269
270
Parameters:
271
- database: Database name
272
- sql: SQL statement to execute
273
- cluster_identifier: Redshift cluster identifier
274
- db_user: Database user name
275
- secret_arn: AWS Secrets Manager ARN for credentials
276
- statement_name: Statement name
277
- parameters: SQL parameters
278
- poll_interval: Polling interval in seconds
279
- aws_conn_id: AWS connection ID
280
"""
281
282
class RedshiftCreateClusterOperator(BaseOperator):
283
def __init__(self, cluster_identifier: str, node_type: str, master_username: str, master_user_password: str, publicly_accessible: bool = True, port: int = 5439, aws_conn_id: str = 'aws_default', **kwargs):
284
"""
285
Create Redshift cluster.
286
287
Parameters:
288
- cluster_identifier: Cluster identifier
289
- node_type: Node type
290
- master_username: Master username
291
- master_user_password: Master password
292
- publicly_accessible: Public accessibility
293
- port: Database port
294
- aws_conn_id: AWS connection ID
295
"""
296
297
class RedshiftDeleteClusterOperator(BaseOperator):
298
def __init__(self, cluster_identifier: str, skip_final_cluster_snapshot: bool = False, final_cluster_snapshot_identifier: str = None, aws_conn_id: str = 'aws_default', **kwargs):
299
"""
300
Delete Redshift cluster.
301
302
Parameters:
303
- cluster_identifier: Cluster identifier
304
- skip_final_cluster_snapshot: Skip final snapshot
305
- final_cluster_snapshot_identifier: Final snapshot identifier
306
- aws_conn_id: AWS connection ID
307
"""
308
309
class RedshiftPauseClusterOperator(BaseOperator):
310
def __init__(self, cluster_identifier: str, aws_conn_id: str = 'aws_default', **kwargs):
311
"""
312
Pause Redshift cluster.
313
314
Parameters:
315
- cluster_identifier: Cluster identifier
316
- aws_conn_id: AWS connection ID
317
"""
318
319
class RedshiftResumeClusterOperator(BaseOperator):
320
def __init__(self, cluster_identifier: str, aws_conn_id: str = 'aws_default', **kwargs):
321
"""
322
Resume Redshift cluster.
323
324
Parameters:
325
- cluster_identifier: Cluster identifier
326
- aws_conn_id: AWS connection ID
327
"""
328
```
329
330
### Redshift Sensors
331
332
Monitoring tasks for Redshift cluster states and query execution.
333
334
```python { .api }
335
class RedshiftClusterSensor(BaseSensorOperator):
336
def __init__(self, cluster_identifier: str, target_status: str = 'available', aws_conn_id: str = 'aws_default', **kwargs):
337
"""
338
Wait for Redshift cluster to reach target status.
339
340
Parameters:
341
- cluster_identifier: Cluster identifier
342
- target_status: Target cluster status
343
- aws_conn_id: AWS connection ID
344
"""
345
```
346
347
## Usage Examples
348
349
### Data Warehouse Operations
350
351
```python
352
from airflow import DAG
353
from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSqlOperator
354
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
355
356
dag = DAG('redshift_analytics', start_date=datetime(2023, 1, 1))
357
358
# Create staging tables
359
create_staging = RedshiftSqlOperator(
360
task_id='create_staging_tables',
361
redshift_conn_id='redshift_prod',
362
sql="""
363
CREATE TABLE IF NOT EXISTS staging.sales_data (
364
transaction_id VARCHAR(50),
365
customer_id INTEGER,
366
product_id INTEGER,
367
quantity INTEGER,
368
price DECIMAL(10,2),
369
transaction_date DATE,
370
region VARCHAR(50)
371
);
372
373
TRUNCATE TABLE staging.sales_data;
374
""",
375
dag=dag
376
)
377
378
# Load data from S3
379
load_data = S3ToRedshiftOperator(
380
task_id='load_from_s3',
381
schema='staging',
382
table='sales_data',
383
s3_bucket='data-warehouse-staging',
384
s3_key='sales/{{ ds }}/',
385
redshift_conn_id='redshift_prod',
386
copy_options=[
387
"CSV",
388
"IGNOREHEADER 1",
389
"TIMEFORMAT 'YYYY-MM-DD'",
390
"TRUNCATECOLUMNS"
391
],
392
dag=dag
393
)
394
395
# Transform and load to production tables
396
transform_data = RedshiftSqlOperator(
397
task_id='transform_and_load',
398
redshift_conn_id='redshift_prod',
399
sql="""
400
-- Update dimension tables
401
INSERT INTO dim_customers (customer_id, region)
402
SELECT DISTINCT customer_id, region
403
FROM staging.sales_data s
404
WHERE NOT EXISTS (
405
SELECT 1 FROM dim_customers d
406
WHERE d.customer_id = s.customer_id
407
);
408
409
-- Insert fact data
410
INSERT INTO fact_sales (
411
transaction_id, customer_id, product_id,
412
quantity, price, transaction_date
413
)
414
SELECT
415
transaction_id, customer_id, product_id,
416
quantity, price, transaction_date
417
FROM staging.sales_data;
418
419
-- Update statistics
420
ANALYZE fact_sales;
421
ANALYZE dim_customers;
422
""",
423
dag=dag
424
)
425
426
create_staging >> load_data >> transform_data
427
```
428
429
### Redshift Data API Usage
430
431
```python
432
from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator
433
434
# Execute query using Data API
435
analyze_sales = RedshiftDataOperator(
436
task_id='analyze_sales_data',
437
database='analytics',
438
cluster_identifier='analytics-cluster',
439
sql="""
440
SELECT
441
region,
442
DATE_TRUNC('month', transaction_date) as month,
443
SUM(quantity * price) as revenue,
444
COUNT(*) as transaction_count
445
FROM fact_sales
446
WHERE transaction_date >= CURRENT_DATE - INTERVAL '3 months'
447
GROUP BY region, month
448
ORDER BY region, month;
449
""",
450
statement_name='monthly_sales_analysis',
451
aws_conn_id='aws_default',
452
dag=dag
453
)
454
```
455
456
### Cluster Lifecycle Management
457
458
```python
459
from airflow.providers.amazon.aws.operators.redshift_cluster import (
460
RedshiftCreateClusterOperator,
461
RedshiftPauseClusterOperator,
462
RedshiftResumeClusterOperator
463
)
464
from airflow.providers.amazon.aws.sensors.redshift_cluster import RedshiftClusterSensor
465
466
# Resume cluster for processing
467
resume_cluster = RedshiftResumeClusterOperator(
468
task_id='resume_cluster',
469
cluster_identifier='analytics-cluster',
470
dag=dag
471
)
472
473
# Wait for cluster to be available
474
wait_for_cluster = RedshiftClusterSensor(
475
task_id='wait_for_available',
476
cluster_identifier='analytics-cluster',
477
target_status='available',
478
timeout=1800, # 30 minutes
479
dag=dag
480
)
481
482
# Run analytics workload
483
run_analytics = RedshiftSqlOperator(
484
task_id='run_analytics',
485
sql='CALL analytics.run_monthly_reports();',
486
redshift_conn_id='redshift_prod',
487
dag=dag
488
)
489
490
# Pause cluster to save costs
491
pause_cluster = RedshiftPauseClusterOperator(
492
task_id='pause_cluster',
493
cluster_identifier='analytics-cluster',
494
dag=dag
495
)
496
497
resume_cluster >> wait_for_cluster >> run_analytics >> pause_cluster
498
```
499
500
## Types
501
502
```python { .api }
503
# Redshift cluster states
504
class RedshiftClusterState:
505
AVAILABLE = 'available'
506
CREATING = 'creating'
507
DELETING = 'deleting'
508
FINAL_SNAPSHOT = 'final-snapshot'
509
HARDWARE_FAILURE = 'hardware-failure'
510
INCOMPATIBLE_HSMS = 'incompatible-hsm'
511
INCOMPATIBLE_NETWORK = 'incompatible-network'
512
INCOMPATIBLE_PARAMETERS = 'incompatible-parameters'
513
INCOMPATIBLE_RESTORE = 'incompatible-restore'
514
MODIFYING = 'modifying'
515
PAUSED = 'paused'
516
REBOOTING = 'rebooting'
517
RENAMING = 'renaming'
518
RESIZING = 'resizing'
519
ROTATING_KEYS = 'rotating-keys'
520
STORAGE_FULL = 'storage-full'
521
UPDATING_HSMS = 'updating-hsms'
522
523
# Redshift node types
524
class RedshiftNodeType:
525
DC2_LARGE = 'dc2.large'
526
DC2_8XLARGE = 'dc2.8xlarge'
527
DS2_XLARGE = 'ds2.xlarge'
528
DS2_8XLARGE = 'ds2.8xlarge'
529
RA3_XLPLUS = 'ra3.xlplus'
530
RA3_4XLARGE = 'ra3.4xlarge'
531
RA3_16XLARGE = 'ra3.16xlarge'
532
533
# Statement status for Data API
534
class StatementStatus:
535
SUBMITTED = 'SUBMITTED'
536
PICKED = 'PICKED'
537
STARTED = 'STARTED'
538
FINISHED = 'FINISHED'
539
ABORTED = 'ABORTED'
540
FAILED = 'FAILED'
541
542
# Cluster configuration
543
class RedshiftClusterConfig:
544
cluster_identifier: str
545
node_type: str
546
master_username: str
547
db_name: str = None
548
port: int = 5439
549
cluster_type: str = 'multi-node'
550
number_of_nodes: int = 1
551
publicly_accessible: bool = True
552
encrypted: bool = False
553
hsm_client_certificate_identifier: str = None
554
hsm_configuration_identifier: str = None
555
elastic_ip: str = None
556
cluster_parameter_group_name: str = None
557
cluster_subnet_group_name: str = None
558
availability_zone: str = None
559
preferred_maintenance_window: str = None
560
cluster_version: str = None
561
allow_version_upgrade: bool = True
562
automated_snapshot_retention_period: int = 1
563
manual_snapshot_retention_period: int = None
564
snapshot_identifier: str = None
565
snapshot_cluster_identifier: str = None
566
owner_account: str = None
567
additional_info: str = None
568
kms_key_id: str = None
569
enhanced_vpc_routing: bool = False
570
iam_roles: list = None
571
maintenance_track_name: str = None
572
snapshot_schedule_identifier: str = None
573
aqua_configuration_status: str = None
574
default_iam_role_arn: str = None
575
tags: list = None
576
```