CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-mysql

Apache Airflow provider package enabling MySQL database integration with hooks, operators, and transfer functionality for data workflows.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

asset-uri-handling.mddocs/

Asset URI Handling

URI sanitization and validation functionality for MySQL and MariaDB assets in Airflow's dataset and asset tracking system. This module ensures proper URI format and provides default configuration for MySQL/MariaDB dataset URIs.

Capabilities

URI Sanitization

Sanitize and validate MySQL/MariaDB URIs for use in Airflow's asset and dataset tracking systems.

def sanitize_uri(uri: SplitResult) -> SplitResult:
    """
    Sanitize MySQL/MariaDB URI for asset handling.
    
    Validates URI format and applies default port configuration.
    Ensures URI contains required components for MySQL asset tracking.
    
    Parameters:
    - uri: SplitResult object representing the URI to sanitize
    
    Returns:
    SplitResult object with sanitized URI components
    
    Raises:
    ValueError: If URI format is invalid (missing host, database, or table)
    
    URI Format Requirements:
    - Must contain a host (netloc)
    - Must contain database and table names in path
    - Port defaults to 3306 if not specified
    - Scheme is normalized to "mysql"
    """

Usage Examples

Basic URI Sanitization

from urllib.parse import urlsplit
from airflow.providers.mysql.assets.mysql import sanitize_uri

# Sanitize a MySQL URI
raw_uri = "mysql://user:pass@localhost/mydb/users"
uri_parts = urlsplit(raw_uri)
sanitized_uri = sanitize_uri(uri_parts)

print(sanitized_uri.geturl())  # mysql://user:pass@localhost:3306/mydb/users

URI Validation and Error Handling

from urllib.parse import urlsplit
from airflow.providers.mysql.assets.mysql import sanitize_uri

# Handle invalid URIs
try:
    # Missing host
    invalid_uri = urlsplit("mysql:///database/table")
    sanitize_uri(invalid_uri)
except ValueError as e:
    print(f"Invalid URI: {e}")  # "URI format mysql:// must contain a host"

try:
    # Missing table name  
    invalid_uri = urlsplit("mysql://host/database")
    sanitize_uri(invalid_uri)
except ValueError as e:
    print(f"Invalid URI: {e}")  # "URI format mysql:// must contain database and table names"

Asset Registration in DAGs

from airflow import DAG, Dataset
from airflow.providers.mysql.transfers.s3_to_mysql import S3ToMySqlOperator
from datetime import datetime

# Define MySQL dataset URIs (automatically sanitized by provider)
user_dataset = Dataset("mysql://localhost/mydb/users")
orders_dataset = Dataset("mysql://localhost/mydb/orders")

dag = DAG(
    'mysql_data_pipeline',
    start_date=datetime(2024, 1, 1),
    schedule=[user_dataset]  # Schedule based on dataset updates
)

# Operator that produces dataset
load_users = S3ToMySqlOperator(
    task_id='load_users',
    s3_source_key='data/users.csv',
    mysql_table='mydb.users',
    outlets=[user_dataset],  # Mark as producing this dataset
    dag=dag
)

# Operator that consumes dataset
process_orders = S3ToMySqlOperator(
    task_id='process_orders',
    s3_source_key='data/orders.csv', 
    mysql_table='mydb.orders',
    outlets=[orders_dataset],
    dag=dag
)

load_users >> process_orders

MariaDB URI Handling

from urllib.parse import urlsplit
from airflow.providers.mysql.assets.mysql import sanitize_uri

# MariaDB URIs are handled identically to MySQL
mariadb_uri = urlsplit("mariadb://user:pass@mariadb-server/analytics/metrics")
sanitized_uri = sanitize_uri(mariadb_uri)

# Scheme is normalized to "mysql" for consistency
print(sanitized_uri.scheme)  # "mysql"
print(sanitized_uri.netloc)  # "user:pass@mariadb-server:3306"

Custom Port Configuration

from urllib.parse import urlsplit
from airflow.providers.mysql.assets.mysql import sanitize_uri

# URI with custom port (preserved)
custom_port_uri = urlsplit("mysql://localhost:3307/mydb/table")
sanitized_uri = sanitize_uri(custom_port_uri)
print(sanitized_uri.netloc)  # "localhost:3307"

# URI without port (default 3306 added)
no_port_uri = urlsplit("mysql://localhost/mydb/table") 
sanitized_uri = sanitize_uri(no_port_uri)
print(sanitized_uri.netloc)  # "localhost:3306"

Asset URI Format Specification

Valid URI Components

MySQL/MariaDB asset URIs must follow this format:

mysql://[user[:password]@]host[:port]/database/table

Components:

  • Scheme: mysql or mariadb (normalized to mysql)
  • User: Optional database username
  • Password: Optional database password
  • Host: Required MySQL/MariaDB server hostname or IP
  • Port: Optional port number (defaults to 3306)
  • Database: Required database name
  • Table: Required table name

URI Validation Rules

# URI validation requirements
URIValidationRules = {
    "host_required": True,           # URI must contain netloc (host)
    "default_port": 3306,           # Default port if not specified
    "path_components": 3,           # Must have exactly 3 path components
    "path_format": "/database/table", # Required path structure
    "supported_schemes": ["mysql", "mariadb"],  # Supported input schemes
    "normalized_scheme": "mysql"     # Output scheme normalization
}

Common URI Patterns

# Valid URI examples
valid_uris = [
    "mysql://user:pass@localhost:3306/mydb/users",
    "mysql://localhost/analytics/daily_metrics", 
    "mariadb://user@mariadb-server:3307/warehouse/facts",
    "mysql://10.0.1.100/inventory/products"
]

# Invalid URI examples (will raise ValueError)
invalid_uris = [
    "mysql:///database/table",        # Missing host
    "mysql://localhost/database",     # Missing table name
    "mysql://localhost/db/table/col", # Too many path components
    "mysql://localhost",              # Missing database and table
]

Integration with Airflow Assets

Asset Definition

from airflow import Dataset

# Assets are automatically sanitized when defined
mysql_asset = Dataset("mysql://localhost/mydb/users")

# Equivalent MariaDB asset (normalized to mysql://)
mariadb_asset = Dataset("mariadb://localhost/mydb/users")

Asset-Aware DAG Scheduling

from airflow import DAG, Dataset
from datetime import datetime

# Define datasets
source_data = Dataset("mysql://localhost/raw/events")
processed_data = Dataset("mysql://localhost/analytics/event_summary")

# DAG scheduled on dataset updates
processing_dag = DAG(
    'data_processing',
    start_date=datetime(2024, 1, 1),
    schedule=[source_data],  # Triggered when source_data is updated
    catchup=False
)

Type Definitions

from urllib.parse import SplitResult
from typing import Union

# URI parsing result type
URISplit = SplitResult

# Supported URI schemes for MySQL assets
MySQLSchemes = Union["mysql", "mariadb"]

# URI validation error types
URIValidationError = ValueError

# Asset URI components
AssetURIComponents = {
    "scheme": str,      # URI scheme (mysql/mariadb)
    "netloc": str,      # Network location (user:pass@host:port)
    "path": str,        # Path (/database/table)
    "query": str,       # Query parameters (optional)
    "fragment": str     # Fragment identifier (optional)
}

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-mysql

docs

asset-uri-handling.md

data-transfer-operations.md

database-operations.md

index.md

tile.json