0
# Data Loading
1
2
Flexible data loading system for database initialization and test data setup, supporting SQL files, Python callables, and import strings with comprehensive error handling and retry logic.
3
4
## Capabilities
5
6
### Loader Builder
7
8
Converts various input types into callable loaders for database initialization.
9
10
```python { .api }
11
def build_loader(load: Union[Callable, str, Path]) -> Callable:
12
"""
13
Build a loader callable from various input types.
14
15
Supports three input types:
16
- Path objects: Loads SQL files using the sql() function
17
- String: Import path like 'module.function' or 'module:function'
18
- Callable: Returns the callable directly
19
20
Parameters:
21
- load: SQL file path, import string, or callable function
22
23
Returns:
24
Callable that accepts connection parameters as kwargs
25
"""
26
```
27
28
### SQL File Loader
29
30
Loads and executes SQL files into the database.
31
32
```python { .api }
33
def sql(sql_filename: Path, **kwargs: Any) -> None:
34
"""
35
Load SQL file into database.
36
37
Connects to database using provided connection parameters,
38
reads the SQL file, and executes its contents.
39
40
Parameters:
41
- sql_filename: Path to SQL file to execute
42
- **kwargs: Database connection parameters (host, port, user, etc.)
43
44
Returns:
45
None
46
47
Raises:
48
- FileNotFoundError: If SQL file doesn't exist
49
- psycopg.Error: If SQL execution fails
50
"""
51
```
52
53
### Retry Utility
54
55
Robust retry mechanism for handling temporary connection failures.
56
57
```python { .api }
58
def retry(
59
func: Callable[[], T],
60
timeout: int = 60,
61
possible_exception: Type[Exception] = Exception,
62
) -> T:
63
"""
64
Retry function execution with timeout for handling temporary failures.
65
66
Particularly useful for database connections that may fail initially
67
due to server startup delays.
68
69
Parameters:
70
- func: Function to retry
71
- timeout: Maximum retry time in seconds (default: 60)
72
- possible_exception: Exception type to catch and retry (default: Exception)
73
74
Returns:
75
Result of successful function execution
76
77
Raises:
78
- TimeoutError: If function doesn't succeed within timeout
79
- Exception: If function fails with non-retryable exception
80
"""
81
82
def get_current_datetime() -> datetime.datetime:
83
"""
84
Get current datetime with Python version compatibility.
85
86
Handles datetime retrieval across Python 3.8+ versions
87
with proper UTC handling.
88
89
Returns:
90
Current datetime in UTC
91
"""
92
```
93
94
## Usage Examples
95
96
### SQL File Loading
97
98
```python
99
from pytest_postgresql import factories
100
from pathlib import Path
101
102
# Load single SQL file
103
postgresql_with_schema = factories.postgresql_proc(
104
load=[Path('/path/to/schema.sql')]
105
)
106
107
# Load multiple SQL files in order
108
postgresql_with_data = factories.postgresql_proc(
109
load=[
110
Path('/path/to/schema.sql'),
111
Path('/path/to/seed_data.sql'),
112
Path('/path/to/test_fixtures.sql')
113
]
114
)
115
116
def test_sql_loading(postgresql_with_data):
117
"""Test database loaded with SQL files."""
118
# Schema and data are already loaded
119
cur = postgresql_with_data.cursor()
120
cur.execute("SELECT COUNT(*) FROM users;")
121
count = cur.fetchone()[0]
122
assert count > 0 # Data was loaded
123
cur.close()
124
```
125
126
### Python Callable Loading
127
128
```python
129
from pytest_postgresql import factories
130
import psycopg
131
132
def create_test_schema(**kwargs):
133
"""Create test schema and tables."""
134
with psycopg.connect(**kwargs) as conn:
135
with conn.cursor() as cur:
136
cur.execute("""
137
CREATE SCHEMA IF NOT EXISTS test_schema;
138
139
CREATE TABLE test_schema.users (
140
id SERIAL PRIMARY KEY,
141
username VARCHAR(50) UNIQUE NOT NULL,
142
email VARCHAR(100) UNIQUE NOT NULL,
143
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
144
);
145
146
CREATE TABLE test_schema.posts (
147
id SERIAL PRIMARY KEY,
148
user_id INTEGER REFERENCES test_schema.users(id),
149
title VARCHAR(200) NOT NULL,
150
content TEXT,
151
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
152
);
153
154
CREATE INDEX idx_posts_user_id ON test_schema.posts(user_id);
155
""")
156
conn.commit()
157
158
def insert_test_data(**kwargs):
159
"""Insert test data."""
160
with psycopg.connect(**kwargs) as conn:
161
with conn.cursor() as cur:
162
# Insert test users
163
cur.execute("""
164
INSERT INTO test_schema.users (username, email) VALUES
165
('alice', 'alice@example.com'),
166
('bob', 'bob@example.com'),
167
('charlie', 'charlie@example.com');
168
""")
169
170
# Insert test posts
171
cur.execute("""
172
INSERT INTO test_schema.posts (user_id, title, content) VALUES
173
(1, 'First Post', 'This is Alice''s first post'),
174
(1, 'Second Post', 'Alice''s follow-up post'),
175
(2, 'Bob''s Thoughts', 'Bob shares his ideas'),
176
(3, 'Charlie''s Update', 'Latest from Charlie');
177
""")
178
conn.commit()
179
180
postgresql_with_python_data = factories.postgresql_proc(
181
load=[create_test_schema, insert_test_data]
182
)
183
184
def test_python_callable_loading(postgresql_with_python_data):
185
"""Test database loaded with Python callables."""
186
cur = postgresql_with_python_data.cursor()
187
188
# Verify schema creation
189
cur.execute("""
190
SELECT table_name FROM information_schema.tables
191
WHERE table_schema = 'test_schema';
192
""")
193
tables = [row[0] for row in cur.fetchall()]
194
assert 'users' in tables
195
assert 'posts' in tables
196
197
# Verify data insertion
198
cur.execute("SELECT COUNT(*) FROM test_schema.users;")
199
user_count = cur.fetchone()[0]
200
assert user_count == 3
201
202
cur.execute("SELECT COUNT(*) FROM test_schema.posts;")
203
post_count = cur.fetchone()[0]
204
assert post_count == 4
205
206
cur.close()
207
```
208
209
### Import String Loading
210
211
```python
212
# myapp/fixtures.py
213
import psycopg
214
215
def load_application_schema(**kwargs):
216
"""Load application schema from fixtures module."""
217
with psycopg.connect(**kwargs) as conn:
218
with conn.cursor() as cur:
219
cur.execute("""
220
CREATE TABLE applications (
221
id SERIAL PRIMARY KEY,
222
name VARCHAR(100) NOT NULL,
223
version VARCHAR(20) NOT NULL,
224
status VARCHAR(20) DEFAULT 'active'
225
);
226
""")
227
conn.commit()
228
229
def load_sample_applications(**kwargs):
230
"""Load sample application data."""
231
with psycopg.connect(**kwargs) as conn:
232
with conn.cursor() as cur:
233
cur.execute("""
234
INSERT INTO applications (name, version, status) VALUES
235
('MyApp', '1.0.0', 'active'),
236
('TestApp', '0.9.0', 'beta'),
237
('LegacyApp', '2.1.0', 'deprecated');
238
""")
239
conn.commit()
240
241
# Use import strings in test configuration
242
from pytest_postgresql import factories
243
244
postgresql_with_imports = factories.postgresql_proc(
245
load=[
246
'myapp.fixtures.load_application_schema',
247
'myapp.fixtures.load_sample_applications'
248
]
249
)
250
251
def test_import_string_loading(postgresql_with_imports):
252
"""Test database loaded with import strings."""
253
cur = postgresql_with_imports.cursor()
254
cur.execute("SELECT name, version FROM applications ORDER BY id;")
255
apps = cur.fetchall()
256
257
assert len(apps) == 3
258
assert apps[0] == ('MyApp', '1.0.0')
259
assert apps[1] == ('TestApp', '0.9.0')
260
assert apps[2] == ('LegacyApp', '2.1.0')
261
262
cur.close()
263
```
264
265
### Mixed Loading Types
266
267
```python
268
from pytest_postgresql import factories
269
from pathlib import Path
270
271
def custom_indexes(**kwargs):
272
"""Add custom indexes after schema creation."""
273
with psycopg.connect(**kwargs) as conn:
274
with conn.cursor() as cur:
275
cur.execute("""
276
CREATE INDEX CONCURRENTLY idx_users_email_lower
277
ON users (LOWER(email));
278
279
CREATE INDEX CONCURRENTLY idx_posts_created_at
280
ON posts (created_at DESC);
281
""")
282
conn.commit()
283
284
postgresql_mixed_loading = factories.postgresql_proc(
285
load=[
286
Path('/path/to/base_schema.sql'), # SQL file
287
'myapp.fixtures.load_reference_data', # Import string
288
custom_indexes, # Python callable
289
Path('/path/to/test_data.sql') # Another SQL file
290
]
291
)
292
293
def test_mixed_loading(postgresql_mixed_loading):
294
"""Test mixed loading types."""
295
# All loaders executed in order
296
pass
297
```
298
299
### Error Handling and Retry
300
301
```python
302
from pytest_postgresql.loader import build_loader, sql
303
from pytest_postgresql.retry import retry
304
from pathlib import Path
305
import psycopg
306
307
def test_loader_error_handling():
308
"""Test error handling in data loading."""
309
310
# Test SQL file loading with missing file
311
try:
312
sql(Path('/nonexistent/file.sql'),
313
host='localhost', port=5432, user='postgres', dbname='test')
314
except FileNotFoundError:
315
pass # Expected
316
317
# Test build_loader with invalid import
318
try:
319
loader = build_loader('nonexistent.module.function')
320
# This will fail when the loader is called
321
except (ImportError, AttributeError):
322
pass # Expected
323
324
def test_retry_mechanism():
325
"""Test retry functionality for unreliable operations."""
326
attempt_count = 0
327
328
def flaky_function():
329
nonlocal attempt_count
330
attempt_count += 1
331
if attempt_count < 3:
332
raise psycopg.OperationalError("Connection failed")
333
return "success"
334
335
# Retry will succeed on third attempt
336
result = retry(flaky_function, timeout=10, possible_exception=psycopg.OperationalError)
337
assert result == "success"
338
assert attempt_count == 3
339
340
def robust_data_loader(**kwargs):
341
"""Robust data loader with retry logic."""
342
def connect_and_load():
343
with psycopg.connect(**kwargs) as conn:
344
with conn.cursor() as cur:
345
cur.execute("CREATE TABLE IF NOT EXISTS robust_test (id INT);")
346
conn.commit()
347
348
# Use retry for initial connection
349
retry(connect_and_load, timeout=30, possible_exception=psycopg.OperationalError)
350
351
postgresql_robust = factories.postgresql_proc(
352
load=[robust_data_loader]
353
)
354
```
355
356
### Advanced Loading Patterns
357
358
#### Conditional Loading
359
360
```python
361
def conditional_loader(**kwargs):
362
"""Load data conditionally based on environment."""
363
import os
364
365
with psycopg.connect(**kwargs) as conn:
366
with conn.cursor() as cur:
367
# Always create basic schema
368
cur.execute("""
369
CREATE TABLE environments (
370
id SERIAL PRIMARY KEY,
371
name VARCHAR(50),
372
config JSONB
373
);
374
""")
375
376
# Load different data based on environment
377
if os.getenv('TEST_ENV') == 'development':
378
cur.execute("""
379
INSERT INTO environments (name, config) VALUES
380
('dev', '{"debug": true, "logging": "verbose"}');
381
""")
382
elif os.getenv('TEST_ENV') == 'production':
383
cur.execute("""
384
INSERT INTO environments (name, config) VALUES
385
('prod', '{"debug": false, "logging": "error"}');
386
""")
387
else:
388
cur.execute("""
389
INSERT INTO environments (name, config) VALUES
390
('test', '{"debug": true, "logging": "info"}');
391
""")
392
conn.commit()
393
```
394
395
#### Parameterized Loading
396
397
```python
398
def create_parameterized_loader(table_count=5, rows_per_table=100):
399
"""Create a parameterized data loader."""
400
def parameterized_loader(**kwargs):
401
with psycopg.connect(**kwargs) as conn:
402
with conn.cursor() as cur:
403
for i in range(table_count):
404
table_name = f"test_table_{i}"
405
cur.execute(f"""
406
CREATE TABLE {table_name} (
407
id SERIAL PRIMARY KEY,
408
data VARCHAR(100),
409
value INTEGER
410
);
411
""")
412
413
# Insert test data
414
for j in range(rows_per_table):
415
cur.execute(f"""
416
INSERT INTO {table_name} (data, value)
417
VALUES ('data_{j}', {j});
418
""")
419
conn.commit()
420
421
return parameterized_loader
422
423
postgresql_parameterized = factories.postgresql_proc(
424
load=[create_parameterized_loader(table_count=3, rows_per_table=50)]
425
)
426
```
427
428
#### Transaction-Safe Loading
429
430
```python
431
def transaction_safe_loader(**kwargs):
432
"""Load data with proper transaction handling."""
433
with psycopg.connect(**kwargs) as conn:
434
try:
435
with conn.cursor() as cur:
436
# Start transaction
437
cur.execute("BEGIN;")
438
439
# Create tables
440
cur.execute("""
441
CREATE TABLE accounts (
442
id SERIAL PRIMARY KEY,
443
name VARCHAR(100),
444
balance DECIMAL(10,2)
445
);
446
""")
447
448
cur.execute("""
449
CREATE TABLE transactions (
450
id SERIAL PRIMARY KEY,
451
account_id INTEGER REFERENCES accounts(id),
452
amount DECIMAL(10,2),
453
transaction_type VARCHAR(20)
454
);
455
""")
456
457
# Insert data
458
cur.execute("""
459
INSERT INTO accounts (name, balance) VALUES
460
('Account A', 1000.00),
461
('Account B', 500.00);
462
""")
463
464
cur.execute("""
465
INSERT INTO transactions (account_id, amount, transaction_type) VALUES
466
(1, -100.00, 'withdrawal'),
467
(2, 100.00, 'deposit');
468
""")
469
470
# Commit transaction
471
cur.execute("COMMIT;")
472
473
except Exception as e:
474
# Rollback on error
475
conn.rollback()
476
raise e
477
```