CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-diffsync

Library to easily sync/diff/update 2 different data sources

Pending
Overview
Eval results
Files

synchronization.mddocs/

Synchronization

Automated synchronization operations that apply calculated differences to update target datasets. Supports creation, modification, and deletion of records with comprehensive error handling and status tracking.

Capabilities

Sync Operations

Core methods for performing synchronization between Adapter instances.

def sync_from(self, source: "Adapter", diff_class: Type[Diff] = Diff,
              flags: DiffSyncFlags = DiffSyncFlags.NONE,
              callback: Optional[Callable[[str, int, int], None]] = None,
              diff: Optional[Diff] = None) -> Diff:
    """
    Synchronize data from the given source DiffSync object into the current DiffSync object.
    
    Args:
        source: Object to sync data from into this one
        diff_class: Diff or subclass thereof to use to calculate the diffs to use for synchronization
        flags: Flags influencing the behavior of this sync
        callback: Function with parameters (stage, current, total), called at intervals as sync proceeds
        diff: An existing diff to be used rather than generating a completely new diff
    
    Returns:
        Diff between origin object and source
        
    Raises:
        DiffClassMismatch: The provided diff's class does not match the diff_class
    """
def sync_to(self, target: "Adapter", diff_class: Type[Diff] = Diff,
            flags: DiffSyncFlags = DiffSyncFlags.NONE,
            callback: Optional[Callable[[str, int, int], None]] = None,
            diff: Optional[Diff] = None) -> Diff:
    """
    Synchronize data from the current DiffSync object into the given target DiffSync object.
    
    Args:
        target: Object to sync data into from this one
        diff_class: Diff or subclass thereof to use to calculate the diffs to use for synchronization
        flags: Flags influencing the behavior of this sync
        callback: Function with parameters (stage, current, total), called at intervals as sync proceeds
        diff: An existing diff that will be used when determining what needs to be synced
    
    Returns:
        Diff between origin object and target
        
    Raises:
        DiffClassMismatch: The provided diff's class does not match the diff_class
    """

Basic Sync Example

from diffsync import Adapter, DiffSyncModel, DiffSyncFlags

# Create source and target adapters
source = SourceNetworkAdapter(name="network_source")
target = TargetNetworkAdapter(name="network_target")

# Load their respective data
source.load()
target.load()

# Synchronize target to match source
diff = target.sync_from(source)

# Check results
if diff.has_diffs():
    summary = diff.summary()
    print(f"Applied {summary['create']} creates, {summary['update']} updates, {summary['delete']} deletes")
else:
    print("No changes were needed")

Sync Completion Callback

Hook for post-synchronization processing.

def sync_complete(self, source: "Adapter", diff: Diff,
                  flags: DiffSyncFlags = DiffSyncFlags.NONE,
                  logger: Optional[structlog.BoundLogger] = None) -> None:
    """
    Callback triggered after a sync_from operation has completed and updated the model data of this instance.
    
    Note that this callback is **only** triggered if the sync actually resulted in data changes.
    If there are no detected changes, this callback will **not** be called.
    
    The default implementation does nothing, but a subclass could use this, for example,
    to perform bulk updates to a backend (such as a file) that doesn't readily support incremental updates.
    
    Args:
        source: The DiffSync whose data was used to update this instance
        diff: The Diff calculated prior to the sync operation
        flags: Any flags that influenced the sync
        logger: Logging context for the sync
    """

Sync Completion Example

class FileBasedAdapter(Adapter):
    device = Device
    top_level = ["device"]
    
    def __init__(self, filename, **kwargs):
        super().__init__(**kwargs)
        self.filename = filename
    
    def sync_complete(self, source, diff, flags, logger):
        # After sync is complete, save all data to file
        print(f"Sync completed, saving {len(self)} objects to {self.filename}")
        
        data = self.dict()
        with open(self.filename, 'w') as f:
            json.dump(data, f, indent=2)
        
        # Log what was changed
        summary = diff.summary()
        logger.info(f"Saved changes to file", 
                   created=summary['create'],
                   updated=summary['update'], 
                   deleted=summary['delete'])

Error Handling and Status Tracking

Synchronization operations include comprehensive error handling and status tracking through model status management.

Error Handling with Flags

from diffsync import DiffSyncFlags

# Continue sync even if individual operations fail
diff = target.sync_from(source, flags=DiffSyncFlags.CONTINUE_ON_FAILURE)

# Check for any failures
for element in diff.get_children():
    if element.action:
        # Get the synchronized object to check its status
        try:
            obj = target.get(element.type, element.keys)
            status, message = obj.get_status()
            if status != DiffSyncStatus.SUCCESS:
                print(f"Failed to {element.action} {element.type} {element.name}: {message}")
        except ObjectNotFound:
            print(f"Object {element.type} {element.name} was not found after sync")

Custom Error Handling in Models

class NetworkDevice(DiffSyncModel):
    _modelname = "device"
    _identifiers = ("name",)
    _attributes = ("ip_address", "os_version")
    
    name: str
    ip_address: str
    os_version: str
    
    @classmethod
    def create(cls, adapter, ids, attrs):
        device = super().create(adapter, ids, attrs)
        try:
            # Attempt to configure device on network
            configure_device(device.name, device.ip_address, device.os_version)
            device.set_status(DiffSyncStatus.SUCCESS, "Device configured successfully")
        except NetworkError as e:
            device.set_status(DiffSyncStatus.ERROR, f"Network configuration failed: {e}")
        except ValidationError as e:
            device.set_status(DiffSyncStatus.FAILURE, f"Invalid configuration: {e}")
        return device
    
    def update(self, attrs):
        old_values = {k: getattr(self, k) for k in attrs.keys()}
        device = super().update(attrs)
        
        try:
            # Apply changes to actual device
            if 'ip_address' in attrs:
                change_device_ip(self.name, old_values['ip_address'], self.ip_address)
            if 'os_version' in attrs:
                upgrade_device_os(self.name, self.os_version)
            device.set_status(DiffSyncStatus.SUCCESS, "Device updated successfully")
        except Exception as e:
            # Rollback changes
            for key, value in old_values.items():
                setattr(self, key, value)
            device.set_status(DiffSyncStatus.ERROR, f"Update failed, rolled back: {e}")
        return device
    
    def delete(self):
        try:
            # Remove device from network
            remove_device(self.name)
            device = super().delete()
            device.set_status(DiffSyncStatus.SUCCESS, "Device removed successfully")
        except Exception as e:
            device = super().delete()
            device.set_status(DiffSyncStatus.ERROR, f"Device removal failed: {e}")
        return device

Advanced Sync Features

Progress Monitoring

def sync_progress_callback(stage, current, total):
    if stage == "sync":
        percentage = (current / total) * 100 if total > 0 else 0
        print(f"Syncing: {current}/{total} elements ({percentage:.1f}%)")

# Monitor sync progress
diff = target.sync_from(source, callback=sync_progress_callback)

Using Pre-calculated Diffs

# Calculate diff first
diff = target.diff_from(source)

# Review and potentially modify diff
print("About to apply these changes:")
print(diff.str())

if input("Continue? (y/n): ").lower() == 'y':
    # Apply the pre-calculated diff
    result_diff = target.sync_from(source, diff=diff)
else:
    print("Sync cancelled")

Selective Synchronization with Flags

# Only create new objects, don't delete existing ones
diff = target.sync_from(source, flags=DiffSyncFlags.SKIP_UNMATCHED_DST)

# Only update existing objects, don't create or delete
diff = target.sync_from(source, flags=DiffSyncFlags.SKIP_UNMATCHED_BOTH)

# Continue even if some operations fail, and log unchanged records
diff = target.sync_from(source, 
                       flags=DiffSyncFlags.CONTINUE_ON_FAILURE | DiffSyncFlags.LOG_UNCHANGED_RECORDS)

Bidirectional Synchronization

Example of implementing bidirectional synchronization between two data sources.

def bidirectional_sync(adapter1, adapter2):
    """Perform bidirectional synchronization between two adapters."""
    
    # Sync adapter1 changes to adapter2
    print("Syncing adapter1 -> adapter2")
    diff1to2 = adapter2.sync_from(adapter1)
    
    # Sync adapter2 changes to adapter1
    print("Syncing adapter2 -> adapter1") 
    diff2to1 = adapter1.sync_from(adapter2)
    
    # Report results
    summary1to2 = diff1to2.summary()
    summary2to1 = diff2to1.summary()
    
    print(f"adapter1 -> adapter2: {summary1to2}")
    print(f"adapter2 -> adapter1: {summary2to1}")
    
    return diff1to2, diff2to1

# Example usage
source_db = DatabaseAdapter(name="source_db")
target_api = APIAdapter(name="target_api")

source_db.load()
target_api.load()

diffs = bidirectional_sync(source_db, target_api)

Hierarchical Synchronization

DiffSync automatically handles hierarchical data structures during synchronization.

class Site(DiffSyncModel):
    _modelname = "site"
    _identifiers = ("name",)
    _attributes = ("address", "contact")
    _children = {"device": "devices"}
    
    name: str
    address: str
    contact: str
    devices: List[str] = []

class Device(DiffSyncModel):
    _modelname = "device"
    _identifiers = ("site", "name")
    _attributes = ("model", "ip_address")
    _children = {"interface": "interfaces"}
    
    site: str
    name: str
    model: str
    ip_address: str
    interfaces: List[str] = []

class Interface(DiffSyncModel):
    _modelname = "interface"
    _identifiers = ("device_site", "device_name", "name")
    _attributes = ("description", "vlan")
    
    device_site: str
    device_name: str
    name: str
    description: str
    vlan: int

# Synchronization will automatically handle the hierarchy:
# 1. Sync sites first
# 2. Then sync devices within each site
# 3. Finally sync interfaces within each device
diff = target.sync_from(source)

Types

from typing import Optional, Callable, Type
from diffsync.diff import Diff
from diffsync.enum import DiffSyncFlags
import structlog

# Progress callback function type
ProgressCallback = Callable[[str, int, int], None]

# Logger type for sync completion callbacks
SyncLogger = structlog.BoundLogger

Install with Tessl CLI

npx tessl i tessl/pypi-diffsync

docs

data-management.md

diff-calculation.md

flags-configuration.md

index.md

model-definition.md

storage-backends.md

synchronization.md

tile.json