or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

common-utilities.mddata-transfers.mdfirebase.mdgcp-services.mdgoogle-ads.mdgoogle-workspace.mdindex.mdleveldb.mdmarketing-platform.md

common-utilities.mddocs/

0

# Common Utilities and Base Classes

1

2

Shared utilities, authentication backends, base classes, and helper functions used across all Google service integrations. Provides foundation for authentication, operation management, and service discovery.

3

4

## Capabilities

5

6

### Base Hook Classes

7

8

Foundation classes that all Google service hooks inherit from, providing common authentication and connection management.

9

10

```python { .api }

11

class GoogleBaseHook(BaseHook):

12

"""

13

Base class for all Google Cloud service hooks.

14

15

Provides common functionality for authentication, connection management,

16

and credential handling across all Google service integrations.

17

"""

18

def __init__(

19

self,

20

gcp_conn_id: str = "google_cloud_default",

21

impersonation_chain: Optional[Union[str, Sequence[str]]] = None,

22

**kwargs

23

): ...

24

25

def get_connection(self, conn_id: str): ...

26

def get_credentials_and_project_id(self): ...

27

def get_credentials(self): ...

28

def quota_retry(self, *args, **kwargs): ...

29

def fallback_to_default_project_id(self, project_id: Optional[str]): ...

30

31

class GoogleBaseAsyncHook(BaseHook):

32

"""

33

Base class for async Google Cloud service hooks.

34

35

Provides asynchronous operations support for long-running Google Cloud

36

processes with deferrable operator patterns.

37

"""

38

def __init__(

39

self,

40

gcp_conn_id: str = "google_cloud_default",

41

impersonation_chain: Optional[Union[str, Sequence[str]]] = None,

42

**kwargs

43

): ...

44

45

async def get_sync_hook(self): ...

46

def sync_hook(self): ...

47

48

class GoogleDiscoveryApiHook(GoogleBaseHook):

49

"""

50

Hook for Google Discovery API services.

51

52

Provides access to Google APIs through the Discovery service,

53

enabling dynamic API client generation.

54

"""

55

def __init__(

56

self,

57

api_service_name: str,

58

api_version: str,

59

gcp_conn_id: str = "google_cloud_default",

60

**kwargs

61

): ...

62

63

def get_conn(self): ...

64

def build_service(self): ...

65

```

66

67

### Operation Helpers

68

69

Utilities for managing long-running Google Cloud operations with polling and status checking.

70

71

```python { .api }

72

class OperationHelper:

73

"""

74

Helper class for managing Google Cloud long-running operations.

75

76

Provides polling, status checking, and result extraction for

77

asynchronous operations across Google Cloud services.

78

"""

79

def __init__(

80

self,

81

operation: Dict[str, Any],

82

project_id: Optional[str] = None,

83

location: Optional[str] = None,

84

**kwargs

85

): ...

86

87

def wait_for_operation(

88

self,

89

timeout: Optional[float] = None,

90

retry: Optional[Retry] = None

91

): ...

92

def is_done(self) -> bool: ...

93

def get_error_message(self) -> Optional[str]: ...

94

def get_result(self) -> Dict[str, Any]: ...

95

def cancel_operation(self): ...

96

```

97

98

### Authentication Backend

99

100

Google OpenID authentication backend for Airflow web server integration.

101

102

```python { .api }

103

class GoogleOpenIdAuthBackend:

104

"""

105

Google OpenID Connect authentication backend for Airflow.

106

107

Enables Google OAuth authentication for Airflow web interface

108

with support for domain restrictions and role mapping.

109

"""

110

def __init__(self): ...

111

112

def authenticate(self, request): ...

113

def login_required(self, function): ...

114

def has_access(self, action: str, resource: str, user) -> bool: ...

115

```

116

117

### ID Token Credentials

118

119

Adapter for Google ID token credentials used in service-to-service authentication.

120

121

```python { .api }

122

class IDTokenCredentialsAdapter:

123

"""

124

Adapter for Google ID token credentials.

125

126

Provides compatibility layer for ID token-based authentication

127

in Google Cloud service-to-service communication.

128

"""

129

def __init__(

130

self,

131

credentials,

132

target_audience: str,

133

**kwargs

134

): ...

135

136

def refresh(self, request): ...

137

def apply(self, headers, token=None): ...

138

def before_request(self, request, method, url, headers): ...

139

140

def get_default_id_token_credentials(

141

target_audience: str,

142

request: Optional[Any] = None

143

): ...

144

"""

145

Get default ID token credentials for target audience.

146

147

Args:

148

target_audience (str): Target service URL for ID token

149

request (Optional[Any]): HTTP request object

150

151

Returns:

152

ID token credentials for service-to-service auth

153

"""

154

155

def impersonated_id_token_credentials(

156

credentials,

157

target_audience: str,

158

target_principal: str,

159

delegates: Optional[List[str]] = None,

160

**kwargs

161

): ...

162

"""

163

Create impersonated ID token credentials.

164

165

Args:

166

credentials: Source credentials for impersonation

167

target_audience (str): Target service URL

168

target_principal (str): Service account to impersonate

169

delegates (Optional[List[str]]): Delegation chain

170

171

Returns:

172

Impersonated ID token credentials

173

"""

174

```

175

176

### Console Links

177

178

Link generators for Google Cloud Console navigation from Airflow web interface.

179

180

```python { .api }

181

class StorageLink(BaseOperatorLink):

182

"""

183

Link to Google Cloud Storage console for bucket or object viewing.

184

185

Generates direct links to GCS resources in the Google Cloud Console

186

for easy navigation from Airflow task instances.

187

"""

188

name: str = "Google Cloud Storage"

189

190

def get_link(

191

self,

192

operator: BaseOperator,

193

dttm: datetime,

194

**kwargs

195

) -> str: ...

196

197

class FileDetailsLink(BaseOperatorLink):

198

"""

199

Link to specific file details in Google Cloud Storage console.

200

201

Provides direct navigation to individual file properties and

202

metadata in the Google Cloud Console.

203

"""

204

name: str = "File Details"

205

206

def get_link(

207

self,

208

operator: BaseOperator,

209

dttm: datetime,

210

**kwargs

211

) -> str: ...

212

```

213

214

### Constants and Configuration

215

216

Common constants and configuration values used across the provider.

217

218

```python { .api }

219

# Default method name for deferrable operations

220

GOOGLE_DEFAULT_DEFERRABLE_METHOD_NAME: str = "execute_complete"

221

222

# Client information for API requests

223

CLIENT_INFO: Dict[str, str] = {

224

"client_library_name": "airflow",

225

"client_library_version": "airflow_version"

226

}

227

228

# Default scopes for Google Cloud APIs

229

DEFAULT_SCOPES: List[str] = [

230

"https://www.googleapis.com/auth/cloud-platform"

231

]

232

233

# Default timeout values

234

DEFAULT_TIMEOUT: float = 60.0

235

DEFAULT_RETRY_ATTEMPTS: int = 3

236

237

# Common HTTP status codes

238

HTTP_OK: int = 200

239

HTTP_CREATED: int = 201

240

HTTP_NO_CONTENT: int = 204

241

HTTP_NOT_FOUND: int = 404

242

```

243

244

## Usage Examples

245

246

### Custom Hook Implementation

247

248

```python { .api }

249

from airflow.providers.google.common.hooks.base_google import GoogleBaseHook

250

from typing import Optional, Dict, Any

251

252

class CustomGoogleServiceHook(GoogleBaseHook):

253

"""Custom hook for a specific Google service."""

254

255

def __init__(

256

self,

257

api_version: str = "v1",

258

gcp_conn_id: str = "google_cloud_default",

259

**kwargs

260

):

261

super().__init__(gcp_conn_id=gcp_conn_id, **kwargs)

262

self.api_version = api_version

263

264

def get_service_client(self):

265

"""Get authenticated service client."""

266

credentials, project_id = self.get_credentials_and_project_id()

267

return build_service_client(

268

credentials=credentials,

269

api_version=self.api_version

270

)

271

272

def create_resource(self, resource_config: Dict[str, Any]) -> Dict[str, Any]:

273

"""Create a resource using the service API."""

274

client = self.get_service_client()

275

operation = client.create_resource(body=resource_config)

276

277

# Use OperationHelper for long-running operations

278

from airflow.providers.google.common.hooks.operation_helpers import OperationHelper

279

280

helper = OperationHelper(

281

operation=operation,

282

project_id=self.project_id

283

)

284

return helper.wait_for_operation(timeout=300)

285

```

286

287

### Authentication Configuration

288

289

```python

290

# Service account key file authentication

291

gcp_connection = {

292

"conn_id": "google_cloud_custom",

293

"conn_type": "gcp",

294

"extra": {

295

"key_path": "/path/to/service-account.json",

296

"scope": "https://www.googleapis.com/auth/cloud-platform",

297

"project": "my-gcp-project"

298

}

299

}

300

301

# Service account impersonation

302

gcp_impersonation = {

303

"conn_id": "google_cloud_impersonated",

304

"conn_type": "gcp",

305

"extra": {

306

"impersonation_chain": [

307

"service-account@project.iam.gserviceaccount.com"

308

],

309

"project": "target-project"

310

}

311

}

312

```

313

314

### Error Handling Patterns

315

316

```python { .api }

317

from airflow.providers.google.common.hooks.base_google import GoogleBaseHook

318

from google.api_core.exceptions import GoogleAPICallError, RetryError

319

from airflow.exceptions import AirflowException

320

321

def safe_google_api_call(hook: GoogleBaseHook, operation_func, **kwargs):

322

"""Safely execute Google API calls with error handling."""

323

try:

324

return operation_func(**kwargs)

325

except GoogleAPICallError as e:

326

if e.code == 404:

327

raise AirflowException(f"Resource not found: {e.message}")

328

elif e.code == 403:

329

raise AirflowException(f"Permission denied: {e.message}")

330

elif e.code == 429:

331

raise AirflowException(f"Rate limit exceeded: {e.message}")

332

else:

333

raise AirflowException(f"Google API error: {e.message}")

334

except RetryError as e:

335

raise AirflowException(f"Operation timed out after retries: {e}")

336

except Exception as e:

337

raise AirflowException(f"Unexpected error: {str(e)}")

338

```

339

340

## Types

341

342

```python { .api }

343

from typing import Dict, List, Optional, Any, Union, Sequence

344

from google.oauth2.credentials import Credentials

345

from google.auth.credentials import Credentials as BaseCredentials

346

from airflow.models import BaseOperator

347

from airflow.models.baseoperatorlink import BaseOperatorLink

348

349

# Credential types

350

GoogleCredentials = Union[Credentials, BaseCredentials]

351

ServiceAccountInfo = Dict[str, Any]

352

ImpersonationChain = Union[str, Sequence[str]]

353

354

# Operation types

355

OperationResult = Dict[str, Any]

356

OperationStatus = str

357

ErrorInfo = Dict[str, str]

358

359

# Connection types

360

ConnectionConfig = Dict[str, Any]

361

AuthScopes = List[str]

362

ProjectId = str

363

LocationId = str

364

365

# Link types

366

ConsoleUrl = str

367

ResourceLink = str

368

```