0
# Data Utilities
1
2
Utility functions for data conversion and integration with the Python data science ecosystem, including pandas DataFrame conversion and database management functions.
3
4
## Capabilities
5
6
### Pandas Integration
7
8
Convert query results to pandas DataFrames for seamless integration with the Python data science stack.
9
10
```python { .api }
11
def as_pandas(cursor, coerce_float=False):
12
"""
13
Convert cursor results to pandas DataFrame.
14
15
This function converts the results from an executed cursor into a pandas
16
DataFrame, enabling easy integration with the Python data science ecosystem
17
including scikit-learn, matplotlib, and other analysis libraries.
18
19
Parameters:
20
cursor: Active cursor object with executed query results
21
coerce_float (bool): Coerce numeric columns to float type. Default is False.
22
23
Returns:
24
pandas.DataFrame: Query results as a DataFrame with proper column names
25
26
Raises:
27
ImportError: If pandas is not installed
28
ValueError: If cursor has no results or is not executed
29
"""
30
```
31
32
### Database Management
33
34
Administrative functions for managing databases and performing maintenance operations.
35
36
```python { .api }
37
def force_drop_impala_database(cursor, database_name):
38
"""
39
Force drop an Impala database and all its tables.
40
41
This is a utility function for development and testing that forcibly
42
drops a database even if it contains tables.
43
44
Parameters:
45
cursor: Active cursor object
46
database_name (str): Name of the database to drop
47
48
Warning:
49
This operation is destructive and cannot be undone.
50
"""
51
52
def force_drop_hive_database(cursor, database_name):
53
"""
54
Force drop a Hive database and all its tables.
55
56
Similar to force_drop_impala_database but optimized for Hive-specific
57
metadata handling.
58
59
Parameters:
60
cursor: Active cursor object
61
database_name (str): Name of the database to drop
62
63
Warning:
64
This operation is destructive and cannot be undone.
65
"""
66
```
67
68
### Logging and Debugging
69
70
Utilities for logging and debugging database operations.
71
72
```python { .api }
73
def get_logger_and_init_null():
74
"""
75
Get logger instance with null handler initialization.
76
77
Returns a properly configured logger for impyla operations with
78
a null handler to prevent unwanted log output unless explicitly
79
configured by the user.
80
81
Returns:
82
logging.Logger: Configured logger instance
83
"""
84
```
85
86
### Internal Utilities
87
88
Internal utility functions used by the impyla library for various operations.
89
90
```python { .api }
91
def _random_id(prefix='', length=8):
92
"""
93
Generate a random identifier.
94
95
Used internally for generating unique identifiers for operations
96
and temporary objects.
97
98
Parameters:
99
prefix (str): Optional prefix for the identifier
100
length (int): Length of the random part (default 8)
101
102
Returns:
103
str: Random identifier string
104
"""
105
106
def _get_table_schema_hack(cursor, table):
107
"""
108
Get table schema information using internal methods.
109
110
This is an internal utility for extracting table schema information
111
when standard methods are not available.
112
113
Parameters:
114
cursor: Active cursor object
115
table (str): Name of the table
116
117
Returns:
118
list: Schema information as list of column descriptors
119
"""
120
121
def _gen_safe_random_table_name(cursor, prefix='tmp'):
122
"""
123
Generate a safe random table name that doesn't conflict with existing tables.
124
125
Parameters:
126
cursor: Active cursor object
127
prefix (str): Prefix for the table name (default 'tmp')
128
129
Returns:
130
str: Safe random table name
131
"""
132
133
def compute_result_schema(cursor, query_string):
134
"""
135
Compute the result schema for a query without executing it.
136
137
Parameters:
138
cursor: Active cursor object
139
query_string (str): SQL query to analyze
140
141
Returns:
142
list: Schema information for the query result
143
"""
144
145
def get_basic_credentials_for_request_headers(user, password):
146
"""
147
Generate basic authentication credentials for HTTP request headers.
148
149
Parameters:
150
user (str): Username
151
password (str): Password
152
153
Returns:
154
str: Base64 encoded credentials for Authorization header
155
"""
156
```
157
158
### HTTP and Cookie Utilities
159
160
Utilities for handling HTTP transport and cookie management.
161
162
```python { .api }
163
def cookie_matches_path(c, path):
164
"""
165
Check if a cookie matches a given path.
166
167
Parameters:
168
c: Cookie object
169
path (str): URL path to check
170
171
Returns:
172
bool: True if cookie matches the path
173
"""
174
175
def get_cookie_expiry(c):
176
"""
177
Get the expiry time of a cookie.
178
179
Parameters:
180
c: Cookie object
181
182
Returns:
183
datetime or None: Cookie expiry time
184
"""
185
186
def get_cookies(resp_headers):
187
"""
188
Extract cookies from HTTP response headers.
189
190
Parameters:
191
resp_headers: HTTP response headers
192
193
Returns:
194
list: List of cookie objects
195
"""
196
197
def get_all_cookies(path, resp_headers):
198
"""
199
Get all cookies from HTTP response headers for a given path.
200
201
Parameters:
202
path (str): URL path
203
resp_headers: HTTP response headers
204
205
Returns:
206
list: List of cookie objects matching the path
207
"""
208
209
def get_all_matching_cookies(cookie_names, path, resp_headers):
210
"""
211
Get cookies matching specific names from HTTP response headers.
212
213
Parameters:
214
cookie_names (list): List of cookie names to match
215
path (str): URL path
216
resp_headers: HTTP response headers
217
218
Returns:
219
list: List of matching cookie objects
220
"""
221
```
222
223
### Deprecation and Warning Utilities
224
225
Utilities for handling deprecation warnings and protocol warnings.
226
227
```python { .api }
228
def warn_protocol_param():
229
"""
230
Issue a warning about deprecated protocol parameters.
231
"""
232
233
def warn_deprecate(functionality='This', alternative=None):
234
"""
235
Issue a deprecation warning for functionality.
236
237
Parameters:
238
functionality (str): Description of deprecated functionality
239
alternative (str): Suggested alternative (optional)
240
"""
241
242
def warn_nontls_jwt():
243
"""
244
Issue a warning about using JWT without TLS.
245
"""
246
```
247
248
## Usage Examples
249
250
### Basic Pandas Conversion
251
252
```python
253
from impala.dbapi import connect
254
from impala.util import as_pandas
255
256
# Connect and execute query
257
conn = connect(host='impala-host', port=21050)
258
cursor = conn.cursor()
259
260
cursor.execute("""
261
SELECT
262
customer_id,
263
order_date,
264
total_amount,
265
status
266
FROM orders
267
WHERE order_date >= '2023-01-01'
268
LIMIT 1000
269
""")
270
271
# Convert to pandas DataFrame
272
df = as_pandas(cursor)
273
274
print(f"DataFrame shape: {df.shape}")
275
print("\nColumn types:")
276
print(df.dtypes)
277
278
print("\nFirst few rows:")
279
print(df.head())
280
281
cursor.close()
282
conn.close()
283
```
284
285
### Data Analysis with Pandas
286
287
```python
288
from impala.dbapi import connect
289
from impala.util import as_pandas
290
import matplotlib.pyplot as plt
291
292
conn = connect(host='impala-host', port=21050)
293
cursor = conn.cursor()
294
295
# Query sales data
296
cursor.execute("""
297
SELECT
298
DATE_TRUNC('month', order_date) as month,
299
SUM(total_amount) as monthly_sales,
300
COUNT(*) as order_count,
301
AVG(total_amount) as avg_order_value
302
FROM orders
303
WHERE order_date >= '2023-01-01'
304
GROUP BY DATE_TRUNC('month', order_date)
305
ORDER BY month
306
""")
307
308
# Convert to DataFrame
309
df = as_pandas(cursor, coerce_float=True)
310
311
# Perform analysis
312
print("Monthly Sales Summary:")
313
print(df.describe())
314
315
# Plot monthly trends
316
plt.figure(figsize=(12, 6))
317
plt.subplot(1, 2, 1)
318
plt.plot(df['month'], df['monthly_sales'])
319
plt.title('Monthly Sales')
320
plt.xticks(rotation=45)
321
322
plt.subplot(1, 2, 2)
323
plt.plot(df['month'], df['avg_order_value'])
324
plt.title('Average Order Value')
325
plt.xticks(rotation=45)
326
327
plt.tight_layout()
328
plt.show()
329
330
cursor.close()
331
conn.close()
332
```
333
334
### Working with Complex Data Types
335
336
```python
337
from impala.dbapi import connect
338
from impala.util import as_pandas
339
import json
340
341
conn = connect(host='impala-host', port=21050)
342
cursor = conn.cursor()
343
344
# Query with complex data types (arrays, structs)
345
cursor.execute("""
346
SELECT
347
user_id,
348
profile.name,
349
profile.email,
350
tags,
351
metadata
352
FROM user_profiles
353
LIMIT 100
354
""")
355
356
df = as_pandas(cursor)
357
358
# Handle complex types
359
print("Data types:")
360
print(df.dtypes)
361
362
# Work with array columns
363
if 'tags' in df.columns:
364
# Convert string representation of arrays to Python lists
365
df['tags_list'] = df['tags'].apply(
366
lambda x: json.loads(x) if x and x != 'NULL' else []
367
)
368
369
# Analyze tag frequency
370
all_tags = []
371
for tags in df['tags_list']:
372
all_tags.extend(tags)
373
374
from collections import Counter
375
tag_counts = Counter(all_tags)
376
print("\nTop 10 most common tags:")
377
for tag, count in tag_counts.most_common(10):
378
print(f"{tag}: {count}")
379
380
cursor.close()
381
conn.close()
382
```
383
384
### Database Management Operations
385
386
```python
387
from impala.dbapi import connect
388
from impala.util import force_drop_impala_database
389
390
# Connect with administrative privileges
391
conn = connect(
392
host='impala-host',
393
port=21050,
394
auth_mechanism='GSSAPI' # Admin access
395
)
396
cursor = conn.cursor()
397
398
# Create test database
399
cursor.execute("CREATE DATABASE IF NOT EXISTS test_analytics")
400
401
# Create some test tables
402
cursor.execute("""
403
CREATE TABLE IF NOT EXISTS test_analytics.sample_data (
404
id INT,
405
name STRING,
406
value DOUBLE
407
)
408
""")
409
410
cursor.execute("""
411
INSERT INTO test_analytics.sample_data
412
VALUES (1, 'test', 123.45)
413
""")
414
415
# List tables in the database
416
cursor.execute("SHOW TABLES IN test_analytics")
417
tables = cursor.fetchall()
418
print(f"Tables in test_analytics: {[t[0] for t in tables]}")
419
420
# Force drop the entire database (use with caution!)
421
try:
422
force_drop_impala_database(cursor, 'test_analytics')
423
print("Database test_analytics dropped successfully")
424
except Exception as e:
425
print(f"Error dropping database: {e}")
426
427
cursor.close()
428
conn.close()
429
```
430
431
### Custom Data Processing Pipeline
432
433
```python
434
from impala.dbapi import connect
435
from impala.util import as_pandas
436
import pandas as pd
437
from sklearn.preprocessing import StandardScaler
438
from sklearn.ensemble import RandomForestClassifier
439
440
def create_ml_pipeline(host, query, target_column):
441
"""Create a machine learning pipeline from Impala data."""
442
443
# Connect and fetch data
444
conn = connect(host=host, port=21050)
445
cursor = conn.cursor()
446
447
cursor.execute(query)
448
df = as_pandas(cursor, coerce_float=True)
449
450
print(f"Loaded {len(df)} rows from Impala")
451
452
# Prepare features and target
453
feature_columns = [col for col in df.columns if col != target_column]
454
X = df[feature_columns]
455
y = df[target_column]
456
457
# Handle missing values
458
X = X.fillna(X.mean())
459
460
# Scale features
461
scaler = StandardScaler()
462
X_scaled = scaler.fit_transform(X)
463
464
# Train model
465
model = RandomForestClassifier(n_estimators=100, random_state=42)
466
model.fit(X_scaled, y)
467
468
print(f"Model trained with accuracy: {model.score(X_scaled, y):.3f}")
469
470
# Feature importance
471
feature_importance = pd.DataFrame({
472
'feature': feature_columns,
473
'importance': model.feature_importances_
474
}).sort_values('importance', ascending=False)
475
476
print("\nTop 5 important features:")
477
print(feature_importance.head())
478
479
cursor.close()
480
conn.close()
481
482
return model, scaler, feature_importance
483
484
# Usage
485
ml_query = """
486
SELECT
487
age, income, education_years, experience_years,
488
CASE WHEN salary > 50000 THEN 1 ELSE 0 END as high_earner
489
FROM employee_data
490
WHERE age IS NOT NULL
491
AND income IS NOT NULL
492
"""
493
494
model, scaler, importance = create_ml_pipeline(
495
'impala-host',
496
ml_query,
497
'high_earner'
498
)
499
```