Library to easily sync/diff/update 2 different data sources
—
Pluggable storage backend implementations for different persistence requirements, from in-memory storage for temporary operations to Redis-based storage for distributed scenarios.
Abstract base class defining the interface that all storage backends must implement.
class BaseStore:
"""Reference store to be implemented in different backends."""
def __init__(self, *args: Any, adapter: Optional["Adapter"] = None,
name: str = "", **kwargs: Any) -> None:
"""
Init method for BaseStore.
Args:
adapter: Associated adapter instance
name: Store name (defaults to class name)
"""
adapter: Optional["Adapter"]
name: strAbstract methods that all storage backends must implement.
def get_all_model_names(self) -> Set[str]:
"""
Get all the model names stored.
Returns:
Set of all the model names
"""def get(self, *, model: Union[str, "DiffSyncModel", Type["DiffSyncModel"]],
identifier: Union[str, Dict]) -> "DiffSyncModel":
"""
Get one object from the data store based on its unique id.
Args:
model: DiffSyncModel class or instance, or modelname string, that defines the type of the object to retrieve
identifier: Unique ID of the object to retrieve, or dict of unique identifier keys/values
Raises:
ValueError: if obj is a str and identifier is a dict (can't convert dict into a uid str without a model class)
ObjectNotFound: if the requested object is not present
"""def get_all(self, *, model: Union[str, "DiffSyncModel", Type["DiffSyncModel"]]) -> List["DiffSyncModel"]:
"""
Get all objects of a given type.
Args:
model: DiffSyncModel class or instance, or modelname string, that defines the type of the objects to retrieve
Returns:
List of objects
"""def get_by_uids(self, *, uids: List[str],
model: Union[str, "DiffSyncModel", Type["DiffSyncModel"]]) -> List["DiffSyncModel"]:
"""
Get multiple objects from the store by their unique IDs/Keys and type.
Args:
uids: List of unique id / key identifying object in the database
model: DiffSyncModel class or instance, or modelname string, that defines the type of the objects to retrieve
Raises:
ObjectNotFound: if any of the requested UIDs are not found in the store
"""def add(self, *, obj: "DiffSyncModel") -> None:
"""
Add a DiffSyncModel object to the store.
Args:
obj: Object to store
Raises:
ObjectAlreadyExists: if a different object with the same uid is already present
"""def update(self, *, obj: "DiffSyncModel") -> None:
"""
Update a DiffSyncModel object to the store.
Args:
obj: Object to update
"""def remove(self, *, obj: "DiffSyncModel", remove_children: bool = False) -> None:
"""
Remove a DiffSyncModel object from the store.
Args:
obj: object to remove
remove_children: If True, also recursively remove any children of this object
Raises:
ObjectNotFound: if the object is not present
"""def count(self, *, model: Union[str, "DiffSyncModel", Type["DiffSyncModel"], None] = None) -> int:
"""Returns the number of elements of a specific model, or all elements in the store if not specified."""Common operations implemented in the base class using the abstract methods.
def get_or_instantiate(self, *, model: Type["DiffSyncModel"], ids: Dict,
attrs: Optional[Dict] = None) -> Tuple["DiffSyncModel", bool]:
"""
Attempt to get the object with provided identifiers or instantiate it with provided identifiers and attrs.
Args:
model: The DiffSyncModel to get or create
ids: Identifiers for the DiffSyncModel to get or create with
attrs: Attributes when creating an object if it doesn't exist. Defaults to None
Returns:
Tuple of (existing or new object, whether it was created)
"""def get_or_add_model_instance(self, obj: "DiffSyncModel") -> Tuple["DiffSyncModel", bool]:
"""
Attempt to get the object with provided obj identifiers or instantiate obj.
Args:
obj: An obj of the DiffSyncModel to get or add
Returns:
Tuple of (existing or new object, whether it was added)
"""def update_or_instantiate(self, *, model: Type["DiffSyncModel"], ids: Dict,
attrs: Dict) -> Tuple["DiffSyncModel", bool]:
"""
Attempt to update an existing object with provided ids/attrs or instantiate it with provided identifiers and attrs.
Args:
model: The DiffSyncModel to update or create
ids: Identifiers for the DiffSyncModel to update or create with
attrs: Attributes when creating/updating an object if it doesn't exist. Pass in empty dict, if no specific attrs
Returns:
Tuple of (existing or new object, whether it was created)
"""def update_or_add_model_instance(self, obj: "DiffSyncModel") -> Tuple["DiffSyncModel", bool]:
"""
Attempt to update an existing object with provided ids/attrs or instantiate obj.
Args:
obj: An instance of the DiffSyncModel to update or create
Returns:
Tuple of (existing or new object, whether it was added)
"""In-memory storage backend using Python dictionaries. Default storage backend for Adapter instances.
class LocalStore(BaseStore):
"""LocalStore class."""
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Init method for LocalStore."""
_data: DictLocalStore uses a nested dictionary structure (defaultdict(dict)) to organize data by model name and unique ID:
_data = {
"device": {
"router1": <Device instance>,
"switch1": <Device instance>
},
"interface": {
"router1__eth0": <Interface instance>,
"router1__eth1": <Interface instance>
}
}from diffsync import Adapter, LocalStore
# LocalStore is the default, these are equivalent:
adapter1 = MyAdapter()
adapter2 = MyAdapter(internal_storage_engine=LocalStore)
# Both adapters will use in-memory storage
adapter1.load()
adapter2.load()
# Data is stored in memory and will be lost when the process ends
print(f"Adapter1 has {len(adapter1)} objects")
print(f"Adapter2 has {len(adapter2)} objects")Redis-based storage backend for persistent and distributed storage scenarios.
class RedisStore(BaseStore):
"""RedisStore class."""
def __init__(self, *args: Any, store_id: Optional[str] = None,
host: Optional[str] = None, port: int = 6379,
url: Optional[str] = None, db: int = 0, **kwargs: Any):
"""
Init method for RedisStore.
Args:
store_id: Optional unique identifier for this store instance
host: Redis server hostname
port: Redis server port (default: 6379)
url: Redis connection URL (alternative to host/port)
db: Redis database number (default: 0)
Raises:
ValueError: if both url and host are specified
ObjectStoreException: if Redis is unavailable
"""
_store: Redis
_store_id: str
_store_label: strRedisStore requires the redis extra to be installed:
pip install diffsync[redis]RedisStore organizes data using hierarchical Redis keys:
diffsync:<store_id>:<model_name>:<unique_id>Examples:
diffsync:12345:device:router1diffsync:12345:interface:router1__eth0from diffsync import Adapter
from diffsync.store.redis import RedisStore
# Connect to Redis on localhost
adapter = MyAdapter(
internal_storage_engine=RedisStore(host="localhost", port=6379, db=0)
)
# Connect using Redis URL
adapter = MyAdapter(
internal_storage_engine=RedisStore(url="redis://localhost:6379/0")
)
# Use specific store ID for multiple isolated datasets
adapter1 = MyAdapter(
internal_storage_engine=RedisStore(host="localhost", store_id="dataset1")
)
adapter2 = MyAdapter(
internal_storage_engine=RedisStore(host="localhost", store_id="dataset2")
)
# Load data - will be persisted to Redis
adapter1.load()
adapter2.load()
# Data persists across process restarts
print(f"Dataset1 has {len(adapter1)} objects")
print(f"Dataset2 has {len(adapter2)} objects")# Distributed scenario - multiple processes sharing data
class SharedAdapter(Adapter):
device = Device
top_level = ["device"]
def __init__(self, process_name):
# All processes use the same store_id to share data
super().__init__(
name=process_name,
internal_storage_engine=RedisStore(
host="redis-server.example.com",
store_id="shared_network_data"
)
)
# Process 1: Load initial data
process1 = SharedAdapter("loader")
process1.load() # Loads data into Redis
# Process 2: Access the same data
process2 = SharedAdapter("consumer")
devices = process2.get_all("device") # Reads from Redis
print(f"Found {len(devices)} devices loaded by process1")
# Process 3: Sync with external source
process3 = SharedAdapter("syncer")
external_source = ExternalAdapter()
external_source.load()
# Sync will update the shared Redis data
process3.sync_from(external_source)You can implement custom storage backends by subclassing BaseStore.
import sqlite3
from diffsync.store import BaseStore
from diffsync.exceptions import ObjectNotFound, ObjectAlreadyExists
class SQLiteStore(BaseStore):
def __init__(self, database_path, **kwargs):
super().__init__(**kwargs)
self.db_path = database_path
self._init_database()
def _init_database(self):
with sqlite3.connect(self.db_path) as conn:
conn.execute('''
CREATE TABLE IF NOT EXISTS objects (
model_name TEXT,
unique_id TEXT,
data BLOB,
PRIMARY KEY (model_name, unique_id)
)
''')
def get_all_model_names(self):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("SELECT DISTINCT model_name FROM objects")
return {row[0] for row in cursor.fetchall()}
def get(self, *, model, identifier):
object_class, modelname = self._get_object_class_and_model(model)
uid = self._get_uid(model, object_class, identifier)
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
"SELECT data FROM objects WHERE model_name = ? AND unique_id = ?",
(modelname, uid)
)
row = cursor.fetchone()
if not row:
raise ObjectNotFound(f"{modelname} {uid} not found")
# Deserialize object from database
obj_data = pickle.loads(row[0])
obj_data.adapter = self.adapter
return obj_data
def add(self, *, obj):
modelname = obj.get_type()
uid = obj.get_unique_id()
# Check if object already exists
try:
existing = self.get(model=modelname, identifier=uid)
if existing is not obj:
raise ObjectAlreadyExists(f"Object {uid} already exists", obj)
return
except ObjectNotFound:
pass
# Serialize and store object
obj_copy = copy.copy(obj)
obj_copy.adapter = None
serialized = pickle.dumps(obj_copy)
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"INSERT INTO objects (model_name, unique_id, data) VALUES (?, ?, ?)",
(modelname, uid, serialized)
)
# Implement other required methods...
# Usage
adapter = MyAdapter(
internal_storage_engine=SQLiteStore(database_path="network_data.db")
)Console logging setup for DiffSync operations, particularly useful when debugging storage backend operations.
def enable_console_logging(verbosity: int = 0) -> None:
"""
Enable formatted logging to console with the specified verbosity.
Args:
verbosity: 0 for WARNING logs, 1 for INFO logs, 2 for DEBUG logs
"""from diffsync.logging import enable_console_logging
from diffsync import Adapter
from diffsync.store.redis import RedisStore
# Enable debug logging to see detailed storage operations
enable_console_logging(verbosity=2)
# Now all storage operations will be logged
adapter = MyAdapter(
internal_storage_engine=RedisStore(host="localhost")
)
adapter.load() # Will show detailed Redis operations
# Sync operations will show storage backend interactions
target = MyAdapter()
adapter.sync_to(target) # Will log all storage operationsfrom typing import Any, Dict, List, Optional, Set, Tuple, Type, Union
from redis import Redis
# Storage backend configuration types
StorageConfig = Dict[str, Any]
ModelIdentifier = Union[str, Dict]
ModelSpec = Union[str, "DiffSyncModel", Type["DiffSyncModel"]]Install with Tessl CLI
npx tessl i tessl/pypi-diffsync