0
# Common Utilities and Base Classes
1
2
Shared utilities, authentication backends, base classes, and helper functions used across all Google service integrations. Provides foundation for authentication, operation management, and service discovery.
3
4
## Capabilities
5
6
### Base Hook Classes
7
8
Foundation classes that all Google service hooks inherit from, providing common authentication and connection management.
9
10
```python { .api }
11
class GoogleBaseHook(BaseHook):
12
"""
13
Base class for all Google Cloud service hooks.
14
15
Provides common functionality for authentication, connection management,
16
and credential handling across all Google service integrations.
17
"""
18
def __init__(
19
self,
20
gcp_conn_id: str = "google_cloud_default",
21
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
22
**kwargs
23
): ...
24
25
def get_connection(self, conn_id: str): ...
26
def get_credentials_and_project_id(self): ...
27
def get_credentials(self): ...
28
def quota_retry(self, *args, **kwargs): ...
29
def fallback_to_default_project_id(self, project_id: Optional[str]): ...
30
31
class GoogleBaseAsyncHook(BaseHook):
32
"""
33
Base class for async Google Cloud service hooks.
34
35
Provides asynchronous operations support for long-running Google Cloud
36
processes with deferrable operator patterns.
37
"""
38
def __init__(
39
self,
40
gcp_conn_id: str = "google_cloud_default",
41
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
42
**kwargs
43
): ...
44
45
async def get_sync_hook(self): ...
46
def sync_hook(self): ...
47
48
class GoogleDiscoveryApiHook(GoogleBaseHook):
49
"""
50
Hook for Google Discovery API services.
51
52
Provides access to Google APIs through the Discovery service,
53
enabling dynamic API client generation.
54
"""
55
def __init__(
56
self,
57
api_service_name: str,
58
api_version: str,
59
gcp_conn_id: str = "google_cloud_default",
60
**kwargs
61
): ...
62
63
def get_conn(self): ...
64
def build_service(self): ...
65
```
66
67
### Operation Helpers
68
69
Utilities for managing long-running Google Cloud operations with polling and status checking.
70
71
```python { .api }
72
class OperationHelper:
73
"""
74
Helper class for managing Google Cloud long-running operations.
75
76
Provides polling, status checking, and result extraction for
77
asynchronous operations across Google Cloud services.
78
"""
79
def __init__(
80
self,
81
operation: Dict[str, Any],
82
project_id: Optional[str] = None,
83
location: Optional[str] = None,
84
**kwargs
85
): ...
86
87
def wait_for_operation(
88
self,
89
timeout: Optional[float] = None,
90
retry: Optional[Retry] = None
91
): ...
92
def is_done(self) -> bool: ...
93
def get_error_message(self) -> Optional[str]: ...
94
def get_result(self) -> Dict[str, Any]: ...
95
def cancel_operation(self): ...
96
```
97
98
### Authentication Backend
99
100
Google OpenID authentication backend for Airflow web server integration.
101
102
```python { .api }
103
class GoogleOpenIdAuthBackend:
104
"""
105
Google OpenID Connect authentication backend for Airflow.
106
107
Enables Google OAuth authentication for Airflow web interface
108
with support for domain restrictions and role mapping.
109
"""
110
def __init__(self): ...
111
112
def authenticate(self, request): ...
113
def login_required(self, function): ...
114
def has_access(self, action: str, resource: str, user) -> bool: ...
115
```
116
117
### ID Token Credentials
118
119
Adapter for Google ID token credentials used in service-to-service authentication.
120
121
```python { .api }
122
class IDTokenCredentialsAdapter:
123
"""
124
Adapter for Google ID token credentials.
125
126
Provides compatibility layer for ID token-based authentication
127
in Google Cloud service-to-service communication.
128
"""
129
def __init__(
130
self,
131
credentials,
132
target_audience: str,
133
**kwargs
134
): ...
135
136
def refresh(self, request): ...
137
def apply(self, headers, token=None): ...
138
def before_request(self, request, method, url, headers): ...
139
140
def get_default_id_token_credentials(
141
target_audience: str,
142
request: Optional[Any] = None
143
): ...
144
"""
145
Get default ID token credentials for target audience.
146
147
Args:
148
target_audience (str): Target service URL for ID token
149
request (Optional[Any]): HTTP request object
150
151
Returns:
152
ID token credentials for service-to-service auth
153
"""
154
155
def impersonated_id_token_credentials(
156
credentials,
157
target_audience: str,
158
target_principal: str,
159
delegates: Optional[List[str]] = None,
160
**kwargs
161
): ...
162
"""
163
Create impersonated ID token credentials.
164
165
Args:
166
credentials: Source credentials for impersonation
167
target_audience (str): Target service URL
168
target_principal (str): Service account to impersonate
169
delegates (Optional[List[str]]): Delegation chain
170
171
Returns:
172
Impersonated ID token credentials
173
"""
174
```
175
176
### Console Links
177
178
Link generators for Google Cloud Console navigation from Airflow web interface.
179
180
```python { .api }
181
class StorageLink(BaseOperatorLink):
182
"""
183
Link to Google Cloud Storage console for bucket or object viewing.
184
185
Generates direct links to GCS resources in the Google Cloud Console
186
for easy navigation from Airflow task instances.
187
"""
188
name: str = "Google Cloud Storage"
189
190
def get_link(
191
self,
192
operator: BaseOperator,
193
dttm: datetime,
194
**kwargs
195
) -> str: ...
196
197
class FileDetailsLink(BaseOperatorLink):
198
"""
199
Link to specific file details in Google Cloud Storage console.
200
201
Provides direct navigation to individual file properties and
202
metadata in the Google Cloud Console.
203
"""
204
name: str = "File Details"
205
206
def get_link(
207
self,
208
operator: BaseOperator,
209
dttm: datetime,
210
**kwargs
211
) -> str: ...
212
```
213
214
### Constants and Configuration
215
216
Common constants and configuration values used across the provider.
217
218
```python { .api }
219
# Default method name for deferrable operations
220
GOOGLE_DEFAULT_DEFERRABLE_METHOD_NAME: str = "execute_complete"
221
222
# Client information for API requests
223
CLIENT_INFO: Dict[str, str] = {
224
"client_library_name": "airflow",
225
"client_library_version": "airflow_version"
226
}
227
228
# Default scopes for Google Cloud APIs
229
DEFAULT_SCOPES: List[str] = [
230
"https://www.googleapis.com/auth/cloud-platform"
231
]
232
233
# Default timeout values
234
DEFAULT_TIMEOUT: float = 60.0
235
DEFAULT_RETRY_ATTEMPTS: int = 3
236
237
# Common HTTP status codes
238
HTTP_OK: int = 200
239
HTTP_CREATED: int = 201
240
HTTP_NO_CONTENT: int = 204
241
HTTP_NOT_FOUND: int = 404
242
```
243
244
## Usage Examples
245
246
### Custom Hook Implementation
247
248
```python { .api }
249
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
250
from typing import Optional, Dict, Any
251
252
class CustomGoogleServiceHook(GoogleBaseHook):
253
"""Custom hook for a specific Google service."""
254
255
def __init__(
256
self,
257
api_version: str = "v1",
258
gcp_conn_id: str = "google_cloud_default",
259
**kwargs
260
):
261
super().__init__(gcp_conn_id=gcp_conn_id, **kwargs)
262
self.api_version = api_version
263
264
def get_service_client(self):
265
"""Get authenticated service client."""
266
credentials, project_id = self.get_credentials_and_project_id()
267
return build_service_client(
268
credentials=credentials,
269
api_version=self.api_version
270
)
271
272
def create_resource(self, resource_config: Dict[str, Any]) -> Dict[str, Any]:
273
"""Create a resource using the service API."""
274
client = self.get_service_client()
275
operation = client.create_resource(body=resource_config)
276
277
# Use OperationHelper for long-running operations
278
from airflow.providers.google.common.hooks.operation_helpers import OperationHelper
279
280
helper = OperationHelper(
281
operation=operation,
282
project_id=self.project_id
283
)
284
return helper.wait_for_operation(timeout=300)
285
```
286
287
### Authentication Configuration
288
289
```python
290
# Service account key file authentication
291
gcp_connection = {
292
"conn_id": "google_cloud_custom",
293
"conn_type": "gcp",
294
"extra": {
295
"key_path": "/path/to/service-account.json",
296
"scope": "https://www.googleapis.com/auth/cloud-platform",
297
"project": "my-gcp-project"
298
}
299
}
300
301
# Service account impersonation
302
gcp_impersonation = {
303
"conn_id": "google_cloud_impersonated",
304
"conn_type": "gcp",
305
"extra": {
306
"impersonation_chain": [
307
"service-account@project.iam.gserviceaccount.com"
308
],
309
"project": "target-project"
310
}
311
}
312
```
313
314
### Error Handling Patterns
315
316
```python { .api }
317
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
318
from google.api_core.exceptions import GoogleAPICallError, RetryError
319
from airflow.exceptions import AirflowException
320
321
def safe_google_api_call(hook: GoogleBaseHook, operation_func, **kwargs):
322
"""Safely execute Google API calls with error handling."""
323
try:
324
return operation_func(**kwargs)
325
except GoogleAPICallError as e:
326
if e.code == 404:
327
raise AirflowException(f"Resource not found: {e.message}")
328
elif e.code == 403:
329
raise AirflowException(f"Permission denied: {e.message}")
330
elif e.code == 429:
331
raise AirflowException(f"Rate limit exceeded: {e.message}")
332
else:
333
raise AirflowException(f"Google API error: {e.message}")
334
except RetryError as e:
335
raise AirflowException(f"Operation timed out after retries: {e}")
336
except Exception as e:
337
raise AirflowException(f"Unexpected error: {str(e)}")
338
```
339
340
## Types
341
342
```python { .api }
343
from typing import Dict, List, Optional, Any, Union, Sequence
344
from google.oauth2.credentials import Credentials
345
from google.auth.credentials import Credentials as BaseCredentials
346
from airflow.models import BaseOperator
347
from airflow.models.baseoperatorlink import BaseOperatorLink
348
349
# Credential types
350
GoogleCredentials = Union[Credentials, BaseCredentials]
351
ServiceAccountInfo = Dict[str, Any]
352
ImpersonationChain = Union[str, Sequence[str]]
353
354
# Operation types
355
OperationResult = Dict[str, Any]
356
OperationStatus = str
357
ErrorInfo = Dict[str, str]
358
359
# Connection types
360
ConnectionConfig = Dict[str, Any]
361
AuthScopes = List[str]
362
ProjectId = str
363
LocationId = str
364
365
# Link types
366
ConsoleUrl = str
367
ResourceLink = str
368
```