0
# Utilities
1
2
Core utilities for data processing, caching, JSON serialization, time handling, database management, and Celery integration. Provides essential functionality used throughout the Superset application for common operations and system integration.
3
4
## Capabilities
5
6
### Data Processing Utilities
7
8
Core functions for data transformation, serialization, and user interface operations.
9
10
```python { .api }
11
def flasher(msg, severity=None):
12
"""
13
Flash message utility for user notifications.
14
Integrates with Flask's flash message system for UI feedback.
15
16
Parameters:
17
- msg: str, message text to display to user
18
- severity: str, optional message severity ('info', 'warning', 'error', 'success')
19
20
Usage:
21
Used throughout the application to provide user feedback
22
for operations, errors, and status updates.
23
"""
24
25
def parse_human_datetime(s):
26
"""
27
Parse human-readable datetime strings.
28
Supports natural language date expressions and ISO formats.
29
30
Parameters:
31
- s: str, datetime string in human-readable format
32
33
Returns:
34
datetime object parsed from input string
35
36
Examples:
37
- '2023-01-01' -> datetime(2023, 1, 1)
38
- 'yesterday' -> datetime for previous day
39
- '1 week ago' -> datetime for one week prior
40
"""
41
42
def datetime_f(dttm):
43
"""
44
Format datetime objects for display.
45
Standardized datetime formatting for UI consistency.
46
47
Parameters:
48
- dttm: datetime, datetime object to format
49
50
Returns:
51
str, formatted datetime string for display
52
"""
53
54
def base_json_conv(obj):
55
"""
56
JSON serialization converter for complex objects.
57
Handles datetime, Decimal, and other non-serializable types.
58
59
Parameters:
60
- obj: any, object to convert for JSON serialization
61
62
Returns:
63
JSON-serializable representation of object
64
65
Usage:
66
Used as default converter in json_dumps() for complex data types.
67
"""
68
69
def json_iso_dttm_ser(dttm, pessimistic=False):
70
"""
71
ISO datetime serialization for JSON APIs.
72
73
Parameters:
74
- dttm: datetime, datetime object to serialize
75
- pessimistic: bool, whether to use pessimistic timezone handling
76
77
Returns:
78
str, ISO 8601 formatted datetime string
79
"""
80
81
def json_int_dttm_ser(dttm):
82
"""
83
Integer timestamp serialization for JavaScript compatibility.
84
85
Parameters:
86
- dttm: datetime, datetime object to serialize
87
88
Returns:
89
int, Unix timestamp in milliseconds for JavaScript Date()
90
"""
91
92
def json_dumps(obj, default=None, ignore_nan=False, encoding=None, sort_keys=False):
93
"""
94
Enhanced JSON serialization with Superset-specific handling.
95
96
Parameters:
97
- obj: any, object to serialize to JSON
98
- default: callable, custom serialization function for complex types
99
- ignore_nan: bool, whether to ignore NaN values in numeric data
100
- encoding: str, character encoding for string data
101
- sort_keys: bool, whether to sort dictionary keys in output
102
103
Returns:
104
str, JSON string representation of object
105
106
Features:
107
- Handles pandas DataFrames and Series
108
- Processes datetime objects with timezone awareness
109
- Manages NaN and infinity values appropriately
110
- Supports custom serialization handlers
111
"""
112
```
113
114
### Database Utilities
115
116
Database connection management and configuration functions.
117
118
```python { .api }
119
def pessimistic_connection_handling(engine):
120
"""
121
Configure pessimistic disconnect handling for database connections.
122
Improves connection reliability in unstable network environments.
123
124
Parameters:
125
- engine: SQLAlchemy Engine, database engine to configure
126
127
Side Effects:
128
Configures engine event listeners for connection validation
129
and automatic reconnection on disconnect detection.
130
"""
131
132
def setup_cache(app, cache_config):
133
"""
134
Initialize application cache configuration.
135
Sets up Flask-Caching with specified backend and options.
136
137
Parameters:
138
- app: Flask application instance
139
- cache_config: dict, cache configuration parameters
140
141
Returns:
142
Cache instance configured for the application
143
144
Supported Backends:
145
- Redis: High-performance distributed caching
146
- Memcached: Memory-based caching system
147
- Simple: In-memory Python dictionary cache
148
- FileSystem: File-based cache storage
149
"""
150
151
def get_or_create_main_db():
152
"""
153
Get or create main database connection instance.
154
Ensures Superset has a configured main database for metadata storage.
155
156
Returns:
157
Database instance for Superset's main metadata database
158
159
Usage:
160
Called during application initialization to establish
161
the primary database connection for application metadata.
162
"""
163
164
def get_main_database(session):
165
"""
166
Retrieve main database instance from session.
167
168
Parameters:
169
- session: SQLAlchemy session for database operations
170
171
Returns:
172
Database instance representing the main Superset database
173
"""
174
175
def get_update_perms_flag():
176
"""
177
Get permission update flag from configuration.
178
Controls whether permissions are automatically updated during startup.
179
180
Returns:
181
bool, True if permissions should be updated automatically
182
"""
183
```
184
185
### Query Processing Utilities
186
187
Functions for processing and manipulating query parameters and filters.
188
189
```python { .api }
190
def merge_extra_filters(form_data, extra_filters):
191
"""
192
Merge additional filters into form data.
193
Combines dashboard-level filters with chart-specific filters.
194
195
Parameters:
196
- form_data: dict, chart configuration and existing filters
197
- extra_filters: list, additional filters to apply
198
199
Returns:
200
dict, updated form data with merged filters
201
202
Usage:
203
Used when dashboard filters need to be applied to individual charts
204
for consistent filtering across dashboard components.
205
"""
206
207
def merge_request_params(form_data, params):
208
"""
209
Merge HTTP request parameters into form data.
210
Incorporates URL parameters and form submissions into chart configuration.
211
212
Parameters:
213
- form_data: dict, existing chart configuration
214
- params: dict, HTTP request parameters to merge
215
216
Returns:
217
dict, updated form data with request parameters
218
"""
219
220
def get_since_until(time_range=None, since=None, until=None, time_shift=None, relative_start=None, relative_end=None):
221
"""
222
Parse and process time range parameters for queries.
223
Handles various time range specifications and converts to absolute timestamps.
224
225
Parameters:
226
- time_range: str, natural language time range ('Last week', '30 days ago', etc.)
227
- since: str, start time specification
228
- until: str, end time specification
229
- time_shift: str, time shift offset for comparisons
230
- relative_start: str, relative start time specification
231
- relative_end: str, relative end time specification
232
233
Returns:
234
tuple, (since_datetime, until_datetime) with processed time boundaries
235
236
Features:
237
- Natural language time range parsing
238
- Relative time calculations
239
- Time zone handling and conversion
240
- Support for rolling time windows
241
"""
242
243
def add_ago_to_kwargs(kwargs, time_ago):
244
"""
245
Add time offset to query parameters for temporal comparisons.
246
247
Parameters:
248
- kwargs: dict, query parameters to modify
249
- time_ago: str, time offset specification ('1 week ago', '30 days', etc.)
250
251
Returns:
252
dict, modified parameters with time offset applied
253
254
Usage:
255
Used for period-over-period comparisons and trend analysis
256
where historical data needs to be queried with time shifts.
257
"""
258
```
259
260
### Security and Validation Utilities
261
262
Functions for data validation, compression, and security operations.
263
264
```python { .api }
265
def zlib_compress(data):
266
"""
267
Compress data using zlib compression algorithm.
268
269
Parameters:
270
- data: bytes or str, data to compress
271
272
Returns:
273
bytes, compressed data suitable for storage or transmission
274
275
Usage:
276
Used for compressing large query results and cached data
277
to reduce storage requirements and network bandwidth.
278
"""
279
280
def zlib_decompress(data):
281
"""
282
Decompress zlib-compressed data.
283
284
Parameters:
285
- data: bytes, compressed data to decompress
286
287
Returns:
288
bytes, original uncompressed data
289
290
Usage:
291
Companion function to zlib_compress() for retrieving
292
compressed cached data and query results.
293
"""
294
295
def validate_json(obj):
296
"""
297
Validate JSON structure and content.
298
299
Parameters:
300
- obj: any, object to validate for JSON compliance
301
302
Returns:
303
bool, True if object is valid JSON, False otherwise
304
305
Raises:
306
ValueError for invalid JSON structures
307
308
Usage:
309
Used throughout the application to validate configuration
310
parameters, API inputs, and stored JSON data.
311
"""
312
```
313
314
### Caching Utilities
315
316
Memoization and caching functionality for performance optimization.
317
318
```python { .api }
319
class memoized:
320
"""
321
Memoization decorator for function result caching.
322
323
Properties:
324
- watch: list, instance variables to monitor for cache invalidation
325
326
Usage:
327
Decorator that caches function results based on arguments.
328
Automatically invalidates cache when watched instance variables change.
329
330
Example:
331
@memoized
332
def expensive_calculation(self, param1, param2):
333
return complex_computation(param1, param2)
334
335
@memoized(watch=('config', 'settings'))
336
def config_dependent_function(self):
337
return process_configuration(self.config)
338
"""
339
```
340
341
### Time and Date Utilities
342
343
Constants and functions for time-based operations and calculations.
344
345
```python { .api }
346
def now_as_float():
347
"""
348
Get current timestamp as floating point number.
349
350
Returns:
351
float, current time as Unix timestamp with millisecond precision
352
353
Usage:
354
Used for performance timing, cache key generation,
355
and high-precision timestamp requirements.
356
"""
357
358
DTTM_ALIAS: str = '__timestamp'
359
"""
360
Standard alias for datetime columns in queries.
361
Consistent column name used across visualizations for time-based data.
362
"""
363
364
EPOCH: datetime
365
"""
366
Unix epoch datetime object (1970-01-01 00:00:00 UTC).
367
Reference point for timestamp calculations and conversions.
368
"""
369
370
JS_MAX_INTEGER: int = 9007199254740991 # 2^53-1
371
"""
372
Maximum safe integer value for JavaScript compatibility.
373
Used to prevent precision loss when sending large integers to frontend.
374
"""
375
```
376
377
### Data Types and Extensions
378
379
Custom SQLAlchemy types and database-specific extensions.
380
381
```python { .api }
382
class MediumText:
383
"""
384
Extended text column type for MySQL databases.
385
Provides larger text storage capacity than standard TEXT type.
386
387
Features:
388
- Supports up to 16MB of text data
389
- MySQL-specific optimization
390
- Automatic fallback for other database engines
391
"""
392
393
# Custom SQLAlchemy Types
394
"""
395
Various custom column types for specialized data storage:
396
- JSON columns for configuration data
397
- Encrypted columns for sensitive information
398
- Compressed columns for large text data
399
- Custom numeric types for specialized calculations
400
"""
401
```
402
403
### Celery Integration
404
405
Celery application management for asynchronous task processing.
406
407
```python { .api }
408
def get_celery_app(config):
409
"""
410
Get or create Celery application instance.
411
412
Parameters:
413
- config: dict or object, Celery configuration parameters
414
415
Returns:
416
Celery application instance configured for Superset tasks
417
418
Features:
419
- Automatic configuration from Superset settings
420
- Task routing and queue management
421
- Result backend configuration
422
- Worker process management
423
424
Usage:
425
Used to initialize Celery for asynchronous query processing,
426
email notifications, and background task execution.
427
"""
428
```
429
430
### Query Status and Enumerations
431
432
Status tracking and enumeration constants for query lifecycle management.
433
434
```python { .api }
435
class QueryStatus:
436
"""
437
Query execution status enumeration.
438
Defines standardized status values for tracking query lifecycle.
439
"""
440
441
STOPPED = 'stopped'
442
"""Query execution was manually stopped or cancelled."""
443
444
FAILED = 'failed'
445
"""Query execution failed due to error or exception."""
446
447
PENDING = 'pending'
448
"""Query is queued and waiting for execution."""
449
450
RUNNING = 'running'
451
"""Query is currently executing on database."""
452
453
SCHEDULED = 'scheduled'
454
"""Query is scheduled for future execution."""
455
456
SUCCESS = 'success'
457
"""Query completed successfully with results available."""
458
459
TIMED_OUT = 'timed_out'
460
"""Query exceeded maximum allowed execution time."""
461
```
462
463
### Adhoc Metrics
464
465
Dynamic metric creation and processing utilities.
466
467
```python { .api }
468
ADHOC_METRIC_EXPRESSION_TYPES = {
469
'SIMPLE': 'SIMPLE',
470
'SQL': 'SQL'
471
}
472
"""
473
Adhoc metric expression type constants.
474
475
- SIMPLE: Basic aggregation functions (SUM, AVG, COUNT, etc.)
476
- SQL: Custom SQL expressions for complex calculations
477
"""
478
479
def to_adhoc(fds, metric, label=None):
480
"""
481
Convert metric definition to adhoc metric format.
482
483
Parameters:
484
- fds: dict, form data structure containing metric context
485
- metric: str or dict, metric name or definition to convert
486
- label: str, optional custom label for the metric
487
488
Returns:
489
dict, adhoc metric definition suitable for query processing
490
491
Usage:
492
Used to standardize metric definitions from various sources
493
into a consistent format for query generation and visualization.
494
"""
495
```
496
497
## Usage Examples
498
499
### Data Processing
500
501
```python
502
# Parse natural language dates
503
start_date = parse_human_datetime('30 days ago')
504
end_date = parse_human_datetime('today')
505
506
# Format for display
507
formatted_date = datetime_f(start_date)
508
509
# JSON serialization with complex types
510
data = {
511
'timestamp': datetime.now(),
512
'values': [1.5, 2.7, float('nan')],
513
'metadata': {'source': 'database'}
514
}
515
json_string = json_dumps(data, ignore_nan=True)
516
```
517
518
### Caching and Memoization
519
520
```python
521
class DataProcessor:
522
def __init__(self):
523
self.config = {}
524
525
@memoized(watch=['config'])
526
def process_data(self, dataset_id):
527
"""Expensive data processing with configuration dependency."""
528
return expensive_calculation(dataset_id, self.config)
529
530
@memoized
531
def get_metadata(self, table_name):
532
"""Cached metadata retrieval."""
533
return fetch_table_metadata(table_name)
534
```
535
536
### Query Processing
537
538
```python
539
# Merge dashboard filters with chart filters
540
chart_data = merge_extra_filters(
541
form_data={'metrics': ['count'], 'groupby': ['category']},
542
extra_filters=[{'col': 'status', 'op': '==', 'val': 'active'}]
543
)
544
545
# Process time range parameters
546
since, until = get_since_until(
547
time_range='Last 30 days',
548
time_shift='1 week ago'
549
)
550
```
551
552
### Database Operations
553
554
```python
555
# Setup application cache
556
app = Flask(__name__)
557
cache = setup_cache(app, {
558
'CACHE_TYPE': 'redis',
559
'CACHE_REDIS_URL': 'redis://localhost:6379/0'
560
})
561
562
# Configure database connection
563
engine = create_engine(database_url)
564
pessimistic_connection_handling(engine)
565
```
566
567
### Celery Task Management
568
569
```python
570
# Initialize Celery application
571
celery_config = {
572
'broker_url': 'redis://localhost:6379/0',
573
'result_backend': 'redis://localhost:6379/0'
574
}
575
celery_app = get_celery_app(celery_config)
576
577
# Define async task
578
@celery_app.task
579
def process_large_query(query_id):
580
return execute_sql_query(query_id)
581
```
582
583
The utilities module provides essential functionality that supports all aspects of Superset operation, from data processing and caching to security and asynchronous task management, enabling robust and performant data visualization and exploration capabilities.