or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bigquery.mddataproc.mdgcs.mdindex.mdpipes.md

gcs.mddocs/

0

# Google Cloud Storage (GCS)

1

2

Comprehensive Google Cloud Storage integration for Dagster providing file storage and management capabilities, I/O managers for pickled objects, file managers for direct GCS operations, compute log management, and sensor utilities for GCS-based data processing workflows.

3

4

## Capabilities

5

6

### GCS Resource

7

8

Configurable resource for GCS client management.

9

10

```python { .api }

11

class GCSResource(ConfigurableResource):

12

"""Resource for GCS client management."""

13

project: Optional[str] # GCP project name

14

15

def get_client(self) -> storage.Client:

16

"""Create authenticated GCS client."""

17

18

@resource(

19

config_schema=GCSResource.to_config_schema(),

20

description="This resource provides a GCS client"

21

)

22

def gcs_resource(init_context) -> storage.Client:

23

"""Legacy GCS resource factory that returns a GCS client."""

24

```

25

26

### File Manager Resource

27

28

Resource providing GCS file manager functionality for direct file operations.

29

30

```python { .api }

31

class GCSFileManagerResource(ConfigurableResource):

32

"""Resource providing GCS file manager functionality."""

33

project: Optional[str] # GCP project name

34

gcs_bucket: str # GCS bucket name

35

gcs_prefix: str = "dagster" # Path prefix

36

37

def get_client(self) -> GCSFileManager:

38

"""Create GCS file manager."""

39

40

def gcs_file_manager(config_schema=None) -> ResourceDefinition:

41

"""Legacy GCS file manager factory."""

42

```

43

44

### I/O Managers

45

46

I/O managers for storing and retrieving objects in GCS.

47

48

```python { .api }

49

class GCSPickleIOManager(ConfigurableIOManager):

50

"""I/O manager for storing pickled objects in GCS."""

51

gcs: ResourceDependency[GCSResource] # GCS resource dependency

52

gcs_bucket: str # GCS bucket name

53

gcs_prefix: str = "dagster" # Path prefix

54

55

def load_input(self, context) -> Any:

56

"""Load input from GCS."""

57

58

def handle_output(self, context, obj) -> None:

59

"""Store output to GCS."""

60

61

class PickledObjectGCSIOManager(UPathIOManager):

62

"""Lower-level I/O manager implementation."""

63

64

def load_from_path(self, context, path) -> Any:

65

"""Load object from GCS path."""

66

67

def dump_to_path(self, context, obj, path) -> None:

68

"""Store object to GCS path."""

69

70

def path_exists(self, path) -> bool:

71

"""Check if path exists."""

72

73

def unlink(self, path) -> None:

74

"""Delete object."""

75

76

# Legacy aliases

77

ConfigurablePickledObjectGCSIOManager = GCSPickleIOManager

78

79

def gcs_pickle_io_manager(config_schema=None) -> IOManagerDefinition:

80

"""Legacy GCS pickle I/O manager factory."""

81

```

82

83

### File Management

84

85

Direct file operations and handles for GCS objects.

86

87

```python { .api }

88

class GCSFileHandle(FileHandle):

89

"""Reference to a file stored in GCS."""

90

91

@property

92

def gcs_bucket(self) -> str: ... # GCS bucket name

93

94

@property

95

def gcs_key(self) -> str: ... # Object key in bucket

96

97

@property

98

def gcs_path(self) -> str: ... # Full gs:// path

99

100

@property

101

def path_desc(self) -> str: ... # Path description

102

103

class GCSFileManager(FileManager):

104

"""File manager implementation for GCS operations."""

105

106

def read(self, file_handle, mode) -> Any:

107

"""Read file content."""

108

109

def write(self, file_obj, mode, ext, key) -> GCSFileHandle:

110

"""Write file to GCS."""

111

112

def write_data(self, data, ext, key) -> GCSFileHandle:

113

"""Write bytes data to GCS."""

114

115

def copy_handle_to_local_temp(self, file_handle) -> str:

116

"""Copy GCS file to local temp."""

117

118

def delete_local_temp(self) -> None:

119

"""Clean up local temp files."""

120

```

121

122

### Compute Log Management

123

124

Storage and management of compute logs in GCS.

125

126

```python { .api }

127

class GCSComputeLogManager(ConfigurableClass, TruncatingCloudStorageComputeLogManager):

128

"""Manages compute logs storage in GCS."""

129

bucket: str # GCS bucket name

130

local_dir: Optional[str] # Local staging directory

131

prefix: Optional[str] = "dagster" # Key prefix

132

json_credentials_envvar: Optional[str] # Environment variable with credentials

133

upload_interval: Optional[int] # Upload interval in seconds

134

show_url_only: Optional[bool] = False # Only show URLs instead of content

135

136

def capture_logs(self, log_key):

137

"""Context manager for log capture."""

138

139

def delete_logs(self, log_key, prefix):

140

"""Delete logs from GCS."""

141

142

def download_url_for_type(self, log_key, io_type):

143

"""Get signed download URL."""

144

145

def display_path_for_type(self, log_key, io_type):

146

"""Get display path."""

147

```

148

149

### Sensor Utilities

150

151

Utilities for GCS-based sensors and data monitoring.

152

153

```python { .api }

154

def get_gcs_keys(

155

bucket: str,

156

prefix: Optional[str] = None,

157

since_key: Optional[str] = None,

158

gcs_session: Optional[Client] = None

159

) -> List[str]:

160

"""

161

Utility function for GCS-based sensors.

162

163

Parameters:

164

- bucket: GCS bucket name

165

- prefix: Key prefix filter

166

- since_key: Starting key for incremental processing

167

- gcs_session: GCS client session

168

169

Returns:

170

List of updated keys

171

"""

172

```

173

174

### Testing Utilities

175

176

Mock GCS classes for testing without actual GCS connectivity.

177

178

```python { .api }

179

class FakeGCSBlob:

180

"""Mock GCS blob for testing."""

181

182

class FakeGCSBucket:

183

"""Mock GCS bucket for testing."""

184

185

class FakeGCSClient:

186

"""Mock GCS client for testing."""

187

188

class FakeConfigurableGCSClient:

189

"""Mock configurable client for testing."""

190

```

191

192

## Usage Examples

193

194

### Basic Resource Usage

195

196

```python

197

from dagster import asset, Definitions

198

from dagster_gcp import GCSResource

199

200

@asset

201

def process_gcs_file(gcs: GCSResource):

202

client = gcs.get_client()

203

bucket = client.bucket("my-data-bucket")

204

blob = bucket.blob("data/input.csv")

205

206

# Download and process file

207

content = blob.download_as_text()

208

processed_data = content.upper() # Simple processing

209

210

# Upload processed result

211

output_blob = bucket.blob("data/output.csv")

212

output_blob.upload_from_string(processed_data)

213

214

return f"Processed {len(content)} characters"

215

216

defs = Definitions(

217

assets=[process_gcs_file],

218

resources={

219

"gcs": GCSResource(project="my-gcp-project")

220

}

221

)

222

```

223

224

### I/O Manager Usage

225

226

```python

227

from dagster import asset, Definitions

228

from dagster_gcp import GCSPickleIOManager, GCSResource

229

import pandas as pd

230

231

@asset

232

def sales_data():

233

# This DataFrame will be pickled and stored in GCS

234

return pd.DataFrame({

235

'product': ['A', 'B', 'C'],

236

'sales': [100, 200, 150],

237

'date': ['2024-01-01', '2024-01-02', '2024-01-03']

238

})

239

240

@asset

241

def sales_summary(sales_data):

242

# sales_data will be loaded from GCS automatically

243

return {

244

'total_sales': sales_data['sales'].sum(),

245

'avg_sales': sales_data['sales'].mean(),

246

'product_count': len(sales_data)

247

}

248

249

defs = Definitions(

250

assets=[sales_data, sales_summary],

251

resources={

252

"io_manager": GCSPickleIOManager(

253

gcs_bucket="my-data-bucket",

254

gcs_prefix="dagster/storage",

255

gcs=GCSResource(project="my-gcp-project")

256

)

257

}

258

)

259

```

260

261

### File Manager Usage

262

263

```python

264

from dagster import op, job, Definitions

265

from dagster_gcp import GCSFileManagerResource

266

267

@op

268

def create_report(gcs_file_manager: GCSFileManagerResource):

269

file_manager = gcs_file_manager.get_client()

270

271

# Create a report

272

report_content = "Sales Report\n============\nTotal: $10,000"

273

274

# Write to GCS

275

file_handle = file_manager.write_data(

276

data=report_content.encode(),

277

ext=".txt",

278

key="reports/daily_sales"

279

)

280

281

return file_handle.gcs_path

282

283

@job

284

def generate_report():

285

create_report()

286

287

defs = Definitions(

288

jobs=[generate_report],

289

resources={

290

"gcs_file_manager": GCSFileManagerResource(

291

project="my-gcp-project",

292

gcs_bucket="my-reports-bucket",

293

gcs_prefix="reports"

294

)

295

}

296

)

297

```

298

299

### Sensor with GCS Keys

300

301

```python

302

from dagster import sensor, RunRequest, Definitions

303

from dagster_gcp import get_gcs_keys, GCSResource

304

305

@sensor(jobs=[process_new_files])

306

def gcs_sensor(gcs: GCSResource):

307

client = gcs.get_client()

308

309

# Get new files since last run

310

new_keys = get_gcs_keys(

311

bucket="my-input-bucket",

312

prefix="incoming/",

313

gcs_session=client

314

)

315

316

for key in new_keys:

317

yield RunRequest(

318

run_key=key,

319

run_config={

320

"ops": {

321

"process_file": {

322

"config": {"file_path": f"gs://my-input-bucket/{key}"}

323

}

324

}

325

}

326

)

327

328

defs = Definitions(

329

sensors=[gcs_sensor],

330

resources={

331

"gcs": GCSResource(project="my-gcp-project")

332

}

333

)

334

```