Rego is the declarative policy language used by Open Policy Agent (OPA) for writing and enforcing policies across cloud-native stacks, featuring data-driven rules, comprehensions, and 200+ built-in functions for infrastructure, security, and compliance automation.
Overall
score
97%
HTTP API authorization is a critical component of modern application security. OPA and Rego provide powerful, context-aware policy enforcement for controlling access to REST APIs, microservices, and web applications. This document covers common patterns for implementing fine-grained authorization policies that control who can access which API endpoints and under what conditions.
HTTP API authorization policies typically evaluate requests based on:
Description: Implements context-aware authorization based on organizational hierarchy, allowing users to access their own resources and managers to access their subordinates' resources.
# METADATA
# title: Hierarchical Authorization
# description: Context-aware authorization based on organizational hierarchy
# authors:
# - API Security Team <api-security@example.com>
# custom:
# category: http-authorization
package httpapi.authz
import rego.v1
# Define organizational hierarchy
subordinates := {
"alice": [],
"charlie": [],
"bob": ["alice"],
"betty": ["charlie"]
}
default allow := false
# METADATA
# title: Allow authorized requests
# description: Permits requests based on hierarchical manager-subordinate relationships
# entrypoint: true
# custom:
# severity: HIGH
# Allow users to get their own salaries
allow if {
input.method == "GET"
input.path == ["finance", "salary", input.user]
}
# Allow managers to get their subordinates' salaries
allow if {
some username
input.method == "GET"
input.path = ["finance", "salary", username]
username in subordinates[input.user]
}Example Input:
{
"method": "GET",
"path": ["finance", "salary", "alice"],
"user": "bob"
}Result: allow == true (bob is alice's manager)
Description: Validates JWT tokens and extracts claims to make authorization decisions, ensuring tokens are issued to the correct user.
# METADATA
# title: JWT-Based Access Control
# description: Validates JWT tokens and extracts claims for authorization decisions
# authors:
# - API Security Team <api-security@example.com>
# custom:
# category: http-authorization
package httpapi.authz
import rego.v1
default allow := false
# METADATA
# title: Allow JWT-authenticated requests
# description: Permits requests when JWT token claims match authorization requirements
# entrypoint: true
# custom:
# severity: HIGH
# Allow users to get their own salaries
allow if {
some username
input.method == "GET"
input.path = ["finance", "salary", username]
token.payload.user == username
user_owns_token
}
# Allow managers to get their subordinates' salaries
allow if {
some username
input.method == "GET"
input.path = ["finance", "salary", username]
username in token.payload.subordinates
user_owns_token
}
# Allow HR members to get anyone's salary
allow if {
input.method == "GET"
input.path = ["finance", "salary", _]
token.payload.hr == true
user_owns_token
}
# Ensure that the token was issued to the user supplying it
user_owns_token if {
input.user == token.payload.azp
}
# Helper to get the token payload
token := {"payload": payload} if {
[header, payload, signature] := io.jwt.decode(input.token)
}Example Input:
{
"token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"method": "GET",
"path": ["finance", "salary", "alice"],
"user": "alice"
}Result: allow == true (token validates and user matches)
Description: Restricts HTTP methods based on user permissions, implementing a standard read/write/admin permission model.
# METADATA
# title: Method-Based Permissions
# description: Restricts HTTP methods based on user read/write/admin permissions
# authors:
# - API Security Team <api-security@example.com>
# custom:
# category: http-authorization
package httpapi.authz
import rego.v1
import data.users
default allow := false
# METADATA
# title: Allow method-based access
# description: Permits requests when user has the required permission for the HTTP method
# entrypoint: true
# custom:
# severity: HIGH
# Allow GET requests for users with read permission
allow if {
input.method == "GET"
"read" in users[input.user].permissions
}
# Allow POST, PUT, PATCH for users with write permission
allow if {
input.method in {"POST", "PUT", "PATCH"}
"write" in users[input.user].permissions
}
# Allow DELETE for users with admin permission
allow if {
input.method == "DELETE"
"admin" in users[input.user].permissions
}Example Data (loaded as data.users):
{
"alice": {"permissions": ["read", "write"]},
"bob": {"permissions": ["read"]},
"admin": {"permissions": ["read", "write", "admin"]}
}Example Input:
{
"method": "DELETE",
"user": "alice"
}Result: allow == false (alice doesn't have admin permission)
Description: Controls access based on API path patterns using glob matching, allowing administrators to define flexible access rules.
# METADATA
# title: Path-Based Authorization
# description: Controls access based on API path patterns using glob matching
# authors:
# - API Security Team <api-security@example.com>
# custom:
# category: http-authorization
package httpapi.authz
import rego.v1
import data.api_permissions
default allow := false
# METADATA
# title: Allow path-matched requests
# description: Permits requests when the user has a matching path pattern permission
# entrypoint: true
# custom:
# severity: HIGH
# Check if user has permission for the requested path
allow if {
some pattern in api_permissions[input.user]
glob.match(pattern, ["/"], concat("/", input.path))
}Example Data (loaded as data.api_permissions):
{
"alice": ["/api/users/*", "/api/reports/personal/*"],
"bob": ["/api/*/read", "/api/public/*"],
"admin": ["/api/**"]
}Example Input:
{
"user": "alice",
"path": ["api", "users", "123"]
}Result: allow == true (matches pattern /api/users/*)
Description: Validates OAuth2 access tokens by checking scopes, expiration, and issuer claims.
# METADATA
# title: OAuth2 Token Validation
# description: Validates OAuth2 access tokens by checking scopes, expiration, and issuer
# authors:
# - API Security Team <api-security@example.com>
# custom:
# category: http-authorization
package httpapi.authz
import rego.v1
default allow := false
# METADATA
# title: Allow OAuth2-authenticated requests
# description: Permits requests when the OAuth2 token is valid and has the required scope
# entrypoint: true
# custom:
# severity: HIGH
# Allow access if token has required scope and is valid
allow if {
token_valid
has_required_scope
}
# Validate token expiration and issuer
token_valid if {
token.payload.exp > time.now_ns() / 1000000000
token.payload.iss == "https://auth.example.com"
}
# Check if token has required scope for the endpoint
has_required_scope if {
required_scope := endpoint_scopes[concat("/", input.path)]
required_scope in token.payload.scope
}
# Map endpoints to required OAuth2 scopes
endpoint_scopes := {
"/api/users": "users:read",
"/api/users/create": "users:write",
"/api/admin": "admin:access"
}
# Decode JWT token
token := {"payload": payload} if {
[header, payload, signature] := io.jwt.decode(input.token)
}Example Input:
{
"token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJodHRwczovL2F1dGguZXhhbXBsZS5jb20iLCJleHAiOjk5OTk5OTk5OTksInNjb3BlIjpbInVzZXJzOnJlYWQiXX0...",
"path": ["api", "users"]
}Result: allow == true (token is valid and has required scope)
Description: Integrates with OpenID Connect providers by validating ID tokens and extracting user profile information.
# METADATA
# title: OpenID Connect Integration
# description: Integrates with OIDC providers by validating ID tokens and user profiles
# authors:
# - API Security Team <api-security@example.com>
# custom:
# category: http-authorization
package httpapi.authz
import rego.v1
default allow := false
# METADATA
# title: Allow OIDC-authenticated requests
# description: Permits requests when the OIDC ID token is valid and user domain is allowed
# entrypoint: true
# custom:
# severity: HIGH
# Allow access if OIDC token is valid
allow if {
oidc_token_valid
user_has_access
}
# Validate OIDC ID token
oidc_token_valid if {
# Token not expired
token.payload.exp > time.now_ns() / 1000000000
# Token issued by trusted provider
token.payload.iss == "https://accounts.google.com"
# Token audience matches our application
token.payload.aud == "our-app-client-id"
# Email verified
token.payload.email_verified == true
}
# Check if user's email domain has access
user_has_access if {
email := token.payload.email
domain := split(email, "@")[1]
domain in allowed_domains
}
allowed_domains := {"example.com", "partner.com"}
# Decode ID token
token := {"payload": payload} if {
[header, payload, signature] := io.jwt.decode(input.id_token)
}Example Input:
{
"id_token": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJodHRwczovL2FjY291bnRzLmdvb2dsZS5jb20iLCJhdWQiOiJvdXItYXBwLWNsaWVudC1pZCIsImV4cCI6OTk5OTk5OTk5OSwiZW1haWwiOiJ1c2VyQGV4YW1wbGUuY29tIiwiZW1haWxfdmVyaWZpZWQiOnRydWV9..."
}Result: allow == true (valid OIDC token from allowed domain)
Description: Validates API keys and enforces key-specific permissions and rate limits.
# METADATA
# title: API Key Authentication
# description: Validates API keys and enforces key-specific permissions and rate limits
# authors:
# - API Security Team <api-security@example.com>
# custom:
# category: http-authorization
package httpapi.authz
import rego.v1
import data.api_keys
default allow := false
# METADATA
# title: Allow API key-authenticated requests
# description: Permits requests when the API key is valid and has matching path permission
# entrypoint: true
# custom:
# severity: HIGH
# Allow access if API key is valid and has permission
allow if {
api_key_valid
api_key_has_permission
}
# Validate API key exists and is active
api_key_valid if {
key := api_keys[input.api_key]
key.active == true
key.expires > time.now_ns()
}
# Check if API key has permission for the endpoint
api_key_has_permission if {
key := api_keys[input.api_key]
endpoint := concat("/", input.path)
some path in key.allowed_paths
glob.match(path, ["/"], endpoint)
}Example Data (loaded as data.api_keys):
{
"sk_live_abc123": {
"active": true,
"expires": 9999999999000000000,
"allowed_paths": ["/api/v1/*", "/webhooks/*"],
"rate_limit": 1000
},
"sk_test_xyz789": {
"active": true,
"expires": 9999999999000000000,
"allowed_paths": ["/api/v1/test/*"],
"rate_limit": 100
}
}Example Input:
{
"api_key": "sk_live_abc123",
"path": ["api", "v1", "users"]
}Result: allow == true (valid key with matching path permission)
Description: Enforces rate limits based on user identity or IP address to prevent abuse.
# METADATA
# title: Rate Limiting Policies
# description: Enforces rate limits based on user identity or IP address to prevent abuse
# authors:
# - API Security Team <api-security@example.com>
# custom:
# category: http-authorization
package httpapi.authz
import rego.v1
import data.rate_limits
default allow := false
# METADATA
# title: Allow rate-limited requests
# description: Permits requests when the user and IP are within their rate limits
# entrypoint: true
# custom:
# severity: MEDIUM
# Allow access if within rate limit
allow if {
not rate_limit_exceeded
}
# Check if user has exceeded their rate limit
rate_limit_exceeded if {
limit := user_rate_limit
count := rate_limits.user_requests[input.user]
count >= limit
}
# Check if IP has exceeded rate limit
rate_limit_exceeded if {
limit := ip_rate_limit
count := rate_limits.ip_requests[input.source_ip]
count >= limit
}
# Get user-specific rate limit
user_rate_limit := limit if {
import data.users
limit := users[input.user].rate_limit
} else := 100 # Default limit
# IP-based rate limit (stricter)
ip_rate_limit := 50Example Data (loaded as data.rate_limits):
{
"user_requests": {
"alice": 95,
"bob": 150
},
"ip_requests": {
"192.168.1.100": 45,
"192.168.1.101": 60
}
}Example Input:
{
"user": "bob",
"source_ip": "192.168.1.100"
}Result: allow == false (bob has exceeded rate limit: 150 >= 100)
Description: Enforces different authorization rules based on API version, allowing gradual migration and deprecation.
# METADATA
# title: API Versioning Policies
# description: Enforces different authorization rules based on API version
# authors:
# - API Security Team <api-security@example.com>
# custom:
# category: http-authorization
package httpapi.authz
import rego.v1
default allow := false
# METADATA
# title: Allow version-appropriate requests
# description: Permits requests based on API version-specific authentication requirements
# entrypoint: true
# custom:
# severity: LOW
# API v1 - Legacy, requires basic auth
allow if {
input.path[0] == "api"
input.path[1] == "v1"
input.auth_type == "basic"
valid_basic_auth
}
# API v2 - Modern, requires JWT
allow if {
input.path[0] == "api"
input.path[1] == "v2"
input.auth_type == "bearer"
valid_jwt_token
}
# API v3 - Future, requires OAuth2 with specific scopes
allow if {
input.path[0] == "api"
input.path[1] == "v3"
input.auth_type == "bearer"
valid_oauth2_token
has_required_scope_v3
}
valid_basic_auth if {
# Basic validation logic
input.credentials != ""
}
valid_jwt_token if {
token := {"payload": payload} | [_, payload, _] := io.jwt.decode(input.token)
token.payload.exp > time.now_ns() / 1000000000
}
valid_oauth2_token if {
valid_jwt_token
}
has_required_scope_v3 if {
token := {"payload": payload} | [_, payload, _] := io.jwt.decode(input.token)
"api:v3:access" in token.payload.scopes
}Example Input:
{
"path": ["api", "v2", "users"],
"auth_type": "bearer",
"token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjk5OTk5OTk5OTl9..."
}Result: allow == true (v2 API with valid JWT)
Description: Enforces Cross-Origin Resource Sharing (CORS) policies by validating origin headers.
# METADATA
# title: CORS Policy Enforcement
# description: Enforces Cross-Origin Resource Sharing policies by validating origin headers
# authors:
# - API Security Team <api-security@example.com>
# custom:
# category: http-authorization
package httpapi.authz
import rego.v1
default allow := false
# METADATA
# title: Allow CORS-compliant requests
# description: Permits requests from same-origin or allowed cross-origin sources
# entrypoint: true
# custom:
# severity: MEDIUM
# Allow same-origin requests
allow if {
input.method in {"GET", "POST", "PUT", "DELETE"}
not input.headers["origin"]
}
# Allow CORS requests from allowed origins
allow if {
input.method in {"GET", "POST", "PUT", "DELETE"}
origin := input.headers["origin"]
origin in allowed_origins
}
# Allow preflight requests from allowed origins
allow if {
input.method == "OPTIONS"
origin := input.headers["origin"]
origin in allowed_origins
}
# Define allowed origins
allowed_origins := {
"https://app.example.com",
"https://staging.example.com",
"https://admin.example.com"
}
# Additional check for credentials
deny contains "CORS credentials not allowed" if {
input.headers["origin"]
input.headers["origin"] not in allowed_origins
input.headers["access-control-allow-credentials"] == "true"
}Example Input:
{
"method": "POST",
"headers": {
"origin": "https://app.example.com",
"content-type": "application/json"
}
}Result: allow == true (request from allowed origin)
Description: Validates request body structure and content before allowing API access.
# METADATA
# title: Request Body Validation
# description: Validates request body structure and content before allowing API access
# authors:
# - API Security Team <api-security@example.com>
# custom:
# category: http-authorization
package httpapi.authz
import rego.v1
default allow := false
# METADATA
# title: Allow requests with valid body
# description: Permits requests when the request body passes structural and content validation
# entrypoint: true
# custom:
# severity: MEDIUM
# Allow POST requests with valid body
allow if {
input.method == "POST"
input.path == ["api", "users"]
valid_user_creation_body
}
# Validate user creation request body
valid_user_creation_body if {
body := input.body
# Required fields present
body.email
body.username
body.password
# Field validation
count(body.password) >= 8
contains(body.email, "@")
count(body.username) >= 3
# No prohibited fields
not body.is_admin
not body.role
}
# Allow PUT requests with valid update body
allow if {
input.method == "PUT"
input.path = ["api", "users", user_id]
valid_user_update_body
input.user == user_id # Can only update own profile
}
valid_user_update_body if {
body := input.body
# Only allowed fields
allowed_fields := {"email", "display_name", "bio"}
body_fields := {field | body[field]}
body_fields - allowed_fields == set()
}Example Input:
{
"method": "POST",
"path": ["api", "users"],
"body": {
"email": "user@example.com",
"username": "newuser",
"password": "securepass123"
}
}Result: allow == true (valid user creation body)
Description: Filters response data based on user permissions, removing sensitive fields.
# METADATA
# title: Response Filtering
# description: Filters response data based on user permissions, removing sensitive fields
# authors:
# - API Security Team <api-security@example.com>
# custom:
# category: http-authorization
package httpapi.authz
import rego.v1
import data.users
# METADATA
# title: Allowed response fields
# description: Determines which fields can be included in the response based on user role
# entrypoint: true
# custom:
# severity: MEDIUM
# Determine which fields can be included in response
allowed_response_fields contains field if {
input.method == "GET"
input.path = ["api", "users", user_id]
# Public fields always allowed
field in public_fields
}
allowed_response_fields contains field if {
input.method == "GET"
input.path = ["api", "users", user_id]
# Private fields only for the user themselves
user_id == input.user
field in private_fields
}
allowed_response_fields contains field if {
input.method == "GET"
input.path = ["api", "users", user_id]
# Admin fields only for admins
users[input.user].role == "admin"
field in admin_fields
}
public_fields := {"username", "display_name", "avatar"}
private_fields := {"email", "phone", "bio"}
admin_fields := {"created_at", "last_login", "ip_address", "status"}
# Generate filtered response
filtered_response := {field: value |
some field, value in input.response
field in allowed_response_fields
}Example Input:
{
"method": "GET",
"path": ["api", "users", "alice"],
"user": "bob",
"response": {
"username": "alice",
"email": "alice@example.com",
"display_name": "Alice Smith",
"created_at": "2023-01-01"
}
}Result: filtered_response contains only public fields (username, display_name)
Description: Enforces tenant isolation to prevent cross-tenant data access in SaaS applications.
# METADATA
# title: Tenant Isolation
# description: Enforces tenant isolation to prevent cross-tenant data access in SaaS applications
# authors:
# - API Security Team <api-security@example.com>
# custom:
# category: http-authorization
package httpapi.authz
import rego.v1
import data.tenants
default allow := false
# METADATA
# title: Allow tenant-scoped requests
# description: Permits requests when the user belongs to the requested tenant
# entrypoint: true
# custom:
# severity: HIGH
# Allow access if user belongs to the tenant
allow if {
user_tenant_id := user_tenant[input.user]
requested_tenant_id := extract_tenant_from_path
user_tenant_id == requested_tenant_id
}
# Extract tenant ID from request path
extract_tenant_from_path := tenant_id if {
input.path[0] == "tenants"
tenant_id := input.path[1]
}
# Get user's tenant
user_tenant[user] := tenant_id if {
some tenant_id, tenant in tenants
user in tenant.users
}
# Platform admins can access any tenant
allow if {
input.user in platform_admins
}
platform_admins := {"platform_admin", "support_admin"}Example Data (loaded as data.tenants):
{
"tenant_a": {
"users": ["alice", "bob"],
"name": "Company A"
},
"tenant_b": {
"users": ["charlie", "david"],
"name": "Company B"
}
}Example Input:
{
"user": "alice",
"path": ["tenants", "tenant_a", "resources", "123"]
}Result: allow == true (alice belongs to tenant_a)
Description: Restricts API access to specific time windows, such as business hours or maintenance windows.
# METADATA
# title: Time-Window Based Access
# description: Restricts API access to specific time windows such as business hours
# authors:
# - API Security Team <api-security@example.com>
# custom:
# category: http-authorization
package httpapi.authz
import rego.v1
default allow := false
# METADATA
# title: Allow time-restricted requests
# description: Permits requests during business hours and outside maintenance windows
# entrypoint: true
# custom:
# severity: MEDIUM
# Allow access during business hours
allow if {
is_business_hours
not is_maintenance_window
}
# Allow admin access anytime
allow if {
input.user in admins
}
# Check if current time is within business hours
is_business_hours if {
[hour, minute, second] := time.clock(time.now_ns())
hour >= 9
hour < 17
}
# Check if current time is within maintenance window
is_maintenance_window if {
[year, month, day] := time.date(time.now_ns())
[hour, minute, second] := time.clock(time.now_ns())
# Maintenance every Sunday 2-4 AM
day_of_week := time.weekday(time.now_ns())
day_of_week == "Sunday"
hour >= 2
hour < 4
}
admins := {"admin", "ops_team"}Example Input (assuming current time is Monday 10:30 AM):
{
"user": "alice"
}Result: allow == true (within business hours, not maintenance window)
Description: Controls API access based on source IP addresses using allowlists and denylists.
# METADATA
# title: IP Allowlist/Denylist
# description: Controls API access based on source IP addresses using allowlists and denylists
# authors:
# - API Security Team <api-security@example.com>
# custom:
# category: http-authorization
package httpapi.authz
import rego.v1
default allow := false
# METADATA
# title: Allow IP-validated requests
# description: Permits requests from allowlisted IPs that are not on the denylist
# entrypoint: true
# custom:
# severity: HIGH
# Allow if IP is in allowlist and not in denylist
allow if {
ip_allowed
not ip_denied
}
# Check if IP is in allowlist (using CIDR ranges)
ip_allowed if {
some cidr in ip_allowlist
net.cidr_contains(cidr, input.source_ip)
}
# Check if IP is explicitly denied
ip_denied if {
input.source_ip in ip_denylist
}
# Internal network ranges
ip_allowlist := [
"10.0.0.0/8",
"172.16.0.0/12",
"192.168.0.0/16"
]
# Known malicious IPs
ip_denylist := {
"192.168.1.100",
"10.0.0.50"
}
# Allow specific public IPs for partners
allow if {
input.source_ip in partner_ips
input.path[0] == "api"
input.path[1] == "partner"
}
partner_ips := {
"203.0.113.10",
"198.51.100.20"
}Example Input:
{
"source_ip": "10.0.0.25",
"path": ["api", "internal", "resources"]
}Result: allow == true (IP in allowlist and not denied)
Description: Restricts API access based on User-Agent headers to prevent scraping or enforce client requirements.
# METADATA
# title: User Agent Restrictions
# description: Restricts API access based on User-Agent headers to prevent scraping
# authors:
# - API Security Team <api-security@example.com>
# custom:
# category: http-authorization
package httpapi.authz
import rego.v1
default allow := false
# METADATA
# title: Allow approved user agent requests
# description: Permits requests from approved user agents and client applications
# entrypoint: true
# custom:
# severity: MEDIUM
# Allow requests from approved user agents
allow if {
user_agent := input.headers["user-agent"]
is_approved_user_agent(user_agent)
}
# Check if user agent is approved
is_approved_user_agent(ua) if {
# Official mobile app
contains(ua, "MyApp-iOS")
}
is_approved_user_agent(ua) if {
# Official Android app
contains(ua, "MyApp-Android")
}
is_approved_user_agent(ua) if {
# Web browser access
some allowed in approved_browsers
contains(ua, allowed)
}
# Block known bots and scrapers
deny contains "Bot/scraper not allowed" if {
user_agent := input.headers["user-agent"]
some blocked in blocked_patterns
contains(lower(user_agent), blocked)
}
approved_browsers := {
"Chrome",
"Firefox",
"Safari",
"Edge"
}
blocked_patterns := {
"bot",
"crawler",
"scraper",
"spider"
}Example Input:
{
"headers": {
"user-agent": "MyApp-iOS/1.2.3 (iPhone; iOS 15.0)"
}
}Result: allow == true (approved mobile app)
Description: Validates that requests have appropriate Content-Type headers for the endpoint.
# METADATA
# title: Content-Type Validation
# description: Validates that requests have appropriate Content-Type headers for the endpoint
# authors:
# - API Security Team <api-security@example.com>
# custom:
# category: http-authorization
package httpapi.authz
import rego.v1
default allow := false
# METADATA
# title: Allow content-type validated requests
# description: Permits requests with correct Content-Type headers for the target endpoint
# entrypoint: true
# custom:
# severity: LOW
# Allow GET requests (no content-type required)
allow if {
input.method == "GET"
}
# Allow POST/PUT with correct content type
allow if {
input.method in {"POST", "PUT", "PATCH"}
valid_content_type
}
# Validate content type for the endpoint
valid_content_type if {
content_type := input.headers["content-type"]
endpoint := concat("/", input.path)
required := required_content_types[endpoint]
startswith(content_type, required)
}
# Map endpoints to required content types
required_content_types := {
"/api/users": "application/json",
"/api/upload": "multipart/form-data",
"/api/webhook": "application/json",
"/api/import": "text/csv"
}
# Deny specific dangerous content types
deny contains "Dangerous content type" if {
content_type := input.headers["content-type"]
some dangerous in dangerous_content_types
contains(content_type, dangerous)
}
dangerous_content_types := {
"application/x-www-form-urlencoded", # If not expected
"text/html" # Prevent XSS injection
}Example Input:
{
"method": "POST",
"path": ["api", "users"],
"headers": {
"content-type": "application/json; charset=utf-8"
}
}Result: allow == true (correct content-type for endpoint)
Description: Validates query parameters to prevent injection attacks and enforce business rules.
# METADATA
# title: Query Parameter Validation
# description: Validates query parameters to prevent injection attacks and enforce business rules
# authors:
# - API Security Team <api-security@example.com>
# custom:
# category: http-authorization
package httpapi.authz
import rego.v1
default allow := false
# METADATA
# title: Allow parameter-validated requests
# description: Permits requests with valid query parameters and no injection risk
# entrypoint: true
# custom:
# severity: MEDIUM
# Allow requests with valid query parameters
allow if {
valid_query_params
not has_injection_risk
}
# Validate query parameters for the endpoint
valid_query_params if {
endpoint := concat("/", input.path)
endpoint == "/api/users"
validate_users_query_params
}
# Validate parameters for /api/users endpoint
validate_users_query_params if {
params := input.query_params
# Limit parameter must be a reasonable number
limit := to_number(params.limit)
limit > 0
limit <= 100
# Offset must be non-negative
offset := to_number(params.offset)
offset >= 0
# Sort field must be allowed
params.sort in allowed_sort_fields
}
# Check for SQL injection patterns
has_injection_risk if {
some param, value in input.query_params
some pattern in injection_patterns
contains(lower(value), pattern)
}
allowed_sort_fields := {
"created_at",
"username",
"email"
}
injection_patterns := {
"union select",
"drop table",
"--",
"/*",
"xp_",
"exec"
}Example Input:
{
"path": ["api", "users"],
"query_params": {
"limit": "50",
"offset": "0",
"sort": "username"
}
}Result: allow == true (valid query parameters)
Description: Enforces required HTTP headers for security and tracking purposes.
# METADATA
# title: HTTP Header Requirements
# description: Enforces required HTTP headers for security and tracking purposes
# authors:
# - API Security Team <api-security@example.com>
# custom:
# category: http-authorization
package httpapi.authz
import rego.v1
default allow := false
# METADATA
# title: Allow header-validated requests
# description: Permits requests that include all required headers with valid values
# entrypoint: true
# custom:
# severity: MEDIUM
# Allow requests with all required headers
allow if {
has_required_headers
valid_header_values
}
# Check if all required headers are present
has_required_headers if {
every header in required_headers {
input.headers[header]
}
}
# Validate header values
valid_header_values if {
# API version header
api_version := input.headers["x-api-version"]
api_version in supported_api_versions
# Request ID for tracing
request_id := input.headers["x-request-id"]
count(request_id) > 0
# Content Security Policy compliance
not input.headers["x-frame-options"]
not input.headers["x-xss-protection"]
}
# Security headers check
deny contains "Missing security headers" if {
input.path[0] == "admin"
some header in security_headers
not input.headers[header]
}
required_headers := {
"x-api-version",
"x-request-id"
}
supported_api_versions := {"v1", "v2", "v3"}
security_headers := {
"x-csrf-token",
"authorization"
}Example Input:
{
"path": ["api", "resources"],
"headers": {
"x-api-version": "v2",
"x-request-id": "req-abc-123-xyz",
"authorization": "Bearer token123"
}
}Result: allow == true (all required headers present and valid)
Description: Combines multiple authorization factors including user identity, JWT validation, role permissions, IP restrictions, and time constraints.
# METADATA
# title: Comprehensive Multi-Factor Authorization
# description: Combines multiple authorization factors including identity, JWT, roles, IP, and time
# authors:
# - API Security Team <api-security@example.com>
# custom:
# category: http-authorization
package httpapi.authz
import rego.v1
default allow := false
# METADATA
# title: Allow multi-factor authorized requests
# description: Permits requests that pass authentication, authorization, and blocking checks
# entrypoint: true
# custom:
# severity: HIGH
# Main authorization decision combining multiple factors
allow if {
authenticated
authorized
not blocked
}
# Authentication: Valid JWT token
authenticated if {
token_valid
token_not_expired
token_signature_valid
}
token_valid if {
[header, payload, signature] := io.jwt.decode(input.token)
payload.iss == "https://auth.example.com"
payload.aud == "api.example.com"
}
token_not_expired if {
[_, payload, _] := io.jwt.decode(input.token)
payload.exp > time.now_ns() / 1000000000
}
token_signature_valid if {
# In production, verify signature with public key
io.jwt.verify_rs256(input.token, input.public_key)
}
# Authorization: Role-based and attribute-based checks
authorized if {
has_required_role
has_path_permission
within_rate_limit
}
has_required_role if {
[_, payload, _] := io.jwt.decode(input.token)
required_role := endpoint_roles[concat("/", input.path)]
required_role in payload.roles
}
has_path_permission if {
[_, payload, _] := io.jwt.decode(input.token)
user_level := payload.level
endpoint_level := endpoint_levels[concat("/", input.path)]
user_level >= endpoint_level
}
within_rate_limit if {
import data.rate_limits
[_, payload, _] := io.jwt.decode(input.token)
user_id := payload.sub
current_requests := rate_limits[user_id]
current_requests < 1000
}
# Blocking conditions
blocked if {
ip_blocked
}
blocked if {
outside_business_hours
not emergency_access
}
ip_blocked if {
some blocked_ip in blocked_ips
input.source_ip == blocked_ip
}
outside_business_hours if {
[hour, _, _] := time.clock(time.now_ns())
hour < 9
hour >= 18
}
emergency_access if {
[_, payload, _] := io.jwt.decode(input.token)
"emergency" in payload.roles
}
# Configuration data
endpoint_roles := {
"/api/users": "user",
"/api/admin": "admin",
"/api/reports": "analyst"
}
endpoint_levels := {
"/api/users": 1,
"/api/admin": 5,
"/api/reports": 3
}
blocked_ips := {
"192.168.1.100",
"10.0.0.50"
}Example Input:
{
"token": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJodHRwczovL2F1dGguZXhhbXBsZS5jb20iLCJhdWQiOiJhcGkuZXhhbXBsZS5jb20iLCJleHAiOjk5OTk5OTk5OTksInN1YiI6InVzZXIxMjMiLCJyb2xlcyI6WyJ1c2VyIiwiYW5hbHlzdCJdLCJsZXZlbCI6M30...",
"path": ["api", "reports"],
"source_ip": "10.0.1.50",
"public_key": "-----BEGIN PUBLIC KEY-----\n..."
}Result: allow == true (all authorization factors satisfied)
Always use default allow := false to ensure that access is denied unless explicitly permitted.
Combine multiple authorization factors (authentication, roles, attributes, time, location) for defense in depth.
Always validate token signatures, expiration times, issuers, and audiences when using JWT or OAuth2 tokens.
Implement rate limiting at multiple levels (user, IP, API key) to prevent abuse.
While not shown in these examples, production systems should log all authorization decisions for security auditing.
Always validate and sanitize inputs to prevent injection attacks and ensure data integrity.
Keep authentication (verifying identity) separate from authorization (verifying permissions).
Ensure that errors or undefined values result in denied access, not granted access.
Write comprehensive tests for all authorization rules, including edge cases and attack scenarios.
Cache frequently accessed data and use efficient algorithms for pattern matching and collection operations.
Most HTTP API authorization policies expect input in the following structure:
{
"user": "alice",
"method": "GET",
"path": ["api", "v1", "resources", "123"],
"headers": {
"authorization": "Bearer token...",
"content-type": "application/json",
"user-agent": "MyApp/1.0"
},
"query_params": {
"limit": "10",
"offset": "0"
},
"body": {},
"source_ip": "192.168.1.50",
"token": "eyJhbGci...",
"timestamp": 1234567890
}# Python example
import requests
input_dict = {
"input": {
"user": request.user,
"method": request.method,
"path": request.path.strip("/").split("/"),
"headers": dict(request.headers),
"source_ip": request.remote_addr
}
}
response = requests.post(
"http://opa:8181/v1/data/httpapi/authz",
json=input_dict
)
if response.json().get("result", {}).get("allow"):
# Request authorized
pass
else:
# Request denied
return 403Most modern web frameworks support middleware that can integrate with OPA for authorization decisions before request handlers execute.
HTTP API authorization with Rego provides:
These patterns can be combined and extended to meet the specific security requirements of any HTTP API or microservices architecture.