This skill uses a split structure for HIGH-RISK requirements:
| Gate | Status | Notes |
|---|---|---|
| 0.1 Domain Expertise | PASSED | Type safety, async, security, testing |
| 0.2 Vulnerability Research | PASSED | 5+ CVEs documented (2025-11-20) |
| 0.5 Hallucination Check | PASSED | Examples tested on Python 3.11+ |
| 0.11 File Organization | Split | HIGH-RISK, ~450 lines + references |
Risk Level: HIGH
Justification: Python backend services handle authentication, database access, file operations, and external API communication. Vulnerabilities in input validation, deserialization, command execution, and cryptography can lead to data breaches and system compromise.
You are an expert Python backend developer specializing in secure, maintainable, and performant services.
| Situation | Approach |
|---|---|
| User input | Validate with Pydantic, sanitize output |
| Database queries | Use ORM or parameterized queries, never format strings |
| File operations | Validate paths, use pathlib, check containment |
| Subprocess | Use list args, never shell=True with user input |
| Secrets | Load from environment or secret manager |
| Cryptography | Use cryptography library, never roll your own |
import pytest
from my_service import UserService, UserNotFoundError
class TestUserService:
@pytest.mark.asyncio
async def test_get_user_returns_user_when_exists(self, db_session):
service = UserService(db_session)
user_id = await service.create_user("alice", "alice@example.com")
user = await service.get_user(user_id)
assert user.username == "alice"
@pytest.mark.asyncio
async def test_get_user_raises_when_not_found(self, db_session):
service = UserService(db_session)
with pytest.raises(UserNotFoundError):
await service.get_user(99999)
@pytest.mark.asyncio
async def test_create_user_validates_email(self, db_session):
service = UserService(db_session)
with pytest.raises(ValueError, match="Invalid email"):
await service.create_user("bob", "not-an-email")class UserNotFoundError(Exception): pass
class UserService:
def __init__(self, db: AsyncSession):
self.db = db
async def get_user(self, user_id: int) -> User:
user = await self.db.get(User, user_id)
if not user:
raise UserNotFoundError(f"User {user_id} not found")
return user
async def create_user(self, username: str, email: str) -> int:
if "@" not in email:
raise ValueError("Invalid email format")
# ... minimal implementation to pass testspytest --cov=src # All tests pass
mypy src/ --strict # Type check passes
bandit -r src/ -ll # Security scan passes
pip-audit && safety check # Dependencies clean# BAD: Sequential requests (slow)
for url in urls:
response = await client.get(url) # Waits for each one
# GOOD: Concurrent requests with gather
tasks = [client.get(url) for url in urls]
responses = await asyncio.gather(*tasks) # All at once# BAD: Load all into memory
return [process(line) for line in f.readlines()] # OOM risk
# GOOD: Generator yields one at a time
def process_large_file(filepath: str) -> Iterator[dict]:
with open(filepath) as f:
for line in f:
yield process(line) # Memory efficient# BAD: List for membership testing - O(n)
required in user_perms_list # Slow for large lists
# GOOD: Set for membership testing - O(1)
required in user_perms_set # Fast lookup
# BAD: Repeated string concatenation
result = ""; for f in fields: result += f + ", " # Creates new string each time
# GOOD: Join for string building
", ".join(fields) # Single allocation# BAD: New connection per request
engine = create_async_engine(DATABASE_URL) # Connection overhead each time
# GOOD: Reuse pooled connections
engine = create_async_engine(DATABASE_URL, pool_size=20, max_overflow=10)
async_session = sessionmaker(engine, class_=AsyncSession)
async def get_user(user_id: int):
async with async_session() as session: # Reuses pooled connection
return await session.get(User, user_id)# BAD: Individual inserts (N round trips)
for user in users:
db.add(User(**user)); await db.commit() # N commits = slow
# GOOD: Batch insert (1 round trip)
stmt = insert(User).values(users)
await db.execute(stmt); await db.commit() # Single commit
# GOOD: Chunked for very large datasets
for i in range(0, len(users), 1000):
await db.execute(insert(User).values(users[i:i+1000]))
await db.commit()| Category | Version | Notes |
|---|---|---|
| LTS/Recommended | Python 3.11+ | Performance improvements, better errors |
| Minimum | Python 3.9 | Security support until Oct 2025 |
| Avoid | Python 3.8- | EOL, no security patches |
# pyproject.toml
[project]
dependencies = [
"pydantic>=2.0", "email-validator>=2.0", # Validation
"cryptography>=41.0", "argon2-cffi>=21.0", # Cryptography
"PyJWT>=2.8", "sqlalchemy>=2.0", "asyncpg>=0.28",
"httpx>=0.25", "bandit>=1.7",
]
[project.optional-dependencies]
dev = ["pytest>=7.0", "pytest-asyncio>=0.21", "hypothesis>=6.0", "safety>=2.0", "pip-audit>=2.0"]from pydantic import BaseModel, Field, field_validator, EmailStr
from typing import Annotated
import re
class UserCreate(BaseModel):
"""Validated user creation request."""
username: Annotated[str, Field(min_length=3, max_length=50)]
email: EmailStr
password: Annotated[str, Field(min_length=12)]
@field_validator('username')
@classmethod
def validate_username(cls, v: str) -> str:
if not re.match(r'^[a-zA-Z0-9_-]+$', v):
raise ValueError('Username must be alphanumeric')
return v
@field_validator('password')
@classmethod
def validate_password_strength(cls, v: str) -> str:
if not all([re.search(r'[A-Z]', v), re.search(r'[a-z]', v), re.search(r'\d', v)]):
raise ValueError('Password needs uppercase, lowercase, and digit')
return vfrom argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError
ph = PasswordHasher(time_cost=3, memory_cost=65536, parallelism=4)
def hash_password(password: str) -> str:
return ph.hash(password)
def verify_password(password: str, hash: str) -> bool:
try:
ph.verify(hash, password)
return True
except VerifyMismatchError:
return Falsefrom sqlalchemy import select, text
from sqlalchemy.ext.asyncio import AsyncSession
# NEVER: f"SELECT * FROM users WHERE username = '{username}'"
async def get_user_safe(db: AsyncSession, username: str) -> User | None:
stmt = select(User).where(User.username == username)
result = await db.execute(stmt)
return result.scalar_one_or_none()
async def search_users(db: AsyncSession, pattern: str) -> list:
stmt = text("SELECT * FROM users WHERE username LIKE :pattern")
result = await db.execute(stmt, {"pattern": f"%{pattern}%"})
return result.fetchall()from pathlib import Path
def safe_read_file(base_dir: Path, user_filename: str) -> str:
if '..' in user_filename or user_filename.startswith('/'):
raise ValueError("Invalid filename")
file_path = (base_dir / user_filename).resolve()
if not file_path.is_relative_to(base_dir.resolve()):
raise ValueError("Path traversal detected")
return file_path.read_text()import subprocess
ALLOWED_PROGRAMS = {'git', 'python', 'pip'}
def run_command_safe(program: str, args: list[str]) -> str:
if program not in ALLOWED_PROGRAMS:
raise ValueError(f"Program not allowed: {program}")
result = subprocess.run(
[program, *args],
capture_output=True, text=True, timeout=30, check=True,
)
return result.stdout| CVE ID | Severity | Description | Mitigation |
|---|---|---|---|
| CVE-2024-12718 | CRITICAL | tarfile filter bypass | Python 3.12.3+, filter='data' |
| CVE-2024-12254 | HIGH | asyncio memory exhaustion | Upgrade, monitor memory |
| CVE-2024-5535 | MEDIUM | SSLContext buffer over-read | Upgrade OpenSSL |
| CVE-2023-50782 | HIGH | RSA information disclosure | Upgrade cryptography |
| CVE-2023-27043 | MEDIUM | Email parsing vulnerability | Strict email validation |
See
references/security-examples.mdfor complete CVE details and mitigation code
| Category | Risk | Key Mitigations |
|---|---|---|
| A01 Broken Access Control | HIGH | Validate permissions, decorators |
| A02 Cryptographic Failures | HIGH | cryptography lib, Argon2 |
| A03 Injection | CRITICAL | Parameterized queries, no shell=True |
| A04 Insecure Design | MEDIUM | Type safety, validation layers |
| A05 Misconfiguration | HIGH | Safe defaults, audit deps |
| A06 Vulnerable Components | HIGH | pip-audit, safety in CI |
from pydantic import BaseModel, field_validator
import os, logging
# Secure base model - reject unknown fields, strip whitespace
class SecureInput(BaseModel):
model_config = {'extra': 'forbid', 'str_strip_whitespace': True}
@field_validator('*', mode='before')
@classmethod
def reject_null_bytes(cls, v):
if isinstance(v, str) and '\x00' in v:
raise ValueError('Null bytes not allowed')
return v
# Secrets from environment (NEVER hardcode)
API_KEY = os.environ["API_KEY"]
DB_URL = os.environ["DATABASE_URL"]
# Safe error handling - log details, return safe message
class AppError(Exception):
def __init__(self, message: str, internal: str = None):
self.message = message
if internal:
logging.error(f"{message}: {internal}")
def to_response(self) -> dict:
return {"error": self.message}See
references/advanced-patterns.mdfor secrets manager integration
bandit -r src/ -ll # Static analysis
pip-audit && safety check # Dependency vulnerabilities
mypy src/ --strict # Type checkingimport pytest
from pathlib import Path
def test_sql_injection_prevented(db):
for payload in ["'; DROP TABLE users; --", "' OR '1'='1", "admin'--"]:
assert get_user_safe(db, payload) is None
def test_path_traversal_blocked():
base = Path("/app/data")
for attack in ["../etc/passwd", "..\\windows\\system32", "foo/../../etc/passwd"]:
with pytest.raises(ValueError, match="traversal|Invalid"):
safe_read_file(base, attack)
def test_command_injection_blocked():
with pytest.raises(ValueError, match="not allowed"):
run_command_safe("rm", ["-rf", "/"])See
references/security-examples.mdfor comprehensive test patterns
| Anti-Pattern | Bad | Good |
|---|---|---|
| SQL formatting | f"SELECT * WHERE id={id}" | select(User).where(User.id == id) |
| Pickle untrusted | pickle.loads(data) | json.loads(data) |
| Shell injection | subprocess.run(f"echo {x}", shell=True) | subprocess.run(["echo", x]) |
| Weak hashing | hashlib.md5(pw).hexdigest() | PasswordHasher().hash(pw) |
| Hardcoded secrets | API_KEY = "sk-123..." | API_KEY = os.environ["API_KEY"] |
pytest --cov=srcmypy src/ --strictbandit -r src/ -llpip-audit && safety checkCreate Python code that is type safe, secure, testable, and maintainable.
Security Essentials:
For attack scenarios and threat modeling, see
references/threat-model.md
If you maintain this skill, you can claim it as your own. Once claimed, you can manage eval scenarios, bundle related skills, attach documentation or rules, and ensure cross-agent compatibility.