0
# Redshift Data Warehousing
1
2
Connect to and execute queries against Amazon Redshift clusters with connection pooling, query optimization, and comprehensive error handling for data warehousing operations.
3
4
## Capabilities
5
6
### Redshift Resource
7
8
Core Redshift resource providing configured database connections and query execution capabilities.
9
10
```python { .api }
11
class RedshiftResource(ResourceWithBoto3Configuration):
12
"""
13
Resource for connecting to Amazon Redshift clusters.
14
"""
15
host: str
16
port: int = 5439
17
database: str
18
user: str
19
password: str
20
21
def get_connection(self):
22
"""
23
Get database connection to Redshift cluster.
24
25
Returns:
26
Connection: Database connection object
27
"""
28
29
def execute_query(self, query: str) -> List[Dict]:
30
"""
31
Execute SQL query against Redshift.
32
33
Parameters:
34
query: SQL query to execute
35
36
Returns:
37
List[Dict]: Query results as list of dictionaries
38
"""
39
40
def redshift_resource(**kwargs) -> RedshiftResource:
41
"""
42
Factory function for Redshift resource.
43
44
Returns:
45
ResourceDefinition: Configured Redshift resource
46
"""
47
```
48
49
### Redshift Client Integration
50
51
Direct Redshift client integration for advanced cluster management and query operations.
52
53
```python { .api }
54
class RedshiftClient:
55
"""
56
Client wrapper for Redshift operations.
57
"""
58
59
def __init__(self, **kwargs): ...
60
61
def execute_query(self, query: str): ...
62
def get_cluster_credentials(self, cluster_identifier: str): ...
63
64
class RedshiftClientResource(RedshiftClient, ConfigurableResource):
65
"""
66
Configurable Redshift client resource.
67
"""
68
```
69
70
### Error Handling
71
72
```python { .api }
73
class RedshiftError(Exception):
74
"""
75
Exception raised for Redshift-related errors.
76
"""
77
78
def __init__(self, message: str, query: Optional[str] = None): ...
79
```
80
81
### Testing Utilities
82
83
```python { .api }
84
class FakeRedshiftClient:
85
"""
86
Mock Redshift client for testing.
87
"""
88
89
class FakeRedshiftClientResource(FakeRedshiftClient, ConfigurableResource): ...
90
class FakeRedshiftResource(ConfigurableResource): ...
91
92
def fake_redshift_resource(**kwargs): ...
93
```
94
95
## Usage Examples
96
97
```python
98
from dagster import op, job, Definitions
99
from dagster_aws.redshift import RedshiftResource
100
101
@op(required_resource_keys={"redshift"})
102
def query_sales_data(context):
103
redshift = context.resources.redshift
104
105
query = """
106
SELECT product_id, SUM(sales_amount) as total_sales
107
FROM sales_table
108
WHERE sale_date >= CURRENT_DATE - 30
109
GROUP BY product_id
110
ORDER BY total_sales DESC
111
LIMIT 10
112
"""
113
114
results = redshift.execute_query(query)
115
context.log.info(f"Retrieved {len(results)} top products")
116
return results
117
118
@job(
119
resource_defs={
120
"redshift": RedshiftResource(
121
host="my-cluster.redshift.amazonaws.com",
122
database="analytics",
123
user="dagster_user",
124
password="secure_password"
125
)
126
}
127
)
128
def sales_analysis_job():
129
query_sales_data()
130
131
defs = Definitions(jobs=[sales_analysis_job])
132
```