chDB is an in-process SQL OLAP Engine powered by ClickHouse
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Python UDF (User Defined Function) support enabling custom Python functions to be used within SQL queries. UDFs are registered using decorators and executed as external processes with automatic type handling and configuration management.
Register Python functions as SQL UDFs with automatic configuration generation.
def chdb_udf(return_type: str = "String"):
"""
Decorator for registering Python functions as chDB UDFs.
Parameters:
- return_type: SQL return type ("String", "Int32", "Float64", etc.)
Must be valid ClickHouse data type
Returns:
Function decorator that registers the function as UDF
Notes:
- Functions must be stateless (no UDAFs supported)
- All input parameters treated as strings (TabSeparated format)
- Function called once per input row
- Must import all required modules within function
- Uses same Python interpreter as calling script
"""Programmatically generate UDF configurations and scripts.
def generate_udf(func_name: str, args: list, return_type: str, udf_body: str):
"""
Generate UDF configuration and Python script files.
Parameters:
- func_name: Name of the UDF function
- args: List of argument names
- return_type: SQL return type string
- udf_body: Python function implementation code
Side Effects:
- Creates {func_name}.py executable script in UDF path
- Updates/creates udf_config.xml with function registration
- Sets up automatic cleanup on process exit
"""from chdb.udf import chdb_udf
from chdb import query
# Simple arithmetic UDF
@chdb_udf()
def add_numbers(a, b):
return str(int(a) + int(b))
# Use in SQL query
result = query("SELECT add_numbers('10', '20') as sum_result")
print(result) # Returns: 30from chdb.udf import chdb_udf
from chdb import query
# UDF returning integer
@chdb_udf(return_type="Int32")
def multiply_int(x, y):
return int(x) * int(y)
# UDF returning float
@chdb_udf(return_type="Float64")
def calculate_average(a, b, c):
values = [float(a), float(b), float(c)]
return sum(values) / len(values)
# UDF returning string (default)
@chdb_udf()
def format_name(first, last):
return f"{first.title()} {last.upper()}"
# Use all UDFs in queries
result1 = query("SELECT multiply_int('5', '7') as product")
result2 = query("SELECT calculate_average('10.5', '20.3', '15.7') as avg")
result3 = query("SELECT format_name('john', 'doe') as formatted_name")
print("Product:", result1)
print("Average:", result2)
print("Formatted:", result3)from chdb.udf import chdb_udf
from chdb import query
@chdb_udf()
def validate_email(email):
import re
# Email validation pattern
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if re.match(pattern, email):
return "valid"
else:
return "invalid"
@chdb_udf()
def extract_domain(email):
import re
match = re.search(r'@([a-zA-Z0-9.-]+)', email)
if match:
return match.group(1)
else:
return "unknown"
# Use UDFs with data processing
result = query("""
SELECT
email,
validate_email(email) as is_valid,
extract_domain(email) as domain
FROM (
SELECT 'user@example.com' as email
UNION ALL
SELECT 'invalid.email'
UNION ALL
SELECT 'admin@company.org'
)
""", "Pretty")
print(result)from chdb.udf import chdb_udf
from chdb import query
@chdb_udf()
def parse_json_field(json_str, field_name):
import json
try:
data = json.loads(json_str)
return str(data.get(field_name, ""))
except:
return "error"
@chdb_udf(return_type="Int32")
def count_json_keys(json_str):
import json
try:
data = json.loads(json_str)
if isinstance(data, dict):
return len(data)
else:
return 0
except:
return -1
# Query with JSON UDFs
result = query("""
SELECT
json_data,
parse_json_field(json_data, 'name') as name,
parse_json_field(json_data, 'age') as age,
count_json_keys(json_data) as key_count
FROM (
SELECT '{"name": "Alice", "age": 30, "city": "NYC"}' as json_data
UNION ALL
SELECT '{"name": "Bob", "age": 25}'
UNION ALL
SELECT 'invalid json'
)
""", "Pretty")
print(result)from chdb.udf import chdb_udf
from chdb import query
@chdb_udf()
def normalize_phone(phone):
import re
# Remove all non-digits
digits = re.sub(r'\D', '', phone)
# Format as (XXX) XXX-XXXX if US number
if len(digits) == 10:
return f"({digits[:3]}) {digits[3:6]}-{digits[6:]}"
elif len(digits) == 11 and digits[0] == '1':
return f"({digits[1:4]}) {digits[4:7]}-{digits[7:]}"
else:
return "invalid"
@chdb_udf()
def categorize_age(age_str):
try:
age = int(age_str)
if age < 18:
return "minor"
elif age < 65:
return "adult"
else:
return "senior"
except:
return "unknown"
# Apply transformations to dataset
result = query("""
SELECT
name,
normalize_phone(phone) as formatted_phone,
categorize_age(age) as age_category
FROM (
SELECT 'Alice' as name, '555-123-4567' as phone, '32' as age
UNION ALL
SELECT 'Bob', '(555) 987 6543', '17'
UNION ALL
SELECT 'Charlie', '15559876543', '67'
)
""", "Pretty")
print(result)from chdb.udf import chdb_udf
from chdb import query
@chdb_udf(return_type="Float64")
def calculate_distance(lat1, lon1, lat2, lon2):
import math
# Convert to radians
lat1, lon1, lat2, lon2 = map(math.radians, [float(lat1), float(lon1), float(lat2), float(lon2)])
# Haversine formula
dlat = lat2 - lat1
dlon = lon2 - lon1
a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2
c = 2 * math.asin(math.sqrt(a))
# Earth radius in kilometers
r = 6371
return c * r
@chdb_udf()
def hash_string(input_str):
import hashlib
return hashlib.md5(input_str.encode()).hexdigest()
# Use UDFs with geographic and crypto functions
result = query("""
SELECT
city1,
city2,
calculate_distance(lat1, lon1, lat2, lon2) as distance_km,
hash_string(CONCAT(city1, '-', city2)) as route_hash
FROM (
SELECT
'New York' as city1, '40.7128' as lat1, '-74.0060' as lon1,
'Los Angeles' as city2, '34.0522' as lat2, '-118.2437' as lon2
UNION ALL
SELECT
'Chicago', '41.8781', '-87.6298',
'Houston', '29.7604', '-95.3698'
)
""", "Pretty")
print(result)from chdb.udf import chdb_udf
from chdb import query, ChdbError
@chdb_udf()
def safe_divide(a, b):
try:
num = float(a)
den = float(b)
if den == 0:
return "division_by_zero"
return str(num / den)
except ValueError:
return "invalid_input"
except Exception:
return "error"
try:
result = query("""
SELECT
a, b,
safe_divide(a, b) as result
FROM (
SELECT '10' as a, '2' as b
UNION ALL
SELECT '15', '0'
UNION ALL
SELECT 'abc', '5'
)
""", "Pretty")
print(result)
except ChdbError as e:
print(f"Query with UDF failed: {e}")from chdb.udf import chdb_udf
from chdb import query
import tempfile
import os
# Custom UDF path
custom_udf_path = tempfile.mkdtemp()
@chdb_udf()
def custom_function(x):
return f"processed_{x}"
# Use UDF with custom path
result = query(
"SELECT custom_function('test') as output",
udf_path=custom_udf_path
)
print(result)
# Cleanup custom UDF path when done
import shutil
shutil.rmtree(custom_udf_path)Install with Tessl CLI
npx tessl i tessl/pypi-chdb