CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-airbyte-source-google-directory

Airbyte source connector for Google Directory (Google Workspace Admin Directory API) that extracts users, groups, and group memberships.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

google-directory-apis.mddocs/

Google Directory APIs

Direct access to Google Directory API resources with authentication, pagination, error handling, and retry logic. These classes provide low-level access to users, groups, and group membership data from Google Workspace.

Capabilities

Core API Wrapper

The main API class that handles Google Directory API authentication and request execution.

class API:
    """
    Core Google Directory API client with authentication handling.
    
    Manages OAuth 2.0 and Service Account authentication, constructs
    Google API service resources, and executes API requests with
    automatic retry and rate limiting.
    """
    
    def __init__(self, credentials: Mapping[str, Any]):
        """
        Initialize the Google Directory API client.
        
        Parameters:
        - credentials: Dict containing authentication credentials
            For OAuth: client_id, client_secret, refresh_token
            For Service Account: credentials_json, email
        """

    def get(self, name: str, params: Dict = None) -> Dict:
        """
        Execute Google Directory API GET request with retry logic.
        
        Parameters:
        - name: API resource name ('users', 'groups', 'members')
        - params: Dict of query parameters for the API request
        
        Returns:
        - Dict: API response containing requested data
        
        Features automatic retry with exponential backoff for rate limits
        and quota exceeded errors using the backoff decorator.
        """
    
    @staticmethod
    def _load_account_info(credentials_json: str) -> Dict:
        """
        Parse service account credentials from JSON string.
        
        Parameters:
        - credentials_json: JSON string containing service account info
        
        Returns:
        - Dict: Parsed account information
        """
    
    def _obtain_service_account_creds(self):
        """
        Obtain Google service account credentials for domain-wide delegation.
        
        Uses the credentials_json and email from the configuration to create
        service account credentials with the required Directory API scopes
        and subject delegation for the admin email.
        """
    
    def _obtain_web_app_creds(self):
        """
        Obtain Google OAuth 2.0 web application credentials.
        
        Uses client_id, client_secret, and refresh_token from configuration
        to create OAuth credentials. Automatically refreshes tokens if expired.
        """
    
    def _obtain_creds(self):
        """
        Determine credential type and obtain appropriate authentication.
        
        Automatically detects whether to use service account or OAuth
        credentials based on the presence of credentials_json or client_id
        in the configuration.
        """
    
    def _construct_resource(self):
        """
        Construct the Google Directory API service resource.
        
        Obtains credentials if not already available and builds the
        'admin' service with 'directory_v1' API version.
        """
    
    def _get_resource(self, name: str):
        """
        Get Google Directory API resource by name.
        
        Parameters:
        - name: Resource name ('users', 'groups', 'members')
        
        Returns:
        - Google API resource object for the specified resource type
        """

Stream API Base Class

Abstract base class providing common functionality for all Google Directory stream APIs.

class StreamAPI(ABC):
    """
    Abstract base class for Google Directory stream APIs.
    
    Provides common pagination, response processing, and data reading
    functionality for all stream implementations.
    """
    
    results_per_page = 100  # Default page size for API requests
    
    def __init__(self, api: API, *args, **kwargs):
        """
        Initialize stream API with core API instance.
        
        Parameters:
        - api: Core API instance for making requests
        """
    
    @abstractmethod
    def list(self, fields: Sequence[str] = None) -> Iterator[dict]:
        """
        Iterate over entities for this stream.
        
        Parameters:
        - fields: Optional list of fields to include in response
        
        Returns:
        - Iterator[dict]: Iterator yielding individual records
        """
    
    @abstractmethod  
    def process_response(self, response: Dict) -> Iterator[dict]:
        """
        Process Google Directory API response and extract records.
        
        Parameters:
        - response: Raw API response dict
        
        Returns:
        - Iterator[dict]: Iterator yielding processed records
        """
    
    def _api_get(self, resource: str, params: Dict = None):
        """
        Internal method to call the core API get method.
        
        Parameters:
        - resource: API resource name
        - params: Query parameters for the request
        
        Returns:
        - Dict: API response data
        """
    
    def read(self, getter: Callable, params: Dict = None) -> Iterator:
        """
        Read data using getter function with automatic pagination.
        
        Parameters:
        - getter: Function to call for each page of data
        - params: Initial parameters for API request
        
        Returns:
        - Iterator: Iterator yielding records across all pages
        
        Handles nextPageToken pagination automatically.
        """

Users API

API for accessing Google Directory users data.

class UsersAPI(StreamAPI):
    """
    API for accessing Google Directory users.
    
    Provides access to user accounts in the Google Workspace domain
    including profile information, organizational data, and metadata.
    """
    
    def list(self, fields: Sequence[str] = None) -> Iterator[dict]:
        """
        List all users in the Google Workspace domain.
        
        Parameters:
        - fields: Optional list of user fields to include
        
        Returns:
        - Iterator[dict]: Iterator yielding user records
        
        Each user record contains standard Google Directory user fields
        including id, primaryEmail, name, orgUnitPath, etc.
        """
    
    def process_response(self, response: Dict) -> Iterator[dict]:
        """
        Extract users from API response.
        
        Parameters:
        - response: Raw API response containing users data
        
        Returns:
        - Iterator[dict]: Iterator yielding individual user records
        """

Groups API

API for accessing Google Directory groups data.

class GroupsAPI(StreamAPI):
    """
    API for accessing Google Directory groups.
    
    Provides access to groups in the Google Workspace domain including
    group metadata, settings, and organizational information.
    """
    
    def list(self, fields: Sequence[str] = None) -> Iterator[dict]:
        """
        List all groups in the Google Workspace domain.
        
        Parameters:
        - fields: Optional list of group fields to include
        
        Returns:
        - Iterator[dict]: Iterator yielding group records
        
        Each group record contains standard Google Directory group fields
        including id, email, name, description, etc.
        """
    
    def process_response(self, response: Dict) -> Iterator[dict]:
        """
        Extract groups from API response.
        
        Parameters:
        - response: Raw API response containing groups data
        
        Returns:
        - Iterator[dict]: Iterator yielding individual group records
        """

Group Members API

API for accessing Google Directory group membership data.

class GroupMembersAPI(StreamAPI):
    """
    API for accessing Google Directory group memberships.
    
    Provides access to group membership relationships, iterating through
    all groups and fetching members for each group.
    """
    
    def list(self, fields: Sequence[str] = None) -> Iterator[dict]:
        """
        List members for all groups in the domain.
        
        Parameters:
        - fields: Optional list of member fields to include
        
        Returns:
        - Iterator[dict]: Iterator yielding group member records
        
        Iterates through all groups and fetches membership data for each,
        yielding individual member records with group context.
        """
    
    def process_response(self, response: Dict) -> Iterator[dict]:
        """
        Extract group members from API response.
        
        Parameters:
        - response: Raw API response containing members data
        
        Returns:
        - Iterator[dict]: Iterator yielding individual member records
        
        Returns empty list if no members found in response.
        """

Utility Functions

Helper functions for API error handling and retry logic.

def rate_limit_handling(error):
    """
    Error handler for backoff retry logic.
    
    Determines whether API errors should trigger retries based on
    HTTP status codes and error reasons.
    
    Parameters:
    - error: GoogleApiHttpError instance
    
    Returns:
    - bool: True if error should not be retried, False to trigger retry
    
    Retries are disabled for specific rate limit cases:
    - HTTP 403 with reason 'quotaExceeded'
    - HTTP 429 with reason 'rateLimitExceeded'
    
    All other errors will trigger retry with exponential backoff.
    """

Usage Examples

Direct API Access

from source_google_directory.api import API, UsersAPI, GroupsAPI, GroupMembersAPI

# OAuth credentials
credentials = {
    "client_id": "your-client-id",
    "client_secret": "your-client-secret", 
    "refresh_token": "your-refresh-token"
}

# Initialize core API
api = API(credentials)

# Test direct API call
response = api.get("users", params={"customer": "my_customer"})
print(f"Found {len(response.get('users', []))} users")

Stream API Usage

# Initialize stream APIs
users_api = UsersAPI(api)
groups_api = GroupsAPI(api)
members_api = GroupMembersAPI(api)

# Iterate through users
for user in users_api.list():
    print(f"User: {user['primaryEmail']} ({user['name']['fullName']})")

# Iterate through groups  
for group in groups_api.list():
    print(f"Group: {group['email']} - {group['name']}")

# Iterate through group memberships
for member in members_api.list():
    print(f"Member: {member['email']} in group {member.get('groupId', 'N/A')}")

Service Account Configuration

# Service account credentials
service_credentials = {
    "credentials_json": '''{
        "type": "service_account",
        "project_id": "your-project",
        "private_key_id": "...",
        "private_key": "...",
        "client_email": "...",
        "client_id": "...",
        "auth_uri": "https://accounts.google.com/o/oauth2/auth",
        "token_uri": "https://oauth2.googleapis.com/token"
    }''',
    "email": "admin@yourdomain.com"
}

api = API(service_credentials)

Authentication Requirements

Required OAuth Scopes

SCOPES = [
    "https://www.googleapis.com/auth/admin.directory.user.readonly",
    "https://www.googleapis.com/auth/admin.directory.group.readonly"
]

Prerequisites

  • OAuth Method: OAuth 2.0 application configured in Google Cloud Console
  • Service Account Method: Service account with domain-wide delegation enabled
  • Admin Access: Credentials must have Google Workspace admin privileges
  • API Enable: Google Admin SDK Directory API must be enabled in the project

Error Handling

The APIs implement comprehensive error handling with the following features:

Retry Logic

  • Automatic Retry: Uses @backoff.on_exception decorator with exponential backoff
  • Max Retries: Up to 7 retry attempts for transient errors
  • Backoff Strategy: Exponential backoff starting from 1 second
  • Exception Type: Catches GoogleApiHttpError exceptions

Rate Limit Handling

  • HTTP 403 quotaExceeded: No retry (permanent failure)
  • HTTP 429 rateLimitExceeded: No retry (permanent failure)
  • All other errors: Automatic retry with exponential backoff
  • Custom Handler: Uses rate_limit_handling() function to determine retry eligibility

Authentication Management

  • OAuth Token Refresh: Automatic refresh when tokens expire
  • Service Account: Automatic credential validation and delegation setup
  • Error Detection: Graceful handling of authentication failures

Error Propagation

  • Meaningful Messages: Detailed error information for debugging
  • Health Check Integration: Connection errors surface through health_check() method
  • Exception Details: Preserves original Google API error context

Retry Configuration

@backoff.on_exception(
    backoff.expo,                    # Exponential backoff strategy
    GoogleApiHttpError,              # Exception type to catch
    max_tries=7,                     # Maximum retry attempts
    giveup=rate_limit_handling       # Custom retry decision function
)

Rate-limited requests that exceed quotas are treated as permanent failures and will not be retried, while transient network or server errors trigger automatic retry with increasing delays.

Install with Tessl CLI

npx tessl i tessl/pypi-airbyte-source-google-directory

docs

client-operations.md

google-directory-apis.md

index.md

main-connector.md

tile.json