Google Cloud Platform integration components for the Dagster data orchestration framework.
npx @tessl/cli install tessl/pypi-dagster-gcp@0.27.00
# Dagster GCP
1
2
Google Cloud Platform integration components for the Dagster data orchestration framework. This package provides resources, I/O managers, operations, and utilities for building data pipelines that leverage GCP services including BigQuery, Google Cloud Storage, Dataproc, and Pipes for external process communication.
3
4
## Package Information
5
6
- **Package Name**: dagster-gcp
7
- **Language**: Python
8
- **Installation**: `pip install dagster-gcp`
9
10
## Core Imports
11
12
```python
13
import dagster_gcp
14
```
15
16
Common specific imports:
17
18
```python
19
from dagster_gcp import (
20
BigQueryResource, BigQueryIOManager,
21
GCSResource, GCSPickleIOManager,
22
DataprocResource, configurable_dataproc_op
23
)
24
```
25
26
Pipes components (imported from submodule):
27
28
```python
29
from dagster_gcp.pipes import (
30
PipesDataprocJobClient,
31
PipesGCSContextInjector,
32
PipesGCSMessageReader,
33
PipesGCSLogReader
34
)
35
```
36
37
## Basic Usage
38
39
```python
40
from dagster import asset, Definitions
41
from dagster_gcp import BigQueryResource, GCSResource, GCSPickleIOManager
42
43
@asset
44
def my_data_asset(bigquery: BigQueryResource):
45
with bigquery.get_client() as client:
46
query = "SELECT * FROM `project.dataset.table` LIMIT 100"
47
df = client.query(query).to_dataframe()
48
return df
49
50
defs = Definitions(
51
assets=[my_data_asset],
52
resources={
53
"bigquery": BigQueryResource(project="my-gcp-project"),
54
"gcs": GCSResource(project="my-gcp-project"),
55
"io_manager": GCSPickleIOManager(
56
gcs_bucket="my-data-bucket",
57
gcs=GCSResource(project="my-gcp-project")
58
)
59
}
60
)
61
```
62
63
## Architecture
64
65
The dagster-gcp package follows Dagster's resource and I/O manager patterns while providing GCP-specific implementations:
66
67
- **Resources**: Configurable connections to GCP services (BigQuery, GCS, Dataproc)
68
- **I/O Managers**: Data storage and retrieval using GCP storage services
69
- **Operations**: Pre-built ops for common GCP tasks (data loading, job execution)
70
- **Pipes Integration**: External process communication through GCP services
71
72
Each integration supports both modern ConfigurableResource patterns and legacy resource factory functions for backward compatibility.
73
74
## Capabilities
75
76
### BigQuery Integration
77
78
Data warehousing operations including I/O managers for BigQuery tables, operations for data loading and querying, and resources for BigQuery client management with authentication support.
79
80
```python { .api }
81
class BigQueryResource(ConfigurableResource):
82
project: Optional[str]
83
location: Optional[str]
84
gcp_credentials: Optional[str]
85
86
def get_client(self) -> Iterator[bigquery.Client]: ...
87
88
class BigQueryIOManager(ConfigurableIOManagerFactory):
89
project: str
90
dataset: Optional[str]
91
location: Optional[str]
92
gcp_credentials: Optional[str]
93
temporary_gcs_bucket: Optional[str]
94
timeout: Optional[float]
95
```
96
97
[BigQuery Integration](./bigquery.md)
98
99
### Google Cloud Storage (GCS)
100
101
File storage and management including I/O managers for pickled objects, file managers for direct GCS operations, compute log management, and sensor utilities for GCS-based data processing.
102
103
```python { .api }
104
class GCSResource(ConfigurableResource):
105
project: Optional[str]
106
107
def get_client(self) -> storage.Client: ...
108
109
class GCSPickleIOManager(ConfigurableIOManager):
110
gcs: ResourceDependency[GCSResource]
111
gcs_bucket: str
112
gcs_prefix: str = "dagster"
113
114
def load_input(self, context) -> Any: ...
115
def handle_output(self, context, obj) -> None: ...
116
```
117
118
[Google Cloud Storage](./gcs.md)
119
120
### Dataproc Integration
121
122
Apache Spark cluster management and job execution including resources for Dataproc cluster lifecycle management, operations for submitting and monitoring Spark jobs, and comprehensive configuration support for cluster and job parameters.
123
124
```python { .api }
125
class DataprocResource(ConfigurableResource):
126
project_id: str
127
region: str
128
cluster_name: str
129
labels: Optional[dict[str, str]]
130
cluster_config_yaml_path: Optional[str]
131
cluster_config_json_path: Optional[str]
132
cluster_config_dict: Optional[dict]
133
134
def get_client(self) -> DataprocClient: ...
135
136
@op
137
def configurable_dataproc_op(
138
dataproc: DataprocResource,
139
config: DataprocOpConfig
140
) -> Any: ...
141
```
142
143
[Dataproc Integration](./dataproc.md)
144
145
### Pipes Integration
146
147
External process communication through GCP services including clients for running workloads on Dataproc, context injectors for passing data via GCS, and message readers for collecting results and logs from external processes.
148
149
```python { .api }
150
class PipesDataprocJobClient(PipesClient):
151
client: JobControllerClient
152
context_injector: PipesContextInjector
153
message_reader: PipesMessageReader
154
forward_termination: bool = True
155
poll_interval: float = 5.0
156
157
def run(self, context, submit_job_params, extras) -> PipesClientCompletedInvocation: ...
158
159
class PipesGCSContextInjector(PipesContextInjector):
160
bucket: str
161
client: GCSClient
162
key_prefix: Optional[str]
163
```
164
165
[Pipes Integration](./pipes.md)
166
167
## Types
168
169
### Core Types
170
171
```python { .api }
172
# File handle for GCS objects
173
class GCSFileHandle(FileHandle):
174
@property
175
def gcs_bucket(self) -> str: ...
176
177
@property
178
def gcs_key(self) -> str: ...
179
180
@property
181
def gcs_path(self) -> str: ...
182
183
@property
184
def path_desc(self) -> str: ...
185
186
# BigQuery error handling
187
class BigQueryError(Exception): ...
188
189
# Dataproc error handling
190
class DataprocError(Exception): ...
191
```