A SQL query builder API for Python that provides programmatic interface for constructing SQL queries.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Database-specific query builders providing specialized features and syntax for different SQL databases. Each dialect extends the base Query functionality with database-specific optimizations, functions, and SQL syntax variations.
MySQL-specific query builder with support for MySQL syntax and features.
class MySQLQuery(Query):
"""MySQL-specific query builder."""
def on_duplicate_key_update(self, **kwargs) -> MySQLQuery:
"""Add ON DUPLICATE KEY UPDATE clause for INSERT statements."""
def on_duplicate_key_ignore(self) -> MySQLQuery:
"""Add ON DUPLICATE KEY IGNORE clause for INSERT statements."""Usage Examples:
from pypika import MySQLQuery, Table
users = Table('users')
# ON DUPLICATE KEY UPDATE
query = (MySQLQuery.into(users)
.columns(users.id, users.name, users.email)
.insert(1, 'John Doe', 'john@example.com')
.on_duplicate_key_update(
name='John Doe Updated',
email='john.updated@example.com'
))
# ON DUPLICATE KEY IGNORE
query = (MySQLQuery.into(users)
.columns(users.name, users.email)
.insert('Jane Doe', 'jane@example.com')
.on_duplicate_key_ignore())
# MySQL-specific functions
from pypika.functions import Concat
query = MySQLQuery.from_(users).select(
Concat(users.first_name, ' ', users.last_name).as_('full_name')
)PostgreSQL-specific query builder with advanced PostgreSQL features.
class PostgreSQLQuery(Query):
"""PostgreSQL-specific query builder."""
def on_conflict(self, *target_fields) -> OnConflictQueryBuilder:
"""Add ON CONFLICT clause for INSERT statements."""
def returning(self, *terms) -> PostgreSQLQuery:
"""Add RETURNING clause."""
def distinct_on(self, *terms) -> PostgreSQLQuery:
"""Add DISTINCT ON clause."""
class OnConflictQueryBuilder:
def do_nothing(self) -> PostgreSQLQuery:
"""Add DO NOTHING to ON CONFLICT."""
def do_update(self, **kwargs) -> PostgreSQLQuery:
"""Add DO UPDATE to ON CONFLICT."""Usage Examples:
from pypika import PostgreSQLQuery, Table
users = Table('users')
orders = Table('orders')
# ON CONFLICT DO NOTHING
query = (PostgreSQLQuery.into(users)
.columns(users.email, users.name)
.insert('john@example.com', 'John Doe')
.on_conflict(users.email)
.do_nothing())
# ON CONFLICT DO UPDATE
query = (PostgreSQLQuery.into(users)
.columns(users.email, users.name, users.last_login)
.insert('john@example.com', 'John Doe', 'NOW()')
.on_conflict(users.email)
.do_update(
name='John Doe Updated',
last_login='NOW()'
))
# RETURNING clause
query = (PostgreSQLQuery.into(users)
.columns(users.name, users.email)
.insert('John Doe', 'john@example.com')
.returning(users.id, users.created_at))
# UPDATE with RETURNING
query = (PostgreSQLQuery.update(users)
.set(users.last_login, 'NOW()')
.where(users.id == 1)
.returning(users.id, users.last_login))
# DISTINCT ON
query = (PostgreSQLQuery.from_(orders)
.distinct_on(orders.customer_id)
.select(orders.customer_id, orders.order_date, orders.total)
.orderby(orders.customer_id, orders.order_date.desc()))
# JSON operations (PostgreSQL specific)
from pypika import JSON
profile = users.profile # JSON field
query = (PostgreSQLQuery.from_(users)
.select(
users.id,
profile.get_text_value('name').as_('profile_name'),
profile.get_path_json_value(['address', 'city']).as_('city')
)
.where(profile.has_key('email')))Oracle Database-specific query builder with Oracle SQL features.
class OracleQuery(Query):
"""Oracle Database-specific query builder."""Usage Examples:
from pypika import OracleQuery, Table
from pypika.pseudocolumns import RowNum, SysDate
users = Table('users')
# Oracle-specific pseudocolumns
query = (OracleQuery.from_(users)
.select(users.name, users.email, RowNum.as_('row_number'))
.where(RowNum <= 10))
# Using SYSDATE
query = (OracleQuery.from_(users)
.select('*')
.where(users.created_at > SysDate - 30))
# Oracle date formatting
from pypika.functions import ToChar
query = (OracleQuery.from_(users)
.select(
users.name,
ToChar(users.created_at, 'YYYY-MM-DD').as_('created_date')
))MSSQL-specific query builder with SQL Server features.
class MSSQLQuery(Query):
"""Microsoft SQL Server-specific query builder."""
def top(self, limit) -> MSSQLQuery:
"""Add TOP clause."""Usage Examples:
from pypika import MSSQLQuery, Table
users = Table('users')
orders = Table('orders')
# TOP clause (SQL Server style)
query = (MSSQLQuery.from_(users)
.top(10)
.select('*')
.orderby(users.created_at.desc()))
# WITH clause and CTE
user_stats = (MSSQLQuery.from_(orders)
.select(orders.user_id, Count('*').as_('order_count'))
.groupby(orders.user_id))
query = (MSSQLQuery.with_(user_stats, 'user_stats')
.from_(users)
.join('user_stats').on(users.id == 'user_stats.user_id')
.select(users.name, 'user_stats.order_count'))Yandex ClickHouse-specific query builder with ClickHouse optimizations.
class ClickHouseQuery(Query):
"""ClickHouse-specific query builder."""Usage Examples:
from pypika import ClickHouseQuery, Table
events = Table('events')
# ClickHouse-specific functions
from pypika.clickhouse.dates_and_times import ToYYYYMM, AddDays
from pypika.clickhouse.array import HasAny, Length
from pypika.clickhouse.type_conversion import ToString, ToInt64
# Date/time functions
query = (ClickHouseQuery.from_(events)
.select(
ToYYYYMM(events.created_at).as_('year_month'),
AddDays(events.created_at, 30).as_('plus_30_days')
))
# Array functions
query = (ClickHouseQuery.from_(events)
.select(
events.user_id,
Length(events.tags).as_('tag_count')
)
.where(HasAny(events.tags, ['important', 'urgent'])))
# Type conversion functions
query = (ClickHouseQuery.from_(events)
.select(
ToString(events.user_id).as_('user_id_str'),
ToInt64(events.score).as_('score_int')
))
# Conditional functions
from pypika.clickhouse.condition import If, MultiIf
query = (ClickHouseQuery.from_(events)
.select(
events.user_id,
If(events.score > 100, 'high', 'low').as_('score_category'),
MultiIf(
events.score > 200, 'very_high',
events.score > 100, 'high',
events.score > 50, 'medium',
'low'
).as_('detailed_category')
))SQLite-specific query builder with SQLite features and limitations.
class SQLLiteQuery(Query):
"""SQLite-specific query builder."""Usage Examples:
from pypika import SQLLiteQuery, Table
users = Table('users')
# SQLite-specific date handling
from pypika.functions import Date, DateTime
query = (SQLLiteQuery.from_(users)
.select(
users.name,
Date(users.created_at).as_('created_date'),
DateTime(users.created_at, '+1 month').as_('next_month')
))
# SQLite LIMIT/OFFSET
query = (SQLLiteQuery.from_(users)
.select('*')
.orderby(users.id)
.limit(20)
.offset(100))
# SQLite aggregate functions
from pypika.functions import Count, Sum, Avg
query = (SQLLiteQuery.from_(users)
.select(
Count('*').as_('total_users'),
Avg(users.age).as_('avg_age')
))Amazon Redshift-specific query builder with Redshift optimizations.
class RedshiftQuery(Query):
"""Amazon Redshift-specific query builder."""Usage Examples:
from pypika import RedshiftQuery, Table
events = Table('events')
users = Table('users')
# Redshift-specific window functions
from pypika.analytics import RowNumber, Rank
query = (RedshiftQuery.from_(events)
.select(
events.user_id,
events.event_time,
RowNumber().over(events.user_id).orderby(events.event_time).as_('event_sequence'),
Rank().over().orderby(events.score.desc()).as_('global_rank')
))
# Redshift date functions
from pypika.functions import DateDiff, DateAdd
query = (RedshiftQuery.from_(users)
.select(
users.id,
DateDiff('day', users.created_at, 'CURRENT_DATE').as_('account_age_days'),
DateAdd('month', 1, users.created_at).as_('one_month_later')
))Vertica-specific query builder with Vertica analytical features.
class VerticaQuery(Query):
"""Vertica-specific query builder."""Usage Examples:
from pypika import VerticaQuery, Table
sales = Table('sales')
products = Table('products')
# Vertica analytical functions
from pypika.analytics import Sum, Avg, FirstValue, LastValue
query = (VerticaQuery.from_(sales)
.select(
sales.product_id,
sales.sale_date,
sales.amount,
Sum(sales.amount).over(sales.product_id).orderby(sales.sale_date).as_('running_total'),
Avg(sales.amount).over().orderby(sales.sale_date).rows_between(-6, 0).as_('7_day_avg'),
FirstValue(sales.amount).over(sales.product_id).orderby(sales.sale_date).as_('first_sale')
))
# Vertica time series functions
from pypika.functions import TimestampAdd
query = (VerticaQuery.from_(sales)
.select(
sales.product_id,
TimestampAdd('hour', 1, sales.sale_timestamp).as_('plus_one_hour')
))Snowflake-specific query builder with cloud data warehouse optimizations.
class SnowflakeQuery(Query):
"""Snowflake-specific query builder."""Usage Examples:
from pypika import SnowflakeQuery, Table
users = Table('users')
orders = Table('orders')
# Basic Snowflake query
query = (SnowflakeQuery.from_(users)
.select(users.id, users.name, users.email)
.where(users.active == True))
# Snowflake-specific date functions
from pypika.functions import DateDiff, DateAdd
query = (SnowflakeQuery.from_(users)
.select(
users.id,
DateDiff('day', users.created_at, 'CURRENT_DATE').as_('account_age_days'),
DateAdd('month', 1, users.created_at).as_('one_month_later')
))
# Window functions with Snowflake syntax
from pypika.analytics import RowNumber, Avg
query = (SnowflakeQuery.from_(orders)
.select(
orders.customer_id,
orders.order_date,
orders.amount,
RowNumber().over(orders.customer_id).orderby(orders.order_date).as_('order_sequence'),
Avg(orders.amount).over().orderby(orders.order_date).rows_between(-2, 0).as_('3_order_avg')
))Choosing the appropriate dialect based on your database system.
Usage Examples:
from pypika import (
MySQLQuery, PostgreSQLQuery, OracleQuery, MSSQLQuery,
ClickHouseQuery, RedshiftQuery, SQLLiteQuery, VerticaQuery,
Dialects
)
# Dialect-specific table creation
def create_user_table(dialect='postgresql'):
users = Table('users')
if dialect == 'postgresql':
return (PostgreSQLQuery.create_table(users)
.columns(
Column('id', 'SERIAL', primary_key=True),
Column('email', 'VARCHAR(255)', nullable=False, unique=True),
Column('profile', 'JSONB'),
Column('created_at', 'TIMESTAMP', default='NOW()')
))
elif dialect == 'mysql':
return (MySQLQuery.create_table(users)
.columns(
Column('id', 'INT AUTO_INCREMENT', primary_key=True),
Column('email', 'VARCHAR(255)', nullable=False, unique=True),
Column('profile', 'JSON'),
Column('created_at', 'TIMESTAMP', default='CURRENT_TIMESTAMP')
))
elif dialect == 'sqlite':
return (SQLLiteQuery.create_table(users)
.columns(
Column('id', 'INTEGER', primary_key=True, autoincrement=True),
Column('email', 'TEXT', nullable=False, unique=True),
Column('profile', 'TEXT'),
Column('created_at', 'DATETIME', default='CURRENT_TIMESTAMP')
))
# Dynamic dialect selection
def get_query_class(database_type):
dialect_map = {
'postgresql': PostgreSQLQuery,
'mysql': MySQLQuery,
'oracle': OracleQuery,
'mssql': MSSQLQuery,
'clickhouse': ClickHouseQuery,
'redshift': RedshiftQuery,
'sqlite': SQLLiteQuery,
'vertica': VerticaQuery
}
return dialect_map.get(database_type, Query)
# Usage with dynamic selection
database_type = 'postgresql' # From configuration
QueryClass = get_query_class(database_type)
users = Table('users')
query = QueryClass.from_(users).select('*').where(users.active == True)class Dialects:
VERTICA: str
CLICKHOUSE: str
ORACLE: str
MSSQL: str
MYSQL: str
POSTGRESQL: str
REDSHIFT: str
SQLLITE: str
SNOWFLAKE: strUsage Examples:
from pypika import Dialects, Interval
# Using dialect enumeration
current_dialect = Dialects.POSTGRESQL
# Dialect-specific interval formatting
if current_dialect == Dialects.POSTGRESQL:
interval = Interval(days=30, dialect=Dialects.POSTGRESQL)
elif current_dialect == Dialects.MYSQL:
interval = Interval(days=30, dialect=Dialects.MYSQL)
# Dialect-aware function selection
def get_date_function(dialect):
if dialect == Dialects.POSTGRESQL:
return 'NOW()'
elif dialect == Dialects.MYSQL:
return 'NOW()'
elif dialect == Dialects.ORACLE:
return 'SYSDATE'
elif dialect == Dialects.MSSQL:
return 'GETDATE()'
else:
return 'CURRENT_TIMESTAMP'Install with Tessl CLI
npx tessl i tessl/pypi-pypika