Google Cloud reCAPTCHA Enterprise API client library for protecting websites and applications from fraud
—
Advanced security rules that automatically respond to detected threats with configurable actions. Firewall policies provide automated threat response capabilities including allow, block, substitute, and redirect actions based on risk analysis and custom conditions.
Creates a new firewall policy with conditions and actions that automatically respond to assessment results.
def create_firewall_policy(
request: CreateFirewallPolicyRequest = None,
*,
parent: str = None,
firewall_policy: FirewallPolicy = None,
retry: Union[retries.Retry, gapic_v1.method._MethodDefault] = _MethodDefault._DEFAULT_VALUE,
timeout: Union[float, object] = _MethodDefault._DEFAULT_VALUE,
metadata: Sequence[Tuple[str, str]] = ()
) -> FirewallPolicy:
"""
Creates a firewall policy.
Args:
request: The request object for creating a firewall policy
parent: Required. The name of the project in format 'projects/{project}'
firewall_policy: Required. Firewall policy configuration
retry: Retry configuration for the request
timeout: Timeout for the request in seconds
metadata: Additional metadata for the request
Returns:
FirewallPolicy: The created firewall policy
Raises:
google.api_core.exceptions.InvalidArgument: If policy configuration is invalid
google.api_core.exceptions.AlreadyExists: If policy path already exists
google.api_core.exceptions.PermissionDenied: If insufficient permissions
"""from google.cloud import recaptchaenterprise
from google.cloud.recaptchaenterprise_v1.types import FirewallPolicy, FirewallAction
client = recaptchaenterprise.RecaptchaEnterpriseServiceClient()
# Create block action for low scores
block_action = FirewallAction(
block=FirewallAction.BlockAction()
)
# Create firewall policy
policy = FirewallPolicy(
description="Block suspicious login attempts",
path="/login/*",
condition="assessment.risk_analysis.score < 0.3",
actions=[block_action]
)
request = recaptchaenterprise.CreateFirewallPolicyRequest(
parent="projects/your-project-id",
firewall_policy=policy
)
created_policy = client.create_firewall_policy(request=request)
print(f"Created policy: {created_policy.name}")Retrieves all firewall policies for a project with support for pagination.
def list_firewall_policies(
request: ListFirewallPoliciesRequest = None,
*,
parent: str = None,
retry: Union[retries.Retry, gapic_v1.method._MethodDefault] = _MethodDefault._DEFAULT_VALUE,
timeout: Union[float, object] = _MethodDefault._DEFAULT_VALUE,
metadata: Sequence[Tuple[str, str]] = ()
) -> ListFirewallPoliciesResponse:
"""
Returns the list of firewall policies in the project.
Args:
request: The request object for listing firewall policies
parent: Required. The name of the project in format 'projects/{project}'
retry: Retry configuration for the request
timeout: Timeout for the request in seconds
metadata: Additional metadata for the request
Returns:
ListFirewallPoliciesResponse: List of firewall policies
Raises:
google.api_core.exceptions.PermissionDenied: If insufficient permissions
google.api_core.exceptions.InvalidArgument: If parent format is invalid
"""Retrieves detailed information about a specific firewall policy.
def get_firewall_policy(
request: GetFirewallPolicyRequest = None,
*,
name: str = None,
retry: Union[retries.Retry, gapic_v1.method._MethodDefault] = _MethodDefault._DEFAULT_VALUE,
timeout: Union[float, object] = _MethodDefault._DEFAULT_VALUE,
metadata: Sequence[Tuple[str, str]] = ()
) -> FirewallPolicy:
"""
Returns the specified firewall policy.
Args:
request: The request object for getting a firewall policy
name: Required. Policy name in format 'projects/{project}/firewallpolicies/{policy}'
retry: Retry configuration for the request
timeout: Timeout for the request in seconds
metadata: Additional metadata for the request
Returns:
FirewallPolicy: The firewall policy configuration
Raises:
google.api_core.exceptions.NotFound: If the policy doesn't exist
google.api_core.exceptions.PermissionDenied: If insufficient permissions
"""Updates the configuration of an existing firewall policy.
def update_firewall_policy(
request: UpdateFirewallPolicyRequest = None,
*,
firewall_policy: FirewallPolicy = None,
update_mask: FieldMask = None,
retry: Union[retries.Retry, gapic_v1.method._MethodDefault] = _MethodDefault._DEFAULT_VALUE,
timeout: Union[float, object] = _MethodDefault._DEFAULT_VALUE,
metadata: Sequence[Tuple[str, str]] = ()
) -> FirewallPolicy:
"""
Updates the specified firewall policy.
Args:
request: The request object for updating a firewall policy
firewall_policy: Required. The policy to update
update_mask: Optional. Field mask specifying which fields to update
retry: Retry configuration for the request
timeout: Timeout for the request in seconds
metadata: Additional metadata for the request
Returns:
FirewallPolicy: The updated firewall policy
Raises:
google.api_core.exceptions.NotFound: If the policy doesn't exist
google.api_core.exceptions.InvalidArgument: If update parameters are invalid
google.api_core.exceptions.PermissionDenied: If insufficient permissions
"""Permanently deletes a firewall policy.
def delete_firewall_policy(
request: DeleteFirewallPolicyRequest = None,
*,
name: str = None,
retry: Union[retries.Retry, gapic_v1.method._MethodDefault] = _MethodDefault._DEFAULT_VALUE,
timeout: Union[float, object] = _MethodDefault._DEFAULT_VALUE,
metadata: Sequence[Tuple[str, str]] = ()
) -> None:
"""
Deletes the specified firewall policy.
Args:
request: The request object for deleting a firewall policy
name: Required. Policy name in format 'projects/{project}/firewallpolicies/{policy}'
retry: Retry configuration for the request
timeout: Timeout for the request in seconds
metadata: Additional metadata for the request
Raises:
google.api_core.exceptions.NotFound: If the policy doesn't exist
google.api_core.exceptions.PermissionDenied: If insufficient permissions
"""Changes the order of firewall policies, which affects their evaluation priority.
def reorder_firewall_policies(
request: ReorderFirewallPoliciesRequest = None,
*,
parent: str = None,
names: List[str] = None,
retry: Union[retries.Retry, gapic_v1.method._MethodDefault] = _MethodDefault._DEFAULT_VALUE,
timeout: Union[float, object] = _MethodDefault._DEFAULT_VALUE,
metadata: Sequence[Tuple[str, str]] = ()
) -> ReorderFirewallPoliciesResponse:
"""
Reorders all firewall policies.
Args:
request: The request object for reordering policies
parent: Required. The name of the project in format 'projects/{project}'
names: Required. Ordered list of policy names
retry: Retry configuration for the request
timeout: Timeout for the request in seconds
metadata: Additional metadata for the request
Returns:
ReorderFirewallPoliciesResponse: Confirmation of reordering
Raises:
google.api_core.exceptions.InvalidArgument: If policy names are invalid
google.api_core.exceptions.PermissionDenied: If insufficient permissions
"""class FirewallPolicy:
"""Firewall policy configuration."""
name: str # Output only. Resource name
description: str # Human-readable description
path: str # Path pattern to match requests
condition: str # CEL expression for triggering policy
actions: List[FirewallAction] # Actions to take when policy matchesclass FirewallAction:
"""Action to take when firewall conditions match."""
allow: AllowAction # Allow the request
block: BlockAction # Block the request
substitute: SubstituteAction # Substitute token or response
redirect: RedirectAction # Redirect to different URL
set_header: SetHeaderAction # Set HTTP headers
class AllowAction:
"""Allow action - lets the request proceed."""
pass # Empty class, just allows request
class BlockAction:
"""Block action - stops the request."""
pass # Empty class, blocks request
class SubstituteAction:
"""Substitute action - replaces token or response."""
path: str # Path to substitute
class RedirectAction:
"""Redirect action - redirects to different URL."""
pass # Empty class, uses default redirect
class SetHeaderAction:
"""Set header action - adds HTTP headers."""
key: str # Header name
value: str # Header valueclass CreateFirewallPolicyRequest:
"""Request message for creating a firewall policy."""
parent: str # Required. Project name
firewall_policy: FirewallPolicy # Required. Policy configuration
class ListFirewallPoliciesRequest:
"""Request message for listing firewall policies."""
parent: str # Required. Project name
page_size: int # Optional. Maximum results per page
page_token: str # Optional. Pagination token
class ListFirewallPoliciesResponse:
"""Response message for listing firewall policies."""
firewall_policies: List[FirewallPolicy] # The list of policies
next_page_token: str # Token for next page
class GetFirewallPolicyRequest:
"""Request message for getting a firewall policy."""
name: str # Required. Policy name
class UpdateFirewallPolicyRequest:
"""Request message for updating a firewall policy."""
firewall_policy: FirewallPolicy # Required. Updated policy
update_mask: FieldMask # Optional. Fields to update
class DeleteFirewallPolicyRequest:
"""Request message for deleting a firewall policy."""
name: str # Required. Policy name
class ReorderFirewallPoliciesRequest:
"""Request message for reordering firewall policies."""
parent: str # Required. Project name
names: List[str] # Required. Ordered policy names
class ReorderFirewallPoliciesResponse:
"""Response message for reordering firewall policies."""
pass # Empty response confirming reorder# Block requests with risk scores below 0.3
block_policy = FirewallPolicy(
description="Block suspicious login attempts",
path="/api/login",
condition="assessment.risk_analysis.score < 0.3",
actions=[FirewallAction(block=FirewallAction.BlockAction())]
)# Redirect suspicious registration attempts to verification page
redirect_policy = FirewallPolicy(
description="Redirect suspicious registrations",
path="/register",
condition="assessment.risk_analysis.score < 0.5 && assessment.token_properties.action == 'register'",
actions=[FirewallAction(redirect=FirewallAction.RedirectAction())]
)# Add monitoring headers for medium-risk requests
header_policy = FirewallPolicy(
description="Tag medium-risk requests",
path="/api/*",
condition="assessment.risk_analysis.score >= 0.3 && assessment.risk_analysis.score < 0.7",
actions=[
FirewallAction(
set_header=FirewallAction.SetHeaderAction(
key="X-Risk-Level",
value="medium"
)
),
FirewallAction(allow=FirewallAction.AllowAction())
]
)# Complex policy with multiple actions
multi_action_policy = FirewallPolicy(
description="Multi-tier response for payment endpoints",
path="/payment/*",
condition="assessment.risk_analysis.score < 0.6",
actions=[
# Set warning header
FirewallAction(
set_header=FirewallAction.SetHeaderAction(
key="X-Fraud-Warning",
value="high-risk-detected"
)
),
# Allow with monitoring
FirewallAction(allow=FirewallAction.AllowAction())
]
)# Block based on Account Defender signals
account_defender_policy = FirewallPolicy(
description="Block suspicious account behavior",
path="/account/*",
condition="""
assessment.account_defender_assessment.labels.contains('SUSPICIOUS_ACCOUNT_CREATION') ||
assessment.account_defender_assessment.labels.contains('SUSPICIOUS_LOGIN_ACTIVITY')
""",
actions=[FirewallAction(block=FirewallAction.BlockAction())]
)# Block high-risk transactions
fraud_prevention_policy = FirewallPolicy(
description="Block high-risk transactions",
path="/checkout",
condition="""
assessment.fraud_prevention_assessment.transaction_risk > 0.8 ||
assessment.fraud_prevention_assessment.stolen_instrument_verdict.risk > 0.7
""",
actions=[FirewallAction(block=FirewallAction.BlockAction())]
)# Different rules based on request context
geo_time_policy = FirewallPolicy(
description="Geographic and time-based filtering",
path="/sensitive/*",
condition="""
assessment.risk_analysis.score < 0.4 ||
(assessment.event.user_ip_address.startsWith('192.168.') &&
assessment.risk_analysis.score < 0.6)
""",
actions=[FirewallAction(block=FirewallAction.BlockAction())]
)# Create policy
policy = FirewallPolicy(
description="Production login protection",
path="/login",
condition="assessment.risk_analysis.score < 0.4",
actions=[FirewallAction(block=FirewallAction.BlockAction())]
)
created_policy = client.create_firewall_policy(
parent="projects/your-project",
firewall_policy=policy
)
# Update policy condition
from google.protobuf import field_mask_pb2
updated_policy = FirewallPolicy(
name=created_policy.name,
description="Updated login protection",
condition="assessment.risk_analysis.score < 0.3" # Stricter threshold
)
update_mask = field_mask_pb2.FieldMask(paths=["description", "condition"])
client.update_firewall_policy(
firewall_policy=updated_policy,
update_mask=update_mask
)
# Reorder policies (most specific first)
policy_names = [
"projects/your-project/firewallpolicies/login-policy",
"projects/your-project/firewallpolicies/api-policy",
"projects/your-project/firewallpolicies/general-policy"
]
client.reorder_firewall_policies(
parent="projects/your-project",
names=policy_names
)Firewall policies use Common Expression Language (CEL) for conditions. Available fields:
# Risk analysis
assessment.risk_analysis.score # Risk score (0.0-1.0)
assessment.risk_analysis.reasons # List of risk reasons
# Token properties
assessment.token_properties.valid # Token validity
assessment.token_properties.action # Expected action
assessment.token_properties.hostname # Token hostname
# Event details
assessment.event.user_ip_address # Client IP address
assessment.event.user_agent # User agent string
assessment.event.expected_action # Expected action name
# Account Defender
assessment.account_defender_assessment.labels # Account defender labels
# Fraud Prevention
assessment.fraud_prevention_assessment.transaction_risk # Transaction risk score# Comparison operators
assessment.risk_analysis.score < 0.5
assessment.risk_analysis.score >= 0.3
# Logical operators
condition1 && condition2
condition1 || condition2
!condition
# String operations
assessment.event.user_agent.contains('bot')
assessment.event.user_ip_address.startsWith('192.168.')
assessment.token_properties.hostname.endsWith('.example.com')
# List operations
assessment.risk_analysis.reasons.contains('SUSPICIOUS_ACTIVITY')
assessment.account_defender_assessment.labels.size() > 0from google.api_core import exceptions
try:
policy = client.create_firewall_policy(request=request)
except exceptions.InvalidArgument as e:
print(f"Invalid policy configuration: {e}")
# Check CEL condition syntax, path patterns, action configuration
except exceptions.AlreadyExists as e:
print(f"Policy with this path already exists: {e}")
except exceptions.PermissionDenied as e:
print(f"Insufficient permissions: {e}")
try:
client.reorder_firewall_policies(parent=parent, names=policy_names)
except exceptions.InvalidArgument as e:
print(f"Invalid policy names or order: {e}")
# Ensure all policies exist and names are correctInstall with Tessl CLI
npx tessl i tessl/pypi-google-cloud-recaptcha-enterprise