A Python implementation of Nacos OpenAPI for service discovery, configuration management, and service management
—
Modern asynchronous API for Nacos service discovery and registration operations with advanced features including GRPC support, batch operations, comprehensive data models, health monitoring, and subscription management.
Create and initialize the asynchronous naming service.
class NacosNamingService:
@staticmethod
async def create_naming_service(client_config: ClientConfig) -> 'NacosNamingService':
"""
Create and initialize a Nacos naming service.
Args:
client_config (ClientConfig): Client configuration containing server details,
authentication, and operational parameters
Returns:
NacosNamingService: Initialized naming service instance
Raises:
NacosException: If client_config is invalid or initialization fails
"""Usage example:
import asyncio
from v2.nacos import ClientConfig, NacosNamingService
async def main():
# Create client configuration
client_config = ClientConfig(
server_addresses="127.0.0.1:8848",
namespace_id="production",
username="nacos",
password="nacos"
)
# Create naming service
naming_service = await NacosNamingService.create_naming_service(client_config)
# Use the service...
# Always shutdown when done
await naming_service.shutdown()
asyncio.run(main())Register service instances with comprehensive metadata and configuration options.
async def register_instance(self, request: RegisterInstanceParam) -> bool:
"""
Register a service instance.
Args:
request (RegisterInstanceParam): Instance registration parameters
Returns:
bool: True if registration successful
Raises:
NacosException: If service_name is empty or invalid
"""Usage example:
from v2.nacos import RegisterInstanceParam
# Basic instance registration
register_param = RegisterInstanceParam(
service_name="user-service",
ip="192.168.1.100",
port=8080
)
success = await naming_service.register_instance(register_param)
print(f"Registration successful: {success}")
# Advanced registration with metadata
register_param = RegisterInstanceParam(
service_name="user-service",
ip="192.168.1.100",
port=8080,
weight=1.5,
cluster_name="production",
group_name="microservices",
ephemeral=True,
enabled=True,
healthy=True,
metadata={
"version": "1.2.0",
"region": "us-west-1",
"zone": "us-west-1a",
"protocol": "http",
"management.port": "8081"
}
)
await naming_service.register_instance(register_param)
# Register with custom health check intervals
register_param = RegisterInstanceParam(
service_name="critical-service",
ip="192.168.1.200",
port=9090,
metadata={
"preserved.heart.beat.timeout": "10000", # 10 seconds
"preserved.heart.beat.interval": "3000", # 3 seconds
"preserved.ip.delete.timeout": "30000" # 30 seconds
}
)
await naming_service.register_instance(register_param)Register multiple service instances in a single operation.
async def batch_register_instances(self, request: BatchRegisterInstanceParam) -> bool:
"""
Register multiple service instances in batch.
Args:
request (BatchRegisterInstanceParam): Batch registration parameters
Returns:
bool: True if batch registration successful
Raises:
NacosException: If service_name is empty or instances list is empty
"""
async def batch_deregister_instances(self, request: BatchRegisterInstanceParam) -> bool:
"""
Deregister multiple service instances in batch.
Args:
request (BatchRegisterInstanceParam): Batch deregistration parameters
Returns:
bool: True if batch deregistration successful
"""Usage example:
from v2.nacos import BatchRegisterInstanceParam, RegisterInstanceParam
# Create multiple instances
instances = [
RegisterInstanceParam(
service_name="user-service",
ip="192.168.1.100",
port=8080,
cluster_name="production",
metadata={"node": "node1"}
),
RegisterInstanceParam(
service_name="user-service",
ip="192.168.1.101",
port=8080,
cluster_name="production",
metadata={"node": "node2"}
),
RegisterInstanceParam(
service_name="user-service",
ip="192.168.1.102",
port=8080,
cluster_name="production",
metadata={"node": "node3"}
)
]
# Batch register instances
batch_param = BatchRegisterInstanceParam(
service_name="user-service",
group_name="microservices",
instances=instances
)
success = await naming_service.batch_register_instances(batch_param)
print(f"Batch registration successful: {success}")
# Batch deregister instances
success = await naming_service.batch_deregister_instances(batch_param)
print(f"Batch deregistration successful: {success}")Remove service instances from Nacos registry.
async def deregister_instance(self, request: DeregisterInstanceParam) -> bool:
"""
Deregister a service instance.
Args:
request (DeregisterInstanceParam): Instance deregistration parameters
Returns:
bool: True if deregistration successful
Raises:
NacosException: If service_name is empty or invalid
"""Usage example:
from v2.nacos import DeregisterInstanceParam
# Basic instance deregistration
deregister_param = DeregisterInstanceParam(
service_name="user-service",
ip="192.168.1.100",
port=8080
)
success = await naming_service.deregister_instance(deregister_param)
print(f"Deregistration successful: {success}")
# Deregister from specific cluster and group
deregister_param = DeregisterInstanceParam(
service_name="user-service",
ip="192.168.1.100",
port=8080,
cluster_name="production",
group_name="microservices",
ephemeral=True
)
await naming_service.deregister_instance(deregister_param)Update existing service instance properties.
async def update_instance(self, request: RegisterInstanceParam) -> bool:
"""
Update a service instance. Uses the same parameters as registration.
Args:
request (RegisterInstanceParam): Instance update parameters
Returns:
bool: True if update successful
Raises:
NacosException: If service_name is empty or invalid
"""Usage example:
# Update instance weight and metadata
update_param = RegisterInstanceParam(
service_name="user-service",
ip="192.168.1.100",
port=8080,
weight=2.0, # Increased weight
metadata={
"version": "1.3.0", # Updated version
"status": "optimized",
"cpu_usage": "low"
}
)
success = await naming_service.update_instance(update_param)
print(f"Update successful: {success}")
# Toggle instance availability
update_param = RegisterInstanceParam(
service_name="user-service",
ip="192.168.1.100",
port=8080,
enabled=False, # Disable instance
healthy=False # Mark as unhealthy
)
await naming_service.update_instance(update_param)Retrieve lists of service instances with filtering options.
async def list_instances(self, request: ListInstanceParam) -> List[Instance]:
"""
List service instances.
Args:
request (ListInstanceParam): Instance listing parameters
Returns:
List[Instance]: List of service instances
Raises:
NacosException: If service_name is empty or invalid
"""Usage example:
from v2.nacos import ListInstanceParam
# List all instances
list_param = ListInstanceParam(service_name="user-service")
instances = await naming_service.list_instances(list_param)
for instance in instances:
print(f"Instance: {instance.ip}:{instance.port} - healthy: {instance.healthy}")
# List healthy instances only
list_param = ListInstanceParam(
service_name="user-service",
healthy_only=True
)
healthy_instances = await naming_service.list_instances(list_param)
# List instances from specific clusters
list_param = ListInstanceParam(
service_name="user-service",
clusters=["production", "staging"],
group_name="microservices"
)
instances = await naming_service.list_instances(list_param)
# List with subscription (enables real-time updates)
list_param = ListInstanceParam(
service_name="user-service",
subscribe=True, # Enable real-time updates
healthy_only=True
)
instances = await naming_service.list_instances(list_param)Get detailed service metadata and configuration.
async def get_service(self, request: GetServiceParam) -> Service:
"""
Get service information.
Args:
request (GetServiceParam): Service query parameters
Returns:
Service: Service information including metadata and instances
Raises:
NacosException: If service_name is empty or invalid
"""
async def list_services(self, request: ListServiceParam) -> ServiceList:
"""
List services with pagination.
Args:
request (ListServiceParam): Service listing parameters
Returns:
ServiceList: Paginated list of services
"""Usage example:
from v2.nacos import GetServiceParam, ListServiceParam
# Get detailed service information
service_param = GetServiceParam(
service_name="user-service",
group_name="microservices"
)
service = await naming_service.get_service(service_param)
print(f"Service: {service.name}")
print(f"Clusters: {service.clusters}")
print(f"Instance count: {len(service.hosts)}")
# Get service from specific clusters
service_param = GetServiceParam(
service_name="user-service",
clusters=["production"]
)
service = await naming_service.get_service(service_param)
# List all services with pagination
list_param = ListServiceParam(
namespace_id="production",
group_name="microservices",
page_no=1,
page_size=20
)
service_list = await naming_service.list_services(list_param)
print(f"Total services: {service_list.count}")
for service_name in service_list.doms:
print(f"Service: {service_name}")
# List services from different namespace
list_param = ListServiceParam(
namespace_id="staging",
page_no=1,
page_size=50
)
staging_services = await naming_service.list_services(list_param)Subscribe to service change notifications for real-time updates.
async def subscribe(self, request: SubscribeServiceParam) -> None:
"""
Subscribe to service changes.
Args:
request (SubscribeServiceParam): Service subscription parameters
"""
async def unsubscribe(self, request: SubscribeServiceParam) -> None:
"""
Unsubscribe from service changes.
Args:
request (SubscribeServiceParam): Service subscription parameters to remove
"""Usage example:
from v2.nacos import SubscribeServiceParam
# Define async callback for service changes
async def service_change_callback(service_info):
print(f"Service changed: {service_info.name}")
print(f"Instance count: {len(service_info.hosts)}")
# Process each instance
for instance in service_info.hosts:
status = "healthy" if instance.healthy else "unhealthy"
print(f" {instance.ip}:{instance.port} - {status}")
# Handle service changes asynchronously
await update_load_balancer(service_info)
async def update_load_balancer(service_info):
# Your async service update logic
print(f"Updating load balancer for {service_info.name}")
# Subscribe to service changes
subscribe_param = SubscribeServiceParam(
service_name="user-service",
group_name="microservices",
subscribe_callback=service_change_callback
)
await naming_service.subscribe(subscribe_param)
# Subscribe to specific clusters
subscribe_param = SubscribeServiceParam(
service_name="user-service",
clusters=["production", "staging"],
subscribe_callback=service_change_callback
)
await naming_service.subscribe(subscribe_param)
# Unsubscribe from service changes
unsubscribe_param = SubscribeServiceParam(
service_name="user-service",
group_name="microservices"
)
await naming_service.unsubscribe(unsubscribe_param)Check service and server health status.
async def server_health(self) -> bool:
"""
Check if Nacos server is healthy.
Returns:
bool: True if server is healthy
"""Usage example:
# Check server health
is_healthy = await naming_service.server_health()
if is_healthy:
print("Nacos server is healthy")
else:
print("Nacos server is unhealthy")
# Health monitoring loop
async def health_monitor():
while True:
try:
healthy = await naming_service.server_health()
print(f"Server health: {'OK' if healthy else 'FAIL'}")
if not healthy:
# Handle unhealthy server
await handle_server_unhealthy()
await asyncio.sleep(30) # Check every 30 seconds
except Exception as e:
print(f"Health check failed: {e}")
await asyncio.sleep(10)
async def handle_server_unhealthy():
print("Handling unhealthy server...")
# Implement fallback logic
# Start health monitoring
asyncio.create_task(health_monitor())Properly shutdown the naming service and cleanup resources.
async def shutdown(self) -> None:
"""
Shutdown the naming service and cleanup resources.
This should always be called when the service is no longer needed.
"""Usage example:
async def main():
naming_service = None
try:
# Create service
client_config = ClientConfig(server_addresses="127.0.0.1:8848")
naming_service = await NacosNamingService.create_naming_service(client_config)
# Register instance
register_param = RegisterInstanceParam(
service_name="test-service",
ip="127.0.0.1",
port=8080
)
await naming_service.register_instance(register_param)
except Exception as e:
print(f"Error: {e}")
finally:
# Always shutdown
if naming_service:
await naming_service.shutdown()class Instance(BaseModel):
instanceId: str = ''
ip: str
port: int
weight: float = 1.0
healthy: bool = True
enabled: bool = True
ephemeral: bool = True
clusterName: str = ''
serviceName: str = ''
metadata: dict = {}
def to_inet_addr(self) -> str: ...
def is_ephemeral(self) -> bool: ...
def get_weight(self) -> float: ...
def add_metadata(self, key: str, value: str) -> None: ...
def contains_metadata(self, key: str) -> bool: ...
def check_instance_is_legal(self): ...class RegisterInstanceParam(BaseModel):
ip: str
port: int
weight: float = 1.0
enabled: bool = True
healthy: bool = True
metadata: Dict[str, str] = {}
clusterName: str = ''
serviceName: str
groupName: str = 'DEFAULT_GROUP'
ephemeral: bool = True
class DeregisterInstanceParam(BaseModel):
ip: str
port: int
clusterName: str = ''
serviceName: str
groupName: str = 'DEFAULT_GROUP'
ephemeral: bool = True
class ListInstanceParam(BaseModel):
serviceName: str
groupName: str = 'DEFAULT_GROUP'
clusters: List[str] = []
subscribe: bool = True
healthyOnly: Optional[bool] = None
class SubscribeServiceParam(BaseModel):
serviceName: str
groupName: str = 'DEFAULT_GROUP'
clusters: List[str] = []
subscribeCallback: Optional[Callable] = None
class GetServiceParam(BaseModel):
serviceName: str
groupName: str = 'DEFAULT_GROUP'
clusters: List[str] = []
class ListServiceParam(BaseModel):
namespaceId: str = 'public'
groupName: str = 'DEFAULT_GROUP'
pageNo: int = 1
pageSize: int = 10class Service(BaseModel):
name: str
groupName: str
clusters: str
cacheMillis: int
hosts: List[Instance]
lastRefTime: int
checksum: str
allIPs: bool
reachProtectionThreshold: bool
class ServiceList(BaseModel):
count: int
services: List[str]The V2 API includes automatic service information updates:
# Enable automatic service updates in ClientConfig
client_config = ClientConfig(
server_addresses="127.0.0.1:8848",
async_update_service=True,
update_thread_num=10 # Number of update threads
)Instances support automatic health check intervals through metadata:
# Configure health check intervals
health_metadata = {
"preserved.heart.beat.interval": "5000", # 5 seconds
"preserved.heart.beat.timeout": "15000", # 15 seconds
"preserved.ip.delete.timeout": "30000" # 30 seconds
}
register_param = RegisterInstanceParam(
service_name="health-monitored-service",
ip="192.168.1.100",
port=8080,
metadata=health_metadata
)The V2 API provides intelligent caching for improved performance:
Install with Tessl CLI
npx tessl i tessl/pypi-nacos-sdk-python