Load data from databases to dataframes, the fastest way.
86
Quality
Pending
Does it follow best practices?
Impact
86%
1.04xAverage score across 10 eval scenarios
Execute queries across multiple databases in a single statement, with automatic join optimization and query rewriting. Federated queries enable ConnectorX to join tables from different database sources, providing a unified query interface across heterogeneous data sources.
Execute SQL queries that span multiple databases using a single query statement.
def read_sql(
conn: dict[str, str] | dict[str, ConnectionUrl],
query: str,
*,
return_type: Literal["pandas", "polars", "arrow"] = "pandas",
strategy: str | None = None,
**kwargs
) -> pd.DataFrame | pl.DataFrame | pa.Table:
"""
Execute federated query across multiple databases.
Parameters:
- conn: Dictionary mapping database aliases to connection strings
- query: SQL query referencing tables with database aliases (db_alias.table_name)
- return_type: Output format ("pandas", "polars", "arrow")
- strategy: Query rewriting strategy for join pushdown optimization
Returns:
DataFrame in specified format with joined data from multiple sources
Note: Federated queries do not support partitioning or protocol specification
"""import connectorx as cx
# Define multiple database connections
connections = {
"db1": "postgresql://user1:pass1@server1:5432/database1",
"db2": "postgresql://user2:pass2@server2:5432/database2"
}
# Query across databases using aliases
federated_query = """
SELECT
n.n_name as nation_name,
r.r_name as region_name,
n.n_comment
FROM db1.nation n
JOIN db2.region r ON n.n_regionkey = r.r_regionkey
"""
df = cx.read_sql(connections, federated_query)# Connect to different database types
connections = {
"sales_db": "postgresql://user:pass@sales-server:5432/sales",
"customer_db": "mysql://user:pass@customer-server:3306/customers",
"inventory_db": "mssql://user:pass@inventory-server:1433/inventory"
}
# Complex federated analytics query
analytics_query = """
SELECT
c.customer_name,
c.customer_segment,
s.order_date,
s.total_amount,
i.product_name,
i.category
FROM customer_db.customers c
JOIN sales_db.orders s ON c.customer_id = s.customer_id
JOIN inventory_db.products i ON s.product_id = i.product_id
WHERE s.order_date >= '2023-01-01'
AND c.customer_segment = 'Enterprise'
"""
result_df = cx.read_sql(connections, analytics_query)from connectorx import ConnectionUrl
# Build connections with ConnectionUrl for type safety
db1_conn = ConnectionUrl(
backend="postgresql",
username="analytics_user",
password="secure_pass",
server="analytics.company.com",
port=5432,
database="warehouse"
)
db2_conn = ConnectionUrl(
backend="mysql",
username="reporting_user",
password="report_pass",
server="mysql.company.com",
port=3306,
database="reporting"
)
connections = {
"warehouse": db1_conn,
"reports": db2_conn
}
query = """
SELECT
w.fact_id,
w.metric_value,
r.report_name,
r.created_date
FROM warehouse.fact_table w
JOIN reports.report_metadata r ON w.report_id = r.report_id
"""
df = cx.read_sql(connections, query)Reference tables using the format: database_alias.table_name
-- Correct federated query syntax
SELECT t1.col1, t2.col2
FROM db1.table1 t1
JOIN db2.table2 t2 ON t1.id = t2.foreign_id
-- Standard table aliases still work
SELECT n.nation_name, r.region_name
FROM db1.nation n
JOIN db2.region r ON n.regionkey = r.regionkeySupported Operations:
Current Limitations:
ConnectorX automatically optimizes federated queries by pushing joins down to individual databases when possible.
# Specify join pushdown strategy
df = cx.read_sql(
connections,
federated_query,
strategy="push_down_joins" # or other optimization strategies
)By default, ConnectorX:
Best Practices:
Example Optimized Query:
-- Good: Filters applied before join
SELECT c.name, o.total
FROM customer_db.customers c
JOIN sales_db.orders o ON c.id = o.customer_id
WHERE c.active = true -- Filter applied at source
AND o.order_date >= '2023-01-01' -- Filter applied at source
-- Less optimal: Post-join filtering
SELECT c.name, o.total
FROM customer_db.customers c
JOIN sales_db.orders o ON c.id = o.customer_id
WHERE c.active = true AND o.order_date >= '2023-01-01'Federated queries support multiple output formats:
# pandas DataFrame (default)
df = cx.read_sql(connections, query)
# PyArrow Table
arrow_table = cx.read_sql(connections, query, return_type="arrow")
# Polars DataFrame
polars_df = cx.read_sql(connections, query, return_type="polars")Common federated query errors:
try:
df = cx.read_sql(connections, federated_query)
except Exception as e:
if "connection" in str(e).lower():
print("Check database connectivity")
elif "permission" in str(e).lower():
print("Verify database permissions")
elif "column" in str(e).lower():
print("Check column names and types")
else:
print(f"Federated query failed: {e}")For optimal federated query performance:
ConnectorX uses a federated query rewriter JAR file:
CX_REWRITER_PATH environment variablepartition_onFor unsupported features:
# Workaround: Manual federation for complex cases
db1_data = cx.read_sql(conn1, "SELECT * FROM table1 WHERE condition")
db2_data = cx.read_sql(conn2, "SELECT * FROM table2 WHERE condition")
# Join in pandas
import pandas as pd
result = pd.merge(db1_data, db2_data, on='common_column')Install with Tessl CLI
npx tessl i tessl/pypi-connectorxdocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10