CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-nacos-sdk-python

A Python implementation of Nacos OpenAPI for service discovery, configuration management, and service management

Pending
Overview
Eval results
Files

v2-naming.mddocs/

V2 Service Discovery (Asynchronous)

Modern asynchronous API for Nacos service discovery and registration operations with advanced features including GRPC support, batch operations, comprehensive data models, health monitoring, and subscription management.

Capabilities

Service Creation and Initialization

Create and initialize the asynchronous naming service.

class NacosNamingService:
    @staticmethod
    async def create_naming_service(client_config: ClientConfig) -> 'NacosNamingService':
        """
        Create and initialize a Nacos naming service.
        
        Args:
            client_config (ClientConfig): Client configuration containing server details,
                                        authentication, and operational parameters
                                        
        Returns:
            NacosNamingService: Initialized naming service instance
            
        Raises:
            NacosException: If client_config is invalid or initialization fails
        """

Usage example:

import asyncio
from v2.nacos import ClientConfig, NacosNamingService

async def main():
    # Create client configuration
    client_config = ClientConfig(
        server_addresses="127.0.0.1:8848",
        namespace_id="production",
        username="nacos",
        password="nacos"
    )
    
    # Create naming service
    naming_service = await NacosNamingService.create_naming_service(client_config)
    
    # Use the service...
    
    # Always shutdown when done
    await naming_service.shutdown()

asyncio.run(main())

Service Instance Registration

Register service instances with comprehensive metadata and configuration options.

async def register_instance(self, request: RegisterInstanceParam) -> bool:
    """
    Register a service instance.
    
    Args:
        request (RegisterInstanceParam): Instance registration parameters
        
    Returns:
        bool: True if registration successful
        
    Raises:
        NacosException: If service_name is empty or invalid
    """

Usage example:

from v2.nacos import RegisterInstanceParam

# Basic instance registration
register_param = RegisterInstanceParam(
    service_name="user-service",
    ip="192.168.1.100",
    port=8080
)
success = await naming_service.register_instance(register_param)
print(f"Registration successful: {success}")

# Advanced registration with metadata
register_param = RegisterInstanceParam(
    service_name="user-service",
    ip="192.168.1.100",
    port=8080,
    weight=1.5,
    cluster_name="production",
    group_name="microservices",
    ephemeral=True,
    enabled=True,
    healthy=True,
    metadata={
        "version": "1.2.0",
        "region": "us-west-1",
        "zone": "us-west-1a",
        "protocol": "http",
        "management.port": "8081"
    }
)
await naming_service.register_instance(register_param)

# Register with custom health check intervals
register_param = RegisterInstanceParam(
    service_name="critical-service",
    ip="192.168.1.200",
    port=9090,
    metadata={
        "preserved.heart.beat.timeout": "10000",  # 10 seconds
        "preserved.heart.beat.interval": "3000",   # 3 seconds
        "preserved.ip.delete.timeout": "30000"     # 30 seconds
    }
)
await naming_service.register_instance(register_param)

Batch Service Instance Registration

Register multiple service instances in a single operation.

async def batch_register_instances(self, request: BatchRegisterInstanceParam) -> bool:
    """
    Register multiple service instances in batch.
    
    Args:
        request (BatchRegisterInstanceParam): Batch registration parameters
        
    Returns:
        bool: True if batch registration successful
        
    Raises:
        NacosException: If service_name is empty or instances list is empty
    """

async def batch_deregister_instances(self, request: BatchRegisterInstanceParam) -> bool:
    """
    Deregister multiple service instances in batch.
    
    Args:
        request (BatchRegisterInstanceParam): Batch deregistration parameters
        
    Returns:
        bool: True if batch deregistration successful
    """

Usage example:

from v2.nacos import BatchRegisterInstanceParam, RegisterInstanceParam

# Create multiple instances
instances = [
    RegisterInstanceParam(
        service_name="user-service",
        ip="192.168.1.100",
        port=8080,
        cluster_name="production",
        metadata={"node": "node1"}
    ),
    RegisterInstanceParam(
        service_name="user-service", 
        ip="192.168.1.101",
        port=8080,
        cluster_name="production",
        metadata={"node": "node2"}
    ),
    RegisterInstanceParam(
        service_name="user-service",
        ip="192.168.1.102", 
        port=8080,
        cluster_name="production",
        metadata={"node": "node3"}
    )
]

# Batch register instances
batch_param = BatchRegisterInstanceParam(
    service_name="user-service",
    group_name="microservices",
    instances=instances
)
success = await naming_service.batch_register_instances(batch_param)
print(f"Batch registration successful: {success}")

# Batch deregister instances
success = await naming_service.batch_deregister_instances(batch_param)
print(f"Batch deregistration successful: {success}")

Service Instance Deregistration

Remove service instances from Nacos registry.

async def deregister_instance(self, request: DeregisterInstanceParam) -> bool:
    """
    Deregister a service instance.
    
    Args:
        request (DeregisterInstanceParam): Instance deregistration parameters
        
    Returns:
        bool: True if deregistration successful
        
    Raises:
        NacosException: If service_name is empty or invalid
    """

Usage example:

from v2.nacos import DeregisterInstanceParam

# Basic instance deregistration
deregister_param = DeregisterInstanceParam(
    service_name="user-service",
    ip="192.168.1.100",
    port=8080
)
success = await naming_service.deregister_instance(deregister_param)
print(f"Deregistration successful: {success}")

# Deregister from specific cluster and group
deregister_param = DeregisterInstanceParam(
    service_name="user-service",
    ip="192.168.1.100",
    port=8080,
    cluster_name="production",
    group_name="microservices",
    ephemeral=True
)
await naming_service.deregister_instance(deregister_param)

Service Instance Updates

Update existing service instance properties.

async def update_instance(self, request: RegisterInstanceParam) -> bool:
    """
    Update a service instance. Uses the same parameters as registration.
    
    Args:
        request (RegisterInstanceParam): Instance update parameters
        
    Returns:
        bool: True if update successful
        
    Raises:
        NacosException: If service_name is empty or invalid
    """

Usage example:

# Update instance weight and metadata
update_param = RegisterInstanceParam(
    service_name="user-service",
    ip="192.168.1.100",
    port=8080,
    weight=2.0,  # Increased weight
    metadata={
        "version": "1.3.0",  # Updated version
        "status": "optimized",
        "cpu_usage": "low"
    }
)
success = await naming_service.update_instance(update_param)
print(f"Update successful: {success}")

# Toggle instance availability
update_param = RegisterInstanceParam(
    service_name="user-service",
    ip="192.168.1.100",
    port=8080,
    enabled=False,  # Disable instance
    healthy=False   # Mark as unhealthy
)
await naming_service.update_instance(update_param)

Service Instance Listing

Retrieve lists of service instances with filtering options.

async def list_instances(self, request: ListInstanceParam) -> List[Instance]:
    """
    List service instances.
    
    Args:
        request (ListInstanceParam): Instance listing parameters
        
    Returns:
        List[Instance]: List of service instances
        
    Raises:
        NacosException: If service_name is empty or invalid
    """

Usage example:

from v2.nacos import ListInstanceParam

# List all instances
list_param = ListInstanceParam(service_name="user-service")
instances = await naming_service.list_instances(list_param)
for instance in instances:
    print(f"Instance: {instance.ip}:{instance.port} - healthy: {instance.healthy}")

# List healthy instances only
list_param = ListInstanceParam(
    service_name="user-service",
    healthy_only=True
)
healthy_instances = await naming_service.list_instances(list_param)

# List instances from specific clusters
list_param = ListInstanceParam(
    service_name="user-service",
    clusters=["production", "staging"],
    group_name="microservices"
)
instances = await naming_service.list_instances(list_param)

# List with subscription (enables real-time updates)
list_param = ListInstanceParam(
    service_name="user-service",
    subscribe=True,  # Enable real-time updates
    healthy_only=True
)
instances = await naming_service.list_instances(list_param)

Service Information Retrieval

Get detailed service metadata and configuration.

async def get_service(self, request: GetServiceParam) -> Service:
    """
    Get service information.
    
    Args:
        request (GetServiceParam): Service query parameters
        
    Returns:
        Service: Service information including metadata and instances
        
    Raises:
        NacosException: If service_name is empty or invalid
    """

async def list_services(self, request: ListServiceParam) -> ServiceList:
    """
    List services with pagination.
    
    Args:
        request (ListServiceParam): Service listing parameters
        
    Returns:
        ServiceList: Paginated list of services
    """

Usage example:

from v2.nacos import GetServiceParam, ListServiceParam

# Get detailed service information
service_param = GetServiceParam(
    service_name="user-service",
    group_name="microservices"
)
service = await naming_service.get_service(service_param)
print(f"Service: {service.name}")
print(f"Clusters: {service.clusters}")
print(f"Instance count: {len(service.hosts)}")

# Get service from specific clusters
service_param = GetServiceParam(
    service_name="user-service",
    clusters=["production"]
)
service = await naming_service.get_service(service_param)

# List all services with pagination
list_param = ListServiceParam(
    namespace_id="production",
    group_name="microservices",
    page_no=1,
    page_size=20
)
service_list = await naming_service.list_services(list_param)
print(f"Total services: {service_list.count}")
for service_name in service_list.doms:
    print(f"Service: {service_name}")

# List services from different namespace
list_param = ListServiceParam(
    namespace_id="staging",
    page_no=1,
    page_size=50
)
staging_services = await naming_service.list_services(list_param)

Service Subscription Management

Subscribe to service change notifications for real-time updates.

async def subscribe(self, request: SubscribeServiceParam) -> None:
    """
    Subscribe to service changes.
    
    Args:
        request (SubscribeServiceParam): Service subscription parameters
    """

async def unsubscribe(self, request: SubscribeServiceParam) -> None:
    """
    Unsubscribe from service changes.
    
    Args:
        request (SubscribeServiceParam): Service subscription parameters to remove
    """

Usage example:

from v2.nacos import SubscribeServiceParam

# Define async callback for service changes
async def service_change_callback(service_info):
    print(f"Service changed: {service_info.name}")
    print(f"Instance count: {len(service_info.hosts)}")
    
    # Process each instance
    for instance in service_info.hosts:
        status = "healthy" if instance.healthy else "unhealthy"
        print(f"  {instance.ip}:{instance.port} - {status}")
    
    # Handle service changes asynchronously
    await update_load_balancer(service_info)

async def update_load_balancer(service_info):
    # Your async service update logic
    print(f"Updating load balancer for {service_info.name}")

# Subscribe to service changes
subscribe_param = SubscribeServiceParam(
    service_name="user-service",
    group_name="microservices", 
    subscribe_callback=service_change_callback
)
await naming_service.subscribe(subscribe_param)

# Subscribe to specific clusters
subscribe_param = SubscribeServiceParam(
    service_name="user-service",
    clusters=["production", "staging"],
    subscribe_callback=service_change_callback
)
await naming_service.subscribe(subscribe_param)

# Unsubscribe from service changes
unsubscribe_param = SubscribeServiceParam(
    service_name="user-service",
    group_name="microservices"
)
await naming_service.unsubscribe(unsubscribe_param)

Health Monitoring

Check service and server health status.

async def server_health(self) -> bool:
    """
    Check if Nacos server is healthy.
    
    Returns:
        bool: True if server is healthy
    """

Usage example:

# Check server health
is_healthy = await naming_service.server_health()
if is_healthy:
    print("Nacos server is healthy")
else:
    print("Nacos server is unhealthy")

# Health monitoring loop
async def health_monitor():
    while True:
        try:
            healthy = await naming_service.server_health()
            print(f"Server health: {'OK' if healthy else 'FAIL'}")
            
            if not healthy:
                # Handle unhealthy server
                await handle_server_unhealthy()
                
            await asyncio.sleep(30)  # Check every 30 seconds
        except Exception as e:
            print(f"Health check failed: {e}")
            await asyncio.sleep(10)

async def handle_server_unhealthy():
    print("Handling unhealthy server...")
    # Implement fallback logic

# Start health monitoring
asyncio.create_task(health_monitor())

Service Shutdown

Properly shutdown the naming service and cleanup resources.

async def shutdown(self) -> None:
    """
    Shutdown the naming service and cleanup resources.
    This should always be called when the service is no longer needed.
    """

Usage example:

async def main():
    naming_service = None
    try:
        # Create service
        client_config = ClientConfig(server_addresses="127.0.0.1:8848")
        naming_service = await NacosNamingService.create_naming_service(client_config)
        
        # Register instance
        register_param = RegisterInstanceParam(
            service_name="test-service",
            ip="127.0.0.1",
            port=8080
        )
        await naming_service.register_instance(register_param)
        
    except Exception as e:
        print(f"Error: {e}")
    finally:
        # Always shutdown
        if naming_service:
            await naming_service.shutdown()

Data Models

Instance Model

class Instance(BaseModel):
    instanceId: str = ''
    ip: str
    port: int
    weight: float = 1.0
    healthy: bool = True
    enabled: bool = True
    ephemeral: bool = True
    clusterName: str = ''
    serviceName: str = ''
    metadata: dict = {}
    
    def to_inet_addr(self) -> str: ...
    def is_ephemeral(self) -> bool: ...
    def get_weight(self) -> float: ...
    def add_metadata(self, key: str, value: str) -> None: ...
    def contains_metadata(self, key: str) -> bool: ...
    def check_instance_is_legal(self): ...

Parameter Models

class RegisterInstanceParam(BaseModel):
    ip: str
    port: int  
    weight: float = 1.0
    enabled: bool = True
    healthy: bool = True
    metadata: Dict[str, str] = {}
    clusterName: str = ''
    serviceName: str
    groupName: str = 'DEFAULT_GROUP'
    ephemeral: bool = True

class DeregisterInstanceParam(BaseModel):
    ip: str
    port: int
    clusterName: str = ''
    serviceName: str
    groupName: str = 'DEFAULT_GROUP'
    ephemeral: bool = True

class ListInstanceParam(BaseModel):
    serviceName: str
    groupName: str = 'DEFAULT_GROUP'
    clusters: List[str] = []
    subscribe: bool = True
    healthyOnly: Optional[bool] = None

class SubscribeServiceParam(BaseModel):
    serviceName: str
    groupName: str = 'DEFAULT_GROUP'
    clusters: List[str] = []
    subscribeCallback: Optional[Callable] = None

class GetServiceParam(BaseModel):
    serviceName: str
    groupName: str = 'DEFAULT_GROUP'
    clusters: List[str] = []

class ListServiceParam(BaseModel):
    namespaceId: str = 'public'
    groupName: str = 'DEFAULT_GROUP'
    pageNo: int = 1
    pageSize: int = 10

Service Models

class Service(BaseModel):
    name: str
    groupName: str
    clusters: str
    cacheMillis: int
    hosts: List[Instance]
    lastRefTime: int
    checksum: str
    allIPs: bool
    reachProtectionThreshold: bool

class ServiceList(BaseModel):
    count: int
    services: List[str]

Advanced Features

Automatic Service Updates

The V2 API includes automatic service information updates:

# Enable automatic service updates in ClientConfig
client_config = ClientConfig(
    server_addresses="127.0.0.1:8848",
    async_update_service=True,
    update_thread_num=10  # Number of update threads
)

Instance Health Management

Instances support automatic health check intervals through metadata:

# Configure health check intervals
health_metadata = {
    "preserved.heart.beat.interval": "5000",    # 5 seconds
    "preserved.heart.beat.timeout": "15000",    # 15 seconds  
    "preserved.ip.delete.timeout": "30000"      # 30 seconds
}

register_param = RegisterInstanceParam(
    service_name="health-monitored-service",
    ip="192.168.1.100",
    port=8080,
    metadata=health_metadata
)

Service Instance Caching

The V2 API provides intelligent caching for improved performance:

  • Service Info Cache: Automatic caching of service instance lists
  • Cache Updates: Real-time cache updates through subscriptions
  • Cache Persistence: Optional persistent caching for offline scenarios
  • Cache Invalidation: Automatic cache invalidation on service changes

Install with Tessl CLI

npx tessl i tessl/pypi-nacos-sdk-python

docs

client-config.md

index.md

v1-config.md

v1-naming.md

v2-config.md

v2-naming.md

tile.json