0
# Google Cloud Storage (GCS)
1
2
Comprehensive Google Cloud Storage integration for Dagster providing file storage and management capabilities, I/O managers for pickled objects, file managers for direct GCS operations, compute log management, and sensor utilities for GCS-based data processing workflows.
3
4
## Capabilities
5
6
### GCS Resource
7
8
Configurable resource for GCS client management.
9
10
```python { .api }
11
class GCSResource(ConfigurableResource):
12
"""Resource for GCS client management."""
13
project: Optional[str] # GCP project name
14
15
def get_client(self) -> storage.Client:
16
"""Create authenticated GCS client."""
17
18
@resource(
19
config_schema=GCSResource.to_config_schema(),
20
description="This resource provides a GCS client"
21
)
22
def gcs_resource(init_context) -> storage.Client:
23
"""Legacy GCS resource factory that returns a GCS client."""
24
```
25
26
### File Manager Resource
27
28
Resource providing GCS file manager functionality for direct file operations.
29
30
```python { .api }
31
class GCSFileManagerResource(ConfigurableResource):
32
"""Resource providing GCS file manager functionality."""
33
project: Optional[str] # GCP project name
34
gcs_bucket: str # GCS bucket name
35
gcs_prefix: str = "dagster" # Path prefix
36
37
def get_client(self) -> GCSFileManager:
38
"""Create GCS file manager."""
39
40
def gcs_file_manager(config_schema=None) -> ResourceDefinition:
41
"""Legacy GCS file manager factory."""
42
```
43
44
### I/O Managers
45
46
I/O managers for storing and retrieving objects in GCS.
47
48
```python { .api }
49
class GCSPickleIOManager(ConfigurableIOManager):
50
"""I/O manager for storing pickled objects in GCS."""
51
gcs: ResourceDependency[GCSResource] # GCS resource dependency
52
gcs_bucket: str # GCS bucket name
53
gcs_prefix: str = "dagster" # Path prefix
54
55
def load_input(self, context) -> Any:
56
"""Load input from GCS."""
57
58
def handle_output(self, context, obj) -> None:
59
"""Store output to GCS."""
60
61
class PickledObjectGCSIOManager(UPathIOManager):
62
"""Lower-level I/O manager implementation."""
63
64
def load_from_path(self, context, path) -> Any:
65
"""Load object from GCS path."""
66
67
def dump_to_path(self, context, obj, path) -> None:
68
"""Store object to GCS path."""
69
70
def path_exists(self, path) -> bool:
71
"""Check if path exists."""
72
73
def unlink(self, path) -> None:
74
"""Delete object."""
75
76
# Legacy aliases
77
ConfigurablePickledObjectGCSIOManager = GCSPickleIOManager
78
79
def gcs_pickle_io_manager(config_schema=None) -> IOManagerDefinition:
80
"""Legacy GCS pickle I/O manager factory."""
81
```
82
83
### File Management
84
85
Direct file operations and handles for GCS objects.
86
87
```python { .api }
88
class GCSFileHandle(FileHandle):
89
"""Reference to a file stored in GCS."""
90
91
@property
92
def gcs_bucket(self) -> str: ... # GCS bucket name
93
94
@property
95
def gcs_key(self) -> str: ... # Object key in bucket
96
97
@property
98
def gcs_path(self) -> str: ... # Full gs:// path
99
100
@property
101
def path_desc(self) -> str: ... # Path description
102
103
class GCSFileManager(FileManager):
104
"""File manager implementation for GCS operations."""
105
106
def read(self, file_handle, mode) -> Any:
107
"""Read file content."""
108
109
def write(self, file_obj, mode, ext, key) -> GCSFileHandle:
110
"""Write file to GCS."""
111
112
def write_data(self, data, ext, key) -> GCSFileHandle:
113
"""Write bytes data to GCS."""
114
115
def copy_handle_to_local_temp(self, file_handle) -> str:
116
"""Copy GCS file to local temp."""
117
118
def delete_local_temp(self) -> None:
119
"""Clean up local temp files."""
120
```
121
122
### Compute Log Management
123
124
Storage and management of compute logs in GCS.
125
126
```python { .api }
127
class GCSComputeLogManager(ConfigurableClass, TruncatingCloudStorageComputeLogManager):
128
"""Manages compute logs storage in GCS."""
129
bucket: str # GCS bucket name
130
local_dir: Optional[str] # Local staging directory
131
prefix: Optional[str] = "dagster" # Key prefix
132
json_credentials_envvar: Optional[str] # Environment variable with credentials
133
upload_interval: Optional[int] # Upload interval in seconds
134
show_url_only: Optional[bool] = False # Only show URLs instead of content
135
136
def capture_logs(self, log_key):
137
"""Context manager for log capture."""
138
139
def delete_logs(self, log_key, prefix):
140
"""Delete logs from GCS."""
141
142
def download_url_for_type(self, log_key, io_type):
143
"""Get signed download URL."""
144
145
def display_path_for_type(self, log_key, io_type):
146
"""Get display path."""
147
```
148
149
### Sensor Utilities
150
151
Utilities for GCS-based sensors and data monitoring.
152
153
```python { .api }
154
def get_gcs_keys(
155
bucket: str,
156
prefix: Optional[str] = None,
157
since_key: Optional[str] = None,
158
gcs_session: Optional[Client] = None
159
) -> List[str]:
160
"""
161
Utility function for GCS-based sensors.
162
163
Parameters:
164
- bucket: GCS bucket name
165
- prefix: Key prefix filter
166
- since_key: Starting key for incremental processing
167
- gcs_session: GCS client session
168
169
Returns:
170
List of updated keys
171
"""
172
```
173
174
### Testing Utilities
175
176
Mock GCS classes for testing without actual GCS connectivity.
177
178
```python { .api }
179
class FakeGCSBlob:
180
"""Mock GCS blob for testing."""
181
182
class FakeGCSBucket:
183
"""Mock GCS bucket for testing."""
184
185
class FakeGCSClient:
186
"""Mock GCS client for testing."""
187
188
class FakeConfigurableGCSClient:
189
"""Mock configurable client for testing."""
190
```
191
192
## Usage Examples
193
194
### Basic Resource Usage
195
196
```python
197
from dagster import asset, Definitions
198
from dagster_gcp import GCSResource
199
200
@asset
201
def process_gcs_file(gcs: GCSResource):
202
client = gcs.get_client()
203
bucket = client.bucket("my-data-bucket")
204
blob = bucket.blob("data/input.csv")
205
206
# Download and process file
207
content = blob.download_as_text()
208
processed_data = content.upper() # Simple processing
209
210
# Upload processed result
211
output_blob = bucket.blob("data/output.csv")
212
output_blob.upload_from_string(processed_data)
213
214
return f"Processed {len(content)} characters"
215
216
defs = Definitions(
217
assets=[process_gcs_file],
218
resources={
219
"gcs": GCSResource(project="my-gcp-project")
220
}
221
)
222
```
223
224
### I/O Manager Usage
225
226
```python
227
from dagster import asset, Definitions
228
from dagster_gcp import GCSPickleIOManager, GCSResource
229
import pandas as pd
230
231
@asset
232
def sales_data():
233
# This DataFrame will be pickled and stored in GCS
234
return pd.DataFrame({
235
'product': ['A', 'B', 'C'],
236
'sales': [100, 200, 150],
237
'date': ['2024-01-01', '2024-01-02', '2024-01-03']
238
})
239
240
@asset
241
def sales_summary(sales_data):
242
# sales_data will be loaded from GCS automatically
243
return {
244
'total_sales': sales_data['sales'].sum(),
245
'avg_sales': sales_data['sales'].mean(),
246
'product_count': len(sales_data)
247
}
248
249
defs = Definitions(
250
assets=[sales_data, sales_summary],
251
resources={
252
"io_manager": GCSPickleIOManager(
253
gcs_bucket="my-data-bucket",
254
gcs_prefix="dagster/storage",
255
gcs=GCSResource(project="my-gcp-project")
256
)
257
}
258
)
259
```
260
261
### File Manager Usage
262
263
```python
264
from dagster import op, job, Definitions
265
from dagster_gcp import GCSFileManagerResource
266
267
@op
268
def create_report(gcs_file_manager: GCSFileManagerResource):
269
file_manager = gcs_file_manager.get_client()
270
271
# Create a report
272
report_content = "Sales Report\n============\nTotal: $10,000"
273
274
# Write to GCS
275
file_handle = file_manager.write_data(
276
data=report_content.encode(),
277
ext=".txt",
278
key="reports/daily_sales"
279
)
280
281
return file_handle.gcs_path
282
283
@job
284
def generate_report():
285
create_report()
286
287
defs = Definitions(
288
jobs=[generate_report],
289
resources={
290
"gcs_file_manager": GCSFileManagerResource(
291
project="my-gcp-project",
292
gcs_bucket="my-reports-bucket",
293
gcs_prefix="reports"
294
)
295
}
296
)
297
```
298
299
### Sensor with GCS Keys
300
301
```python
302
from dagster import sensor, RunRequest, Definitions
303
from dagster_gcp import get_gcs_keys, GCSResource
304
305
@sensor(jobs=[process_new_files])
306
def gcs_sensor(gcs: GCSResource):
307
client = gcs.get_client()
308
309
# Get new files since last run
310
new_keys = get_gcs_keys(
311
bucket="my-input-bucket",
312
prefix="incoming/",
313
gcs_session=client
314
)
315
316
for key in new_keys:
317
yield RunRequest(
318
run_key=key,
319
run_config={
320
"ops": {
321
"process_file": {
322
"config": {"file_path": f"gs://my-input-bucket/{key}"}
323
}
324
}
325
}
326
)
327
328
defs = Definitions(
329
sensors=[gcs_sensor],
330
resources={
331
"gcs": GCSResource(project="my-gcp-project")
332
}
333
)
334
```