0
# Query Partitioning
1
2
Functionality for partitioning SQL queries to enable parallel data loading across multiple threads. Query partitioning allows ConnectorX to split large queries into smaller, parallelizable chunks based on a specified column's value range.
3
4
## Capabilities
5
6
### SQL Query Partitioning
7
8
Generate multiple SQL queries by partitioning on a specified column, enabling parallel execution.
9
10
```python { .api }
11
def partition_sql(
12
conn: str | ConnectionUrl,
13
query: str,
14
partition_on: str,
15
partition_num: int,
16
partition_range: tuple[int, int] | None = None,
17
) -> list[str]:
18
"""
19
Partition a SQL query based on a column for parallel execution.
20
21
Parameters:
22
- conn: Database connection string or ConnectionUrl
23
- query: Original SQL query to partition
24
- partition_on: Column name to use for partitioning (must be numerical)
25
- partition_num: Number of partitions to generate
26
- partition_range: Optional explicit min/max values for partitioning
27
28
Returns:
29
List of partitioned SQL query strings
30
"""
31
```
32
33
**Usage Examples:**
34
35
```python
36
import connectorx as cx
37
38
postgres_url = "postgresql://username:password@server:port/database"
39
base_query = "SELECT * FROM lineitem"
40
41
# Automatic range detection
42
partitioned_queries = cx.partition_sql(
43
postgres_url,
44
base_query,
45
partition_on="l_orderkey",
46
partition_num=4
47
)
48
49
# Result: List of 4 queries with WHERE clauses for different ranges
50
# ["SELECT * FROM (SELECT * FROM lineitem) WHERE l_orderkey >= 1 AND l_orderkey < 250",
51
# "SELECT * FROM (SELECT * FROM lineitem) WHERE l_orderkey >= 250 AND l_orderkey < 500",
52
# ...]
53
54
# Explicit range specification
55
partitioned_queries = cx.partition_sql(
56
postgres_url,
57
base_query,
58
partition_on="l_orderkey",
59
partition_num=3,
60
partition_range=(100, 1000)
61
)
62
```
63
64
## Partitioning Strategy
65
66
### Automatic Range Detection
67
68
When `partition_range` is not specified, ConnectorX:
69
70
1. Executes: `SELECT MIN(partition_column), MAX(partition_column) FROM (original_query)`
71
2. Calculates equal-sized ranges based on min/max values
72
3. Generates queries with appropriate WHERE clauses
73
74
### Manual Range Specification
75
76
When `partition_range` is provided:
77
78
1. Uses the specified min/max values directly
79
2. Divides the range into `partition_num` equal parts
80
3. Generates partitioned queries within the specified bounds
81
82
### Query Transformation
83
84
Original query is wrapped and filtered:
85
86
```sql
87
-- Original
88
SELECT * FROM lineitem WHERE l_shipdate > '1995-01-01'
89
90
-- Becomes (for partition 1 of 4)
91
SELECT * FROM (SELECT * FROM lineitem WHERE l_shipdate > '1995-01-01')
92
WHERE l_orderkey >= 1 AND l_orderkey < 1500
93
```
94
95
## Partitioning Requirements
96
97
### Column Requirements
98
99
The partition column must:
100
- Be **numerical** (integer, float, decimal)
101
- **Not contain NULL values**
102
- Have reasonable distribution for effective partitioning
103
- Be included in the original query's accessible columns
104
105
### Query Requirements
106
107
- Must be **SPJA queries** (Select, Project, Join, Aggregate)
108
- Complex subqueries and CTEs may not partition correctly
109
- Window functions and advanced features may be incompatible
110
111
**Supported Query Types:**
112
113
```python
114
# ✓ Simple SELECT
115
"SELECT * FROM table"
116
117
# ✓ Filtered SELECT
118
"SELECT col1, col2 FROM table WHERE condition"
119
120
# ✓ Joins
121
"SELECT t1.*, t2.name FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id"
122
123
# ✓ Aggregation (with proper grouping)
124
"SELECT category, COUNT(*) FROM table GROUP BY category"
125
126
# ✗ Complex window functions
127
"SELECT *, ROW_NUMBER() OVER (PARTITION BY cat ORDER BY date) FROM table"
128
```
129
130
## Integration with read_sql
131
132
Partitioned queries can be used directly with `read_sql`:
133
134
```python
135
# Manual partitioning workflow
136
partitioned_queries = cx.partition_sql(
137
postgres_url,
138
"SELECT * FROM lineitem",
139
partition_on="l_orderkey",
140
partition_num=4
141
)
142
143
# Execute partitioned queries in parallel
144
df = cx.read_sql(postgres_url, partitioned_queries)
145
146
# Or use automatic partitioning in read_sql
147
df = cx.read_sql(
148
postgres_url,
149
"SELECT * FROM lineitem",
150
partition_on="l_orderkey",
151
partition_num=4
152
)
153
```
154
155
## Performance Considerations
156
157
### Optimal Partition Numbers
158
159
- **CPU cores**: Generally 1-2x the number of available CPU cores
160
- **Database connections**: Consider database connection limits
161
- **Data distribution**: Ensure partitions contain similar amounts of data
162
163
### Partition Column Selection
164
165
Choose columns that:
166
- Have good distribution across the range
167
- Are indexed for efficient filtering
168
- Result in roughly equal partition sizes
169
- Are not heavily skewed
170
171
**Good partition columns:**
172
- Primary keys (auto-incrementing IDs)
173
- Timestamp/date columns with uniform distribution
174
- Well-distributed numerical columns
175
176
**Poor partition columns:**
177
- Columns with many duplicate values
178
- Heavily skewed distributions
179
- Columns with large gaps in values