CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-qdrant-client

Client library for the Qdrant vector search engine

Pending
Overview
Eval results
Files

clustering-sharding.mddocs/

Clustering & Sharding

Distributed operation support including shard key management, cluster operations, and multi-tenant configurations.

Capabilities

Shard Key Management

Create and manage shard keys for data distribution and isolation.

def create_shard_key(
    self,
    collection_name: str,
    shard_key: ShardKey,
    shards_number: Optional[int] = None,
    replication_factor: Optional[int] = None,
    placement: Optional[List[int]] = None,
    timeout: Optional[int] = None,
    **kwargs
) -> bool:
    """
    Create a shard key for collection.

    Parameters:
    - collection_name: Name of the collection
    - shard_key: Shard key value
    - shards_number: Number of shards for this key
    - replication_factor: Replication factor for shards
    - placement: Specific nodes for shard placement
    - timeout: Request timeout

    Returns:
        bool: True if shard key created successfully
    """

def delete_shard_key(
    self,
    collection_name: str,
    shard_key: ShardKey,
    timeout: Optional[int] = None,
    **kwargs
) -> bool:
    """
    Delete a shard key from collection.

    Parameters:
    - collection_name: Name of the collection
    - shard_key: Shard key value to delete
    - timeout: Request timeout

    Returns:
        bool: True if shard key deleted successfully
    """

def list_shard_keys(
    self,
    collection_name: str,
    **kwargs
) -> List[ShardKeyInfo]:
    """
    List all shard keys for collection.

    Parameters:
    - collection_name: Name of the collection

    Returns:
        List[ShardKeyInfo]: List of shard key information
    """

Usage examples:

from qdrant_client import models

# Create shard keys for multi-tenant application
client.create_shard_key(
    collection_name="multi_tenant_collection",
    shard_key="tenant_1",
    shards_number=2,
    replication_factor=1
)

client.create_shard_key(
    collection_name="multi_tenant_collection", 
    shard_key="tenant_2",
    shards_number=3,
    replication_factor=2
)

# Use shard key in operations
client.upsert(
    collection_name="multi_tenant_collection",
    points=[
        models.PointStruct(
            id=1,
            vector=[0.1, 0.2, 0.3],
            payload={"data": "tenant_1_data"}
        )
    ],
    shard_key_selector=models.ShardKeySelector(shard_keys=["tenant_1"])
)

Cluster Information

Get information about cluster nodes and health.

def get_cluster_info(self, **kwargs) -> ClusterInfo:
    """
    Get cluster information and status.

    Returns:
        ClusterInfo: Cluster status and node information
    """

def get_node_info(self, **kwargs) -> NodeInfo:
    """
    Get current node information.

    Returns:
        NodeInfo: Current node status and configuration
    """

Shard Operations

Manage individual shards and their distribution.

def get_collection_cluster_info(
    self,
    collection_name: str,
    **kwargs
) -> CollectionClusterInfo:
    """
    Get cluster information for specific collection.

    Parameters:
    - collection_name: Name of the collection

    Returns:
        CollectionClusterInfo: Collection cluster status
    """

def update_collection_cluster_setup(
    self,
    collection_name: str,
    move_shard: Optional[MoveShard] = None,
    replicate_shard: Optional[ReplicateShard] = None,
    abort_transfer: Optional[AbortTransfer] = None,
    drop_replica: Optional[DropReplica] = None,
    timeout: Optional[int] = None,
    **kwargs
) -> bool:
    """
    Update collection cluster configuration.

    Parameters:
    - collection_name: Name of the collection
    - move_shard: Move shard between nodes
    - replicate_shard: Create shard replica
    - abort_transfer: Abort ongoing transfer
    - drop_replica: Drop shard replica
    - timeout: Request timeout

    Returns:
        bool: True if operation initiated successfully
    """

Shard Key Types and Selectors

Shard Key Definition

# Shard key can be string or integer
ShardKey = Union[str, int]

class ShardKeySelector(BaseModel):
    shard_keys: List[ShardKey]  # List of shard keys to target

Shard Key Usage in Operations

All data operations support shard key routing:

# Point operations with shard key
client.upsert(
    collection_name="collection",
    points=points,
    shard_key_selector=models.ShardKeySelector(shard_keys=["tenant_1"])
)

client.delete(
    collection_name="collection",
    points_selector=selector,
    shard_key_selector=models.ShardKeySelector(shard_keys=["tenant_1"])
)

# Search operations with shard key
results = client.query_points(
    collection_name="collection",
    query=query_vector,
    shard_key_selector=models.ShardKeySelector(shard_keys=["tenant_1"])
)

# Scroll with shard key
records, next_offset = client.scroll(
    collection_name="collection",
    shard_key_selector=models.ShardKeySelector(shard_keys=["tenant_1"])
)

Cluster Management Types

Cluster Information

class ClusterInfo(BaseModel):
    peer_id: int  # Current node peer ID
    peers: Dict[int, PeerInfo]  # Information about all peers
    raft_info: RaftInfo  # Raft consensus information

class PeerInfo(BaseModel):
    uri: str  # Peer URI
    state: PeerState  # Peer connection state

class PeerState(str, Enum):
    ALIVE = "Alive"
    DEAD = "Dead"
    PARTIAL = "Partial"

class RaftInfo(BaseModel):
    term: int  # Current Raft term
    commit: int  # Last committed entry
    pending_operations: int  # Pending operations count
    leader: Optional[int] = None  # Leader peer ID
    role: RaftRole  # Current node role

class RaftRole(str, Enum):
    LEADER = "Leader"
    FOLLOWER = "Follower"
    CANDIDATE = "Candidate"

Collection Cluster Information

class CollectionClusterInfo(BaseModel):
    peer_id: int  # Current peer ID
    shard_count: int  # Total number of shards
    local_shards: List[LocalShardInfo]  # Shards on current node
    remote_shards: List[RemoteShardInfo]  # Shards on remote nodes
    shard_transfers: List[ShardTransferInfo]  # Ongoing transfers

class LocalShardInfo(BaseModel):
    shard_id: int  # Shard identifier
    points_count: int  # Number of points in shard
    state: ShardState  # Shard state

class RemoteShardInfo(BaseModel):
    shard_id: int  # Shard identifier
    peer_id: int  # Peer hosting the shard
    state: ShardState  # Shard state

class ShardState(str, Enum):
    ACTIVE = "Active"
    DEAD = "Dead"
    PARTIAL = "Partial"
    INITIALIZING = "Initializing"
    LISTENER = "Listener"

Shard Operations

class MoveShard(BaseModel):
    shard_id: int  # Shard to move
    from_peer_id: int  # Source peer
    to_peer_id: int  # Destination peer
    method: Optional[ShardTransferMethod] = None  # Transfer method

class ReplicateShard(BaseModel):
    shard_id: int  # Shard to replicate
    from_peer_id: int  # Source peer
    to_peer_id: int  # Destination peer
    method: Optional[ShardTransferMethod] = None  # Transfer method

class DropReplica(BaseModel):
    shard_id: int  # Shard replica to drop
    peer_id: int  # Peer to drop replica from

class AbortTransfer(BaseModel):
    shard_id: int  # Shard transfer to abort
    from_peer_id: int  # Source peer
    to_peer_id: int  # Destination peer

class ShardTransferMethod(str, Enum):
    STREAM_RECORDS = "stream_records"  # Stream individual records
    SNAPSHOT = "snapshot"  # Transfer via snapshot

Multi-Tenant Patterns

Tenant Isolation

Use shard keys to isolate tenant data:

class TenantClient:
    def __init__(self, client: QdrantClient, tenant_id: str):
        self.client = client
        self.tenant_id = tenant_id
        self.shard_selector = models.ShardKeySelector(shard_keys=[tenant_id])
    
    def upsert_points(self, collection_name: str, points: List[models.PointStruct]):
        return self.client.upsert(
            collection_name=collection_name,
            points=points,
            shard_key_selector=self.shard_selector
        )
    
    def search(self, collection_name: str, query_vector: List[float], **kwargs):
        return self.client.query_points(
            collection_name=collection_name,
            query=query_vector,
            shard_key_selector=self.shard_selector,
            **kwargs
        )

# Usage
tenant_client = TenantClient(client, "tenant_1")
tenant_client.upsert_points("shared_collection", points)
results = tenant_client.search("shared_collection", query_vector)

Hierarchical Sharding

Use multiple shard keys for hierarchical data organization:

# Create nested shard keys for organization/team structure
client.create_shard_key(collection_name="docs", shard_key="org_1")
client.create_shard_key(collection_name="docs", shard_key="org_1_team_a")
client.create_shard_key(collection_name="docs", shard_key="org_1_team_b")

# Query across multiple related shards
results = client.query_points(
    collection_name="docs",
    query=query_vector,
    shard_key_selector=models.ShardKeySelector(
        shard_keys=["org_1_team_a", "org_1_team_b"]  # Search both teams
    )
)

Performance Considerations

Shard Key Distribution

  • Balanced sharding: Ensure even data distribution across shard keys
  • Shard key cardinality: Higher cardinality improves parallel processing
  • Query patterns: Align shard keys with common query patterns
  • Cross-shard queries: Minimize queries spanning multiple shards

Replication Strategy

# High availability setup
client.create_shard_key(
    collection_name="critical_data",
    shard_key="important_tenant",
    shards_number=3,        # Distribute across 3 shards
    replication_factor=2    # 2 replicas per shard
)

Monitoring Cluster Health

def monitor_cluster_health(client: QdrantClient):
    cluster_info = client.get_cluster_info()
    
    print(f"Leader: {cluster_info.raft_info.leader}")
    print(f"Pending operations: {cluster_info.raft_info.pending_operations}")
    
    for peer_id, peer_info in cluster_info.peers.items():
        print(f"Peer {peer_id}: {peer_info.state} ({peer_info.uri})")
    
    # Check collection-specific cluster status
    for collection in client.get_collections().collections:
        coll_cluster = client.get_collection_cluster_info(collection.name)
        transfers = len(coll_cluster.shard_transfers)
        if transfers > 0:
            print(f"Collection {collection.name}: {transfers} ongoing transfers")

Install with Tessl CLI

npx tessl i tessl/pypi-qdrant-client

docs

client-setup.md

clustering-sharding.md

collection-management.md

fastembed-integration.md

index.md

indexing-optimization.md

search-query.md

snapshots-backup.md

vector-operations.md

tile.json