or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

athena-queries.mdcloudwatch-logging.mdecr-integration.mdecs-orchestration.mdemr-processing.mdindex.mdparameter-store.mdpipes-orchestration.mdrds-operations.mdredshift-integration.mds3-storage.mdsecrets-management.md

redshift-integration.mddocs/

0

# Redshift Data Warehousing

1

2

Connect to and execute queries against Amazon Redshift clusters with connection pooling, query optimization, and comprehensive error handling for data warehousing operations.

3

4

## Capabilities

5

6

### Redshift Resource

7

8

Core Redshift resource providing configured database connections and query execution capabilities.

9

10

```python { .api }

11

class RedshiftResource(ResourceWithBoto3Configuration):

12

"""

13

Resource for connecting to Amazon Redshift clusters.

14

"""

15

host: str

16

port: int = 5439

17

database: str

18

user: str

19

password: str

20

21

def get_connection(self):

22

"""

23

Get database connection to Redshift cluster.

24

25

Returns:

26

Connection: Database connection object

27

"""

28

29

def execute_query(self, query: str) -> List[Dict]:

30

"""

31

Execute SQL query against Redshift.

32

33

Parameters:

34

query: SQL query to execute

35

36

Returns:

37

List[Dict]: Query results as list of dictionaries

38

"""

39

40

def redshift_resource(**kwargs) -> RedshiftResource:

41

"""

42

Factory function for Redshift resource.

43

44

Returns:

45

ResourceDefinition: Configured Redshift resource

46

"""

47

```

48

49

### Redshift Client Integration

50

51

Direct Redshift client integration for advanced cluster management and query operations.

52

53

```python { .api }

54

class RedshiftClient:

55

"""

56

Client wrapper for Redshift operations.

57

"""

58

59

def __init__(self, **kwargs): ...

60

61

def execute_query(self, query: str): ...

62

def get_cluster_credentials(self, cluster_identifier: str): ...

63

64

class RedshiftClientResource(RedshiftClient, ConfigurableResource):

65

"""

66

Configurable Redshift client resource.

67

"""

68

```

69

70

### Error Handling

71

72

```python { .api }

73

class RedshiftError(Exception):

74

"""

75

Exception raised for Redshift-related errors.

76

"""

77

78

def __init__(self, message: str, query: Optional[str] = None): ...

79

```

80

81

### Testing Utilities

82

83

```python { .api }

84

class FakeRedshiftClient:

85

"""

86

Mock Redshift client for testing.

87

"""

88

89

class FakeRedshiftClientResource(FakeRedshiftClient, ConfigurableResource): ...

90

class FakeRedshiftResource(ConfigurableResource): ...

91

92

def fake_redshift_resource(**kwargs): ...

93

```

94

95

## Usage Examples

96

97

```python

98

from dagster import op, job, Definitions

99

from dagster_aws.redshift import RedshiftResource

100

101

@op(required_resource_keys={"redshift"})

102

def query_sales_data(context):

103

redshift = context.resources.redshift

104

105

query = """

106

SELECT product_id, SUM(sales_amount) as total_sales

107

FROM sales_table

108

WHERE sale_date >= CURRENT_DATE - 30

109

GROUP BY product_id

110

ORDER BY total_sales DESC

111

LIMIT 10

112

"""

113

114

results = redshift.execute_query(query)

115

context.log.info(f"Retrieved {len(results)} top products")

116

return results

117

118

@job(

119

resource_defs={

120

"redshift": RedshiftResource(

121

host="my-cluster.redshift.amazonaws.com",

122

database="analytics",

123

user="dagster_user",

124

password="secure_password"

125

)

126

}

127

)

128

def sales_analysis_job():

129

query_sales_data()

130

131

defs = Definitions(jobs=[sales_analysis_job])

132

```