A Python implementation of Nacos OpenAPI for service discovery, configuration management, and service management
—
Modern asynchronous API for Nacos configuration operations with advanced features including GRPC support, encryption, filtering, caching, and comprehensive error handling. This API provides type-safe operations using Pydantic models.
Create and initialize the asynchronous configuration service.
class NacosConfigService:
@staticmethod
async def create_config_service(client_config: ClientConfig) -> 'NacosConfigService':
"""
Create and initialize a Nacos configuration service.
Args:
client_config (ClientConfig): Client configuration containing server details,
authentication, and operational parameters
Returns:
NacosConfigService: Initialized configuration service instance
Raises:
NacosException: If client_config is invalid or initialization fails
"""Usage example:
import asyncio
from v2.nacos import ClientConfig, NacosConfigService
async def main():
# Create client configuration
client_config = ClientConfig(
server_addresses="127.0.0.1:8848",
namespace_id="production",
username="nacos",
password="nacos"
)
# Create configuration service
config_service = await NacosConfigService.create_config_service(client_config)
# Use the service...
# Always shutdown when done
await config_service.shutdown()
asyncio.run(main())Retrieve configuration values with automatic caching and fallback support.
async def get_config(self, param: ConfigParam) -> str:
"""
Get configuration from Nacos server with caching and failover support.
Args:
param (ConfigParam): Configuration parameters including data_id and group
Returns:
str: Configuration content
Raises:
NacosException: If data_id is empty or invalid
"""Usage example:
from v2.nacos import ConfigParam
# Basic configuration retrieval
param = ConfigParam(dataId="database.config", group="DEFAULT_GROUP")
config = await config_service.get_config(param)
print(f"Database config: {config}")
# Get configuration with custom group
param = ConfigParam(dataId="redis.config", group="CACHE_GROUP")
config = await config_service.get_config(param)
# Get configuration from specific namespace (set in ClientConfig)
param = ConfigParam(dataId="app.properties") # group defaults to DEFAULT_GROUP
config = await config_service.get_config(param)Publish or update configuration values with optional encryption and filtering.
async def publish_config(self, param: ConfigParam) -> bool:
"""
Publish configuration to Nacos server with encryption and filtering support.
Args:
param (ConfigParam): Configuration parameters including data_id, group, and content
Returns:
bool: True if publication successful
Raises:
NacosException: If data_id is empty or invalid
"""Usage example:
from v2.nacos import ConfigParam
# Basic configuration publishing
param = ConfigParam(
data_id="database.config",
group="DEFAULT_GROUP",
content="host=localhost\nport=5432\ndb=myapp"
)
success = await config_service.publish_config(param)
print(f"Publish successful: {success}")
# Publish JSON configuration with type hint
import json
config_data = {
"database": {
"host": "localhost",
"port": 5432,
"name": "myapp"
}
}
param = ConfigParam(
data_id="database.json",
group="DEFAULT_GROUP",
content=json.dumps(config_data),
config_type="json"
)
await config_service.publish_config(param)
# Publish with application context
param = ConfigParam(
data_id="app.properties",
group="DEFAULT_GROUP",
content="app.name=MyApplication\napp.version=1.0.0",
app_name="MyApplication",
config_type="properties"
)
await config_service.publish_config(param)Remove configuration from Nacos server.
async def remove_config(self, param: ConfigParam):
"""
Remove configuration from Nacos server.
Args:
param (ConfigParam): Configuration parameters with data_id and group
Raises:
NacosException: If data_id is empty or invalid
"""Usage example:
# Remove configuration
param = ConfigParam(dataId="old.config", group="DEFAULT_GROUP")
await config_service.remove_config(param)
print("Configuration removed")
# Remove from custom group
param = ConfigParam(dataId="temp.config", group="TEMP_GROUP")
await config_service.remove_config(param)Add and manage asynchronous configuration change listeners.
async def add_listener(self, data_id: str, group: str, listener: Callable) -> None:
"""
Add configuration change listener.
Args:
data_id (str): Configuration data ID
group (str): Configuration group
listener (Callable): Async callback function for configuration changes
Raises:
NacosException: If data_id is empty or invalid
"""
async def remove_listener(self, data_id: str, group: str, listener: Callable):
"""
Remove configuration change listener.
Args:
data_id (str): Configuration data ID
group (str): Configuration group
listener (Callable): Callback function to remove
"""Usage example:
# Define async listener
async def config_change_listener(config_content):
print(f"Configuration changed: {config_content}")
# Process configuration change asynchronously
await process_config_change(config_content)
async def process_config_change(content):
# Your async processing logic here
print(f"Processing: {content}")
# Add listener
await config_service.add_listener(
data_id="database.config",
group="DEFAULT_GROUP",
listener=config_change_listener
)
# Add multiple listeners for the same config
async def backup_listener(config_content):
print(f"Backup handler: {config_content}")
await backup_config(config_content)
await config_service.add_listener(
data_id="database.config",
group="DEFAULT_GROUP",
listener=backup_listener
)
# Remove specific listener
await config_service.remove_listener(
data_id="database.config",
group="DEFAULT_GROUP",
listener=config_change_listener
)Check the health status of Nacos server.
async def server_health(self) -> bool:
"""
Check if Nacos server is healthy.
Returns:
bool: True if server is healthy
"""Usage example:
# Check server health
is_healthy = await config_service.server_health()
if is_healthy:
print("Nacos server is healthy")
else:
print("Nacos server is unhealthy")
# Handle unhealthy server scenario
# Health check in a monitoring loop
import asyncio
async def health_monitor():
while True:
try:
healthy = await config_service.server_health()
print(f"Server health: {'OK' if healthy else 'FAIL'}")
await asyncio.sleep(30) # Check every 30 seconds
except Exception as e:
print(f"Health check failed: {e}")
await asyncio.sleep(10) # Retry after 10 seconds
# Start health monitoring
asyncio.create_task(health_monitor())Properly shutdown the configuration service and cleanup resources.
async def shutdown(self):
"""
Shutdown the configuration service and cleanup resources.
This should always be called when the service is no longer needed.
"""Usage example:
async def main():
config_service = None
try:
# Create service
client_config = ClientConfig(server_addresses="127.0.0.1:8848")
config_service = await NacosConfigService.create_config_service(client_config)
# Use service
param = ConfigParam(dataId="test.config", group="DEFAULT_GROUP")
config = await config_service.get_config(param)
except Exception as e:
print(f"Error: {e}")
finally:
# Always shutdown
if config_service:
await config_service.shutdown()
# Using context manager pattern
class ConfigServiceManager:
def __init__(self, client_config):
self.client_config = client_config
self.service = None
async def __aenter__(self):
self.service = await NacosConfigService.create_config_service(self.client_config)
return self.service
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.service:
await self.service.shutdown()
# Usage with context manager
async def main():
client_config = ClientConfig(server_addresses="127.0.0.1:8848")
async with ConfigServiceManager(client_config) as config_service:
param = ConfigParam(dataId="test.config", group="DEFAULT_GROUP")
config = await config_service.get_config(param)
print(config)class ConfigParam(BaseModel):
dataId: str
group: str = 'DEFAULT_GROUP'
content: str = ''
type: str = ''
appName: str = ''
tag: str = ''
md5: str = ''The ConfigParam model supports:
Usage examples:
# Minimal configuration
param = ConfigParam(dataId="app.config")
# Full configuration for publishing
param = ConfigParam(
dataId="database.json",
group="DATABASE_GROUP",
content='{"host": "localhost", "port": 5432}',
type="json",
appName="MyApp",
tag="database,production"
)
# Configuration with CAS (Compare-And-Swap)
current_config = await config_service.get_config(
ConfigParam(dataId="critical.config", group="DEFAULT_GROUP")
)
import hashlib
current_md5 = hashlib.md5(current_config.encode()).hexdigest()
param = ConfigParam(
dataId="critical.config",
group="DEFAULT_GROUP",
content="updated content",
md5=current_md5 # Only update if current content matches this MD5
)
success = await config_service.publish_config(param)When KMSConfig is enabled in ClientConfig, configurations are automatically encrypted/decrypted:
from v2.nacos import ClientConfig, KMSConfig
# Configure KMS encryption
kms_config = KMSConfig(
enabled=True,
endpoint="https://kms.example.com",
access_key="your-access-key",
secret_key="your-secret-key"
)
client_config = ClientConfig(
server_addresses="127.0.0.1:8848",
kms_config=kms_config
)
config_service = await NacosConfigService.create_config_service(client_config)
# Configurations are automatically encrypted when published
# and decrypted when retrieved
param = ConfigParam(
data_id="secret.config",
content="sensitive data here"
)
await config_service.publish_config(param) # Automatically encrypted
decrypted_config = await config_service.get_config(param) # Automatically decryptedCustom filters can be applied to configuration content:
# Configuration filters are automatically applied based on ClientConfig
# They handle encryption, validation, and content transformation
# This is managed internally by the ConfigFilterChainManagerThe V2 API provides automatic caching with failover capabilities:
These features are enabled by default and managed automatically by the service.
Install with Tessl CLI
npx tessl i tessl/pypi-nacos-sdk-python