Client library for the Qdrant vector search engine
—
Distributed operation support including shard key management, cluster operations, and multi-tenant configurations.
Create and manage shard keys for data distribution and isolation.
def create_shard_key(
self,
collection_name: str,
shard_key: ShardKey,
shards_number: Optional[int] = None,
replication_factor: Optional[int] = None,
placement: Optional[List[int]] = None,
timeout: Optional[int] = None,
**kwargs
) -> bool:
"""
Create a shard key for collection.
Parameters:
- collection_name: Name of the collection
- shard_key: Shard key value
- shards_number: Number of shards for this key
- replication_factor: Replication factor for shards
- placement: Specific nodes for shard placement
- timeout: Request timeout
Returns:
bool: True if shard key created successfully
"""
def delete_shard_key(
self,
collection_name: str,
shard_key: ShardKey,
timeout: Optional[int] = None,
**kwargs
) -> bool:
"""
Delete a shard key from collection.
Parameters:
- collection_name: Name of the collection
- shard_key: Shard key value to delete
- timeout: Request timeout
Returns:
bool: True if shard key deleted successfully
"""
def list_shard_keys(
self,
collection_name: str,
**kwargs
) -> List[ShardKeyInfo]:
"""
List all shard keys for collection.
Parameters:
- collection_name: Name of the collection
Returns:
List[ShardKeyInfo]: List of shard key information
"""Usage examples:
from qdrant_client import models
# Create shard keys for multi-tenant application
client.create_shard_key(
collection_name="multi_tenant_collection",
shard_key="tenant_1",
shards_number=2,
replication_factor=1
)
client.create_shard_key(
collection_name="multi_tenant_collection",
shard_key="tenant_2",
shards_number=3,
replication_factor=2
)
# Use shard key in operations
client.upsert(
collection_name="multi_tenant_collection",
points=[
models.PointStruct(
id=1,
vector=[0.1, 0.2, 0.3],
payload={"data": "tenant_1_data"}
)
],
shard_key_selector=models.ShardKeySelector(shard_keys=["tenant_1"])
)Get information about cluster nodes and health.
def get_cluster_info(self, **kwargs) -> ClusterInfo:
"""
Get cluster information and status.
Returns:
ClusterInfo: Cluster status and node information
"""
def get_node_info(self, **kwargs) -> NodeInfo:
"""
Get current node information.
Returns:
NodeInfo: Current node status and configuration
"""Manage individual shards and their distribution.
def get_collection_cluster_info(
self,
collection_name: str,
**kwargs
) -> CollectionClusterInfo:
"""
Get cluster information for specific collection.
Parameters:
- collection_name: Name of the collection
Returns:
CollectionClusterInfo: Collection cluster status
"""
def update_collection_cluster_setup(
self,
collection_name: str,
move_shard: Optional[MoveShard] = None,
replicate_shard: Optional[ReplicateShard] = None,
abort_transfer: Optional[AbortTransfer] = None,
drop_replica: Optional[DropReplica] = None,
timeout: Optional[int] = None,
**kwargs
) -> bool:
"""
Update collection cluster configuration.
Parameters:
- collection_name: Name of the collection
- move_shard: Move shard between nodes
- replicate_shard: Create shard replica
- abort_transfer: Abort ongoing transfer
- drop_replica: Drop shard replica
- timeout: Request timeout
Returns:
bool: True if operation initiated successfully
"""# Shard key can be string or integer
ShardKey = Union[str, int]
class ShardKeySelector(BaseModel):
shard_keys: List[ShardKey] # List of shard keys to targetAll data operations support shard key routing:
# Point operations with shard key
client.upsert(
collection_name="collection",
points=points,
shard_key_selector=models.ShardKeySelector(shard_keys=["tenant_1"])
)
client.delete(
collection_name="collection",
points_selector=selector,
shard_key_selector=models.ShardKeySelector(shard_keys=["tenant_1"])
)
# Search operations with shard key
results = client.query_points(
collection_name="collection",
query=query_vector,
shard_key_selector=models.ShardKeySelector(shard_keys=["tenant_1"])
)
# Scroll with shard key
records, next_offset = client.scroll(
collection_name="collection",
shard_key_selector=models.ShardKeySelector(shard_keys=["tenant_1"])
)class ClusterInfo(BaseModel):
peer_id: int # Current node peer ID
peers: Dict[int, PeerInfo] # Information about all peers
raft_info: RaftInfo # Raft consensus information
class PeerInfo(BaseModel):
uri: str # Peer URI
state: PeerState # Peer connection state
class PeerState(str, Enum):
ALIVE = "Alive"
DEAD = "Dead"
PARTIAL = "Partial"
class RaftInfo(BaseModel):
term: int # Current Raft term
commit: int # Last committed entry
pending_operations: int # Pending operations count
leader: Optional[int] = None # Leader peer ID
role: RaftRole # Current node role
class RaftRole(str, Enum):
LEADER = "Leader"
FOLLOWER = "Follower"
CANDIDATE = "Candidate"class CollectionClusterInfo(BaseModel):
peer_id: int # Current peer ID
shard_count: int # Total number of shards
local_shards: List[LocalShardInfo] # Shards on current node
remote_shards: List[RemoteShardInfo] # Shards on remote nodes
shard_transfers: List[ShardTransferInfo] # Ongoing transfers
class LocalShardInfo(BaseModel):
shard_id: int # Shard identifier
points_count: int # Number of points in shard
state: ShardState # Shard state
class RemoteShardInfo(BaseModel):
shard_id: int # Shard identifier
peer_id: int # Peer hosting the shard
state: ShardState # Shard state
class ShardState(str, Enum):
ACTIVE = "Active"
DEAD = "Dead"
PARTIAL = "Partial"
INITIALIZING = "Initializing"
LISTENER = "Listener"class MoveShard(BaseModel):
shard_id: int # Shard to move
from_peer_id: int # Source peer
to_peer_id: int # Destination peer
method: Optional[ShardTransferMethod] = None # Transfer method
class ReplicateShard(BaseModel):
shard_id: int # Shard to replicate
from_peer_id: int # Source peer
to_peer_id: int # Destination peer
method: Optional[ShardTransferMethod] = None # Transfer method
class DropReplica(BaseModel):
shard_id: int # Shard replica to drop
peer_id: int # Peer to drop replica from
class AbortTransfer(BaseModel):
shard_id: int # Shard transfer to abort
from_peer_id: int # Source peer
to_peer_id: int # Destination peer
class ShardTransferMethod(str, Enum):
STREAM_RECORDS = "stream_records" # Stream individual records
SNAPSHOT = "snapshot" # Transfer via snapshotUse shard keys to isolate tenant data:
class TenantClient:
def __init__(self, client: QdrantClient, tenant_id: str):
self.client = client
self.tenant_id = tenant_id
self.shard_selector = models.ShardKeySelector(shard_keys=[tenant_id])
def upsert_points(self, collection_name: str, points: List[models.PointStruct]):
return self.client.upsert(
collection_name=collection_name,
points=points,
shard_key_selector=self.shard_selector
)
def search(self, collection_name: str, query_vector: List[float], **kwargs):
return self.client.query_points(
collection_name=collection_name,
query=query_vector,
shard_key_selector=self.shard_selector,
**kwargs
)
# Usage
tenant_client = TenantClient(client, "tenant_1")
tenant_client.upsert_points("shared_collection", points)
results = tenant_client.search("shared_collection", query_vector)Use multiple shard keys for hierarchical data organization:
# Create nested shard keys for organization/team structure
client.create_shard_key(collection_name="docs", shard_key="org_1")
client.create_shard_key(collection_name="docs", shard_key="org_1_team_a")
client.create_shard_key(collection_name="docs", shard_key="org_1_team_b")
# Query across multiple related shards
results = client.query_points(
collection_name="docs",
query=query_vector,
shard_key_selector=models.ShardKeySelector(
shard_keys=["org_1_team_a", "org_1_team_b"] # Search both teams
)
)# High availability setup
client.create_shard_key(
collection_name="critical_data",
shard_key="important_tenant",
shards_number=3, # Distribute across 3 shards
replication_factor=2 # 2 replicas per shard
)def monitor_cluster_health(client: QdrantClient):
cluster_info = client.get_cluster_info()
print(f"Leader: {cluster_info.raft_info.leader}")
print(f"Pending operations: {cluster_info.raft_info.pending_operations}")
for peer_id, peer_info in cluster_info.peers.items():
print(f"Peer {peer_id}: {peer_info.state} ({peer_info.uri})")
# Check collection-specific cluster status
for collection in client.get_collections().collections:
coll_cluster = client.get_collection_cluster_info(collection.name)
transfers = len(coll_cluster.shard_transfers)
if transfers > 0:
print(f"Collection {collection.name}: {transfers} ongoing transfers")Install with Tessl CLI
npx tessl i tessl/pypi-qdrant-client