Kiln AI is a comprehensive platform for building, evaluating, and deploying AI systems with dataset management, model fine-tuning, RAG, and evaluation capabilities.
Configuration management, formatting utilities, and async lock management for the Kiln AI system. Provides centralized configuration storage, helper functions, and concurrency utilities.
Centralized configuration for Kiln AI with singleton access pattern.
from kiln_ai.utils.config import Config, ConfigProperty, MCP_SECRETS_KEY
class Config:
"""
Configuration management for Kiln AI.
Stores configuration in ~/.kiln_settings/config.yaml
Properties:
- custom_models (list[str]): Custom model identifiers in format "provider::model"
- openai_compatible_providers (list[dict]): OpenAI-compatible provider configs
"""
@classmethod
def shared(cls) -> 'Config':
"""
Get singleton configuration instance.
Returns:
Config: Shared configuration instance
"""
def save(self) -> None:
"""
Save configuration to disk.
Writes to ~/.kiln_settings/config.yaml
"""
def load(self) -> None:
"""
Load configuration from disk.
Reads from ~/.kiln_settings/config.yaml
"""
def get(self, key: str, default=None):
"""
Get configuration value.
Parameters:
- key (str): Configuration key
- default: Default value if key not found
Returns:
Any: Configuration value or default
"""
def set(self, key: str, value) -> None:
"""
Set configuration value.
Parameters:
- key (str): Configuration key
- value: Value to set
"""
class ConfigProperty:
"""
Configuration property definition.
Properties:
- key (str): Property key
- default: Default value
- description (str): Property description
"""
# MCP secrets configuration key
MCP_SECRETS_KEY = "mcp_secrets"Manage async locks for concurrent operations.
from kiln_ai.utils.lock import AsyncLockManager, shared_async_lock_manager
class AsyncLockManager:
"""
Manage async locks for concurrency control.
Methods:
- acquire(): Acquire a lock
- release(): Release a lock
- with_lock(): Context manager for lock
"""
async def acquire(self, lock_id: str) -> None:
"""
Acquire a lock.
Parameters:
- lock_id (str): Lock identifier
Blocks until lock is available.
"""
async def release(self, lock_id: str) -> None:
"""
Release a lock.
Parameters:
- lock_id (str): Lock identifier
"""
async def with_lock(self, lock_id: str):
"""
Context manager for lock acquisition.
Parameters:
- lock_id (str): Lock identifier
Usage:
async with lock_manager.with_lock("my_lock"):
# Critical section
pass
"""
# Shared lock manager singleton
shared_async_lock_manager = AsyncLockManager()String formatting and conversion utilities.
from kiln_ai.utils.formatting import snake_case
def snake_case(text: str) -> str:
"""
Convert string to snake_case.
Parameters:
- text (str): Input text (can be camelCase, PascalCase, or mixed)
Returns:
str: snake_case formatted string
Examples:
- "HelloWorld" -> "hello_world"
- "camelCase" -> "camel_case"
- "already_snake" -> "already_snake"
"""from kiln_ai.utils.config import Config
# Get shared configuration instance
config = Config.shared()
# Access configuration values
custom_models = config.custom_models or []
print(f"Custom models: {custom_models}")
providers = config.openai_compatible_providers or []
print(f"Custom providers: {len(providers)}")
# Save configuration
config.save()from kiln_ai.utils.config import Config
config = Config.shared()
# Add custom model
new_model = "openai::gpt-3.5-turbo-custom"
custom_models = config.custom_models or []
if new_model not in custom_models:
custom_models.append(new_model)
config.custom_models = custom_models
config.save()
print(f"Added custom model: {new_model}")
# List all custom models
print("\nCustom models:")
for model in config.custom_models:
print(f" - {model}")from kiln_ai.utils.config import Config
config = Config.shared()
# Add custom provider
provider_config = {
"name": "CustomOllama",
"base_url": "http://localhost:11434/v1",
"api_key": "ollama"
}
providers = config.openai_compatible_providers or []
# Check if provider already exists
existing = next((p for p in providers if p["name"] == provider_config["name"]), None)
if not existing:
providers.append(provider_config)
config.openai_compatible_providers = providers
config.save()
print(f"Added provider: {provider_config['name']}")
else:
print(f"Provider {provider_config['name']} already exists")
# List all providers
print("\nConfigured providers:")
for provider in config.openai_compatible_providers:
print(f" - {provider['name']}: {provider['base_url']}")from kiln_ai.utils.config import Config
import os
config = Config.shared()
# Get API keys from environment
openai_key = os.getenv("OPENAI_API_KEY")
anthropic_key = os.getenv("ANTHROPIC_API_KEY")
# Store in config if not already present
if openai_key and not config.get("openai_api_key"):
config.set("openai_api_key", openai_key)
if anthropic_key and not config.get("anthropic_api_key"):
config.set("anthropic_api_key", anthropic_key)
config.save()from kiln_ai.utils.config import Config, MCP_SECRETS_KEY
config = Config.shared()
# Store MCP secrets
secrets = config.get(MCP_SECRETS_KEY, {})
secrets["my_mcp_server"] = {
"api_key": "secret_key_123",
"endpoint": "https://mcp.example.com"
}
config.set(MCP_SECRETS_KEY, secrets)
config.save()
# Retrieve MCP secrets
retrieved_secrets = config.get(MCP_SECRETS_KEY, {})
server_secret = retrieved_secrets.get("my_mcp_server")
print(f"MCP Server endpoint: {server_secret['endpoint']}")from kiln_ai.utils.lock import shared_async_lock_manager
# Use shared lock manager
async def update_shared_resource(resource_id: str):
"""Update resource with lock protection."""
lock_id = f"resource_{resource_id}"
async with shared_async_lock_manager.with_lock(lock_id):
# Critical section - only one task can execute this at a time
print(f"Updating resource {resource_id}")
# Perform update...
await asyncio.sleep(1)
print(f"Completed update {resource_id}")
# Multiple concurrent calls will be serialized
await asyncio.gather(
update_shared_resource("abc"),
update_shared_resource("abc"),
update_shared_resource("abc")
)from kiln_ai.utils.lock import shared_async_lock_manager
import asyncio
async def safe_file_write(file_path: str, content: str):
"""Write to file with lock protection."""
lock_id = f"file_{file_path}"
async with shared_async_lock_manager.with_lock(lock_id):
# Only one task can write to this file at a time
with open(file_path, "w") as f:
f.write(content)
await asyncio.sleep(0.1) # Simulate processing
# Safe concurrent writes
await asyncio.gather(
safe_file_write("/tmp/data.txt", "content1"),
safe_file_write("/tmp/data.txt", "content2"),
safe_file_write("/tmp/other.txt", "content3") # Different file, can run in parallel
)from kiln_ai.utils.formatting import snake_case
# Convert various formats to snake_case
test_strings = [
"HelloWorld",
"camelCase",
"PascalCase",
"already_snake_case",
"SCREAMING_SNAKE_CASE",
"Mixed-Format_String"
]
print("String conversions:")
for s in test_strings:
converted = snake_case(s)
print(f" {s} -> {converted}")
# Use for generating IDs
class_name = "MyCustomModel"
model_id = snake_case(class_name)
print(f"\nModel ID: {model_id}") # my_custom_modelfrom kiln_ai.utils.config import Config
def validate_config():
"""Validate configuration has required values."""
config = Config.shared()
errors = []
# Check for required providers
providers = config.openai_compatible_providers or []
if not providers:
errors.append("No OpenAI compatible providers configured")
# Check custom models format
custom_models = config.custom_models or []
for model in custom_models:
if "::" not in model:
errors.append(f"Invalid model format: {model} (expected 'provider::model')")
if errors:
print("Configuration errors:")
for error in errors:
print(f" - {error}")
return False
else:
print("Configuration valid")
return True
# Validate
validate_config()from kiln_ai.utils.config import Config
import json
import shutil
from pathlib import Path
def backup_config():
"""Backup configuration to JSON."""
config = Config.shared()
backup_data = {
"custom_models": config.custom_models,
"openai_compatible_providers": config.openai_compatible_providers
}
backup_path = Path.home() / ".kiln_settings" / "config_backup.json"
with open(backup_path, "w") as f:
json.dump(backup_data, f, indent=2)
print(f"Config backed up to {backup_path}")
def restore_config():
"""Restore configuration from JSON backup."""
backup_path = Path.home() / ".kiln_settings" / "config_backup.json"
if not backup_path.exists():
print("No backup found")
return
with open(backup_path, "r") as f:
backup_data = json.load(f)
config = Config.shared()
config.custom_models = backup_data.get("custom_models", [])
config.openai_compatible_providers = backup_data.get("openai_compatible_providers", [])
config.save()
print("Config restored from backup")
# Backup before changes
backup_config()from kiln_ai.utils.config import Config
from kiln_ai.datamodel import Task
class TaskConfig:
"""Task-specific configuration wrapper."""
def __init__(self, task: Task):
self.task = task
self.global_config = Config.shared()
def get_model_config(self, model_name: str) -> dict:
"""Get configuration for specific model."""
# Could store task-specific overrides
return {
"temperature": 0.7,
"max_tokens": 1000
}
def get_custom_models(self) -> list:
"""Get custom models including global and task-specific."""
global_models = self.global_config.custom_models or []
# Could add task-specific models
return global_models
# Use with task
task = Task.load_from_file("path/to/task.kiln")
task_config = TaskConfig(task)
models = task_config.get_custom_models()from kiln_ai.utils.config import Config
class ConfigWatcher:
"""Watch for configuration changes."""
def __init__(self):
self.config = Config.shared()
self.last_models = list(self.config.custom_models or [])
def check_changes(self) -> dict:
"""Check for configuration changes."""
self.config.load() # Reload from disk
current_models = self.config.custom_models or []
changes = {}
# Check for new models
new_models = set(current_models) - set(self.last_models)
if new_models:
changes["added_models"] = list(new_models)
# Check for removed models
removed_models = set(self.last_models) - set(current_models)
if removed_models:
changes["removed_models"] = list(removed_models)
self.last_models = list(current_models)
return changes
# Use watcher
watcher = ConfigWatcher()
# Later...
changes = watcher.check_changes()
if changes:
print("Configuration changes detected:")
print(changes)from kiln_ai.utils.config import Config
from kiln_ai.utils.lock import shared_async_lock_manager
import asyncio
async def safe_config_update(key: str, value):
"""Thread-safe configuration update."""
async with shared_async_lock_manager.with_lock("config_lock"):
config = Config.shared()
config.set(key, value)
config.save()
print(f"Updated {key} = {value}")
# Safe concurrent updates
await asyncio.gather(
safe_config_update("setting1", "value1"),
safe_config_update("setting2", "value2"),
safe_config_update("setting3", "value3")
)from kiln_ai.utils.config import Config
def migrate_config_v1_to_v2():
"""Migrate configuration from v1 to v2 format."""
config = Config.shared()
# Old format: list of model strings
old_models = config.get("models", [])
# New format: list of dicts with metadata
if old_models and isinstance(old_models[0], str):
new_models = []
for model_str in old_models:
provider, model = model_str.split("::", 1)
new_models.append({
"provider": provider,
"model": model,
"enabled": True
})
config.set("models_v2", new_models)
config.save()
print("Migrated configuration to v2")
# Run migration
migrate_config_v1_to_v2()Install with Tessl CLI
npx tessl i tessl/pypi-kiln-ai