or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication.mdcore-database.mddata-science.mddata-types.mderror-handling.mdindex.mdmetadata.md

data-science.mddocs/

0

# Data Science Integration

1

2

Native integration with pandas and numpy for efficient data transfer between Redshift and Python data science workflows. The redshift_connector provides optimized methods for working with DataFrames and numpy arrays, enabling seamless data analysis pipelines.

3

4

## Capabilities

5

6

### Pandas DataFrame Integration

7

8

Direct support for reading query results into pandas DataFrames and writing DataFrames to Redshift tables with automatic type conversion and optimization.

9

10

```python { .api }

11

class Cursor:

12

def fetch_dataframe(self, num: int = None) -> 'pandas.DataFrame':

13

"""

14

Fetch query results as a pandas DataFrame.

15

16

Parameters:

17

- num: Maximum number of rows to fetch (None for all remaining rows)

18

19

Returns:

20

pandas.DataFrame with query results

21

22

Raises:

23

InterfaceError: If pandas is not available

24

"""

25

26

def write_dataframe(self, df: 'pandas.DataFrame', table: str) -> None:

27

"""

28

Write a pandas DataFrame to a Redshift table.

29

30

Parameters:

31

- df: pandas DataFrame to write

32

- table: Target table name

33

34

The method automatically handles type conversion and creates

35

appropriate INSERT statements for efficient data loading.

36

37

Raises:

38

InterfaceError: If pandas is not available

39

ProgrammingError: If table doesn't exist or schema mismatch

40

"""

41

```

42

43

### NumPy Array Integration

44

45

Support for fetching query results as numpy arrays for numerical computing and scientific analysis workflows.

46

47

```python { .api }

48

class Cursor:

49

def fetch_numpy_array(self, num: int = None) -> 'numpy.ndarray':

50

"""

51

Fetch query results as a numpy ndarray.

52

53

Parameters:

54

- num: Maximum number of rows to fetch (None for all remaining rows)

55

56

Returns:

57

numpy.ndarray containing query results with appropriate dtypes

58

59

The method automatically converts Redshift data types to

60

compatible numpy dtypes for efficient numerical operations.

61

62

Raises:

63

InterfaceError: If numpy is not available

64

"""

65

```

66

67

### Installation for Data Science Features

68

69

Data science integration requires optional dependencies that can be installed separately.

70

71

```python { .api }

72

# Install with data science dependencies

73

# pip install redshift_connector[full]

74

75

# This includes:

76

# - pandas >= 1.0.0

77

# - numpy >= 1.15.0

78

79

# Error handling for missing dependencies

80

MISSING_MODULE_ERROR_MSG: str = (

81

"redshift_connector requires {module} support for this functionality. "

82

"Please install redshift_connector[full] for {module} support"

83

)

84

```

85

86

### Data Type Conversion

87

88

Automatic conversion between Redshift data types and pandas/numpy compatible types for seamless data analysis.

89

90

```python { .api }

91

# Redshift to pandas/numpy type mapping:

92

#

93

# BIGINT -> int64

94

# INTEGER -> int32

95

# SMALLINT -> int16

96

# BOOLEAN -> bool

97

# REAL -> float32

98

# DOUBLE PRECISION -> float64

99

# NUMERIC/DECIMAL -> float64 (or Decimal if numeric_to_float=False)

100

# VARCHAR/CHAR/TEXT -> object (string)

101

# DATE -> datetime64[D]

102

# TIMESTAMP -> datetime64[ns]

103

# TIME -> object (datetime.time)

104

# JSON/JSONB -> object (parsed JSON)

105

# ARRAY types -> object (Python lists)

106

107

# Configuration for numeric conversion

108

conn = redshift_connector.connect(

109

# ... connection parameters

110

numeric_to_float=True # Convert NUMERIC to float (default: False, uses Decimal)

111

)

112

```

113

114

### Usage Examples

115

116

Practical examples demonstrating data science integration patterns.

117

118

```python

119

import redshift_connector

120

import pandas as pd

121

import numpy as np

122

123

# Connect to Redshift

124

conn = redshift_connector.connect(

125

host='examplecluster.abc123xyz789.us-west-1.redshift.amazonaws.com',

126

database='dev',

127

user='awsuser',

128

password='password',

129

numeric_to_float=True # Convert NUMERIC to float for pandas compatibility

130

)

131

132

cursor = conn.cursor()

133

134

# Fetch data as DataFrame

135

cursor.execute("""

136

SELECT customer_id, order_date, total_amount, product_category

137

FROM sales_data

138

WHERE order_date >= '2023-01-01'

139

ORDER BY order_date

140

""")

141

142

# Get results as pandas DataFrame

143

df = cursor.fetch_dataframe()

144

print(f"Loaded {len(df)} rows into DataFrame")

145

print(df.dtypes)

146

print(df.head())

147

148

# Perform data analysis

149

monthly_sales = df.groupby(df['order_date'].dt.to_period('M'))['total_amount'].sum()

150

category_stats = df.groupby('product_category')['total_amount'].agg(['mean', 'sum', 'count'])

151

152

# Write processed data back to Redshift

153

# First create target table

154

cursor.execute("""

155

CREATE TABLE IF NOT EXISTS monthly_sales_summary (

156

month VARCHAR(10),

157

total_sales DECIMAL(15,2)

158

)

159

""")

160

161

# Convert Series to DataFrame for writing

162

summary_df = monthly_sales.reset_index()

163

summary_df.columns = ['month', 'total_sales']

164

summary_df['month'] = summary_df['month'].astype(str)

165

166

# Write DataFrame to table

167

cursor.write_dataframe(summary_df, 'monthly_sales_summary')

168

169

# Fetch numerical data as numpy array for analysis

170

cursor.execute("SELECT total_amount, quantity, discount FROM order_details")

171

data_array = cursor.fetch_numpy_array()

172

173

# Perform numerical analysis

174

mean_amount = np.mean(data_array[:, 0]) # total_amount column

175

correlation_matrix = np.corrcoef(data_array.T)

176

print(f"Mean order amount: {mean_amount}")

177

print("Correlation matrix:")

178

print(correlation_matrix)

179

180

cursor.close()

181

conn.close()

182

```

183

184

### Performance Considerations

185

186

Optimization strategies for efficient data transfer between Redshift and Python data science libraries.

187

188

```python { .api }

189

# Performance Tips:

190

191

# 1. Use columnar operations when possible

192

cursor.execute("SELECT col1, col2, col3 FROM large_table WHERE condition")

193

df = cursor.fetch_dataframe() # More efficient than row-by-row processing

194

195

# 2. Limit result sets for large tables

196

cursor.execute("SELECT * FROM large_table LIMIT 100000")

197

df = cursor.fetch_dataframe()

198

199

# 3. Use appropriate data types

200

conn = redshift_connector.connect(

201

# ...

202

numeric_to_float=True # Faster than Decimal for numerical analysis

203

)

204

205

# 4. Batch operations for large DataFrames

206

# Write large DataFrames in chunks if needed

207

chunk_size = 10000

208

for i in range(0, len(large_df), chunk_size):

209

chunk = large_df[i:i + chunk_size]

210

cursor.write_dataframe(chunk, 'target_table')

211

212

# 5. Use stream parameter for very large result sets

213

cursor.execute("SELECT * FROM very_large_table", stream=True)

214

# Process results in chunks rather than loading all into memory

215

```

216

217

### Error Handling for Data Science Operations

218

219

Specific error handling patterns for data science integration features.

220

221

```python { .api }

222

import redshift_connector

223

from redshift_connector import InterfaceError, ProgrammingError

224

225

try:

226

cursor = conn.cursor()

227

cursor.execute("SELECT * FROM my_table")

228

df = cursor.fetch_dataframe()

229

except InterfaceError as e:

230

if "pandas" in str(e):

231

print("pandas not available. Install with: pip install redshift_connector[full]")

232

elif "numpy" in str(e):

233

print("numpy not available. Install with: pip install redshift_connector[full]")

234

else:

235

print(f"Interface error: {e}")

236

except ProgrammingError as e:

237

print(f"SQL or schema error: {e}")

238

239

try:

240

cursor.write_dataframe(df, 'nonexistent_table')

241

except ProgrammingError as e:

242

print(f"Table write error: {e}")

243

# Consider creating table first or checking schema compatibility

244

```

245

246

### Integration with Data Science Ecosystem

247

248

The redshift_connector integrates seamlessly with the broader Python data science ecosystem.

249

250

```python

251

# Common integration patterns:

252

253

# 1. With Jupyter Notebooks

254

# %%sql magic command support (via SQLAlchemy integration)

255

import pandas as pd

256

import redshift_connector

257

258

conn = redshift_connector.connect(...)

259

df = pd.read_sql("SELECT * FROM my_table", conn)

260

261

# 2. With Matplotlib/Seaborn for visualization

262

import matplotlib.pyplot as plt

263

import seaborn as sns

264

265

cursor = conn.cursor()

266

cursor.execute("SELECT category, AVG(value) as avg_value FROM data GROUP BY category")

267

df = cursor.fetch_dataframe()

268

269

plt.figure(figsize=(10, 6))

270

sns.barplot(data=df, x='category', y='avg_value')

271

plt.title('Average Values by Category')

272

plt.show()

273

274

# 3. With Scikit-learn for machine learning

275

from sklearn.model_selection import train_test_split

276

from sklearn.linear_model import LinearRegression

277

278

cursor.execute("SELECT feature1, feature2, target FROM ml_data")

279

data = cursor.fetch_numpy_array()

280

281

X = data[:, :2] # features

282

y = data[:, 2] # target

283

284

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

285

model = LinearRegression()

286

model.fit(X_train, y_train)

287

288

# 4. With Apache Airflow for data pipelines

289

from airflow.operators.python import PythonOperator

290

291

def extract_transform_load():

292

conn = redshift_connector.connect(...)

293

cursor = conn.cursor()

294

295

# Extract

296

cursor.execute("SELECT * FROM source_table")

297

df = cursor.fetch_dataframe()

298

299

# Transform

300

df['processed_date'] = pd.to_datetime(df['date_column'])

301

df['calculated_field'] = df['field1'] * df['field2']

302

303

# Load

304

cursor.write_dataframe(df, 'target_table')

305

conn.close()

306

307

extract_task = PythonOperator(

308

task_id='extract_transform_load',

309

python_callable=extract_transform_load,

310

dag=dag

311

)

312

```