or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asset-management.mdaws-integration.mdbulk-operations.mddata-retrieval.mddatabase-connection.mdindex.mdopenlineage-integration.mdschema-operations.mdsql-dialect.md

schema-operations.mddocs/

0

# Schema Operations and Introspection

1

2

Database schema introspection and metadata operations for analyzing table structures, primary keys, constraints, and database organization. Provides programmatic access to PostgreSQL system catalogs and information schema.

3

4

## Capabilities

5

6

### Primary Key Introspection

7

8

Retrieve primary key information for tables using PostgreSQL system catalogs.

9

10

```python { .api }

11

def get_table_primary_key(

12

self,

13

table: str,

14

schema: str | None = "public"

15

) -> list[str] | None:

16

"""

17

Get primary key columns for specified table.

18

19

Parameters:

20

- table: str, table name to inspect

21

- schema: str or None, schema name (defaults to "public")

22

23

Returns:

24

list[str] or None: List of primary key column names, None if no primary key

25

26

Example:

27

pk_cols = hook.get_table_primary_key("users", "public") # ["id"]

28

composite_pk = hook.get_table_primary_key("user_roles") # ["user_id", "role_id"]

29

"""

30

```

31

32

## Usage Examples

33

34

### Basic Primary Key Lookup

35

36

```python

37

from airflow.providers.postgres.hooks.postgres import PostgresHook

38

39

hook = PostgresHook(postgres_conn_id="postgres_default")

40

41

# Get primary key for table in default schema

42

pk_columns = hook.get_table_primary_key("users")

43

print(f"Primary key columns: {pk_columns}") # ["id"]

44

45

# Get primary key for table in specific schema

46

pk_columns = hook.get_table_primary_key("customers", "sales")

47

print(f"Primary key columns: {pk_columns}") # ["customer_id"]

48

```

49

50

### Handling Composite Primary Keys

51

52

```python

53

# Tables with composite primary keys

54

composite_pk = hook.get_table_primary_key("order_items")

55

if composite_pk:

56

print(f"Composite key: {composite_pk}") # ["order_id", "product_id"]

57

58

# Use in upsert operations

59

hook.insert_rows(

60

table="order_items",

61

rows=[(1, 101, 2), (1, 102, 1)],

62

target_fields=["order_id", "product_id", "quantity"],

63

replace=True,

64

replace_index=composite_pk # Use composite key for conflict resolution

65

)

66

```

67

68

### Schema Validation Workflow

69

70

```python

71

def validate_table_structure(table_name, expected_pk_columns):

72

"""Validate table has expected primary key structure."""

73

actual_pk = hook.get_table_primary_key(table_name)

74

75

if actual_pk is None:

76

raise ValueError(f"Table {table_name} has no primary key")

77

78

if set(actual_pk) != set(expected_pk_columns):

79

raise ValueError(

80

f"Primary key mismatch for {table_name}. "

81

f"Expected: {expected_pk_columns}, Actual: {actual_pk}"

82

)

83

84

return True

85

86

# Validate table structure before operations

87

validate_table_structure("users", ["id"])

88

validate_table_structure("user_permissions", ["user_id", "permission_id"])

89

```

90

91

### Dynamic Upsert Configuration

92

93

```python

94

def smart_upsert(table_name, rows, target_fields):

95

"""Perform upsert using table's actual primary key."""

96

97

# Discover primary key dynamically

98

pk_columns = hook.get_table_primary_key(table_name)

99

100

if pk_columns is None:

101

# No primary key - use regular insert

102

hook.insert_rows(

103

table=table_name,

104

rows=rows,

105

target_fields=target_fields

106

)

107

else:

108

# Use discovered primary key for upsert

109

hook.insert_rows(

110

table=table_name,

111

rows=rows,

112

target_fields=target_fields,

113

replace=True,

114

replace_index=pk_columns

115

)

116

117

# Usage

118

smart_upsert("products", [(1, "Widget", 19.99)], ["id", "name", "price"])

119

```

120

121

### ETL Pipeline Integration

122

123

```python

124

def etl_with_schema_validation():

125

"""ETL pipeline with schema validation."""

126

127

# Define expected schemas

128

expected_schemas = {

129

"users": ["user_id"],

130

"orders": ["order_id"],

131

"order_items": ["order_id", "item_id"]

132

}

133

134

# Validate all tables before processing

135

for table, expected_pk in expected_schemas.items():

136

try:

137

validate_table_structure(table, expected_pk)

138

print(f"✓ {table} schema validation passed")

139

except ValueError as e:

140

print(f"✗ {table} schema validation failed: {e}")

141

return False

142

143

# Proceed with ETL operations

144

process_etl_data()

145

return True

146

```

147

148

## Schema Information Queries

149

150

While the hook provides primary key introspection, you can also execute custom queries for additional schema information:

151

152

### Table Information

153

154

```python

155

# Get all tables in schema

156

tables = hook.get_records("""

157

SELECT table_name

158

FROM information_schema.tables

159

WHERE table_schema = %s AND table_type = 'BASE TABLE'

160

""", parameters=["public"])

161

162

# Get column information

163

columns = hook.get_records("""

164

SELECT column_name, data_type, is_nullable, column_default

165

FROM information_schema.columns

166

WHERE table_schema = %s AND table_name = %s

167

ORDER BY ordinal_position

168

""", parameters=["public", "users"])

169

```

170

171

### Constraint Information

172

173

```python

174

# Get foreign key constraints

175

foreign_keys = hook.get_records("""

176

SELECT

177

kcu.column_name,

178

ccu.table_name AS foreign_table_name,

179

ccu.column_name AS foreign_column_name

180

FROM information_schema.table_constraints tc

181

JOIN information_schema.key_column_usage kcu

182

ON tc.constraint_name = kcu.constraint_name

183

JOIN information_schema.constraint_column_usage ccu

184

ON ccu.constraint_name = tc.constraint_name

185

WHERE tc.constraint_type = 'FOREIGN KEY'

186

AND tc.table_schema = %s

187

AND tc.table_name = %s

188

""", parameters=["public", "orders"])

189

190

# Get unique constraints

191

unique_constraints = hook.get_records("""

192

SELECT

193

tc.constraint_name,

194

array_agg(kcu.column_name ORDER BY kcu.ordinal_position) as columns

195

FROM information_schema.table_constraints tc

196

JOIN information_schema.key_column_usage kcu

197

ON tc.constraint_name = kcu.constraint_name

198

WHERE tc.constraint_type = 'UNIQUE'

199

AND tc.table_schema = %s

200

AND tc.table_name = %s

201

GROUP BY tc.constraint_name

202

""", parameters=["public", "users"])

203

```

204

205

### Index Information

206

207

```python

208

# Get table indexes

209

indexes = hook.get_records("""

210

SELECT

211

i.relname as index_name,

212

array_agg(a.attname ORDER BY c.ordinality) as columns,

213

ix.indisunique as is_unique,

214

ix.indisprimary as is_primary

215

FROM pg_class t

216

JOIN pg_index ix ON t.oid = ix.indrelid

217

JOIN pg_class i ON i.oid = ix.indexrelid

218

JOIN unnest(ix.indkey) WITH ORDINALITY AS c(attnum, ordinality) ON true

219

JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = c.attnum

220

JOIN pg_namespace n ON t.relnamespace = n.oid

221

WHERE t.relname = %s AND n.nspname = %s

222

GROUP BY i.relname, ix.indisunique, ix.indisprimary

223

ORDER BY i.relname

224

""", parameters=["users", "public"])

225

```

226

227

## Error Handling

228

229

```python

230

def safe_get_primary_key(table_name, schema=None):

231

"""Safely get primary key with error handling."""

232

try:

233

pk_columns = hook.get_table_primary_key(table_name, schema)

234

return pk_columns

235

except Exception as e:

236

print(f"Error getting primary key for {table_name}: {e}")

237

return None

238

239

# Usage with error handling

240

pk = safe_get_primary_key("nonexistent_table")

241

if pk is not None:

242

print(f"Primary key: {pk}")

243

else:

244

print("Could not determine primary key")

245

```