CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-diffsync

Library to easily sync/diff/update 2 different data sources

Pending
Overview
Eval results
Files

storage-backends.mddocs/

Storage Backends

Pluggable storage backend implementations for different persistence requirements, from in-memory storage for temporary operations to Redis-based storage for distributed scenarios.

Capabilities

BaseStore Interface

Abstract base class defining the interface that all storage backends must implement.

class BaseStore:
    """Reference store to be implemented in different backends."""
    
    def __init__(self, *args: Any, adapter: Optional["Adapter"] = None,
                 name: str = "", **kwargs: Any) -> None:
        """
        Init method for BaseStore.
        
        Args:
            adapter: Associated adapter instance
            name: Store name (defaults to class name)
        """
    
    adapter: Optional["Adapter"]
    name: str

Core Storage Operations

Abstract methods that all storage backends must implement.

def get_all_model_names(self) -> Set[str]:
    """
    Get all the model names stored.
    
    Returns:
        Set of all the model names
    """
def get(self, *, model: Union[str, "DiffSyncModel", Type["DiffSyncModel"]],
        identifier: Union[str, Dict]) -> "DiffSyncModel":
    """
    Get one object from the data store based on its unique id.
    
    Args:
        model: DiffSyncModel class or instance, or modelname string, that defines the type of the object to retrieve
        identifier: Unique ID of the object to retrieve, or dict of unique identifier keys/values
    
    Raises:
        ValueError: if obj is a str and identifier is a dict (can't convert dict into a uid str without a model class)
        ObjectNotFound: if the requested object is not present
    """
def get_all(self, *, model: Union[str, "DiffSyncModel", Type["DiffSyncModel"]]) -> List["DiffSyncModel"]:
    """
    Get all objects of a given type.
    
    Args:
        model: DiffSyncModel class or instance, or modelname string, that defines the type of the objects to retrieve
    
    Returns:
        List of objects
    """
def get_by_uids(self, *, uids: List[str],
                model: Union[str, "DiffSyncModel", Type["DiffSyncModel"]]) -> List["DiffSyncModel"]:
    """
    Get multiple objects from the store by their unique IDs/Keys and type.
    
    Args:
        uids: List of unique id / key identifying object in the database
        model: DiffSyncModel class or instance, or modelname string, that defines the type of the objects to retrieve
    
    Raises:
        ObjectNotFound: if any of the requested UIDs are not found in the store
    """
def add(self, *, obj: "DiffSyncModel") -> None:
    """
    Add a DiffSyncModel object to the store.
    
    Args:
        obj: Object to store
    
    Raises:
        ObjectAlreadyExists: if a different object with the same uid is already present
    """
def update(self, *, obj: "DiffSyncModel") -> None:
    """
    Update a DiffSyncModel object to the store.
    
    Args:
        obj: Object to update
    """
def remove(self, *, obj: "DiffSyncModel", remove_children: bool = False) -> None:
    """
    Remove a DiffSyncModel object from the store.
    
    Args:
        obj: object to remove
        remove_children: If True, also recursively remove any children of this object
    
    Raises:
        ObjectNotFound: if the object is not present
    """
def count(self, *, model: Union[str, "DiffSyncModel", Type["DiffSyncModel"], None] = None) -> int:
    """Returns the number of elements of a specific model, or all elements in the store if not specified."""

Convenience Methods

Common operations implemented in the base class using the abstract methods.

def get_or_instantiate(self, *, model: Type["DiffSyncModel"], ids: Dict,
                      attrs: Optional[Dict] = None) -> Tuple["DiffSyncModel", bool]:
    """
    Attempt to get the object with provided identifiers or instantiate it with provided identifiers and attrs.
    
    Args:
        model: The DiffSyncModel to get or create
        ids: Identifiers for the DiffSyncModel to get or create with
        attrs: Attributes when creating an object if it doesn't exist. Defaults to None
    
    Returns:
        Tuple of (existing or new object, whether it was created)
    """
def get_or_add_model_instance(self, obj: "DiffSyncModel") -> Tuple["DiffSyncModel", bool]:
    """
    Attempt to get the object with provided obj identifiers or instantiate obj.
    
    Args:
        obj: An obj of the DiffSyncModel to get or add
    
    Returns:
        Tuple of (existing or new object, whether it was added)
    """
def update_or_instantiate(self, *, model: Type["DiffSyncModel"], ids: Dict,
                         attrs: Dict) -> Tuple["DiffSyncModel", bool]:
    """
    Attempt to update an existing object with provided ids/attrs or instantiate it with provided identifiers and attrs.
    
    Args:
        model: The DiffSyncModel to update or create
        ids: Identifiers for the DiffSyncModel to update or create with
        attrs: Attributes when creating/updating an object if it doesn't exist. Pass in empty dict, if no specific attrs
    
    Returns:
        Tuple of (existing or new object, whether it was created)
    """
def update_or_add_model_instance(self, obj: "DiffSyncModel") -> Tuple["DiffSyncModel", bool]:
    """
    Attempt to update an existing object with provided ids/attrs or instantiate obj.
    
    Args:
        obj: An instance of the DiffSyncModel to update or create
    
    Returns:
        Tuple of (existing or new object, whether it was added)
    """

LocalStore Implementation

In-memory storage backend using Python dictionaries. Default storage backend for Adapter instances.

class LocalStore(BaseStore):
    """LocalStore class."""
    
    def __init__(self, *args: Any, **kwargs: Any) -> None:
        """Init method for LocalStore."""
    
    _data: Dict

LocalStore uses a nested dictionary structure (defaultdict(dict)) to organize data by model name and unique ID:

_data = {
    "device": {
        "router1": <Device instance>,
        "switch1": <Device instance>
    },
    "interface": {
        "router1__eth0": <Interface instance>,
        "router1__eth1": <Interface instance>
    }
}

LocalStore Usage

from diffsync import Adapter, LocalStore

# LocalStore is the default, these are equivalent:
adapter1 = MyAdapter()
adapter2 = MyAdapter(internal_storage_engine=LocalStore)

# Both adapters will use in-memory storage
adapter1.load()
adapter2.load()

# Data is stored in memory and will be lost when the process ends
print(f"Adapter1 has {len(adapter1)} objects")
print(f"Adapter2 has {len(adapter2)} objects")

RedisStore Implementation

Redis-based storage backend for persistent and distributed storage scenarios.

class RedisStore(BaseStore):
    """RedisStore class."""
    
    def __init__(self, *args: Any, store_id: Optional[str] = None,
                 host: Optional[str] = None, port: int = 6379,
                 url: Optional[str] = None, db: int = 0, **kwargs: Any):
        """
        Init method for RedisStore.
        
        Args:
            store_id: Optional unique identifier for this store instance
            host: Redis server hostname
            port: Redis server port (default: 6379)
            url: Redis connection URL (alternative to host/port)
            db: Redis database number (default: 0)
        
        Raises:
            ValueError: if both url and host are specified
            ObjectStoreException: if Redis is unavailable
        """
    
    _store: Redis
    _store_id: str
    _store_label: str

RedisStore requires the redis extra to be installed:

pip install diffsync[redis]

RedisStore Key Structure

RedisStore organizes data using hierarchical Redis keys:

diffsync:<store_id>:<model_name>:<unique_id>

Examples:

  • diffsync:12345:device:router1
  • diffsync:12345:interface:router1__eth0

RedisStore Usage

from diffsync import Adapter
from diffsync.store.redis import RedisStore

# Connect to Redis on localhost
adapter = MyAdapter(
    internal_storage_engine=RedisStore(host="localhost", port=6379, db=0)
)

# Connect using Redis URL
adapter = MyAdapter(
    internal_storage_engine=RedisStore(url="redis://localhost:6379/0")
)

# Use specific store ID for multiple isolated datasets
adapter1 = MyAdapter(
    internal_storage_engine=RedisStore(host="localhost", store_id="dataset1")
)
adapter2 = MyAdapter(
    internal_storage_engine=RedisStore(host="localhost", store_id="dataset2")
)

# Load data - will be persisted to Redis
adapter1.load()
adapter2.load()

# Data persists across process restarts
print(f"Dataset1 has {len(adapter1)} objects")
print(f"Dataset2 has {len(adapter2)} objects")

RedisStore Advanced Usage

# Distributed scenario - multiple processes sharing data
class SharedAdapter(Adapter):
    device = Device
    top_level = ["device"]
    
    def __init__(self, process_name):
        # All processes use the same store_id to share data
        super().__init__(
            name=process_name,
            internal_storage_engine=RedisStore(
                host="redis-server.example.com",
                store_id="shared_network_data"
            )
        )

# Process 1: Load initial data
process1 = SharedAdapter("loader")
process1.load()  # Loads data into Redis

# Process 2: Access the same data
process2 = SharedAdapter("consumer")
devices = process2.get_all("device")  # Reads from Redis
print(f"Found {len(devices)} devices loaded by process1")

# Process 3: Sync with external source
process3 = SharedAdapter("syncer")
external_source = ExternalAdapter()
external_source.load()

# Sync will update the shared Redis data
process3.sync_from(external_source)

Custom Storage Backends

You can implement custom storage backends by subclassing BaseStore.

Custom Backend Example

import sqlite3
from diffsync.store import BaseStore
from diffsync.exceptions import ObjectNotFound, ObjectAlreadyExists

class SQLiteStore(BaseStore):
    def __init__(self, database_path, **kwargs):
        super().__init__(**kwargs)
        self.db_path = database_path
        self._init_database()
    
    def _init_database(self):
        with sqlite3.connect(self.db_path) as conn:
            conn.execute('''
                CREATE TABLE IF NOT EXISTS objects (
                    model_name TEXT,
                    unique_id TEXT,
                    data BLOB,
                    PRIMARY KEY (model_name, unique_id)
                )
            ''')
    
    def get_all_model_names(self):
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.execute("SELECT DISTINCT model_name FROM objects")
            return {row[0] for row in cursor.fetchall()}
    
    def get(self, *, model, identifier):
        object_class, modelname = self._get_object_class_and_model(model)
        uid = self._get_uid(model, object_class, identifier)
        
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.execute(
                "SELECT data FROM objects WHERE model_name = ? AND unique_id = ?",
                (modelname, uid)
            )
            row = cursor.fetchone()
            if not row:
                raise ObjectNotFound(f"{modelname} {uid} not found")
            
            # Deserialize object from database
            obj_data = pickle.loads(row[0])
            obj_data.adapter = self.adapter
            return obj_data
    
    def add(self, *, obj):
        modelname = obj.get_type()
        uid = obj.get_unique_id()
        
        # Check if object already exists
        try:
            existing = self.get(model=modelname, identifier=uid)
            if existing is not obj:
                raise ObjectAlreadyExists(f"Object {uid} already exists", obj)
            return
        except ObjectNotFound:
            pass
        
        # Serialize and store object
        obj_copy = copy.copy(obj)
        obj_copy.adapter = None
        serialized = pickle.dumps(obj_copy)
        
        with sqlite3.connect(self.db_path) as conn:
            conn.execute(
                "INSERT INTO objects (model_name, unique_id, data) VALUES (?, ?, ?)",
                (modelname, uid, serialized)
            )
    
    # Implement other required methods...

# Usage
adapter = MyAdapter(
    internal_storage_engine=SQLiteStore(database_path="network_data.db")
)

Storage Backend Selection Guide

LocalStore - Choose When:

  • Working with temporary data that doesn't need persistence
  • Single-process applications
  • Development and testing scenarios
  • Fast, in-memory operations are priority
  • No sharing of data between processes needed

RedisStore - Choose When:

  • Data needs to persist across process restarts
  • Multiple processes need to share the same dataset
  • Distributed applications with multiple nodes
  • Need atomic operations and Redis features
  • Building caching layers or session storage

Custom Backend - Choose When:

  • Need integration with existing database systems
  • Specific performance requirements (e.g., very large datasets)
  • Special serialization or encryption requirements
  • Integration with external storage services (S3, cloud databases)
  • Complex querying requirements beyond simple key-value access

Logging Configuration

Console logging setup for DiffSync operations, particularly useful when debugging storage backend operations.

def enable_console_logging(verbosity: int = 0) -> None:
    """
    Enable formatted logging to console with the specified verbosity.
    
    Args:
        verbosity: 0 for WARNING logs, 1 for INFO logs, 2 for DEBUG logs
    """

Logging Usage

from diffsync.logging import enable_console_logging
from diffsync import Adapter
from diffsync.store.redis import RedisStore

# Enable debug logging to see detailed storage operations
enable_console_logging(verbosity=2)

# Now all storage operations will be logged
adapter = MyAdapter(
    internal_storage_engine=RedisStore(host="localhost")
)
adapter.load()  # Will show detailed Redis operations

# Sync operations will show storage backend interactions
target = MyAdapter()
adapter.sync_to(target)  # Will log all storage operations

Types

from typing import Any, Dict, List, Optional, Set, Tuple, Type, Union
from redis import Redis

# Storage backend configuration types
StorageConfig = Dict[str, Any]
ModelIdentifier = Union[str, Dict]
ModelSpec = Union[str, "DiffSyncModel", Type["DiffSyncModel"]]

Install with Tessl CLI

npx tessl i tessl/pypi-diffsync

docs

data-management.md

diff-calculation.md

flags-configuration.md

index.md

model-definition.md

storage-backends.md

synchronization.md

tile.json