Rego is the declarative policy language used by Open Policy Agent (OPA) for writing and enforcing policies across cloud-native stacks, featuring data-driven rules, comprehensions, and 200+ built-in functions for infrastructure, security, and compliance automation.
Overall
score
97%
This document provides comprehensive examples of using Rego for network security policies and regulatory compliance validation. These patterns are essential for enforcing security controls, managing network access, and meeting regulatory requirements across cloud-native infrastructure.
Validates that IP addresses fall within approved CIDR ranges, commonly used for restricting access to internal networks or approved cloud regions.
package network.policies
import rego.v1
# Allowed private network ranges
allowed_cidrs := [
"10.0.0.0/8",
"172.16.0.0/12",
"192.168.0.0/16"
]
# Deny access from IPs outside allowed ranges
deny contains msg if {
ip := input.source_ip
not is_allowed_ip(ip)
msg := sprintf("IP address %v is not in allowed CIDR ranges", [ip])
}
# Check if IP is within any allowed CIDR
is_allowed_ip(ip) if {
some cidr in allowed_cidrs
net.cidr_contains(cidr, ip)
}
# Validate multiple IPs against CIDR ranges
validate_ip_list contains result if {
some ip in input.ip_addresses
some cidr in allowed_cidrs
net.cidr_contains(cidr, ip)
result := {
"ip": ip,
"cidr": cidr,
"allowed": true
}
}Controls outbound network traffic to ensure services only communicate with approved external endpoints.
package network.egress
import rego.v1
# Approved external domains
allowed_domains := {
"api.github.com",
"registry.npmjs.org",
"*.googleapis.com",
"*.amazonaws.com",
"*.azure.com"
}
# Deny egress to unauthorized hosts
deny contains msg if {
host := input.request.host
not is_allowed_host(host)
msg := sprintf("Egress to %v is not allowed", [host])
}
# Check if host matches any allowed domain pattern
is_allowed_host(host) if {
some pattern in allowed_domains
glob.match(pattern, ["."], host)
}
# Validate egress by protocol and port
deny contains msg if {
input.request.protocol == "http"
input.request.port == 80
msg := "Unencrypted HTTP egress is not allowed"
}
# Allow only specific ports for specific domains
deny contains msg if {
host := input.request.host
port := input.request.port
not valid_port_for_host(host, port)
msg := sprintf("Port %v is not allowed for host %v", [port, host])
}
valid_port_for_host(host, port) if {
glob.match("*.amazonaws.com", ["."], host)
port in {443, 8443}
}Enforces network segmentation to isolate workloads based on sensitivity levels.
package network.segmentation
import rego.v1
# Network segment definitions
segments := {
"dmz": {
"cidrs": ["10.0.1.0/24", "10.0.2.0/24"],
"sensitivity": "public"
},
"application": {
"cidrs": ["10.0.10.0/24", "10.0.11.0/24"],
"sensitivity": "internal"
},
"database": {
"cidrs": ["10.0.20.0/24"],
"sensitivity": "confidential"
}
}
# Deny cross-segment traffic unless explicitly allowed
deny contains msg if {
source_segment := segment_for_ip(input.source_ip)
dest_segment := segment_for_ip(input.destination_ip)
source_segment != dest_segment
not allowed_flow(source_segment, dest_segment)
msg := sprintf("Traffic from %v segment to %v segment is not allowed", [source_segment, dest_segment])
}
# Determine which segment an IP belongs to
segment_for_ip(ip) := segment_name if {
some segment_name, config in segments
some cidr in config.cidrs
net.cidr_contains(cidr, ip)
}
# Define allowed cross-segment flows
allowed_flow("dmz", "application")
allowed_flow("application", "database")Validates firewall rules to ensure they follow security best practices.
package network.firewall
import rego.v1
# Deny overly permissive firewall rules
deny contains msg if {
some rule in input.firewall_rules
rule.source_range == "0.0.0.0/0"
rule.action == "allow"
sensitive_port(rule.port)
msg := sprintf("Firewall rule %v allows access from anywhere to sensitive port %v", [rule.name, rule.port])
}
# Define sensitive ports
sensitive_port(port) if {
port in {22, 3389, 1433, 3306, 5432, 6379, 27017}
}
# Require justification for broad access rules
deny contains msg if {
some rule in input.firewall_rules
rule.source_range == "0.0.0.0/0"
not rule.justification
msg := sprintf("Firewall rule %v allows broad access but lacks justification", [rule.name])
}
# Validate rule priority conflicts
deny contains msg if {
rule1 := input.firewall_rules[i]
rule2 := input.firewall_rules[j]
i != j
rule1.priority == rule2.priority
msg := sprintf("Firewall rules %v and %v have conflicting priority %v", [rule1.name, rule2.name, rule1.priority])
}Controls VPN access based on user attributes and network requirements.
package network.vpn
import data.users
import rego.v1
# Deny VPN access outside business hours for non-privileged users
deny contains msg if {
not is_business_hours
user := users[input.user_id]
not user.privileged_access
msg := "VPN access outside business hours requires privileged access"
}
is_business_hours if {
[hour, _, _] := time.clock(time.now_ns())
hour >= 8
hour < 18
}
# Require MFA for VPN connections
deny contains msg if {
not input.mfa_verified
msg := "Multi-factor authentication is required for VPN access"
}
# Restrict VPN access by geographic location
deny contains msg if {
location := input.connection_location
not location in allowed_locations
msg := sprintf("VPN access from location %v is not permitted", [location])
}
allowed_locations := {"US", "CA", "GB", "DE", "FR"}Ensures load balancers are configured securely.
package network.loadbalancer
import rego.v1
# Require HTTPS listeners
deny contains msg if {
lb := input.load_balancer
some listener in lb.listeners
listener.protocol == "HTTP"
listener.port != 80
msg := sprintf("Load balancer %v has HTTP listener on non-standard port %v", [lb.name, listener.port])
}
# Enforce TLS version for HTTPS listeners
deny contains msg if {
lb := input.load_balancer
some listener in lb.listeners
listener.protocol == "HTTPS"
not valid_tls_version(listener.ssl_policy)
msg := sprintf("Load balancer %v listener uses insecure TLS version", [lb.name])
}
valid_tls_version(policy) if {
policy.min_tls_version in {"TLSv1.2", "TLSv1.3"}
}
# Require health checks
deny contains msg if {
lb := input.load_balancer
not lb.health_check
msg := sprintf("Load balancer %v must have health checks configured", [lb.name])
}Validates DNS configurations to prevent common security issues.
package network.dns
import rego.v1
# Prevent DNS zone transfers to unauthorized servers
deny contains msg if {
zone := input.dns_zone
some transfer in zone.zone_transfers
not is_authorized_server(transfer.server)
msg := sprintf("DNS zone %v allows transfers to unauthorized server %v", [zone.name, transfer.server])
}
is_authorized_server(server) if {
authorized_servers := {"10.0.1.10", "10.0.1.11"}
server in authorized_servers
}
# Require DNSSEC for public zones
deny contains msg if {
zone := input.dns_zone
zone.visibility == "public"
not zone.dnssec_enabled
msg := sprintf("Public DNS zone %v must have DNSSEC enabled", [zone.name])
}
# Validate DNS record patterns
deny contains msg if {
zone := input.dns_zone
some record in zone.records
record.type == "A"
not valid_ip_format(record.value)
msg := sprintf("DNS A record for %v has invalid IP format", [record.name])
}
valid_ip_format(ip) if {
parts := split(ip, ".")
count(parts) == 4
}Implements zero-trust network access principles.
package network.zerotrust
import rego.v1
# Deny all traffic by default
default allow := false
# Allow only authenticated and authorized requests
allow if {
valid_identity
valid_device
valid_authorization
}
valid_identity if {
input.identity.verified
input.identity.mfa_verified
not is_expired(input.identity.token_expiry)
}
valid_device if {
input.device.registered
input.device.compliant
input.device.last_scan_days < 7
}
valid_authorization if {
some permission in input.identity.permissions
permission.resource == input.requested_resource
permission.action == input.requested_action
}
is_expired(expiry_time) if {
time.now_ns() > expiry_time
}Enforces security policies in service mesh environments.
package network.servicemesh
import rego.v1
# Require mTLS for inter-service communication
deny contains msg if {
connection := input.connection
connection.source_service != "ingress-gateway"
connection.destination_service != "egress-gateway"
not connection.mtls_enabled
msg := sprintf("Connection from %v to %v must use mTLS", [connection.source_service, connection.destination_service])
}
# Validate service identity
deny contains msg if {
connection := input.connection
not valid_service_identity(connection.source_identity)
msg := sprintf("Invalid service identity: %v", [connection.source_identity])
}
valid_service_identity(identity) if {
startswith(identity, "spiffe://")
contains(identity, "/ns/")
contains(identity, "/sa/")
}
# Enforce authorization policies
deny contains msg if {
connection := input.connection
not is_authorized_connection(connection.source_service, connection.destination_service)
msg := sprintf("Service %v is not authorized to access %v", [connection.source_service, connection.destination_service])
}
is_authorized_connection(source, destination) if {
policy := data.service_mesh.policies[destination]
source in policy.allowed_sources
}Validates that systems handling payment card data meet PCI-DSS encryption requirements.
package compliance.pci.encryption
import rego.v1
# Require encryption for databases storing cardholder data
deny contains msg if {
some resource in input.resources
resource.type == "database"
resource.stores_cardholder_data
not resource.encrypted_at_rest
msg := sprintf("Database %v stores cardholder data but is not encrypted (PCI-DSS Req 3.4)", [resource.id])
}
# Require strong encryption algorithms
deny contains msg if {
some resource in input.resources
resource.encrypted_at_rest
not valid_encryption_algorithm(resource.encryption_algorithm)
msg := sprintf("Resource %v uses weak encryption algorithm %v (PCI-DSS Req 3.5)", [resource.id, resource.encryption_algorithm])
}
valid_encryption_algorithm(algorithm) if {
algorithm in {"AES-256", "AES-256-GCM", "RSA-2048", "RSA-4096"}
}
# Require encryption in transit
deny contains msg if {
some connection in input.connections
connection.transmits_cardholder_data
not connection.encrypted_in_transit
msg := sprintf("Connection %v transmits cardholder data without encryption (PCI-DSS Req 4.1)", [connection.id])
}Ensures audit logging is enabled for systems in PCI-DSS scope.
package compliance.pci.logging
import rego.v1
# Require audit logging for all PCI-relevant systems
deny contains msg if {
some resource in input.resources
resource.pci_scope
not resource.audit_logging_enabled
msg := sprintf("Resource %v is in PCI scope but lacks audit logging (PCI-DSS Req 10.1)", [resource.id])
}
# Validate log retention period
deny contains msg if {
some resource in input.resources
resource.pci_scope
resource.log_retention_days < 365
msg := sprintf("Resource %v has insufficient log retention: %v days (PCI-DSS Req 10.7)", [resource.id, resource.log_retention_days])
}
# Require logging of authentication attempts
deny contains msg if {
some system in input.systems
system.handles_authentication
not logs_authentication_events(system)
msg := sprintf("System %v must log authentication attempts (PCI-DSS Req 10.2.4)", [system.id])
}
logs_authentication_events(system) if {
"authentication" in system.logged_event_types
}Validates access control mechanisms meet PCI-DSS requirements.
package compliance.pci.access
import rego.v1
# Require unique user IDs
deny contains msg if {
some user in input.users
count([u | some u in input.users; u.user_id == user.user_id]) > 1
msg := sprintf("Multiple users share user ID %v (PCI-DSS Req 8.1)", [user.user_id])
}
# Require password complexity
deny contains msg if {
some user in input.users
user.has_password
not meets_password_requirements(user.password_policy)
msg := sprintf("User %v password policy does not meet requirements (PCI-DSS Req 8.2.3)", [user.user_id])
}
meets_password_requirements(policy) if {
policy.min_length >= 7
policy.requires_uppercase
policy.requires_lowercase
policy.requires_numeric
}
# Require MFA for remote access
deny contains msg if {
some access in input.access_requests
access.access_type == "remote"
access.target_scope == "cardholder_data_environment"
not access.mfa_enabled
msg := sprintf("Remote access to CDE requires MFA (PCI-DSS Req 8.3)", [access.user])
}Ensures compliance with GDPR data protection requirements.
package compliance.gdpr.protection
import rego.v1
# Require encryption for personal data
deny contains msg if {
some resource in input.resources
resource.stores_personal_data
not resource.encrypted
msg := sprintf("Resource %v stores personal data without encryption (GDPR Art. 32)", [resource.id])
}
# Validate data minimization
deny contains msg if {
some collection in input.data_collections
some field in collection.fields
field.data_category == "personal"
not field.necessary_for_purpose
msg := sprintf("Collection %v includes unnecessary personal data field %v (GDPR Art. 5(1)(c))", [collection.name, field.name])
}
# Require data retention policies
deny contains msg if {
some resource in input.resources
resource.stores_personal_data
not resource.retention_policy
msg := sprintf("Resource %v lacks data retention policy (GDPR Art. 5(1)(e))", [resource.id])
}
# Validate consent mechanism
deny contains msg if {
some processing in input.data_processing
processing.legal_basis == "consent"
not valid_consent(processing.consent_mechanism)
msg := sprintf("Processing %v has invalid consent mechanism (GDPR Art. 7)", [processing.id])
}
valid_consent(mechanism) if {
mechanism.freely_given
mechanism.specific
mechanism.informed
mechanism.unambiguous
}Implements controls to support GDPR data subject rights.
package compliance.gdpr.rights
import rego.v1
# Require data subject access request (DSAR) capability
deny contains msg if {
some system in input.systems
system.processes_personal_data
not system.supports_dsar
msg := sprintf("System %v must support data subject access requests (GDPR Art. 15)", [system.id])
}
# Validate right to erasure implementation
deny contains msg if {
some system in input.systems
system.processes_personal_data
not system.supports_erasure
not has_valid_retention_justification(system)
msg := sprintf("System %v must support right to erasure (GDPR Art. 17)", [system.id])
}
has_valid_retention_justification(system) if {
system.retention_justification in {
"legal_obligation",
"public_interest",
"legal_claims"
}
}
# Require data portability for automated processing
deny contains msg if {
some processing in input.data_processing
processing.automated
processing.legal_basis == "consent"
not processing.supports_portability
msg := sprintf("Processing %v must support data portability (GDPR Art. 20)", [processing.id])
}Validates protection of Protected Health Information under HIPAA.
package compliance.hipaa.phi
import rego.v1
# Require encryption for PHI at rest
deny contains msg if {
some resource in input.resources
resource.contains_phi
not resource.encrypted_at_rest
msg := sprintf("Resource %v contains PHI but is not encrypted (HIPAA §164.312(a)(2)(iv))", [resource.id])
}
# Require encryption for PHI in transit
deny contains msg if {
some transmission in input.transmissions
transmission.contains_phi
not transmission.encrypted
msg := sprintf("Transmission %v contains PHI without encryption (HIPAA §164.312(e)(1))", [transmission.id])
}
# Validate access controls for PHI
deny contains msg if {
some resource in input.resources
resource.contains_phi
not resource.access_controls.role_based
msg := sprintf("Resource %v with PHI lacks role-based access controls (HIPAA §164.308(a)(4))", [resource.id])
}
# Require audit logs for PHI access
deny contains msg if {
some resource in input.resources
resource.contains_phi
not resource.audit_logging
msg := sprintf("Resource %v with PHI must have audit logging (HIPAA §164.312(b))", [resource.id])
}
# Validate minimum necessary principle
deny contains msg if {
some access in input.access_grants
access.resource_contains_phi
not is_minimum_necessary(access)
msg := sprintf("Access grant %v violates minimum necessary principle (HIPAA §164.502(b))", [access.id])
}
is_minimum_necessary(access) if {
access.justified_by_role
access.limited_fields
access.time_limited
}Ensures data stays within required geographic boundaries for compliance.
package compliance.data_residency
import rego.v1
# Define region requirements by data classification
region_requirements := {
"eu_customer_data": {"allowed_regions": {"eu-west-1", "eu-central-1", "eu-north-1"}},
"us_customer_data": {"allowed_regions": {"us-east-1", "us-west-2"}},
"global_public_data": {"allowed_regions": "*"}
}
# Deny resources storing data outside allowed regions
deny contains msg if {
some resource in input.resources
classification := resource.data_classification
requirements := region_requirements[classification]
requirements.allowed_regions != "*"
not resource.region in requirements.allowed_regions
msg := sprintf("Resource %v with %v must be in regions %v, found in %v", [
resource.id,
classification,
requirements.allowed_regions,
resource.region
])
}
# Validate cross-region replication
deny contains msg if {
some resource in input.resources
resource.replication_enabled
some replica_region in resource.replica_regions
classification := resource.data_classification
requirements := region_requirements[classification]
requirements.allowed_regions != "*"
not replica_region in requirements.allowed_regions
msg := sprintf("Resource %v replicates %v to unauthorized region %v", [
resource.id,
classification,
replica_region
])
}
# Prevent data transfer outside allowed regions
deny contains msg if {
some transfer in input.data_transfers
classification := transfer.data_classification
requirements := region_requirements[classification]
requirements.allowed_regions != "*"
not transfer.destination_region in requirements.allowed_regions
msg := sprintf("Data transfer %v moves %v to unauthorized region %v", [
transfer.id,
classification,
transfer.destination_region
])
}Implements SOC 2 Trust Service Criteria controls.
package compliance.soc2
import rego.v1
# CC6.1 - Logical Access Controls
deny contains msg if {
some system in input.systems
system.criticality == "high"
not system.requires_authentication
msg := sprintf("System %v lacks authentication requirements (SOC 2 CC6.1)", [system.id])
}
# CC6.6 - Encryption
deny contains msg if {
some data_store in input.data_stores
data_store.sensitivity in {"confidential", "restricted"}
not data_store.encrypted_at_rest
msg := sprintf("Sensitive data store %v is not encrypted (SOC 2 CC6.6)", [data_store.id])
}
# CC6.7 - Transmission Security
deny contains msg if {
some connection in input.connections
connection.data_classification != "public"
not connection.encrypted
msg := sprintf("Non-public data connection %v is not encrypted (SOC 2 CC6.7)", [connection.id])
}
# CC7.2 - System Monitoring
deny contains msg if {
some system in input.systems
not system.monitoring_enabled
msg := sprintf("System %v lacks monitoring (SOC 2 CC7.2)", [system.id])
}
# CC7.3 - Change Management
deny contains msg if {
some change in input.changes
change.impact == "high"
not change.approved
msg := sprintf("High-impact change %v is not approved (SOC 2 CC7.3)", [change.id])
}
# A1.2 - Availability Monitoring
deny contains msg if {
some service in input.services
service.sla_required
not service.availability_monitoring
msg := sprintf("Service %v with SLA lacks availability monitoring (SOC 2 A1.2)", [service.id])
}Enforces use of secure TLS/SSL versions and cipher suites.
package security.tls
import rego.v1
# Deny use of deprecated TLS versions
deny contains msg if {
config := input.tls_config
config.min_version in {"SSLv2", "SSLv3", "TLSv1.0", "TLSv1.1"}
msg := sprintf("TLS configuration uses deprecated version %v, minimum should be TLSv1.2", [config.min_version])
}
# Require strong cipher suites
deny contains msg if {
config := input.tls_config
some cipher in config.cipher_suites
is_weak_cipher(cipher)
msg := sprintf("Weak cipher suite detected: %v", [cipher])
}
is_weak_cipher(cipher) if {
weak_ciphers := {
"TLS_RSA_WITH_RC4_128_SHA",
"TLS_RSA_WITH_3DES_EDE_CBC_SHA",
"TLS_RSA_WITH_AES_128_CBC_SHA",
"TLS_RSA_WITH_AES_256_CBC_SHA"
}
cipher in weak_ciphers
}
# Require perfect forward secrecy
deny contains msg if {
config := input.tls_config
some cipher in config.cipher_suites
not supports_pfs(cipher)
msg := sprintf("Cipher suite %v does not support Perfect Forward Secrecy", [cipher])
}
supports_pfs(cipher) if {
contains(cipher, "ECDHE")
}
supports_pfs(cipher) if {
contains(cipher, "DHE")
}Validates SSL/TLS certificates meet security requirements.
package security.certificates
import rego.v1
# Require valid certificate expiry
deny contains msg if {
some cert in input.certificates
is_expired(cert.not_after)
msg := sprintf("Certificate %v has expired", [cert.common_name])
}
deny contains msg if {
some cert in input.certificates
expires_soon(cert.not_after, 30)
msg := sprintf("Certificate %v expires within 30 days", [cert.common_name])
}
is_expired(not_after) if {
expiry := time.parse_rfc3339_ns(not_after)
expiry < time.now_ns()
}
expires_soon(not_after, days) if {
expiry := time.parse_rfc3339_ns(not_after)
threshold := time.now_ns() + (days * 24 * 60 * 60 * 1000000000)
expiry < threshold
}
# Require strong key sizes
deny contains msg if {
some cert in input.certificates
cert.key_algorithm == "RSA"
cert.key_size < 2048
msg := sprintf("Certificate %v uses RSA key size %v, minimum is 2048", [cert.common_name, cert.key_size])
}
deny contains msg if {
some cert in input.certificates
cert.key_algorithm == "EC"
cert.key_size < 256
msg := sprintf("Certificate %v uses EC key size %v, minimum is 256", [cert.common_name, cert.key_size])
}
# Validate certificate chain
deny contains msg if {
some cert in input.certificates
not cert.chain_valid
msg := sprintf("Certificate %v has invalid chain", [cert.common_name])
}
# Require certificates from trusted CAs
deny contains msg if {
some cert in input.certificates
not is_trusted_ca(cert.issuer)
msg := sprintf("Certificate %v issued by untrusted CA: %v", [cert.common_name, cert.issuer])
}
is_trusted_ca(issuer) if {
trusted_cas := {
"DigiCert",
"Let's Encrypt",
"Amazon",
"Microsoft"
}
some ca in trusted_cas
contains(issuer, ca)
}Enforces mutual TLS authentication for service-to-service communication.
package security.mtls
import rego.v1
# Require mTLS for internal service communication
deny contains msg if {
some connection in input.connections
connection.source_type == "service"
connection.destination_type == "service"
not connection.mtls_enabled
msg := sprintf("Service-to-service connection from %v to %v must use mTLS", [
connection.source,
connection.destination
])
}
# Validate client certificate is presented
deny contains msg if {
some connection in input.connections
connection.mtls_enabled
not connection.client_cert_presented
msg := sprintf("mTLS connection from %v missing client certificate", [connection.source])
}
# Require certificate validation
deny contains msg if {
some connection in input.connections
connection.mtls_enabled
not connection.verify_client_cert
msg := sprintf("mTLS connection from %v does not verify client certificate", [connection.source])
}
# Validate certificate-based authorization
deny contains msg if {
some connection in input.connections
connection.mtls_enabled
not has_valid_spiffe_id(connection.client_cert_subject)
msg := sprintf("Client certificate for %v lacks valid SPIFFE ID", [connection.source])
}
has_valid_spiffe_id(subject) if {
startswith(subject, "spiffe://")
}
# Require certificate rotation policy
deny contains msg if {
some service in input.services
service.uses_mtls
not service.cert_rotation_policy
msg := sprintf("Service %v uses mTLS but lacks certificate rotation policy", [service.name])
}
deny contains msg if {
some service in input.services
service.cert_rotation_policy
service.cert_rotation_days > 90
msg := sprintf("Service %v certificate rotation period %v days exceeds maximum of 90", [
service.name,
service.cert_rotation_days
])
}Ensures comprehensive audit logging for security and compliance.
package security.audit
import rego.v1
# Require audit logging for sensitive operations
deny contains msg if {
some operation in input.operations
is_sensitive_operation(operation.type)
not operation.audit_logged
msg := sprintf("Sensitive operation %v must be audit logged", [operation.type])
}
is_sensitive_operation(op_type) if {
op_type in {
"user_login",
"user_logout",
"permission_change",
"data_access",
"data_modification",
"configuration_change",
"security_event"
}
}
# Validate log retention periods
deny contains msg if {
some system in input.systems
system.audit_logging_enabled
system.log_retention_days < minimum_retention_days(system.compliance_scope)
msg := sprintf("System %v log retention %v days is below minimum for %v compliance", [
system.id,
system.log_retention_days,
system.compliance_scope
])
}
minimum_retention_days(scope) := 365 if {
scope in {"pci-dss", "sox"}
}
minimum_retention_days(scope) := 180 if {
scope == "hipaa"
}
minimum_retention_days(scope) := 90 if {
scope == "default"
}
# Require log integrity protection
deny contains msg if {
log_config := input.log_configuration
not log_config.integrity_protection
msg := "Audit logs must have integrity protection enabled"
}
# Validate log fields
deny contains msg if {
some log_entry in input.log_entries
not has_required_fields(log_entry)
msg := sprintf("Log entry missing required fields: %v", [log_entry.id])
}
has_required_fields(entry) if {
entry.timestamp
entry.user
entry.action
entry.resource
entry.result
}
# Require centralized logging
deny contains msg if {
some system in input.systems
system.generates_audit_logs
not system.centralized_logging
msg := sprintf("System %v must send audit logs to central logging", [system.id])
}This document provides 21 comprehensive examples covering:
Network Security:
Regulatory Compliance:
Encryption & Certificates:
Audit & Logging:
These examples demonstrate production-ready patterns for implementing security policies and regulatory compliance validation using Rego. Each policy includes detailed validation logic, clear error messages, and follows Rego best practices for maintainability and performance.