Airbyte source connector for Google Directory (Google Workspace Admin Directory API) that extracts users, groups, and group memberships.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Direct access to Google Directory API resources with authentication, pagination, error handling, and retry logic. These classes provide low-level access to users, groups, and group membership data from Google Workspace.
The main API class that handles Google Directory API authentication and request execution.
class API:
"""
Core Google Directory API client with authentication handling.
Manages OAuth 2.0 and Service Account authentication, constructs
Google API service resources, and executes API requests with
automatic retry and rate limiting.
"""
def __init__(self, credentials: Mapping[str, Any]):
"""
Initialize the Google Directory API client.
Parameters:
- credentials: Dict containing authentication credentials
For OAuth: client_id, client_secret, refresh_token
For Service Account: credentials_json, email
"""
def get(self, name: str, params: Dict = None) -> Dict:
"""
Execute Google Directory API GET request with retry logic.
Parameters:
- name: API resource name ('users', 'groups', 'members')
- params: Dict of query parameters for the API request
Returns:
- Dict: API response containing requested data
Features automatic retry with exponential backoff for rate limits
and quota exceeded errors using the backoff decorator.
"""
@staticmethod
def _load_account_info(credentials_json: str) -> Dict:
"""
Parse service account credentials from JSON string.
Parameters:
- credentials_json: JSON string containing service account info
Returns:
- Dict: Parsed account information
"""
def _obtain_service_account_creds(self):
"""
Obtain Google service account credentials for domain-wide delegation.
Uses the credentials_json and email from the configuration to create
service account credentials with the required Directory API scopes
and subject delegation for the admin email.
"""
def _obtain_web_app_creds(self):
"""
Obtain Google OAuth 2.0 web application credentials.
Uses client_id, client_secret, and refresh_token from configuration
to create OAuth credentials. Automatically refreshes tokens if expired.
"""
def _obtain_creds(self):
"""
Determine credential type and obtain appropriate authentication.
Automatically detects whether to use service account or OAuth
credentials based on the presence of credentials_json or client_id
in the configuration.
"""
def _construct_resource(self):
"""
Construct the Google Directory API service resource.
Obtains credentials if not already available and builds the
'admin' service with 'directory_v1' API version.
"""
def _get_resource(self, name: str):
"""
Get Google Directory API resource by name.
Parameters:
- name: Resource name ('users', 'groups', 'members')
Returns:
- Google API resource object for the specified resource type
"""Abstract base class providing common functionality for all Google Directory stream APIs.
class StreamAPI(ABC):
"""
Abstract base class for Google Directory stream APIs.
Provides common pagination, response processing, and data reading
functionality for all stream implementations.
"""
results_per_page = 100 # Default page size for API requests
def __init__(self, api: API, *args, **kwargs):
"""
Initialize stream API with core API instance.
Parameters:
- api: Core API instance for making requests
"""
@abstractmethod
def list(self, fields: Sequence[str] = None) -> Iterator[dict]:
"""
Iterate over entities for this stream.
Parameters:
- fields: Optional list of fields to include in response
Returns:
- Iterator[dict]: Iterator yielding individual records
"""
@abstractmethod
def process_response(self, response: Dict) -> Iterator[dict]:
"""
Process Google Directory API response and extract records.
Parameters:
- response: Raw API response dict
Returns:
- Iterator[dict]: Iterator yielding processed records
"""
def _api_get(self, resource: str, params: Dict = None):
"""
Internal method to call the core API get method.
Parameters:
- resource: API resource name
- params: Query parameters for the request
Returns:
- Dict: API response data
"""
def read(self, getter: Callable, params: Dict = None) -> Iterator:
"""
Read data using getter function with automatic pagination.
Parameters:
- getter: Function to call for each page of data
- params: Initial parameters for API request
Returns:
- Iterator: Iterator yielding records across all pages
Handles nextPageToken pagination automatically.
"""API for accessing Google Directory users data.
class UsersAPI(StreamAPI):
"""
API for accessing Google Directory users.
Provides access to user accounts in the Google Workspace domain
including profile information, organizational data, and metadata.
"""
def list(self, fields: Sequence[str] = None) -> Iterator[dict]:
"""
List all users in the Google Workspace domain.
Parameters:
- fields: Optional list of user fields to include
Returns:
- Iterator[dict]: Iterator yielding user records
Each user record contains standard Google Directory user fields
including id, primaryEmail, name, orgUnitPath, etc.
"""
def process_response(self, response: Dict) -> Iterator[dict]:
"""
Extract users from API response.
Parameters:
- response: Raw API response containing users data
Returns:
- Iterator[dict]: Iterator yielding individual user records
"""API for accessing Google Directory groups data.
class GroupsAPI(StreamAPI):
"""
API for accessing Google Directory groups.
Provides access to groups in the Google Workspace domain including
group metadata, settings, and organizational information.
"""
def list(self, fields: Sequence[str] = None) -> Iterator[dict]:
"""
List all groups in the Google Workspace domain.
Parameters:
- fields: Optional list of group fields to include
Returns:
- Iterator[dict]: Iterator yielding group records
Each group record contains standard Google Directory group fields
including id, email, name, description, etc.
"""
def process_response(self, response: Dict) -> Iterator[dict]:
"""
Extract groups from API response.
Parameters:
- response: Raw API response containing groups data
Returns:
- Iterator[dict]: Iterator yielding individual group records
"""API for accessing Google Directory group membership data.
class GroupMembersAPI(StreamAPI):
"""
API for accessing Google Directory group memberships.
Provides access to group membership relationships, iterating through
all groups and fetching members for each group.
"""
def list(self, fields: Sequence[str] = None) -> Iterator[dict]:
"""
List members for all groups in the domain.
Parameters:
- fields: Optional list of member fields to include
Returns:
- Iterator[dict]: Iterator yielding group member records
Iterates through all groups and fetches membership data for each,
yielding individual member records with group context.
"""
def process_response(self, response: Dict) -> Iterator[dict]:
"""
Extract group members from API response.
Parameters:
- response: Raw API response containing members data
Returns:
- Iterator[dict]: Iterator yielding individual member records
Returns empty list if no members found in response.
"""Helper functions for API error handling and retry logic.
def rate_limit_handling(error):
"""
Error handler for backoff retry logic.
Determines whether API errors should trigger retries based on
HTTP status codes and error reasons.
Parameters:
- error: GoogleApiHttpError instance
Returns:
- bool: True if error should not be retried, False to trigger retry
Retries are disabled for specific rate limit cases:
- HTTP 403 with reason 'quotaExceeded'
- HTTP 429 with reason 'rateLimitExceeded'
All other errors will trigger retry with exponential backoff.
"""from source_google_directory.api import API, UsersAPI, GroupsAPI, GroupMembersAPI
# OAuth credentials
credentials = {
"client_id": "your-client-id",
"client_secret": "your-client-secret",
"refresh_token": "your-refresh-token"
}
# Initialize core API
api = API(credentials)
# Test direct API call
response = api.get("users", params={"customer": "my_customer"})
print(f"Found {len(response.get('users', []))} users")# Initialize stream APIs
users_api = UsersAPI(api)
groups_api = GroupsAPI(api)
members_api = GroupMembersAPI(api)
# Iterate through users
for user in users_api.list():
print(f"User: {user['primaryEmail']} ({user['name']['fullName']})")
# Iterate through groups
for group in groups_api.list():
print(f"Group: {group['email']} - {group['name']}")
# Iterate through group memberships
for member in members_api.list():
print(f"Member: {member['email']} in group {member.get('groupId', 'N/A')}")# Service account credentials
service_credentials = {
"credentials_json": '''{
"type": "service_account",
"project_id": "your-project",
"private_key_id": "...",
"private_key": "...",
"client_email": "...",
"client_id": "...",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token"
}''',
"email": "admin@yourdomain.com"
}
api = API(service_credentials)SCOPES = [
"https://www.googleapis.com/auth/admin.directory.user.readonly",
"https://www.googleapis.com/auth/admin.directory.group.readonly"
]The APIs implement comprehensive error handling with the following features:
@backoff.on_exception decorator with exponential backoffGoogleApiHttpError exceptionsrate_limit_handling() function to determine retry eligibility@backoff.on_exception(
backoff.expo, # Exponential backoff strategy
GoogleApiHttpError, # Exception type to catch
max_tries=7, # Maximum retry attempts
giveup=rate_limit_handling # Custom retry decision function
)Rate-limited requests that exceed quotas are treated as permanent failures and will not be retried, while transient network or server errors trigger automatic retry with increasing delays.
Install with Tessl CLI
npx tessl i tessl/pypi-airbyte-source-google-directory