0
# Data Science Integration
1
2
Native integration with pandas and numpy for efficient data transfer between Redshift and Python data science workflows. The redshift_connector provides optimized methods for working with DataFrames and numpy arrays, enabling seamless data analysis pipelines.
3
4
## Capabilities
5
6
### Pandas DataFrame Integration
7
8
Direct support for reading query results into pandas DataFrames and writing DataFrames to Redshift tables with automatic type conversion and optimization.
9
10
```python { .api }
11
class Cursor:
12
def fetch_dataframe(self, num: int = None) -> 'pandas.DataFrame':
13
"""
14
Fetch query results as a pandas DataFrame.
15
16
Parameters:
17
- num: Maximum number of rows to fetch (None for all remaining rows)
18
19
Returns:
20
pandas.DataFrame with query results
21
22
Raises:
23
InterfaceError: If pandas is not available
24
"""
25
26
def write_dataframe(self, df: 'pandas.DataFrame', table: str) -> None:
27
"""
28
Write a pandas DataFrame to a Redshift table.
29
30
Parameters:
31
- df: pandas DataFrame to write
32
- table: Target table name
33
34
The method automatically handles type conversion and creates
35
appropriate INSERT statements for efficient data loading.
36
37
Raises:
38
InterfaceError: If pandas is not available
39
ProgrammingError: If table doesn't exist or schema mismatch
40
"""
41
```
42
43
### NumPy Array Integration
44
45
Support for fetching query results as numpy arrays for numerical computing and scientific analysis workflows.
46
47
```python { .api }
48
class Cursor:
49
def fetch_numpy_array(self, num: int = None) -> 'numpy.ndarray':
50
"""
51
Fetch query results as a numpy ndarray.
52
53
Parameters:
54
- num: Maximum number of rows to fetch (None for all remaining rows)
55
56
Returns:
57
numpy.ndarray containing query results with appropriate dtypes
58
59
The method automatically converts Redshift data types to
60
compatible numpy dtypes for efficient numerical operations.
61
62
Raises:
63
InterfaceError: If numpy is not available
64
"""
65
```
66
67
### Installation for Data Science Features
68
69
Data science integration requires optional dependencies that can be installed separately.
70
71
```python { .api }
72
# Install with data science dependencies
73
# pip install redshift_connector[full]
74
75
# This includes:
76
# - pandas >= 1.0.0
77
# - numpy >= 1.15.0
78
79
# Error handling for missing dependencies
80
MISSING_MODULE_ERROR_MSG: str = (
81
"redshift_connector requires {module} support for this functionality. "
82
"Please install redshift_connector[full] for {module} support"
83
)
84
```
85
86
### Data Type Conversion
87
88
Automatic conversion between Redshift data types and pandas/numpy compatible types for seamless data analysis.
89
90
```python { .api }
91
# Redshift to pandas/numpy type mapping:
92
#
93
# BIGINT -> int64
94
# INTEGER -> int32
95
# SMALLINT -> int16
96
# BOOLEAN -> bool
97
# REAL -> float32
98
# DOUBLE PRECISION -> float64
99
# NUMERIC/DECIMAL -> float64 (or Decimal if numeric_to_float=False)
100
# VARCHAR/CHAR/TEXT -> object (string)
101
# DATE -> datetime64[D]
102
# TIMESTAMP -> datetime64[ns]
103
# TIME -> object (datetime.time)
104
# JSON/JSONB -> object (parsed JSON)
105
# ARRAY types -> object (Python lists)
106
107
# Configuration for numeric conversion
108
conn = redshift_connector.connect(
109
# ... connection parameters
110
numeric_to_float=True # Convert NUMERIC to float (default: False, uses Decimal)
111
)
112
```
113
114
### Usage Examples
115
116
Practical examples demonstrating data science integration patterns.
117
118
```python
119
import redshift_connector
120
import pandas as pd
121
import numpy as np
122
123
# Connect to Redshift
124
conn = redshift_connector.connect(
125
host='examplecluster.abc123xyz789.us-west-1.redshift.amazonaws.com',
126
database='dev',
127
user='awsuser',
128
password='password',
129
numeric_to_float=True # Convert NUMERIC to float for pandas compatibility
130
)
131
132
cursor = conn.cursor()
133
134
# Fetch data as DataFrame
135
cursor.execute("""
136
SELECT customer_id, order_date, total_amount, product_category
137
FROM sales_data
138
WHERE order_date >= '2023-01-01'
139
ORDER BY order_date
140
""")
141
142
# Get results as pandas DataFrame
143
df = cursor.fetch_dataframe()
144
print(f"Loaded {len(df)} rows into DataFrame")
145
print(df.dtypes)
146
print(df.head())
147
148
# Perform data analysis
149
monthly_sales = df.groupby(df['order_date'].dt.to_period('M'))['total_amount'].sum()
150
category_stats = df.groupby('product_category')['total_amount'].agg(['mean', 'sum', 'count'])
151
152
# Write processed data back to Redshift
153
# First create target table
154
cursor.execute("""
155
CREATE TABLE IF NOT EXISTS monthly_sales_summary (
156
month VARCHAR(10),
157
total_sales DECIMAL(15,2)
158
)
159
""")
160
161
# Convert Series to DataFrame for writing
162
summary_df = monthly_sales.reset_index()
163
summary_df.columns = ['month', 'total_sales']
164
summary_df['month'] = summary_df['month'].astype(str)
165
166
# Write DataFrame to table
167
cursor.write_dataframe(summary_df, 'monthly_sales_summary')
168
169
# Fetch numerical data as numpy array for analysis
170
cursor.execute("SELECT total_amount, quantity, discount FROM order_details")
171
data_array = cursor.fetch_numpy_array()
172
173
# Perform numerical analysis
174
mean_amount = np.mean(data_array[:, 0]) # total_amount column
175
correlation_matrix = np.corrcoef(data_array.T)
176
print(f"Mean order amount: {mean_amount}")
177
print("Correlation matrix:")
178
print(correlation_matrix)
179
180
cursor.close()
181
conn.close()
182
```
183
184
### Performance Considerations
185
186
Optimization strategies for efficient data transfer between Redshift and Python data science libraries.
187
188
```python { .api }
189
# Performance Tips:
190
191
# 1. Use columnar operations when possible
192
cursor.execute("SELECT col1, col2, col3 FROM large_table WHERE condition")
193
df = cursor.fetch_dataframe() # More efficient than row-by-row processing
194
195
# 2. Limit result sets for large tables
196
cursor.execute("SELECT * FROM large_table LIMIT 100000")
197
df = cursor.fetch_dataframe()
198
199
# 3. Use appropriate data types
200
conn = redshift_connector.connect(
201
# ...
202
numeric_to_float=True # Faster than Decimal for numerical analysis
203
)
204
205
# 4. Batch operations for large DataFrames
206
# Write large DataFrames in chunks if needed
207
chunk_size = 10000
208
for i in range(0, len(large_df), chunk_size):
209
chunk = large_df[i:i + chunk_size]
210
cursor.write_dataframe(chunk, 'target_table')
211
212
# 5. Use stream parameter for very large result sets
213
cursor.execute("SELECT * FROM very_large_table", stream=True)
214
# Process results in chunks rather than loading all into memory
215
```
216
217
### Error Handling for Data Science Operations
218
219
Specific error handling patterns for data science integration features.
220
221
```python { .api }
222
import redshift_connector
223
from redshift_connector import InterfaceError, ProgrammingError
224
225
try:
226
cursor = conn.cursor()
227
cursor.execute("SELECT * FROM my_table")
228
df = cursor.fetch_dataframe()
229
except InterfaceError as e:
230
if "pandas" in str(e):
231
print("pandas not available. Install with: pip install redshift_connector[full]")
232
elif "numpy" in str(e):
233
print("numpy not available. Install with: pip install redshift_connector[full]")
234
else:
235
print(f"Interface error: {e}")
236
except ProgrammingError as e:
237
print(f"SQL or schema error: {e}")
238
239
try:
240
cursor.write_dataframe(df, 'nonexistent_table')
241
except ProgrammingError as e:
242
print(f"Table write error: {e}")
243
# Consider creating table first or checking schema compatibility
244
```
245
246
### Integration with Data Science Ecosystem
247
248
The redshift_connector integrates seamlessly with the broader Python data science ecosystem.
249
250
```python
251
# Common integration patterns:
252
253
# 1. With Jupyter Notebooks
254
# %%sql magic command support (via SQLAlchemy integration)
255
import pandas as pd
256
import redshift_connector
257
258
conn = redshift_connector.connect(...)
259
df = pd.read_sql("SELECT * FROM my_table", conn)
260
261
# 2. With Matplotlib/Seaborn for visualization
262
import matplotlib.pyplot as plt
263
import seaborn as sns
264
265
cursor = conn.cursor()
266
cursor.execute("SELECT category, AVG(value) as avg_value FROM data GROUP BY category")
267
df = cursor.fetch_dataframe()
268
269
plt.figure(figsize=(10, 6))
270
sns.barplot(data=df, x='category', y='avg_value')
271
plt.title('Average Values by Category')
272
plt.show()
273
274
# 3. With Scikit-learn for machine learning
275
from sklearn.model_selection import train_test_split
276
from sklearn.linear_model import LinearRegression
277
278
cursor.execute("SELECT feature1, feature2, target FROM ml_data")
279
data = cursor.fetch_numpy_array()
280
281
X = data[:, :2] # features
282
y = data[:, 2] # target
283
284
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
285
model = LinearRegression()
286
model.fit(X_train, y_train)
287
288
# 4. With Apache Airflow for data pipelines
289
from airflow.operators.python import PythonOperator
290
291
def extract_transform_load():
292
conn = redshift_connector.connect(...)
293
cursor = conn.cursor()
294
295
# Extract
296
cursor.execute("SELECT * FROM source_table")
297
df = cursor.fetch_dataframe()
298
299
# Transform
300
df['processed_date'] = pd.to_datetime(df['date_column'])
301
df['calculated_field'] = df['field1'] * df['field2']
302
303
# Load
304
cursor.write_dataframe(df, 'target_table')
305
conn.close()
306
307
extract_task = PythonOperator(
308
task_id='extract_transform_load',
309
python_callable=extract_transform_load,
310
dag=dag
311
)
312
```