tessl install github:mckinsey/agents-at-scale-ark --skill ark-pentest-issue-resolvergithub.com/mckinsey/agents-at-scale-ark
Resolve common penetration testing issues in Ark. Use when fixing security vulnerabilities from pentest reports, security audits, or OWASP Top 10 issues.
Review Score
84%
Validation Score
13/16
Implementation Score
77%
Activation Score
90%
Provides detection patterns, mitigation strategies, and fixes for common penetration testing issues found in the Ark platform.
Use this skill when:
Note: This skill is used by the ark-security-patcher agent when no specific CVE is mentioned. It helps identify and resolve standard penetration testing findings.
Description: Attacker can inject malicious SQL queries through user input.
Detection Patterns:
# VULNERABLE: Direct string concatenation
query = f"SELECT * FROM users WHERE username = '{username}'"
# VULNERABLE: String formatting
query = "SELECT * FROM users WHERE id = %s" % user_idMitigation:
# SECURE: Use parameterized queries
cursor.execute("SELECT * FROM users WHERE username = ?", (username,))
# SECURE: Use ORM with parameter binding
User.objects.filter(username=username)
# SECURE: Use sqlalchemy with bound parameters
session.query(User).filter(User.username == username)Ark Context:
services/ark-api/, executor servicescursor.execute, db.query, SQL string concatenationDescription: Attacker can inject malicious JavaScript into web pages viewed by other users.
Types:
Detection Patterns:
// VULNERABLE: Direct innerHTML manipulation
element.innerHTML = userInput;
// VULNERABLE: Unescaped template rendering
return `<div>${userInput}</div>`;
// VULNERABLE: dangerouslySetInnerHTML in React
<div dangerouslySetInnerHTML={{__html: userInput}} />Mitigation:
// SECURE: Use textContent
element.textContent = userInput;
// SECURE: Use React/Vue built-in escaping
return <div>{userInput}</div>
// SECURE: Sanitize HTML if HTML input is required
import DOMPurify from 'dompurify';
const clean = DOMPurify.sanitize(userInput);
// SECURE: Set Content-Security-Policy header
Content-Security-Policy: default-src 'self'; script-src 'self'Ark Context:
ark-dashboard/, docs/ (Next.js applications)dangerouslySetInnerHTML, innerHTML, unescaped templatesDescription: Attacker tricks user into executing unwanted actions on authenticated application.
Detection Patterns:
// VULNERABLE: No CSRF token validation
http.HandleFunc("/api/delete", func(w http.ResponseWriter, r *http.Request) {
// Directly processes DELETE without token
deleteUser(r.FormValue("id"))
})Mitigation:
// SECURE: Use CSRF middleware
import "github.com/gorilla/csrf"
csrfMiddleware := csrf.Protect(
[]byte("32-byte-long-auth-key"),
csrf.Secure(true),
)
http.ListenAndServe(":8000", csrfMiddleware(router))
// SECURE: Verify CSRF token
token := r.Header.Get("X-CSRF-Token")
if !csrf.Validate(token, session) {
http.Error(w, "Invalid CSRF token", http.StatusForbidden)
return
}
// SECURE: Use SameSite cookie attribute
http.SetCookie(w, &http.Cookie{
Name: "session",
Value: sessionID,
SameSite: http.SameSiteStrictMode,
Secure: true,
HttpOnly: true,
})Ark Context:
SameSite=Strict or SameSite=LaxDescription: Application exposes internal implementation objects (IDs) without proper authorization checks.
Detection Patterns:
# VULNERABLE: No authorization check
@app.route('/api/user/<user_id>')
def get_user(user_id):
return User.query.get(user_id) # Any authenticated user can access any user
# VULNERABLE: Predictable IDs
@app.route('/api/document/<int:doc_id>')
def get_document(doc_id):
return Document.query.get(doc_id) # Sequential IDs are guessableMitigation:
# SECURE: Check authorization
@app.route('/api/user/<user_id>')
def get_user(user_id):
user = User.query.get(user_id)
if user.id != current_user.id and not current_user.is_admin:
abort(403, "Unauthorized")
return user
# SECURE: Use UUIDs instead of sequential IDs
import uuid
user_id = uuid.uuid4()
# SECURE: Implement resource ownership check
def check_ownership(resource_id, user_id):
resource = Resource.query.get(resource_id)
if resource.owner_id != user_id:
raise PermissionDenied()Ark Context:
Description: Insecure default configurations, incomplete setups, open cloud storage, misconfigured HTTP headers.
Common Issues:
Detection:
curl -I https://ark-dashboard.example.com | grep -E "X-Frame-Options|X-Content-Type-Options|Strict-Transport-Security"Mitigation:
// Add security headers middleware
func securityHeaders(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("X-Frame-Options", "DENY")
w.Header().Set("X-Content-Type-Options", "nosniff")
w.Header().Set("X-XSS-Protection", "1; mode=block")
w.Header().Set("Strict-Transport-Security", "max-age=31536000; includeSubDomains")
w.Header().Set("Content-Security-Policy", "default-src 'self'")
w.Header().Set("Referrer-Policy", "strict-origin-when-cross-origin")
next.ServeHTTP(w, r)
})
}// Next.js configuration (next.config.js)
module.exports = {
async headers() {
return [
{
source: '/:path*',
headers: [
{ key: 'X-Frame-Options', value: 'DENY' },
{ key: 'X-Content-Type-Options', value: 'nosniff' },
{ key: 'X-XSS-Protection', value: '1; mode=block' },
{ key: 'Strict-Transport-Security', value: 'max-age=31536000; includeSubDomains' },
{ key: 'Referrer-Policy', value: 'strict-origin-when-cross-origin' },
],
},
]
},
}Detection:
grep -r "DEBUG.*=.*true" .
grep -r "NODE_ENV.*development" .Mitigation:
DEBUG=false in productionNODE_ENV=productionDescription: Application exposes sensitive data through logs, error messages, or insecure transmission.
Detection Patterns:
# VULNERABLE: Logging sensitive data
logger.info(f"User {username} logged in with password {password}")
# VULNERABLE: Exposing secrets in error messages
raise Exception(f"Database connection failed: {db_password}")
# VULNERABLE: Sensitive data in URLs
redirect(f"/reset-password?token={reset_token}")Mitigation:
# SECURE: Never log sensitive data
logger.info(f"User {username} logged in successfully")
# SECURE: Generic error messages
raise Exception("Database connection failed")
logger.error("DB error", exc_info=True) # Full details only in logs
# SECURE: Use POST for sensitive data
# POST /reset-password with token in body
# SECURE: Mask sensitive data
def mask_credit_card(card_number):
return f"****-****-****-{card_number[-4:]}"Environment Variables:
# VULNERABLE: Hardcoded secrets
API_KEY = "sk_live_abc123xyz"
# SECURE: Use environment variables
API_KEY = os.getenv("API_KEY")
# SECURE: Use Kubernetes secrets
apiVersion: v1
kind: Secret
metadata:
name: ark-secrets
type: Opaque
data:
api-key: <base64-encoded-value>Ark Context:
Description: Weak authentication mechanisms allowing credential stuffing, brute force, or session hijacking.
Common Issues:
Mitigation:
import re
def validate_password(password):
"""Enforce strong password policy"""
if len(password) < 12:
return False, "Password must be at least 12 characters"
if not re.search(r"[A-Z]", password):
return False, "Password must contain uppercase letter"
if not re.search(r"[a-z]", password):
return False, "Password must contain lowercase letter"
if not re.search(r"[0-9]", password):
return False, "Password must contain number"
if not re.search(r"[!@#$%^&*]", password):
return False, "Password must contain special character"
return True, "Password is valid"Mitigation:
import "golang.org/x/time/rate"
// Rate limiter: 5 requests per minute per IP
var limiter = rate.NewLimiter(rate.Every(time.Minute/5), 5)
func loginHandler(w http.ResponseWriter, r *http.Request) {
if !limiter.Allow() {
http.Error(w, "Too many requests", http.StatusTooManyRequests)
return
}
// Process login
}Mitigation:
// SECURE: Generate cryptographically secure session IDs
import "crypto/rand"
func generateSessionID() (string, error) {
b := make([]byte, 32)
if _, err := rand.Read(b); err != nil {
return "", err
}
return base64.URLEncoding.EncodeToString(b), nil
}
// SECURE: Set secure cookie attributes
http.SetCookie(w, &http.Cookie{
Name: "session_id",
Value: sessionID,
Path: "/",
MaxAge: 3600,
HttpOnly: true, // Prevent JavaScript access
Secure: true, // Only send over HTTPS
SameSite: http.SameSiteStrictMode,
})Description: Users can act outside their intended permissions.
Detection Patterns:
# VULNERABLE: No permission check
@app.route('/api/admin/users', methods=['DELETE'])
def delete_user():
user_id = request.json['user_id']
User.query.filter_by(id=user_id).delete()Mitigation:
# SECURE: Role-based access control (RBAC)
from functools import wraps
def require_role(*roles):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated:
abort(401)
if current_user.role not in roles:
abort(403, "Insufficient permissions")
return f(*args, **kwargs)
return decorated_function
return decorator
@app.route('/api/admin/users', methods=['DELETE'])
@require_role('admin', 'superuser')
def delete_user():
user_id = request.json['user_id']
User.query.filter_by(id=user_id).delete()Kubernetes RBAC (for Ark operator):
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: ark-operator
rules:
- apiGroups: ["ark.example.com"]
resources: ["models", "prompts"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]Ark Context:
Description: Lack of logging and monitoring allows attacks to go undetected.
Mitigation:
import logging
import structlog
# Configure structured logging
logger = structlog.get_logger()
# Log security events
def log_security_event(event_type, user_id, details):
logger.info(
"security_event",
event_type=event_type,
user_id=user_id,
details=details,
timestamp=datetime.utcnow().isoformat()
)
# Examples of what to log
log_security_event("login_success", user.id, {"ip": request.remote_addr})
log_security_event("login_failure", username, {"ip": request.remote_addr})
log_security_event("password_change", user.id, {})
log_security_event("permission_denied", user.id, {"resource": resource_id})
log_security_event("admin_action", user.id, {"action": "delete_user", "target": target_id})Events to Log:
Description: Attacker tricks server into making requests to internal resources.
Detection Patterns:
# VULNERABLE: Direct use of user-provided URL
import requests
url = request.args.get('url')
response = requests.get(url) # Can access internal resources!Mitigation:
# SECURE: Validate and whitelist URLs
import ipaddress
from urllib.parse import urlparse
ALLOWED_DOMAINS = ['api.example.com', 'cdn.example.com']
def is_safe_url(url):
try:
parsed = urlparse(url)
# Only allow HTTP/HTTPS
if parsed.scheme not in ['http', 'https']:
return False
# Check against whitelist
if parsed.hostname not in ALLOWED_DOMAINS:
return False
# Resolve hostname and check it's not internal
ip = socket.gethostbyname(parsed.hostname)
if ipaddress.ip_address(ip).is_private:
return False
return True
except Exception:
return False
# Use the validator
url = request.args.get('url')
if not is_safe_url(url):
abort(400, "Invalid URL")
response = requests.get(url)Ark Context:
requests.get, http.Get, fetch with user-provided URLsDescription: Attacker can include external entities in XML to read files or cause DoS.
Detection Patterns:
# VULNERABLE: Unsafe XML parsing
import xml.etree.ElementTree as ET
tree = ET.parse(xml_file) # Vulnerable to XXEMitigation:
# SECURE: Disable external entity processing
import defusedxml.ElementTree as ET
tree = ET.parse(xml_file) # Safe from XXE
# SECURE: Configure parser to disable DTDs
from lxml import etree
parser = etree.XMLParser(resolve_entities=False, no_network=True, dtd_validation=False)
tree = etree.parse(xml_file, parser)Description: Untrusted data is deserialized, potentially allowing remote code execution.
Detection Patterns:
# VULNERABLE: Pickle deserialization of untrusted data
import pickle
data = pickle.loads(untrusted_data) # Can execute arbitrary code!
# VULNERABLE: YAML unsafe loading
import yaml
data = yaml.load(untrusted_yaml) # Can execute Python codeMitigation:
# SECURE: Use JSON instead of pickle
import json
data = json.loads(untrusted_data)
# SECURE: Use safe YAML loading
import yaml
data = yaml.safe_load(untrusted_yaml)
# SECURE: If pickle is necessary, use HMAC verification
import hmac
import hashlib
def serialize_with_hmac(obj, secret_key):
serialized = pickle.dumps(obj)
signature = hmac.new(secret_key, serialized, hashlib.sha256).digest()
return signature + serialized
def deserialize_with_hmac(data, secret_key):
signature = data[:32]
serialized = data[32:]
expected_signature = hmac.new(secret_key, serialized, hashlib.sha256).digest()
if not hmac.compare_digest(signature, expected_signature):
raise ValueError("Invalid signature")
return pickle.loads(serialized)Description: Attacker accesses files outside intended directory using ../ sequences.
Detection Patterns:
# VULNERABLE: Direct file path from user input
filename = request.args.get('file')
with open(f'/var/www/uploads/{filename}', 'r') as f: # Can access ../../../../etc/passwd
content = f.read()Mitigation:
# SECURE: Validate and sanitize file paths
import os
from pathlib import Path
UPLOAD_DIR = '/var/www/uploads'
def safe_file_path(filename):
# Remove any directory components
filename = os.path.basename(filename)
# Build full path
full_path = os.path.join(UPLOAD_DIR, filename)
# Resolve to absolute path and verify it's within UPLOAD_DIR
full_path = os.path.abspath(full_path)
if not full_path.startswith(os.path.abspath(UPLOAD_DIR)):
raise ValueError("Invalid file path")
return full_path
# Use the validator
filename = request.args.get('file')
safe_path = safe_file_path(filename)
with open(safe_path, 'r') as f:
content = f.read()Description: Attacker can execute arbitrary system commands.
Detection Patterns:
# VULNERABLE: Direct command execution with user input
import os
filename = request.args.get('file')
os.system(f'ls -l {filename}') # Can inject: file.txt; rm -rf /
# VULNERABLE: Shell=True with user input
import subprocess
subprocess.call(f'ping {host}', shell=True)Mitigation:
# SECURE: Use subprocess with list of arguments (no shell)
import subprocess
filename = request.args.get('file')
subprocess.run(['ls', '-l', filename], check=True, capture_output=True)
# SECURE: Validate and sanitize input
import shlex
def safe_command(user_input):
# Whitelist allowed characters
if not re.match(r'^[a-zA-Z0-9._-]+$', user_input):
raise ValueError("Invalid input")
return user_input
# SECURE: Use libraries instead of shell commands
# Instead of: os.system('tar -czf archive.tar.gz files/')
import tarfile
with tarfile.open('archive.tar.gz', 'w:gz') as tar:
tar.add('files/')Ark Context:
os.system, subprocess.call, shell=TrueDescription: No protection against brute force, DoS, or resource exhaustion attacks.
Mitigation:
# Python with Flask-Limiter
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
limiter = Limiter(
app,
key_func=get_remote_address,
default_limits=["200 per day", "50 per hour"]
)
@app.route("/api/login", methods=["POST"])
@limiter.limit("5 per minute")
def login():
# Login logic
pass// Go with rate limiting middleware
import "golang.org/x/time/rate"
func rateLimitMiddleware(next http.Handler) http.Handler {
limiter := rate.NewLimiter(rate.Every(time.Second), 10) // 10 req/sec
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if !limiter.Allow() {
http.Error(w, "Rate limit exceeded", http.StatusTooManyRequests)
return
}
next.ServeHTTP(w, r)
})
}Kubernetes Network Policies:
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: rate-limit-policy
spec:
podSelector:
matchLabels:
app: ark-api
policyTypes:
- Ingress
ingress:
- from:
- podSelector:
matchLabels:
app: ingress-controllerMap the reported issue to one of the categories above:
Use grep/ripgrep to find vulnerable code patterns:
# SQL Injection
grep -r "cursor.execute.*%" .
grep -r "query.*format" .
# XSS
grep -r "innerHTML" .
grep -r "dangerouslySetInnerHTML" .
# Command Injection
grep -r "os.system" .
grep -r "shell=True" .
# Path Traversal
grep -r "open.*request" .
# Insecure Deserialization
grep -r "pickle.loads" .
grep -r "yaml.load" .Consider which Ark components are affected:
ark/ - Controllers, webhooks, API serverservices/ - Executor services, API endpointsark-dashboard/, docs/, ark-cli/Present the user with:
Wait for user approval before implementing.
Apply the appropriate mitigation from the categories above:
# Run existing tests
make test
# Security-specific tests
# - Test with malicious input
# - Verify security headers
# - Check authorization enforcement
# - Validate input sanitizationDocument:
CRD Validation:
// Add validation to CRD specs
// +kubebuilder:validation:Pattern=^[a-zA-Z0-9-]+$
// +kubebuilder:validation:MinLength=1
// +kubebuilder:validation:MaxLength=253
Name string `json:"name"`RBAC Principle of Least Privilege:
# Only grant necessary permissions
rules:
- apiGroups: ["ark.example.com"]
resources: ["models"]
verbs: ["get", "list"] # Not "delete" or "*"Input Validation:
from pydantic import BaseModel, validator
class PromptRequest(BaseModel):
prompt: str
max_tokens: int
@validator('prompt')
def validate_prompt(cls, v):
if len(v) > 10000:
raise ValueError('Prompt too long')
return v
@validator('max_tokens')
def validate_max_tokens(cls, v):
if v < 1 or v > 4096:
raise ValueError('Invalid max_tokens')
return vEnvironment Variables:
// Only expose NEXT_PUBLIC_ vars to client
// next.config.js
module.exports = {
env: {
// Server-side only
DATABASE_URL: process.env.DATABASE_URL,
},
publicRuntimeConfig: {
// Client-side accessible
API_URL: process.env.NEXT_PUBLIC_API_URL,
},
}Non-root User:
# Run as non-root user
RUN addgroup -g 1001 -S appuser && \
adduser -u 1001 -S appuser -G appuser
USER appuserMinimal Base Images:
# Use distroless or alpine
FROM gcr.io/distroless/python3
# OR
FROM python:3.11-alpine<script>alert('XSS')</script>)../../../../etc/passwd)# Static analysis
bandit -r services/ # Python
gosec ./... # Go
npm audit # Node.js
# Dependency scanning
safety check # Python
go list -json -m all | nancy sleuth # Go
npm audit fix # Node.js
# Container scanning
trivy image ark-operator:latestWhen you see these terms in a pentest report, map them to the appropriate category: