0
# Data Loading
1
2
Primary functionality for executing SQL queries and loading data into various dataframe formats. ConnectorX's data loading provides high-performance, zero-copy data transfer from databases to Python dataframes with support for multiple output formats and parallel processing.
3
4
## Capabilities
5
6
### Primary Data Loading Function
7
8
The main function for loading data from databases into dataframes with extensive customization options.
9
10
```python { .api }
11
def read_sql(
12
conn: str | ConnectionUrl | dict[str, str] | dict[str, ConnectionUrl],
13
query: list[str] | str,
14
*,
15
return_type: Literal["pandas", "polars", "arrow", "modin", "dask", "arrow_stream"] = "pandas",
16
protocol: Protocol | None = None,
17
partition_on: str | None = None,
18
partition_range: tuple[int, int] | None = None,
19
partition_num: int | None = None,
20
index_col: str | None = None,
21
strategy: str | None = None,
22
pre_execution_query: list[str] | str | None = None,
23
batch_size: int = 10000,
24
**kwargs
25
) -> pd.DataFrame | mpd.DataFrame | dd.DataFrame | pl.DataFrame | pa.Table | pa.RecordBatchReader:
26
"""
27
Run SQL query and download data from database into a dataframe.
28
29
Parameters:
30
- conn: Connection string, ConnectionUrl, or dict for federated queries
31
- query: SQL query string or list of SQL queries
32
- return_type: Output format ("pandas", "polars", "arrow", "modin", "dask", "arrow_stream")
33
- protocol: Backend-specific transfer protocol ("csv", "binary", "cursor", "simple", "text")
34
- partition_on: Column name for partitioning the result for parallel processing
35
- partition_range: Value range tuple for the partition column
36
- partition_num: Number of partitions to generate for parallel processing
37
- index_col: Index column to set (pandas/modin/dask only)
38
- strategy: Strategy for federated query rewriting
39
- pre_execution_query: SQL queries to execute before main query
40
- batch_size: Maximum batch size for arrow_stream return type
41
42
Returns:
43
DataFrame in the specified format
44
"""
45
```
46
47
**Usage Examples:**
48
49
```python
50
import connectorx as cx
51
52
# Basic usage - single threaded
53
postgres_url = "postgresql://username:password@server:port/database"
54
query = "SELECT * FROM lineitem"
55
df = cx.read_sql(postgres_url, query)
56
57
# Parallel loading with automatic partitioning
58
df = cx.read_sql(
59
postgres_url,
60
query,
61
partition_on="l_orderkey",
62
partition_num=10
63
)
64
65
# Explicit partition range
66
df = cx.read_sql(
67
postgres_url,
68
query,
69
partition_on="l_orderkey",
70
partition_range=(1, 1000),
71
partition_num=4
72
)
73
74
# Multiple return types
75
arrow_table = cx.read_sql(postgres_url, query, return_type="arrow")
76
polars_df = cx.read_sql(postgres_url, query, return_type="polars")
77
modin_df = cx.read_sql(postgres_url, query, return_type="modin")
78
79
# Pre-execution queries for configuration
80
df = cx.read_sql(
81
postgres_url,
82
query,
83
pre_execution_query="SET work_mem = '1GB'"
84
)
85
86
# Arrow streaming for large datasets
87
record_reader = cx.read_sql(
88
postgres_url,
89
query,
90
return_type="arrow_stream",
91
batch_size=50000
92
)
93
```
94
95
### Pandas-Compatible Function
96
97
Convenience function with pandas.read_sql compatible interface.
98
99
```python { .api }
100
def read_sql_pandas(
101
sql: list[str] | str,
102
con: str | ConnectionUrl | dict[str, str] | dict[str, ConnectionUrl],
103
index_col: str | None = None,
104
protocol: Protocol | None = None,
105
partition_on: str | None = None,
106
partition_range: tuple[int, int] | None = None,
107
partition_num: int | None = None,
108
pre_execution_queries: list[str] | str | None = None,
109
) -> pd.DataFrame:
110
"""
111
Run SQL query with pandas.read_sql compatible interface.
112
113
Parameters are in the same order as pandas.read_sql for easy migration.
114
115
Returns:
116
pandas DataFrame
117
"""
118
```
119
120
**Usage Example:**
121
122
```python
123
# Drop-in replacement for pandas.read_sql
124
# from pandas import read_sql
125
from connectorx import read_sql_pandas as read_sql
126
127
postgres_url = "postgresql://username:password@server:port/database"
128
query = "SELECT * FROM lineitem"
129
df = read_sql(query, postgres_url)
130
```
131
132
## Output Format Support
133
134
### DataFrame Libraries
135
136
- **pandas**: Default format, full compatibility with pandas ecosystem
137
- **polars**: High-performance DataFrame library with lazy evaluation
138
- **arrow**: PyArrow tables for interoperability and columnar processing
139
- **modin**: Distributed pandas for large datasets
140
- **dask**: Distributed computing with familiar pandas interface
141
- **arrow_stream**: PyArrow RecordBatchReader for streaming large datasets
142
143
### Protocol Options
144
145
Backend-specific protocols for optimal performance:
146
147
- **binary**: Default high-performance protocol (most databases)
148
- **cursor**: Server-side cursors (Redshift default)
149
- **text**: Text-based transfer (ClickHouse default)
150
- **csv**: CSV format transfer
151
- **simple**: Simple query protocol
152
153
## Parallel Processing
154
155
### Automatic Partitioning
156
157
When `partition_on` is specified, ConnectorX automatically:
158
159
1. Determines the min/max values of the partition column
160
2. Splits the range into `partition_num` equal partitions
161
3. Executes queries in parallel across multiple threads
162
4. Combines results into a single dataframe
163
164
### Manual Query Lists
165
166
For custom partitioning logic, provide a list of queries:
167
168
```python
169
queries = [
170
"SELECT * FROM lineitem WHERE l_orderkey <= 1000",
171
"SELECT * FROM lineitem WHERE l_orderkey > 1000 AND l_orderkey <= 2000",
172
"SELECT * FROM lineitem WHERE l_orderkey > 2000"
173
]
174
df = cx.read_sql(postgres_url, queries)
175
```
176
177
## Error Handling
178
179
ConnectorX handles common database connection and query errors:
180
181
- **Connection errors**: Database unreachable, authentication failures
182
- **Query errors**: Invalid SQL syntax, missing tables/columns
183
- **Partition errors**: Invalid partition column, insufficient partitions
184
- **Memory errors**: Automatic optimization for large result sets
185
186
For federated queries, additional validation ensures query compatibility across databases.