Library to easily sync/diff/update 2 different data sources
—
Automated synchronization operations that apply calculated differences to update target datasets. Supports creation, modification, and deletion of records with comprehensive error handling and status tracking.
Core methods for performing synchronization between Adapter instances.
def sync_from(self, source: "Adapter", diff_class: Type[Diff] = Diff,
flags: DiffSyncFlags = DiffSyncFlags.NONE,
callback: Optional[Callable[[str, int, int], None]] = None,
diff: Optional[Diff] = None) -> Diff:
"""
Synchronize data from the given source DiffSync object into the current DiffSync object.
Args:
source: Object to sync data from into this one
diff_class: Diff or subclass thereof to use to calculate the diffs to use for synchronization
flags: Flags influencing the behavior of this sync
callback: Function with parameters (stage, current, total), called at intervals as sync proceeds
diff: An existing diff to be used rather than generating a completely new diff
Returns:
Diff between origin object and source
Raises:
DiffClassMismatch: The provided diff's class does not match the diff_class
"""def sync_to(self, target: "Adapter", diff_class: Type[Diff] = Diff,
flags: DiffSyncFlags = DiffSyncFlags.NONE,
callback: Optional[Callable[[str, int, int], None]] = None,
diff: Optional[Diff] = None) -> Diff:
"""
Synchronize data from the current DiffSync object into the given target DiffSync object.
Args:
target: Object to sync data into from this one
diff_class: Diff or subclass thereof to use to calculate the diffs to use for synchronization
flags: Flags influencing the behavior of this sync
callback: Function with parameters (stage, current, total), called at intervals as sync proceeds
diff: An existing diff that will be used when determining what needs to be synced
Returns:
Diff between origin object and target
Raises:
DiffClassMismatch: The provided diff's class does not match the diff_class
"""from diffsync import Adapter, DiffSyncModel, DiffSyncFlags
# Create source and target adapters
source = SourceNetworkAdapter(name="network_source")
target = TargetNetworkAdapter(name="network_target")
# Load their respective data
source.load()
target.load()
# Synchronize target to match source
diff = target.sync_from(source)
# Check results
if diff.has_diffs():
summary = diff.summary()
print(f"Applied {summary['create']} creates, {summary['update']} updates, {summary['delete']} deletes")
else:
print("No changes were needed")Hook for post-synchronization processing.
def sync_complete(self, source: "Adapter", diff: Diff,
flags: DiffSyncFlags = DiffSyncFlags.NONE,
logger: Optional[structlog.BoundLogger] = None) -> None:
"""
Callback triggered after a sync_from operation has completed and updated the model data of this instance.
Note that this callback is **only** triggered if the sync actually resulted in data changes.
If there are no detected changes, this callback will **not** be called.
The default implementation does nothing, but a subclass could use this, for example,
to perform bulk updates to a backend (such as a file) that doesn't readily support incremental updates.
Args:
source: The DiffSync whose data was used to update this instance
diff: The Diff calculated prior to the sync operation
flags: Any flags that influenced the sync
logger: Logging context for the sync
"""class FileBasedAdapter(Adapter):
device = Device
top_level = ["device"]
def __init__(self, filename, **kwargs):
super().__init__(**kwargs)
self.filename = filename
def sync_complete(self, source, diff, flags, logger):
# After sync is complete, save all data to file
print(f"Sync completed, saving {len(self)} objects to {self.filename}")
data = self.dict()
with open(self.filename, 'w') as f:
json.dump(data, f, indent=2)
# Log what was changed
summary = diff.summary()
logger.info(f"Saved changes to file",
created=summary['create'],
updated=summary['update'],
deleted=summary['delete'])Synchronization operations include comprehensive error handling and status tracking through model status management.
from diffsync import DiffSyncFlags
# Continue sync even if individual operations fail
diff = target.sync_from(source, flags=DiffSyncFlags.CONTINUE_ON_FAILURE)
# Check for any failures
for element in diff.get_children():
if element.action:
# Get the synchronized object to check its status
try:
obj = target.get(element.type, element.keys)
status, message = obj.get_status()
if status != DiffSyncStatus.SUCCESS:
print(f"Failed to {element.action} {element.type} {element.name}: {message}")
except ObjectNotFound:
print(f"Object {element.type} {element.name} was not found after sync")class NetworkDevice(DiffSyncModel):
_modelname = "device"
_identifiers = ("name",)
_attributes = ("ip_address", "os_version")
name: str
ip_address: str
os_version: str
@classmethod
def create(cls, adapter, ids, attrs):
device = super().create(adapter, ids, attrs)
try:
# Attempt to configure device on network
configure_device(device.name, device.ip_address, device.os_version)
device.set_status(DiffSyncStatus.SUCCESS, "Device configured successfully")
except NetworkError as e:
device.set_status(DiffSyncStatus.ERROR, f"Network configuration failed: {e}")
except ValidationError as e:
device.set_status(DiffSyncStatus.FAILURE, f"Invalid configuration: {e}")
return device
def update(self, attrs):
old_values = {k: getattr(self, k) for k in attrs.keys()}
device = super().update(attrs)
try:
# Apply changes to actual device
if 'ip_address' in attrs:
change_device_ip(self.name, old_values['ip_address'], self.ip_address)
if 'os_version' in attrs:
upgrade_device_os(self.name, self.os_version)
device.set_status(DiffSyncStatus.SUCCESS, "Device updated successfully")
except Exception as e:
# Rollback changes
for key, value in old_values.items():
setattr(self, key, value)
device.set_status(DiffSyncStatus.ERROR, f"Update failed, rolled back: {e}")
return device
def delete(self):
try:
# Remove device from network
remove_device(self.name)
device = super().delete()
device.set_status(DiffSyncStatus.SUCCESS, "Device removed successfully")
except Exception as e:
device = super().delete()
device.set_status(DiffSyncStatus.ERROR, f"Device removal failed: {e}")
return devicedef sync_progress_callback(stage, current, total):
if stage == "sync":
percentage = (current / total) * 100 if total > 0 else 0
print(f"Syncing: {current}/{total} elements ({percentage:.1f}%)")
# Monitor sync progress
diff = target.sync_from(source, callback=sync_progress_callback)# Calculate diff first
diff = target.diff_from(source)
# Review and potentially modify diff
print("About to apply these changes:")
print(diff.str())
if input("Continue? (y/n): ").lower() == 'y':
# Apply the pre-calculated diff
result_diff = target.sync_from(source, diff=diff)
else:
print("Sync cancelled")# Only create new objects, don't delete existing ones
diff = target.sync_from(source, flags=DiffSyncFlags.SKIP_UNMATCHED_DST)
# Only update existing objects, don't create or delete
diff = target.sync_from(source, flags=DiffSyncFlags.SKIP_UNMATCHED_BOTH)
# Continue even if some operations fail, and log unchanged records
diff = target.sync_from(source,
flags=DiffSyncFlags.CONTINUE_ON_FAILURE | DiffSyncFlags.LOG_UNCHANGED_RECORDS)Example of implementing bidirectional synchronization between two data sources.
def bidirectional_sync(adapter1, adapter2):
"""Perform bidirectional synchronization between two adapters."""
# Sync adapter1 changes to adapter2
print("Syncing adapter1 -> adapter2")
diff1to2 = adapter2.sync_from(adapter1)
# Sync adapter2 changes to adapter1
print("Syncing adapter2 -> adapter1")
diff2to1 = adapter1.sync_from(adapter2)
# Report results
summary1to2 = diff1to2.summary()
summary2to1 = diff2to1.summary()
print(f"adapter1 -> adapter2: {summary1to2}")
print(f"adapter2 -> adapter1: {summary2to1}")
return diff1to2, diff2to1
# Example usage
source_db = DatabaseAdapter(name="source_db")
target_api = APIAdapter(name="target_api")
source_db.load()
target_api.load()
diffs = bidirectional_sync(source_db, target_api)DiffSync automatically handles hierarchical data structures during synchronization.
class Site(DiffSyncModel):
_modelname = "site"
_identifiers = ("name",)
_attributes = ("address", "contact")
_children = {"device": "devices"}
name: str
address: str
contact: str
devices: List[str] = []
class Device(DiffSyncModel):
_modelname = "device"
_identifiers = ("site", "name")
_attributes = ("model", "ip_address")
_children = {"interface": "interfaces"}
site: str
name: str
model: str
ip_address: str
interfaces: List[str] = []
class Interface(DiffSyncModel):
_modelname = "interface"
_identifiers = ("device_site", "device_name", "name")
_attributes = ("description", "vlan")
device_site: str
device_name: str
name: str
description: str
vlan: int
# Synchronization will automatically handle the hierarchy:
# 1. Sync sites first
# 2. Then sync devices within each site
# 3. Finally sync interfaces within each device
diff = target.sync_from(source)from typing import Optional, Callable, Type
from diffsync.diff import Diff
from diffsync.enum import DiffSyncFlags
import structlog
# Progress callback function type
ProgressCallback = Callable[[str, int, int], None]
# Logger type for sync completion callbacks
SyncLogger = structlog.BoundLoggerInstall with Tessl CLI
npx tessl i tessl/pypi-diffsync