Apache Airflow provider package enabling MySQL database integration with hooks, operators, and transfer functionality for data workflows.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
URI sanitization and validation functionality for MySQL and MariaDB assets in Airflow's dataset and asset tracking system. This module ensures proper URI format and provides default configuration for MySQL/MariaDB dataset URIs.
Sanitize and validate MySQL/MariaDB URIs for use in Airflow's asset and dataset tracking systems.
def sanitize_uri(uri: SplitResult) -> SplitResult:
"""
Sanitize MySQL/MariaDB URI for asset handling.
Validates URI format and applies default port configuration.
Ensures URI contains required components for MySQL asset tracking.
Parameters:
- uri: SplitResult object representing the URI to sanitize
Returns:
SplitResult object with sanitized URI components
Raises:
ValueError: If URI format is invalid (missing host, database, or table)
URI Format Requirements:
- Must contain a host (netloc)
- Must contain database and table names in path
- Port defaults to 3306 if not specified
- Scheme is normalized to "mysql"
"""from urllib.parse import urlsplit
from airflow.providers.mysql.assets.mysql import sanitize_uri
# Sanitize a MySQL URI
raw_uri = "mysql://user:pass@localhost/mydb/users"
uri_parts = urlsplit(raw_uri)
sanitized_uri = sanitize_uri(uri_parts)
print(sanitized_uri.geturl()) # mysql://user:pass@localhost:3306/mydb/usersfrom urllib.parse import urlsplit
from airflow.providers.mysql.assets.mysql import sanitize_uri
# Handle invalid URIs
try:
# Missing host
invalid_uri = urlsplit("mysql:///database/table")
sanitize_uri(invalid_uri)
except ValueError as e:
print(f"Invalid URI: {e}") # "URI format mysql:// must contain a host"
try:
# Missing table name
invalid_uri = urlsplit("mysql://host/database")
sanitize_uri(invalid_uri)
except ValueError as e:
print(f"Invalid URI: {e}") # "URI format mysql:// must contain database and table names"from airflow import DAG, Dataset
from airflow.providers.mysql.transfers.s3_to_mysql import S3ToMySqlOperator
from datetime import datetime
# Define MySQL dataset URIs (automatically sanitized by provider)
user_dataset = Dataset("mysql://localhost/mydb/users")
orders_dataset = Dataset("mysql://localhost/mydb/orders")
dag = DAG(
'mysql_data_pipeline',
start_date=datetime(2024, 1, 1),
schedule=[user_dataset] # Schedule based on dataset updates
)
# Operator that produces dataset
load_users = S3ToMySqlOperator(
task_id='load_users',
s3_source_key='data/users.csv',
mysql_table='mydb.users',
outlets=[user_dataset], # Mark as producing this dataset
dag=dag
)
# Operator that consumes dataset
process_orders = S3ToMySqlOperator(
task_id='process_orders',
s3_source_key='data/orders.csv',
mysql_table='mydb.orders',
outlets=[orders_dataset],
dag=dag
)
load_users >> process_ordersfrom urllib.parse import urlsplit
from airflow.providers.mysql.assets.mysql import sanitize_uri
# MariaDB URIs are handled identically to MySQL
mariadb_uri = urlsplit("mariadb://user:pass@mariadb-server/analytics/metrics")
sanitized_uri = sanitize_uri(mariadb_uri)
# Scheme is normalized to "mysql" for consistency
print(sanitized_uri.scheme) # "mysql"
print(sanitized_uri.netloc) # "user:pass@mariadb-server:3306"from urllib.parse import urlsplit
from airflow.providers.mysql.assets.mysql import sanitize_uri
# URI with custom port (preserved)
custom_port_uri = urlsplit("mysql://localhost:3307/mydb/table")
sanitized_uri = sanitize_uri(custom_port_uri)
print(sanitized_uri.netloc) # "localhost:3307"
# URI without port (default 3306 added)
no_port_uri = urlsplit("mysql://localhost/mydb/table")
sanitized_uri = sanitize_uri(no_port_uri)
print(sanitized_uri.netloc) # "localhost:3306"MySQL/MariaDB asset URIs must follow this format:
mysql://[user[:password]@]host[:port]/database/tableComponents:
mysql or mariadb (normalized to mysql)# URI validation requirements
URIValidationRules = {
"host_required": True, # URI must contain netloc (host)
"default_port": 3306, # Default port if not specified
"path_components": 3, # Must have exactly 3 path components
"path_format": "/database/table", # Required path structure
"supported_schemes": ["mysql", "mariadb"], # Supported input schemes
"normalized_scheme": "mysql" # Output scheme normalization
}# Valid URI examples
valid_uris = [
"mysql://user:pass@localhost:3306/mydb/users",
"mysql://localhost/analytics/daily_metrics",
"mariadb://user@mariadb-server:3307/warehouse/facts",
"mysql://10.0.1.100/inventory/products"
]
# Invalid URI examples (will raise ValueError)
invalid_uris = [
"mysql:///database/table", # Missing host
"mysql://localhost/database", # Missing table name
"mysql://localhost/db/table/col", # Too many path components
"mysql://localhost", # Missing database and table
]from airflow import Dataset
# Assets are automatically sanitized when defined
mysql_asset = Dataset("mysql://localhost/mydb/users")
# Equivalent MariaDB asset (normalized to mysql://)
mariadb_asset = Dataset("mariadb://localhost/mydb/users")from airflow import DAG, Dataset
from datetime import datetime
# Define datasets
source_data = Dataset("mysql://localhost/raw/events")
processed_data = Dataset("mysql://localhost/analytics/event_summary")
# DAG scheduled on dataset updates
processing_dag = DAG(
'data_processing',
start_date=datetime(2024, 1, 1),
schedule=[source_data], # Triggered when source_data is updated
catchup=False
)from urllib.parse import SplitResult
from typing import Union
# URI parsing result type
URISplit = SplitResult
# Supported URI schemes for MySQL assets
MySQLSchemes = Union["mysql", "mariadb"]
# URI validation error types
URIValidationError = ValueError
# Asset URI components
AssetURIComponents = {
"scheme": str, # URI scheme (mysql/mariadb)
"netloc": str, # Network location (user:pass@host:port)
"path": str, # Path (/database/table)
"query": str, # Query parameters (optional)
"fragment": str # Fragment identifier (optional)
}Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-mysql