or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mddata-loading.mddatabase-connections.mdexternal-server.mdindex.mdprocess-management.md

data-loading.mddocs/

0

# Data Loading

1

2

Flexible data loading system for database initialization and test data setup, supporting SQL files, Python callables, and import strings with comprehensive error handling and retry logic.

3

4

## Capabilities

5

6

### Loader Builder

7

8

Converts various input types into callable loaders for database initialization.

9

10

```python { .api }

11

def build_loader(load: Union[Callable, str, Path]) -> Callable:

12

"""

13

Build a loader callable from various input types.

14

15

Supports three input types:

16

- Path objects: Loads SQL files using the sql() function

17

- String: Import path like 'module.function' or 'module:function'

18

- Callable: Returns the callable directly

19

20

Parameters:

21

- load: SQL file path, import string, or callable function

22

23

Returns:

24

Callable that accepts connection parameters as kwargs

25

"""

26

```

27

28

### SQL File Loader

29

30

Loads and executes SQL files into the database.

31

32

```python { .api }

33

def sql(sql_filename: Path, **kwargs: Any) -> None:

34

"""

35

Load SQL file into database.

36

37

Connects to database using provided connection parameters,

38

reads the SQL file, and executes its contents.

39

40

Parameters:

41

- sql_filename: Path to SQL file to execute

42

- **kwargs: Database connection parameters (host, port, user, etc.)

43

44

Returns:

45

None

46

47

Raises:

48

- FileNotFoundError: If SQL file doesn't exist

49

- psycopg.Error: If SQL execution fails

50

"""

51

```

52

53

### Retry Utility

54

55

Robust retry mechanism for handling temporary connection failures.

56

57

```python { .api }

58

def retry(

59

func: Callable[[], T],

60

timeout: int = 60,

61

possible_exception: Type[Exception] = Exception,

62

) -> T:

63

"""

64

Retry function execution with timeout for handling temporary failures.

65

66

Particularly useful for database connections that may fail initially

67

due to server startup delays.

68

69

Parameters:

70

- func: Function to retry

71

- timeout: Maximum retry time in seconds (default: 60)

72

- possible_exception: Exception type to catch and retry (default: Exception)

73

74

Returns:

75

Result of successful function execution

76

77

Raises:

78

- TimeoutError: If function doesn't succeed within timeout

79

- Exception: If function fails with non-retryable exception

80

"""

81

82

def get_current_datetime() -> datetime.datetime:

83

"""

84

Get current datetime with Python version compatibility.

85

86

Handles datetime retrieval across Python 3.8+ versions

87

with proper UTC handling.

88

89

Returns:

90

Current datetime in UTC

91

"""

92

```

93

94

## Usage Examples

95

96

### SQL File Loading

97

98

```python

99

from pytest_postgresql import factories

100

from pathlib import Path

101

102

# Load single SQL file

103

postgresql_with_schema = factories.postgresql_proc(

104

load=[Path('/path/to/schema.sql')]

105

)

106

107

# Load multiple SQL files in order

108

postgresql_with_data = factories.postgresql_proc(

109

load=[

110

Path('/path/to/schema.sql'),

111

Path('/path/to/seed_data.sql'),

112

Path('/path/to/test_fixtures.sql')

113

]

114

)

115

116

def test_sql_loading(postgresql_with_data):

117

"""Test database loaded with SQL files."""

118

# Schema and data are already loaded

119

cur = postgresql_with_data.cursor()

120

cur.execute("SELECT COUNT(*) FROM users;")

121

count = cur.fetchone()[0]

122

assert count > 0 # Data was loaded

123

cur.close()

124

```

125

126

### Python Callable Loading

127

128

```python

129

from pytest_postgresql import factories

130

import psycopg

131

132

def create_test_schema(**kwargs):

133

"""Create test schema and tables."""

134

with psycopg.connect(**kwargs) as conn:

135

with conn.cursor() as cur:

136

cur.execute("""

137

CREATE SCHEMA IF NOT EXISTS test_schema;

138

139

CREATE TABLE test_schema.users (

140

id SERIAL PRIMARY KEY,

141

username VARCHAR(50) UNIQUE NOT NULL,

142

email VARCHAR(100) UNIQUE NOT NULL,

143

created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP

144

);

145

146

CREATE TABLE test_schema.posts (

147

id SERIAL PRIMARY KEY,

148

user_id INTEGER REFERENCES test_schema.users(id),

149

title VARCHAR(200) NOT NULL,

150

content TEXT,

151

created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP

152

);

153

154

CREATE INDEX idx_posts_user_id ON test_schema.posts(user_id);

155

""")

156

conn.commit()

157

158

def insert_test_data(**kwargs):

159

"""Insert test data."""

160

with psycopg.connect(**kwargs) as conn:

161

with conn.cursor() as cur:

162

# Insert test users

163

cur.execute("""

164

INSERT INTO test_schema.users (username, email) VALUES

165

('alice', 'alice@example.com'),

166

('bob', 'bob@example.com'),

167

('charlie', 'charlie@example.com');

168

""")

169

170

# Insert test posts

171

cur.execute("""

172

INSERT INTO test_schema.posts (user_id, title, content) VALUES

173

(1, 'First Post', 'This is Alice''s first post'),

174

(1, 'Second Post', 'Alice''s follow-up post'),

175

(2, 'Bob''s Thoughts', 'Bob shares his ideas'),

176

(3, 'Charlie''s Update', 'Latest from Charlie');

177

""")

178

conn.commit()

179

180

postgresql_with_python_data = factories.postgresql_proc(

181

load=[create_test_schema, insert_test_data]

182

)

183

184

def test_python_callable_loading(postgresql_with_python_data):

185

"""Test database loaded with Python callables."""

186

cur = postgresql_with_python_data.cursor()

187

188

# Verify schema creation

189

cur.execute("""

190

SELECT table_name FROM information_schema.tables

191

WHERE table_schema = 'test_schema';

192

""")

193

tables = [row[0] for row in cur.fetchall()]

194

assert 'users' in tables

195

assert 'posts' in tables

196

197

# Verify data insertion

198

cur.execute("SELECT COUNT(*) FROM test_schema.users;")

199

user_count = cur.fetchone()[0]

200

assert user_count == 3

201

202

cur.execute("SELECT COUNT(*) FROM test_schema.posts;")

203

post_count = cur.fetchone()[0]

204

assert post_count == 4

205

206

cur.close()

207

```

208

209

### Import String Loading

210

211

```python

212

# myapp/fixtures.py

213

import psycopg

214

215

def load_application_schema(**kwargs):

216

"""Load application schema from fixtures module."""

217

with psycopg.connect(**kwargs) as conn:

218

with conn.cursor() as cur:

219

cur.execute("""

220

CREATE TABLE applications (

221

id SERIAL PRIMARY KEY,

222

name VARCHAR(100) NOT NULL,

223

version VARCHAR(20) NOT NULL,

224

status VARCHAR(20) DEFAULT 'active'

225

);

226

""")

227

conn.commit()

228

229

def load_sample_applications(**kwargs):

230

"""Load sample application data."""

231

with psycopg.connect(**kwargs) as conn:

232

with conn.cursor() as cur:

233

cur.execute("""

234

INSERT INTO applications (name, version, status) VALUES

235

('MyApp', '1.0.0', 'active'),

236

('TestApp', '0.9.0', 'beta'),

237

('LegacyApp', '2.1.0', 'deprecated');

238

""")

239

conn.commit()

240

241

# Use import strings in test configuration

242

from pytest_postgresql import factories

243

244

postgresql_with_imports = factories.postgresql_proc(

245

load=[

246

'myapp.fixtures.load_application_schema',

247

'myapp.fixtures.load_sample_applications'

248

]

249

)

250

251

def test_import_string_loading(postgresql_with_imports):

252

"""Test database loaded with import strings."""

253

cur = postgresql_with_imports.cursor()

254

cur.execute("SELECT name, version FROM applications ORDER BY id;")

255

apps = cur.fetchall()

256

257

assert len(apps) == 3

258

assert apps[0] == ('MyApp', '1.0.0')

259

assert apps[1] == ('TestApp', '0.9.0')

260

assert apps[2] == ('LegacyApp', '2.1.0')

261

262

cur.close()

263

```

264

265

### Mixed Loading Types

266

267

```python

268

from pytest_postgresql import factories

269

from pathlib import Path

270

271

def custom_indexes(**kwargs):

272

"""Add custom indexes after schema creation."""

273

with psycopg.connect(**kwargs) as conn:

274

with conn.cursor() as cur:

275

cur.execute("""

276

CREATE INDEX CONCURRENTLY idx_users_email_lower

277

ON users (LOWER(email));

278

279

CREATE INDEX CONCURRENTLY idx_posts_created_at

280

ON posts (created_at DESC);

281

""")

282

conn.commit()

283

284

postgresql_mixed_loading = factories.postgresql_proc(

285

load=[

286

Path('/path/to/base_schema.sql'), # SQL file

287

'myapp.fixtures.load_reference_data', # Import string

288

custom_indexes, # Python callable

289

Path('/path/to/test_data.sql') # Another SQL file

290

]

291

)

292

293

def test_mixed_loading(postgresql_mixed_loading):

294

"""Test mixed loading types."""

295

# All loaders executed in order

296

pass

297

```

298

299

### Error Handling and Retry

300

301

```python

302

from pytest_postgresql.loader import build_loader, sql

303

from pytest_postgresql.retry import retry

304

from pathlib import Path

305

import psycopg

306

307

def test_loader_error_handling():

308

"""Test error handling in data loading."""

309

310

# Test SQL file loading with missing file

311

try:

312

sql(Path('/nonexistent/file.sql'),

313

host='localhost', port=5432, user='postgres', dbname='test')

314

except FileNotFoundError:

315

pass # Expected

316

317

# Test build_loader with invalid import

318

try:

319

loader = build_loader('nonexistent.module.function')

320

# This will fail when the loader is called

321

except (ImportError, AttributeError):

322

pass # Expected

323

324

def test_retry_mechanism():

325

"""Test retry functionality for unreliable operations."""

326

attempt_count = 0

327

328

def flaky_function():

329

nonlocal attempt_count

330

attempt_count += 1

331

if attempt_count < 3:

332

raise psycopg.OperationalError("Connection failed")

333

return "success"

334

335

# Retry will succeed on third attempt

336

result = retry(flaky_function, timeout=10, possible_exception=psycopg.OperationalError)

337

assert result == "success"

338

assert attempt_count == 3

339

340

def robust_data_loader(**kwargs):

341

"""Robust data loader with retry logic."""

342

def connect_and_load():

343

with psycopg.connect(**kwargs) as conn:

344

with conn.cursor() as cur:

345

cur.execute("CREATE TABLE IF NOT EXISTS robust_test (id INT);")

346

conn.commit()

347

348

# Use retry for initial connection

349

retry(connect_and_load, timeout=30, possible_exception=psycopg.OperationalError)

350

351

postgresql_robust = factories.postgresql_proc(

352

load=[robust_data_loader]

353

)

354

```

355

356

### Advanced Loading Patterns

357

358

#### Conditional Loading

359

360

```python

361

def conditional_loader(**kwargs):

362

"""Load data conditionally based on environment."""

363

import os

364

365

with psycopg.connect(**kwargs) as conn:

366

with conn.cursor() as cur:

367

# Always create basic schema

368

cur.execute("""

369

CREATE TABLE environments (

370

id SERIAL PRIMARY KEY,

371

name VARCHAR(50),

372

config JSONB

373

);

374

""")

375

376

# Load different data based on environment

377

if os.getenv('TEST_ENV') == 'development':

378

cur.execute("""

379

INSERT INTO environments (name, config) VALUES

380

('dev', '{"debug": true, "logging": "verbose"}');

381

""")

382

elif os.getenv('TEST_ENV') == 'production':

383

cur.execute("""

384

INSERT INTO environments (name, config) VALUES

385

('prod', '{"debug": false, "logging": "error"}');

386

""")

387

else:

388

cur.execute("""

389

INSERT INTO environments (name, config) VALUES

390

('test', '{"debug": true, "logging": "info"}');

391

""")

392

conn.commit()

393

```

394

395

#### Parameterized Loading

396

397

```python

398

def create_parameterized_loader(table_count=5, rows_per_table=100):

399

"""Create a parameterized data loader."""

400

def parameterized_loader(**kwargs):

401

with psycopg.connect(**kwargs) as conn:

402

with conn.cursor() as cur:

403

for i in range(table_count):

404

table_name = f"test_table_{i}"

405

cur.execute(f"""

406

CREATE TABLE {table_name} (

407

id SERIAL PRIMARY KEY,

408

data VARCHAR(100),

409

value INTEGER

410

);

411

""")

412

413

# Insert test data

414

for j in range(rows_per_table):

415

cur.execute(f"""

416

INSERT INTO {table_name} (data, value)

417

VALUES ('data_{j}', {j});

418

""")

419

conn.commit()

420

421

return parameterized_loader

422

423

postgresql_parameterized = factories.postgresql_proc(

424

load=[create_parameterized_loader(table_count=3, rows_per_table=50)]

425

)

426

```

427

428

#### Transaction-Safe Loading

429

430

```python

431

def transaction_safe_loader(**kwargs):

432

"""Load data with proper transaction handling."""

433

with psycopg.connect(**kwargs) as conn:

434

try:

435

with conn.cursor() as cur:

436

# Start transaction

437

cur.execute("BEGIN;")

438

439

# Create tables

440

cur.execute("""

441

CREATE TABLE accounts (

442

id SERIAL PRIMARY KEY,

443

name VARCHAR(100),

444

balance DECIMAL(10,2)

445

);

446

""")

447

448

cur.execute("""

449

CREATE TABLE transactions (

450

id SERIAL PRIMARY KEY,

451

account_id INTEGER REFERENCES accounts(id),

452

amount DECIMAL(10,2),

453

transaction_type VARCHAR(20)

454

);

455

""")

456

457

# Insert data

458

cur.execute("""

459

INSERT INTO accounts (name, balance) VALUES

460

('Account A', 1000.00),

461

('Account B', 500.00);

462

""")

463

464

cur.execute("""

465

INSERT INTO transactions (account_id, amount, transaction_type) VALUES

466

(1, -100.00, 'withdrawal'),

467

(2, 100.00, 'deposit');

468

""")

469

470

# Commit transaction

471

cur.execute("COMMIT;")

472

473

except Exception as e:

474

# Rollback on error

475

conn.rollback()

476

raise e

477

```