tessl install github:jeremylongshore/claude-code-plugins-plus-skills --skill databricks-migration-deep-diveExecute comprehensive platform migrations to Databricks from legacy systems. Use when migrating from on-premises Hadoop, other cloud platforms, or legacy data warehouses to Databricks. Trigger with phrases like "migrate to databricks", "hadoop migration", "snowflake to databricks", "legacy migration", "data warehouse migration".
Review Score
77%
Validation Score
11/16
Implementation Score
65%
Activation Score
90%
Comprehensive migration strategies for moving to Databricks from legacy systems.
| Source System | Migration Pattern | Complexity | Timeline |
|---|---|---|---|
| On-prem Hadoop | Lift-and-shift + modernize | High | 6-12 months |
| Snowflake | Parallel run + cutover | Medium | 3-6 months |
| AWS Redshift | ETL rewrite + data copy | Medium | 3-6 months |
| Azure Synapse | Delta Lake conversion | Low | 1-3 months |
| Legacy DW (Oracle/Teradata) | Full rebuild | High | 12-18 months |
# scripts/migration_assessment.py
from dataclasses import dataclass
from typing import List, Dict
import pandas as pd
@dataclass
class SourceTableInfo:
"""Information about source table for migration planning."""
database: str
schema: str
table: str
row_count: int
size_gb: float
column_count: int
partition_columns: List[str]
dependencies: List[str]
access_frequency: str # high, medium, low
data_classification: str # pii, confidential, public
def assess_hadoop_cluster(spark, hive_metastore: str) -> List[SourceTableInfo]:
"""
Assess Hadoop/Hive cluster for migration planning.
Returns inventory of all tables with metadata.
"""
tables = []
# Get all databases
databases = spark.sql("SHOW DATABASES").collect()
for db_row in databases:
db = db_row.databaseName
if db in ['default', 'sys']: # Skip system DBs
continue
# Get tables in database
spark.sql(f"USE {db}")
table_list = spark.sql("SHOW TABLES").collect()
for table_row in table_list:
table_name = table_row.tableName
try:
# Get table details
desc = spark.sql(f"DESCRIBE EXTENDED {db}.{table_name}")
detail_df = desc.toPandas()
# Extract partition info
partition_cols = []
in_partition_section = False
for _, row in detail_df.iterrows():
if row['col_name'] == '# Partition Information':
in_partition_section = True
elif in_partition_section and row['col_name'] and not row['col_name'].startswith('#'):
partition_cols.append(row['col_name'])
# Get row count and size
stats = spark.sql(f"DESCRIBE EXTENDED {db}.{table_name}")
# Parse statistics from DESCRIBE output
tables.append(SourceTableInfo(
database=db,
schema=db,
table=table_name,
row_count=0, # Would query actual count
size_gb=0, # From statistics
column_count=len(detail_df),
partition_columns=partition_cols,
dependencies=[], # Would analyze queries
access_frequency='medium',
data_classification='internal',
))
except Exception as e:
print(f"Error processing {db}.{table_name}: {e}")
return tables
def generate_migration_plan(tables: List[SourceTableInfo]) -> pd.DataFrame:
"""Generate migration plan with prioritization."""
plan_data = []
for table in tables:
# Calculate complexity score
complexity = 0
complexity += 1 if table.size_gb > 100 else 0
complexity += 1 if len(table.partition_columns) > 2 else 0
complexity += 1 if len(table.dependencies) > 5 else 0
complexity += 2 if table.data_classification == 'pii' else 0
# Calculate priority
priority = 0
priority += 3 if table.access_frequency == 'high' else 1
priority += 2 if table.data_classification == 'pii' else 0
plan_data.append({
'source_table': f"{table.database}.{table.table}",
'target_table': f"migrated.{table.schema}.{table.table}",
'size_gb': table.size_gb,
'complexity_score': complexity,
'priority_score': priority,
'estimated_hours': max(1, table.size_gb / 10 + complexity * 2),
'migration_wave': 1 if priority > 3 else (2 if priority > 1 else 3),
})
return pd.DataFrame(plan_data).sort_values(['migration_wave', 'priority_score'], ascending=[True, False])# scripts/schema_migration.py
from pyspark.sql import SparkSession
from pyspark.sql.types import *
def convert_hive_to_delta_schema(spark: SparkSession, hive_table: str) -> StructType:
"""
Convert Hive table schema to Delta Lake compatible schema.
Handles type conversions and incompatibilities.
"""
hive_schema = spark.table(hive_table).schema
# Type mappings for problematic types
type_conversions = {
'decimal(38,0)': DecimalType(38, 10), # Add scale
'char': StringType(),
'varchar': StringType(),
'tinyint': IntegerType(), # Delta doesn't have tinyint
}
new_fields = []
for field in hive_schema.fields:
new_type = field.dataType
type_str = str(field.dataType).lower()
for pattern, replacement in type_conversions.items():
if pattern in type_str:
new_type = replacement
break
new_fields.append(StructField(
field.name,
new_type,
field.nullable,
field.metadata
))
return StructType(new_fields)
def migrate_table_schema(
spark: SparkSession,
source_table: str,
target_table: str,
catalog: str = "migrated",
) -> dict:
"""
Migrate table schema from Hive to Delta Lake.
Returns migration result with any schema changes.
"""
# Get source schema
source_df = spark.table(source_table)
source_schema = source_df.schema
# Convert schema
target_schema = convert_hive_to_delta_schema(spark, source_table)
# Create target table
schema_ddl = ", ".join([
f"`{f.name}` {f.dataType.simpleString()}"
for f in target_schema.fields
])
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {catalog}.{target_table} (
{schema_ddl}
)
USING DELTA
TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true'
)
""")
# Track schema changes
changes = []
for i, (src, tgt) in enumerate(zip(source_schema.fields, target_schema.fields)):
if str(src.dataType) != str(tgt.dataType):
changes.append({
'column': src.name,
'source_type': str(src.dataType),
'target_type': str(tgt.dataType),
})
return {
'source_table': source_table,
'target_table': f"{catalog}.{target_table}",
'column_count': len(target_schema.fields),
'schema_changes': changes,
}# scripts/data_migration.py
from pyspark.sql import SparkSession, DataFrame
from datetime import datetime
import time
class DataMigrator:
"""Handle data migration from legacy systems to Delta Lake."""
def __init__(self, spark: SparkSession, target_catalog: str):
self.spark = spark
self.target_catalog = target_catalog
def migrate_table(
self,
source_table: str,
target_table: str,
batch_size: int = 1000000,
partition_columns: list[str] = None,
incremental_column: str = None,
) -> dict:
"""
Migrate table data with batching and checkpointing.
Args:
source_table: Source table (Hive, JDBC, etc.)
target_table: Target Delta table
batch_size: Rows per batch
partition_columns: Columns for partitioning
incremental_column: Column for incremental loads
Returns:
Migration statistics
"""
start_time = time.time()
stats = {
'source_table': source_table,
'target_table': f"{self.target_catalog}.{target_table}",
'batches': 0,
'total_rows': 0,
'errors': [],
}
# Get source data
source_df = self.spark.table(source_table)
# For large tables, process in batches by partition
if partition_columns and len(partition_columns) > 0:
# Get distinct partition values
partitions = source_df.select(partition_columns).distinct().collect()
for partition_row in partitions:
# Build filter condition
conditions = [
f"{col} = '{partition_row[col]}'"
for col in partition_columns
]
filter_expr = " AND ".join(conditions)
batch_df = source_df.filter(filter_expr)
self._write_batch(batch_df, target_table, partition_columns)
stats['batches'] += 1
stats['total_rows'] += batch_df.count()
else:
# Single batch for small tables
self._write_batch(source_df, target_table, partition_columns)
stats['batches'] = 1
stats['total_rows'] = source_df.count()
stats['duration_seconds'] = time.time() - start_time
stats['rows_per_second'] = stats['total_rows'] / stats['duration_seconds']
return stats
def _write_batch(
self,
df: DataFrame,
target_table: str,
partition_columns: list[str] = None,
):
"""Write a batch to Delta table."""
writer = df.write.format("delta").mode("append")
if partition_columns:
writer = writer.partitionBy(*partition_columns)
writer.saveAsTable(f"{self.target_catalog}.{target_table}")
def validate_migration(
self,
source_table: str,
target_table: str,
) -> dict:
"""Validate migrated data matches source."""
source_df = self.spark.table(source_table)
target_df = self.spark.table(f"{self.target_catalog}.{target_table}")
validation = {
'source_count': source_df.count(),
'target_count': target_df.count(),
'count_match': False,
'schema_match': False,
'sample_match': False,
}
# Count validation
validation['count_match'] = (
validation['source_count'] == validation['target_count']
)
# Schema validation (column names)
source_cols = set(source_df.columns)
target_cols = set(target_df.columns)
validation['schema_match'] = source_cols == target_cols
# Sample data validation
source_sample = source_df.limit(100).toPandas()
target_sample = target_df.limit(100).toPandas()
# Compare samples (simplified)
validation['sample_match'] = len(source_sample) == len(target_sample)
return validation# scripts/pipeline_migration.py
def convert_spark_job_to_databricks(
source_code: str,
source_type: str = "spark-submit",
) -> str:
"""
Convert legacy Spark job to Databricks job.
Handles common patterns from spark-submit, Oozie, Airflow.
"""
# Common replacements
replacements = {
# SparkSession changes
'SparkSession.builder.master("yarn")': 'SparkSession.builder',
'.master("local[*]")': '',
# Path changes
'hdfs://namenode:8020/': '/mnt/data/',
's3a://': 's3://',
# Hive changes
'.enableHiveSupport()': '', # Unity Catalog handles this
'hive_metastore.': '', # Direct table access
# Config changes
'.config("spark.sql.warehouse.dir"': '# Removed: .config("spark.sql.warehouse.dir"',
}
converted = source_code
for old, new in replacements.items():
converted = converted.replace(old, new)
# Add Databricks-specific imports
header = '''
# Converted for Databricks
# Original source: {source_type}
# Conversion date: {date}
from pyspark.sql import SparkSession
# SparkSession is pre-configured in Databricks
spark = SparkSession.builder.getOrCreate()
'''.format(source_type=source_type, date=datetime.now().isoformat())
return header + converted
# Convert Oozie workflow to Databricks job
def convert_oozie_to_databricks_job(oozie_xml: str) -> dict:
"""Convert Oozie workflow XML to Databricks job definition."""
import xml.etree.ElementTree as ET
root = ET.fromstring(oozie_xml)
tasks = []
for action in root.findall('.//action'):
action_name = action.get('name')
# Handle different action types
spark_action = action.find('spark')
if spark_action is not None:
jar = spark_action.find('jar').text
main_class = spark_action.find('class').text
tasks.append({
'task_key': action_name,
'spark_jar_task': {
'main_class_name': main_class,
'parameters': [],
},
'libraries': [{'jar': jar}],
})
shell_action = action.find('shell')
if shell_action is not None:
# Convert to notebook task or skip
pass
# Build job definition
job_definition = {
'name': f"migrated-{root.get('name')}",
'tasks': tasks,
'job_clusters': [{
'job_cluster_key': 'migration_cluster',
'new_cluster': {
'spark_version': '14.3.x-scala2.12',
'node_type_id': 'Standard_DS3_v2',
'num_workers': 2,
}
}],
}
return job_definition# scripts/cutover_plan.py
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List
@dataclass
class CutoverStep:
"""Individual step in cutover plan."""
order: int
name: str
duration_minutes: int
owner: str
rollback_procedure: str
verification: str
def generate_cutover_plan(
migration_wave: int,
tables: List[str],
cutover_date: datetime,
) -> List[CutoverStep]:
"""Generate detailed cutover plan."""
steps = [
CutoverStep(
order=1,
name="Pre-cutover validation",
duration_minutes=60,
owner="Data Engineer",
rollback_procedure="N/A - no changes made",
verification="Run validation queries on all tables",
),
CutoverStep(
order=2,
name="Disable source data pipelines",
duration_minutes=15,
owner="Platform Admin",
rollback_procedure="Re-enable pipelines in source system",
verification="Verify no new data in source",
),
CutoverStep(
order=3,
name="Final incremental sync",
duration_minutes=120,
owner="Data Engineer",
rollback_procedure="N/A",
verification="Row counts match source",
),
CutoverStep(
order=4,
name="Enable Databricks pipelines",
duration_minutes=30,
owner="Data Engineer",
rollback_procedure="Disable Databricks pipelines, re-enable source",
verification="Jobs running successfully",
),
CutoverStep(
order=5,
name="Update downstream applications",
duration_minutes=60,
owner="Application Team",
rollback_procedure="Revert connection strings",
verification="Applications reading from Databricks",
),
CutoverStep(
order=6,
name="Monitor and validate",
duration_minutes=240,
owner="Data Engineer",
rollback_procedure="Execute full rollback if issues",
verification="All metrics within acceptable range",
),
]
# Calculate timings
current_time = cutover_date
for step in steps:
step.start_time = current_time
step.end_time = current_time + timedelta(minutes=step.duration_minutes)
current_time = step.end_time
return steps| Issue | Cause | Solution |
|---|---|---|
| Schema incompatibility | Unsupported types | Use type conversion mappings |
| Data loss | Truncation | Validate counts at each step |
| Performance issues | Large tables | Use partitioned migration |
| Dependency conflicts | Wrong migration order | Analyze dependencies first |
-- Compare source and target counts
SELECT
'source' as system, COUNT(*) as row_count
FROM hive_metastore.db.table
UNION ALL
SELECT
'target' as system, COUNT(*) as row_count
FROM migrated.db.table;This skill pack provides comprehensive coverage for Databricks platform operations.