CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-google-cloud-recaptcha-enterprise

Google Cloud reCAPTCHA Enterprise API client library for protecting websites and applications from fraud

Pending
Overview
Eval results
Files

account-security-analysis.mddocs/

Account Security Analysis

Advanced account protection features including related account group analysis, membership tracking, and search capabilities for detecting coordinated fraud attempts and abuse patterns. These capabilities help identify and mitigate sophisticated attacks that involve multiple related accounts.

Capabilities

List Related Account Groups

Retrieves related account groups that represent clusters of accounts with suspicious relationships, helping identify coordinated fraud attempts.

def list_related_account_groups(
    request: ListRelatedAccountGroupsRequest = None,
    *,
    parent: str = None,
    retry: Union[retries.Retry, gapic_v1.method._MethodDefault] = _MethodDefault._DEFAULT_VALUE,
    timeout: Union[float, object] = _MethodDefault._DEFAULT_VALUE,
    metadata: Sequence[Tuple[str, str]] = ()
) -> ListRelatedAccountGroupsResponse:
    """
    List groups of related accounts in a project.

    Args:
        request: The request object for listing related account groups
        parent: Required. The name of the project in format 'projects/{project}'
        retry: Retry configuration for the request
        timeout: Timeout for the request in seconds
        metadata: Additional metadata for the request

    Returns:
        ListRelatedAccountGroupsResponse: List of related account groups

    Raises:
        google.api_core.exceptions.PermissionDenied: If insufficient permissions
        google.api_core.exceptions.InvalidArgument: If parent format is invalid
    """

Usage Example

from google.cloud import recaptchaenterprise

client = recaptchaenterprise.RecaptchaEnterpriseServiceClient()

# List related account groups
request = recaptchaenterprise.ListRelatedAccountGroupsRequest(
    parent="projects/your-project-id"
)

response = client.list_related_account_groups(request=request)

print("Related account groups:")
for group in response.related_account_groups:
    print(f"Group: {group.name}")
    print(f"  Accounts in group: {len(group.memberships) if group.memberships else 'Unknown'}")

List Related Account Group Memberships

Retrieves the memberships within a specific related account group, showing which accounts are connected.

def list_related_account_group_memberships(
    request: ListRelatedAccountGroupMembershipsRequest = None,
    *,
    parent: str = None,
    retry: Union[retries.Retry, gapic_v1.method._MethodDefault] = _MethodDefault._DEFAULT_VALUE,
    timeout: Union[float, object] = _MethodDefault._DEFAULT_VALUE,
    metadata: Sequence[Tuple[str, str]] = ()
) -> ListRelatedAccountGroupMembershipsResponse:
    """
    Get memberships in a related account group.

    Args:
        request: The request object for listing memberships
        parent: Required. The group name in format 'projects/{project}/relatedaccountgroups/{group}'
        retry: Retry configuration for the request
        timeout: Timeout for the request in seconds
        metadata: Additional metadata for the request

    Returns:
        ListRelatedAccountGroupMembershipsResponse: List of account memberships

    Raises:
        google.api_core.exceptions.NotFound: If the group doesn't exist
        google.api_core.exceptions.PermissionDenied: If insufficient permissions
    """

Search Related Account Group Memberships

Searches for account group memberships based on hashed account identifiers, enabling investigation of specific accounts.

def search_related_account_group_memberships(
    request: SearchRelatedAccountGroupMembershipsRequest = None,
    *,
    parent: str = None,
    hashed_account_id: bytes = None,
    retry: Union[retries.Retry, gapic_v1.method._MethodDefault] = _MethodDefault._DEFAULT_VALUE,
    timeout: Union[float, object] = _MethodDefault._DEFAULT_VALUE,
    metadata: Sequence[Tuple[str, str]] = ()
) -> SearchRelatedAccountGroupMembershipsResponse:
    """
    Search for memberships in a related account group.

    Args:
        request: The request object for searching memberships
        parent: Required. The project name in format 'projects/{project}'
        hashed_account_id: Required. Hashed account identifier to search for
        retry: Retry configuration for the request
        timeout: Timeout for the request in seconds
        metadata: Additional metadata for the request

    Returns:
        SearchRelatedAccountGroupMembershipsResponse: Search results

    Raises:
        google.api_core.exceptions.InvalidArgument: If search parameters are invalid
        google.api_core.exceptions.PermissionDenied: If insufficient permissions
    """

Usage Example

import hashlib

# Hash account identifier for search
account_id = "user123@example.com"
hashed_account_id = hashlib.sha256(account_id.encode()).digest()

# Search for related accounts
search_request = recaptchaenterprise.SearchRelatedAccountGroupMembershipsRequest(
    parent="projects/your-project-id",
    hashed_account_id=hashed_account_id
)

search_response = client.search_related_account_group_memberships(request=search_request)

print(f"Found {len(search_response.related_account_group_memberships)} related accounts")
for membership in search_response.related_account_group_memberships:
    print(f"Group: {membership.name}")
    print(f"Account: {membership.hashed_account_id.hex()}")

Request and Response Types

RelatedAccountGroup

class RelatedAccountGroup:
    """Group of related user accounts."""
    name: str                                    # Output only. Resource name
    memberships: List[RelatedAccountGroupMembership]  # Account memberships in group

RelatedAccountGroupMembership

class RelatedAccountGroupMembership:
    """Membership in a related account group."""
    name: str                           # Output only. Resource name
    hashed_account_id: bytes           # Hashed account identifier
    account_id: str                    # Output only. Account identifier (if available)

Request Types

class ListRelatedAccountGroupsRequest:
    """Request message for listing related account groups."""
    parent: str                        # Required. Project name in format 'projects/{project}'
    page_size: int                    # Optional. Maximum results per page
    page_token: str                   # Optional. Pagination token

class ListRelatedAccountGroupsResponse:
    """Response message for listing related account groups."""
    related_account_groups: List[RelatedAccountGroup]  # List of account groups
    next_page_token: str              # Token for next page of results

class ListRelatedAccountGroupMembershipsRequest:
    """Request message for listing account group memberships."""
    parent: str                       # Required. Group name
    page_size: int                   # Optional. Maximum results per page
    page_token: str                  # Optional. Pagination token

class ListRelatedAccountGroupMembershipsResponse:
    """Response message for listing account group memberships."""
    related_account_group_memberships: List[RelatedAccountGroupMembership]  # Memberships
    next_page_token: str             # Token for next page

class SearchRelatedAccountGroupMembershipsRequest:
    """Request message for searching account group memberships."""
    parent: str                      # Required. Project name
    hashed_account_id: bytes         # Required. Hashed account ID to search for
    page_size: int                  # Optional. Maximum results per page
    page_token: str                 # Optional. Pagination token

class SearchRelatedAccountGroupMembershipsResponse:
    """Response message for searching account group memberships."""
    related_account_group_memberships: List[RelatedAccountGroupMembership]  # Search results
    next_page_token: str            # Token for next page

Usage Examples

Account Investigation Workflow

import hashlib

def investigate_account_relationships(client, account_identifier):
    """Investigate relationships for a specific account."""
    
    # Hash the account identifier
    hashed_id = hashlib.sha256(account_identifier.encode()).digest()
    
    # Search for related account groups
    search_request = recaptchaenterprise.SearchRelatedAccountGroupMembershipsRequest(
        parent="projects/your-project-id",
        hashed_account_id=hashed_id
    )
    
    search_response = client.search_related_account_group_memberships(request=search_request)
    
    print(f"Investigating account: {account_identifier[:10]}...")
    print(f"Found in {len(search_response.related_account_group_memberships)} related groups")
    
    # Analyze each group the account belongs to
    for membership in search_response.related_account_group_memberships:
        group_name = membership.name.split('/')[3]  # Extract group ID
        print(f"\n--- Group {group_name} ---")
        
        # Get all members of this group
        list_request = recaptchaenterprise.ListRelatedAccountGroupMembershipsRequest(
            parent=f"projects/your-project-id/relatedaccountgroups/{group_name}"
        )
        
        list_response = client.list_related_account_group_memberships(request=list_request)
        
        print(f"Total members in group: {len(list_response.related_account_group_memberships)}")
        
        # Show other members (first 5)
        other_members = [m for m in list_response.related_account_group_memberships 
                        if m.hashed_account_id != hashed_id]
        
        print("Other members in group:")
        for i, member in enumerate(other_members[:5]):
            print(f"  {i+1}. {member.hashed_account_id.hex()[:16]}...")
        
        if len(other_members) > 5:
            print(f"  ... and {len(other_members) - 5} more")

# Investigate suspicious account
investigate_account_relationships(client, "suspicious.user@example.com")

Fraud Detection Integration

def check_account_risk_factors(client, assessment_response):
    """Check additional risk factors using related account analysis."""
    
    # Extract account ID from assessment if available
    if hasattr(assessment_response.event, 'hashed_account_id'):
        hashed_account_id = assessment_response.event.hashed_account_id
        
        # Search for related accounts
        search_request = recaptchaenterprise.SearchRelatedAccountGroupMembershipsRequest(
            parent="projects/your-project-id",
            hashed_account_id=hashed_account_id
        )
        
        try:
            search_response = client.search_related_account_group_memberships(request=search_request)
            
            # Risk factors based on related accounts
            group_count = len(search_response.related_account_group_memberships)
            
            if group_count > 0:
                print(f"Account belongs to {group_count} related account groups")
                
                # Additional checks for high-risk patterns
                for membership in search_response.related_account_group_memberships:
                    group_path = membership.name.rsplit('/', 1)[0]  # Get group path
                    
                    # Get group size
                    list_request = recaptchaenterprise.ListRelatedAccountGroupMembershipsRequest(
                        parent=group_path
                    )
                    
                    list_response = client.list_related_account_group_memberships(request=list_request)
                    group_size = len(list_response.related_account_group_memberships)
                    
                    if group_size > 10:  # Large groups may indicate coordinated attacks
                        print(f"WARNING: Account in large group ({group_size} members)")
                        return "HIGH_RISK"
                    elif group_size > 5:
                        print(f"CAUTION: Account in medium group ({group_size} members)")
                        return "MEDIUM_RISK"
                
                return "LOW_RISK"
            else:
                print("Account not found in any related groups")
                return "UNKNOWN"
                
        except Exception as e:
            print(f"Error checking related accounts: {e}")
            return "ERROR"
    
    return "NO_ACCOUNT_ID"

# Use in assessment workflow
assessment = client.create_assessment(request=assessment_request)
account_risk = check_account_risk_factors(client, assessment)

print(f"Assessment score: {assessment.risk_analysis.score}")
print(f"Account risk level: {account_risk}")

Bulk Account Analysis

def analyze_account_groups(client, project_id):
    """Analyze all related account groups in a project."""
    
    # Get all related account groups
    list_request = recaptchaenterprise.ListRelatedAccountGroupsRequest(
        parent=f"projects/{project_id}"
    )
    
    response = client.list_related_account_groups(request=list_request)
    
    print(f"Found {len(response.related_account_groups)} related account groups")
    
    group_stats = []
    
    for group in response.related_account_groups:
        # Get memberships for each group
        memberships_request = recaptchaenterprise.ListRelatedAccountGroupMembershipsRequest(
            parent=group.name
        )
        
        memberships_response = client.list_related_account_group_memberships(
            request=memberships_request
        )
        
        group_size = len(memberships_response.related_account_group_memberships)
        group_stats.append({
            'name': group.name,
            'size': group_size
        })
    
    # Sort by group size (largest first)
    group_stats.sort(key=lambda x: x['size'], reverse=True)
    
    print("\nLargest related account groups:")
    for i, stats in enumerate(group_stats[:10]):
        print(f"{i+1}. {stats['name']}: {stats['size']} members")
    
    # Flag large groups for investigation
    large_groups = [g for g in group_stats if g['size'] > 20]
    if large_groups:
        print(f"\nWARNING: {len(large_groups)} groups have >20 members - investigate for coordinated fraud")
    
    return group_stats

# Analyze all groups
stats = analyze_account_groups(client, "your-project-id")

Account Hashing Utilities

import hashlib
import hmac

def hash_account_id(account_id, salt=None):
    """Hash account identifier for reCAPTCHA Enterprise."""
    if salt:
        # Use HMAC with salt for consistent hashing
        return hmac.new(
            salt.encode('utf-8'),
            account_id.encode('utf-8'),
            hashlib.sha256
        ).digest()
    else:
        # Simple SHA-256 hash
        return hashlib.sha256(account_id.encode('utf-8')).digest()

def normalize_account_id(account_id):
    """Normalize account identifier before hashing."""
    # Convert to lowercase and strip whitespace
    normalized = account_id.lower().strip()
    
    # Remove common variations (adjust for your use case)
    # For email addresses
    if '@' in normalized:
        local, domain = normalized.split('@', 1)
        # Remove dots from Gmail addresses
        if domain in ['gmail.com', 'googlemail.com']:
            local = local.replace('.', '')
        normalized = f"{local}@{domain}"
    
    return normalized

# Example usage
account_id = "User.Name@Gmail.com"
normalized_id = normalize_account_id(account_id)  # "username@gmail.com"
hashed_id = hash_account_id(normalized_id)

print(f"Original: {account_id}")
print(f"Normalized: {normalized_id}")
print(f"Hashed: {hashed_id.hex()}")

Integration with Assessment Creation

def create_assessment_with_account_analysis(client, event_data, account_identifier=None):
    """Create assessment with related account analysis."""
    
    # Hash account identifier if provided
    hashed_account_id = None
    if account_identifier:
        hashed_account_id = hash_account_id(normalize_account_id(account_identifier))
    
    # Create event with account information
    event = recaptchaenterprise.Event(
        token=event_data.get('token'),
        site_key=event_data.get('site_key'),
        user_agent=event_data.get('user_agent'),
        user_ip_address=event_data.get('user_ip'),
        expected_action=event_data.get('action'),
        hashed_account_id=hashed_account_id
    )
    
    # Create assessment
    assessment = recaptchaenterprise.Assessment(event=event)
    assessment_request = recaptchaenterprise.CreateAssessmentRequest(
        parent="projects/your-project-id",
        assessment=assessment
    )
    
    assessment_response = client.create_assessment(request=assessment_request)
    
    # Analyze related accounts if account ID was provided
    account_analysis = None
    if hashed_account_id:
        account_analysis = check_account_risk_factors(client, assessment_response)
    
    return {
        'assessment': assessment_response,
        'account_risk': account_analysis,
        'risk_score': assessment_response.risk_analysis.score
    }

# Example usage
event_data = {
    'token': '03AIIukzh7Z...',
    'site_key': '6LdGwQ0...',
    'user_agent': 'Mozilla/5.0...',
    'user_ip': '203.0.113.42',
    'action': 'login'
}

result = create_assessment_with_account_analysis(
    client, 
    event_data, 
    account_identifier="user@example.com"
)

print(f"Risk score: {result['risk_score']}")
print(f"Account risk: {result['account_risk']}")

Error Handling

from google.api_core import exceptions

try:
    response = client.list_related_account_groups(request=request)
except exceptions.PermissionDenied as e:
    print(f"Insufficient permissions for account analysis: {e}")
except exceptions.InvalidArgument as e:
    print(f"Invalid request parameters: {e}")

try:
    search_response = client.search_related_account_group_memberships(request=search_request)
except exceptions.InvalidArgument as e:
    print(f"Invalid hashed account ID or search parameters: {e}")
except exceptions.NotFound as e:
    print(f"No related account groups found: {e}")

Best Practices

Account Identifier Management

  • Use consistent normalization before hashing account IDs
  • Consider using HMAC with a secret salt for additional security
  • Handle different account identifier formats (email, username, etc.)
  • Document your hashing approach for consistency

Privacy and Security

  • Never store or log raw account identifiers
  • Use hashed identifiers in all API calls
  • Implement proper access controls for related account data
  • Regular audit of who has access to account relationship data

Analysis Workflows

  • Integrate related account analysis with existing fraud detection
  • Set appropriate thresholds for group size alerts
  • Monitor trends in related account group formation
  • Use related account data to enhance risk scoring models

Performance Considerations

  • Cache related account lookups for frequently analyzed accounts
  • Implement pagination for large result sets
  • Consider async processing for bulk account analysis
  • Monitor API quotas and rate limits for account analysis operations

Install with Tessl CLI

npx tessl i tessl/pypi-google-cloud-recaptcha-enterprise

docs

account-security-analysis.md

assessment-operations.md

firewall-policies.md

index.md

ip-override-management.md

key-management.md

metrics-analytics.md

tile.json