Google Cloud reCAPTCHA Enterprise API client library for protecting websites and applications from fraud
—
Advanced account protection features including related account group analysis, membership tracking, and search capabilities for detecting coordinated fraud attempts and abuse patterns. These capabilities help identify and mitigate sophisticated attacks that involve multiple related accounts.
Retrieves related account groups that represent clusters of accounts with suspicious relationships, helping identify coordinated fraud attempts.
def list_related_account_groups(
request: ListRelatedAccountGroupsRequest = None,
*,
parent: str = None,
retry: Union[retries.Retry, gapic_v1.method._MethodDefault] = _MethodDefault._DEFAULT_VALUE,
timeout: Union[float, object] = _MethodDefault._DEFAULT_VALUE,
metadata: Sequence[Tuple[str, str]] = ()
) -> ListRelatedAccountGroupsResponse:
"""
List groups of related accounts in a project.
Args:
request: The request object for listing related account groups
parent: Required. The name of the project in format 'projects/{project}'
retry: Retry configuration for the request
timeout: Timeout for the request in seconds
metadata: Additional metadata for the request
Returns:
ListRelatedAccountGroupsResponse: List of related account groups
Raises:
google.api_core.exceptions.PermissionDenied: If insufficient permissions
google.api_core.exceptions.InvalidArgument: If parent format is invalid
"""from google.cloud import recaptchaenterprise
client = recaptchaenterprise.RecaptchaEnterpriseServiceClient()
# List related account groups
request = recaptchaenterprise.ListRelatedAccountGroupsRequest(
parent="projects/your-project-id"
)
response = client.list_related_account_groups(request=request)
print("Related account groups:")
for group in response.related_account_groups:
print(f"Group: {group.name}")
print(f" Accounts in group: {len(group.memberships) if group.memberships else 'Unknown'}")Retrieves the memberships within a specific related account group, showing which accounts are connected.
def list_related_account_group_memberships(
request: ListRelatedAccountGroupMembershipsRequest = None,
*,
parent: str = None,
retry: Union[retries.Retry, gapic_v1.method._MethodDefault] = _MethodDefault._DEFAULT_VALUE,
timeout: Union[float, object] = _MethodDefault._DEFAULT_VALUE,
metadata: Sequence[Tuple[str, str]] = ()
) -> ListRelatedAccountGroupMembershipsResponse:
"""
Get memberships in a related account group.
Args:
request: The request object for listing memberships
parent: Required. The group name in format 'projects/{project}/relatedaccountgroups/{group}'
retry: Retry configuration for the request
timeout: Timeout for the request in seconds
metadata: Additional metadata for the request
Returns:
ListRelatedAccountGroupMembershipsResponse: List of account memberships
Raises:
google.api_core.exceptions.NotFound: If the group doesn't exist
google.api_core.exceptions.PermissionDenied: If insufficient permissions
"""Searches for account group memberships based on hashed account identifiers, enabling investigation of specific accounts.
def search_related_account_group_memberships(
request: SearchRelatedAccountGroupMembershipsRequest = None,
*,
parent: str = None,
hashed_account_id: bytes = None,
retry: Union[retries.Retry, gapic_v1.method._MethodDefault] = _MethodDefault._DEFAULT_VALUE,
timeout: Union[float, object] = _MethodDefault._DEFAULT_VALUE,
metadata: Sequence[Tuple[str, str]] = ()
) -> SearchRelatedAccountGroupMembershipsResponse:
"""
Search for memberships in a related account group.
Args:
request: The request object for searching memberships
parent: Required. The project name in format 'projects/{project}'
hashed_account_id: Required. Hashed account identifier to search for
retry: Retry configuration for the request
timeout: Timeout for the request in seconds
metadata: Additional metadata for the request
Returns:
SearchRelatedAccountGroupMembershipsResponse: Search results
Raises:
google.api_core.exceptions.InvalidArgument: If search parameters are invalid
google.api_core.exceptions.PermissionDenied: If insufficient permissions
"""import hashlib
# Hash account identifier for search
account_id = "user123@example.com"
hashed_account_id = hashlib.sha256(account_id.encode()).digest()
# Search for related accounts
search_request = recaptchaenterprise.SearchRelatedAccountGroupMembershipsRequest(
parent="projects/your-project-id",
hashed_account_id=hashed_account_id
)
search_response = client.search_related_account_group_memberships(request=search_request)
print(f"Found {len(search_response.related_account_group_memberships)} related accounts")
for membership in search_response.related_account_group_memberships:
print(f"Group: {membership.name}")
print(f"Account: {membership.hashed_account_id.hex()}")class RelatedAccountGroup:
"""Group of related user accounts."""
name: str # Output only. Resource name
memberships: List[RelatedAccountGroupMembership] # Account memberships in groupclass RelatedAccountGroupMembership:
"""Membership in a related account group."""
name: str # Output only. Resource name
hashed_account_id: bytes # Hashed account identifier
account_id: str # Output only. Account identifier (if available)class ListRelatedAccountGroupsRequest:
"""Request message for listing related account groups."""
parent: str # Required. Project name in format 'projects/{project}'
page_size: int # Optional. Maximum results per page
page_token: str # Optional. Pagination token
class ListRelatedAccountGroupsResponse:
"""Response message for listing related account groups."""
related_account_groups: List[RelatedAccountGroup] # List of account groups
next_page_token: str # Token for next page of results
class ListRelatedAccountGroupMembershipsRequest:
"""Request message for listing account group memberships."""
parent: str # Required. Group name
page_size: int # Optional. Maximum results per page
page_token: str # Optional. Pagination token
class ListRelatedAccountGroupMembershipsResponse:
"""Response message for listing account group memberships."""
related_account_group_memberships: List[RelatedAccountGroupMembership] # Memberships
next_page_token: str # Token for next page
class SearchRelatedAccountGroupMembershipsRequest:
"""Request message for searching account group memberships."""
parent: str # Required. Project name
hashed_account_id: bytes # Required. Hashed account ID to search for
page_size: int # Optional. Maximum results per page
page_token: str # Optional. Pagination token
class SearchRelatedAccountGroupMembershipsResponse:
"""Response message for searching account group memberships."""
related_account_group_memberships: List[RelatedAccountGroupMembership] # Search results
next_page_token: str # Token for next pageimport hashlib
def investigate_account_relationships(client, account_identifier):
"""Investigate relationships for a specific account."""
# Hash the account identifier
hashed_id = hashlib.sha256(account_identifier.encode()).digest()
# Search for related account groups
search_request = recaptchaenterprise.SearchRelatedAccountGroupMembershipsRequest(
parent="projects/your-project-id",
hashed_account_id=hashed_id
)
search_response = client.search_related_account_group_memberships(request=search_request)
print(f"Investigating account: {account_identifier[:10]}...")
print(f"Found in {len(search_response.related_account_group_memberships)} related groups")
# Analyze each group the account belongs to
for membership in search_response.related_account_group_memberships:
group_name = membership.name.split('/')[3] # Extract group ID
print(f"\n--- Group {group_name} ---")
# Get all members of this group
list_request = recaptchaenterprise.ListRelatedAccountGroupMembershipsRequest(
parent=f"projects/your-project-id/relatedaccountgroups/{group_name}"
)
list_response = client.list_related_account_group_memberships(request=list_request)
print(f"Total members in group: {len(list_response.related_account_group_memberships)}")
# Show other members (first 5)
other_members = [m for m in list_response.related_account_group_memberships
if m.hashed_account_id != hashed_id]
print("Other members in group:")
for i, member in enumerate(other_members[:5]):
print(f" {i+1}. {member.hashed_account_id.hex()[:16]}...")
if len(other_members) > 5:
print(f" ... and {len(other_members) - 5} more")
# Investigate suspicious account
investigate_account_relationships(client, "suspicious.user@example.com")def check_account_risk_factors(client, assessment_response):
"""Check additional risk factors using related account analysis."""
# Extract account ID from assessment if available
if hasattr(assessment_response.event, 'hashed_account_id'):
hashed_account_id = assessment_response.event.hashed_account_id
# Search for related accounts
search_request = recaptchaenterprise.SearchRelatedAccountGroupMembershipsRequest(
parent="projects/your-project-id",
hashed_account_id=hashed_account_id
)
try:
search_response = client.search_related_account_group_memberships(request=search_request)
# Risk factors based on related accounts
group_count = len(search_response.related_account_group_memberships)
if group_count > 0:
print(f"Account belongs to {group_count} related account groups")
# Additional checks for high-risk patterns
for membership in search_response.related_account_group_memberships:
group_path = membership.name.rsplit('/', 1)[0] # Get group path
# Get group size
list_request = recaptchaenterprise.ListRelatedAccountGroupMembershipsRequest(
parent=group_path
)
list_response = client.list_related_account_group_memberships(request=list_request)
group_size = len(list_response.related_account_group_memberships)
if group_size > 10: # Large groups may indicate coordinated attacks
print(f"WARNING: Account in large group ({group_size} members)")
return "HIGH_RISK"
elif group_size > 5:
print(f"CAUTION: Account in medium group ({group_size} members)")
return "MEDIUM_RISK"
return "LOW_RISK"
else:
print("Account not found in any related groups")
return "UNKNOWN"
except Exception as e:
print(f"Error checking related accounts: {e}")
return "ERROR"
return "NO_ACCOUNT_ID"
# Use in assessment workflow
assessment = client.create_assessment(request=assessment_request)
account_risk = check_account_risk_factors(client, assessment)
print(f"Assessment score: {assessment.risk_analysis.score}")
print(f"Account risk level: {account_risk}")def analyze_account_groups(client, project_id):
"""Analyze all related account groups in a project."""
# Get all related account groups
list_request = recaptchaenterprise.ListRelatedAccountGroupsRequest(
parent=f"projects/{project_id}"
)
response = client.list_related_account_groups(request=list_request)
print(f"Found {len(response.related_account_groups)} related account groups")
group_stats = []
for group in response.related_account_groups:
# Get memberships for each group
memberships_request = recaptchaenterprise.ListRelatedAccountGroupMembershipsRequest(
parent=group.name
)
memberships_response = client.list_related_account_group_memberships(
request=memberships_request
)
group_size = len(memberships_response.related_account_group_memberships)
group_stats.append({
'name': group.name,
'size': group_size
})
# Sort by group size (largest first)
group_stats.sort(key=lambda x: x['size'], reverse=True)
print("\nLargest related account groups:")
for i, stats in enumerate(group_stats[:10]):
print(f"{i+1}. {stats['name']}: {stats['size']} members")
# Flag large groups for investigation
large_groups = [g for g in group_stats if g['size'] > 20]
if large_groups:
print(f"\nWARNING: {len(large_groups)} groups have >20 members - investigate for coordinated fraud")
return group_stats
# Analyze all groups
stats = analyze_account_groups(client, "your-project-id")import hashlib
import hmac
def hash_account_id(account_id, salt=None):
"""Hash account identifier for reCAPTCHA Enterprise."""
if salt:
# Use HMAC with salt for consistent hashing
return hmac.new(
salt.encode('utf-8'),
account_id.encode('utf-8'),
hashlib.sha256
).digest()
else:
# Simple SHA-256 hash
return hashlib.sha256(account_id.encode('utf-8')).digest()
def normalize_account_id(account_id):
"""Normalize account identifier before hashing."""
# Convert to lowercase and strip whitespace
normalized = account_id.lower().strip()
# Remove common variations (adjust for your use case)
# For email addresses
if '@' in normalized:
local, domain = normalized.split('@', 1)
# Remove dots from Gmail addresses
if domain in ['gmail.com', 'googlemail.com']:
local = local.replace('.', '')
normalized = f"{local}@{domain}"
return normalized
# Example usage
account_id = "User.Name@Gmail.com"
normalized_id = normalize_account_id(account_id) # "username@gmail.com"
hashed_id = hash_account_id(normalized_id)
print(f"Original: {account_id}")
print(f"Normalized: {normalized_id}")
print(f"Hashed: {hashed_id.hex()}")def create_assessment_with_account_analysis(client, event_data, account_identifier=None):
"""Create assessment with related account analysis."""
# Hash account identifier if provided
hashed_account_id = None
if account_identifier:
hashed_account_id = hash_account_id(normalize_account_id(account_identifier))
# Create event with account information
event = recaptchaenterprise.Event(
token=event_data.get('token'),
site_key=event_data.get('site_key'),
user_agent=event_data.get('user_agent'),
user_ip_address=event_data.get('user_ip'),
expected_action=event_data.get('action'),
hashed_account_id=hashed_account_id
)
# Create assessment
assessment = recaptchaenterprise.Assessment(event=event)
assessment_request = recaptchaenterprise.CreateAssessmentRequest(
parent="projects/your-project-id",
assessment=assessment
)
assessment_response = client.create_assessment(request=assessment_request)
# Analyze related accounts if account ID was provided
account_analysis = None
if hashed_account_id:
account_analysis = check_account_risk_factors(client, assessment_response)
return {
'assessment': assessment_response,
'account_risk': account_analysis,
'risk_score': assessment_response.risk_analysis.score
}
# Example usage
event_data = {
'token': '03AIIukzh7Z...',
'site_key': '6LdGwQ0...',
'user_agent': 'Mozilla/5.0...',
'user_ip': '203.0.113.42',
'action': 'login'
}
result = create_assessment_with_account_analysis(
client,
event_data,
account_identifier="user@example.com"
)
print(f"Risk score: {result['risk_score']}")
print(f"Account risk: {result['account_risk']}")from google.api_core import exceptions
try:
response = client.list_related_account_groups(request=request)
except exceptions.PermissionDenied as e:
print(f"Insufficient permissions for account analysis: {e}")
except exceptions.InvalidArgument as e:
print(f"Invalid request parameters: {e}")
try:
search_response = client.search_related_account_group_memberships(request=search_request)
except exceptions.InvalidArgument as e:
print(f"Invalid hashed account ID or search parameters: {e}")
except exceptions.NotFound as e:
print(f"No related account groups found: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-google-cloud-recaptcha-enterprise