or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connection-management.mdconnection-pooling.mdcopy-operations.mdcursor-operations.mdexception-handling.mdindex.mdlisteners-notifications.mdprepared-statements.mdquery-execution.mdtransaction-management.mdtype-system.md

query-execution.mddocs/

0

# Query Execution

1

2

Comprehensive query execution capabilities supporting various result formats, parameterized queries, prepared statements, bulk operations, and cursors for large result sets.

3

4

## Capabilities

5

6

### Basic Query Execution

7

8

Execute SQL commands and queries with automatic parameter binding and result processing.

9

10

```python { .api }

11

async def execute(self, query: str, *args, timeout: float = None) -> str:

12

"""

13

Execute an SQL command (or commands).

14

15

Parameters:

16

query: SQL command to execute

17

*args: Query parameters for placeholders ($1, $2, etc.)

18

timeout: Query timeout in seconds

19

20

Returns:

21

Command status string (e.g., 'SELECT 5', 'INSERT 0 1', 'UPDATE 3')

22

"""

23

24

async def executemany(self, command: str, args, *, timeout: float = None) -> None:

25

"""

26

Execute an SQL command for each sequence of arguments.

27

28

Parameters:

29

command: SQL command template with placeholders

30

args: Sequence of argument tuples/lists

31

timeout: Query timeout in seconds

32

"""

33

```

34

35

#### Example Usage

36

37

```python

38

# Execute DDL commands

39

await conn.execute("CREATE TABLE users(id serial, name text, email text)")

40

await conn.execute("CREATE INDEX idx_users_email ON users(email)")

41

42

# Execute DML with parameters

43

result = await conn.execute(

44

"INSERT INTO users(name, email) VALUES($1, $2)",

45

"Alice", "alice@example.com"

46

)

47

print(result) # "INSERT 0 1"

48

49

# Batch insert

50

rows = [

51

("Bob", "bob@example.com"),

52

("Charlie", "charlie@example.com"),

53

("David", "david@example.com")

54

]

55

await conn.executemany(

56

"INSERT INTO users(name, email) VALUES($1, $2)",

57

rows

58

)

59

```

60

61

### Result Fetching

62

63

Retrieve query results in various formats optimized for different use cases.

64

65

```python { .api }

66

async def fetch(self, query: str, *args, timeout: float = None, record_class: type = None) -> typing.List[Record]:

67

"""

68

Run a query and return all results as a list of Records.

69

70

Parameters:

71

query: SQL query to execute

72

*args: Query parameters for placeholders

73

timeout: Query timeout in seconds

74

record_class: Custom record class for results

75

76

Returns:

77

List of Record objects

78

"""

79

80

async def fetchval(self, query: str, *args, column: typing.Union[int, str] = 0, timeout: float = None) -> typing.Any:

81

"""

82

Run a query and return a single value from the first row.

83

84

Parameters:

85

query: SQL query to execute

86

*args: Query parameters for placeholders

87

column: Column index or name to return (default: 0)

88

timeout: Query timeout in seconds

89

90

Returns:

91

Single value or None if no results

92

"""

93

94

async def fetchrow(self, query: str, *args, timeout: float = None, record_class: type = None) -> typing.Optional[Record]:

95

"""

96

Run a query and return the first row.

97

98

Parameters:

99

query: SQL query to execute

100

*args: Query parameters for placeholders

101

timeout: Query timeout in seconds

102

record_class: Custom record class for result

103

104

Returns:

105

Record object or None if no results

106

"""

107

108

async def fetchmany(self, query: str, args, *, timeout: float = None, record_class: type = None) -> typing.List[typing.List[Record]]:

109

"""

110

Execute a query for each sequence of arguments and return all results.

111

112

Parameters:

113

query: SQL query template

114

args: Sequence of argument tuples/lists

115

timeout: Query timeout in seconds

116

record_class: Custom record class for results

117

118

Returns:

119

List of result lists, one per argument sequence

120

"""

121

```

122

123

#### Example Usage

124

125

```python

126

# Fetch all rows

127

users = await conn.fetch("SELECT * FROM users WHERE active = $1", True)

128

for user in users:

129

print(f"{user['name']} - {user['email']}")

130

131

# Fetch single value

132

user_count = await conn.fetchval("SELECT COUNT(*) FROM users")

133

print(f"Total users: {user_count}")

134

135

# Fetch first row

136

admin = await conn.fetchrow("SELECT * FROM users WHERE role = 'admin' LIMIT 1")

137

if admin:

138

print(f"Admin: {admin['name']}")

139

140

# Fetch by column name

141

latest_login = await conn.fetchval(

142

"SELECT last_login FROM users WHERE id = $1",

143

user_id,

144

column='last_login'

145

)

146

147

# Batch queries

148

user_ids = [1, 2, 3, 4, 5]

149

results = await conn.fetchmany(

150

"SELECT * FROM users WHERE id = $1",

151

[(uid,) for uid in user_ids]

152

)

153

```

154

155

### Prepared Statements

156

157

Create reusable prepared statements for improved performance with repeated queries.

158

159

```python { .api }

160

async def prepare(self, query: str, *, name: str = None, timeout: float = None, record_class: type = None) -> PreparedStatement:

161

"""

162

Create a prepared statement for the specified query.

163

164

Parameters:

165

query: SQL query to prepare

166

name: Optional name for the prepared statement

167

timeout: Preparation timeout in seconds

168

record_class: Custom record class for results

169

170

Returns:

171

PreparedStatement instance

172

"""

173

```

174

175

#### PreparedStatement Methods

176

177

```python { .api }

178

class PreparedStatement:

179

"""A prepared SQL statement."""

180

181

def get_name(self) -> str

182

def get_query(self) -> str

183

def get_statusmsg(self) -> str

184

def get_parameters(self) -> typing.Tuple[Type, ...]

185

def get_attributes(self) -> typing.Tuple[Attribute, ...]

186

187

async def fetch(self, *args, timeout: float = None, record_class: type = None) -> typing.List[Record]

188

async def fetchval(self, *args, column: typing.Union[int, str] = 0, timeout: float = None) -> typing.Any

189

async def fetchrow(self, *args, timeout: float = None, record_class: type = None) -> typing.Optional[Record]

190

async def execute(self, *args, timeout: float = None) -> str

191

async def executemany(self, args, *, timeout: float = None) -> None

192

193

def cursor(self, *args, prefetch: int = None, timeout: float = None, record_class: type = None) -> CursorFactory

194

```

195

196

#### Example Usage

197

198

```python

199

# Prepare a statement

200

stmt = await conn.prepare("SELECT * FROM users WHERE department = $1 AND active = $2")

201

202

# Use multiple times with different parameters

203

engineers = await stmt.fetch("Engineering", True)

204

sales = await stmt.fetch("Sales", True)

205

marketing = await stmt.fetch("Marketing", False)

206

207

# Prepared statement for updates

208

update_stmt = await conn.prepare("UPDATE users SET last_login = $1 WHERE id = $2")

209

await update_stmt.executemany([

210

(datetime.now(), 1),

211

(datetime.now(), 2),

212

(datetime.now(), 3)

213

])

214

```

215

216

### Cursors

217

218

Handle large result sets efficiently with scrollable cursors and streaming.

219

220

```python { .api }

221

def cursor(self, query: str, *args, prefetch: int = None, timeout: float = None, record_class: type = None) -> CursorFactory:

222

"""

223

Return a cursor factory for the specified query.

224

225

Parameters:

226

query: SQL query to execute

227

*args: Query parameters for placeholders

228

prefetch: Number of rows to prefetch (default: 50)

229

timeout: Query timeout in seconds

230

record_class: Custom record class for results

231

232

Returns:

233

Cursor factory (async context manager)

234

"""

235

```

236

237

#### Cursor Methods

238

239

```python { .api }

240

class CursorFactory:

241

"""Factory for creating cursors with async iteration support."""

242

243

def __aiter__(self) -> CursorIterator

244

def __await__(self) -> Cursor

245

async def __aenter__(self) -> Cursor

246

async def __aexit__(self, exc_type, exc_val, exc_tb) -> None

247

248

class Cursor:

249

"""Database cursor for iterating over large result sets."""

250

251

async def forward(self, count: int) -> typing.List[Record]

252

async def backwards(self, count: int) -> typing.List[Record]

253

def get_prefetch_size(self) -> int

254

def get_query(self) -> str

255

def get_args(self) -> typing.Tuple

256

257

async def __aenter__(self) -> Cursor

258

async def __aexit__(self, exc_type, exc_val, exc_tb) -> None

259

260

class CursorIterator:

261

"""Async iterator for cursor results."""

262

263

def __aiter__(self) -> CursorIterator

264

async def __anext__(self) -> Record

265

```

266

267

#### Example Usage

268

269

```python

270

# Process large result set with cursor

271

async with conn.cursor("SELECT * FROM large_table WHERE condition = $1", value) as cursor:

272

async for record in cursor:

273

process_record(record)

274

275

# Manual cursor control

276

async with conn.cursor("SELECT * FROM users ORDER BY id", prefetch=100) as cursor:

277

# Fetch first 50 rows

278

batch = await cursor.fetch(50)

279

280

while batch:

281

for row in batch:

282

print(f"User: {row['name']}")

283

284

# Fetch next batch

285

batch = await cursor.fetch(50)

286

```

287

288

### Query Parameterization

289

290

AsyncPG uses PostgreSQL's native parameter binding with numbered placeholders for security and performance.

291

292

```python

293

# Correct parameter usage

294

users = await conn.fetch(

295

"SELECT * FROM users WHERE name = $1 AND age > $2",

296

"Alice", 25

297

)

298

299

# Multiple parameters

300

result = await conn.execute(

301

"UPDATE users SET email = $1, updated_at = $2 WHERE id = $3",

302

"newemail@example.com", datetime.now(), user_id

303

)

304

305

# Array parameters

306

ids = [1, 2, 3, 4, 5]

307

users = await conn.fetch("SELECT * FROM users WHERE id = ANY($1)", ids)

308

309

# JSON parameters

310

data = {"preferences": {"theme": "dark", "language": "en"}}

311

await conn.execute(

312

"UPDATE users SET metadata = $1 WHERE id = $2",

313

json.dumps(data), user_id

314

)

315

```

316

317

### Error Handling

318

319

Handle query execution errors with appropriate exception types.

320

321

```python

322

try:

323

result = await conn.fetch("SELECT * FROM nonexistent_table")

324

except asyncpg.UndefinedTableError:

325

print("Table does not exist")

326

except asyncpg.PostgresSyntaxError:

327

print("SQL syntax error")

328

except asyncpg.DataError:

329

print("Data-related error")

330

except asyncio.TimeoutError:

331

print("Query timed out")

332

```

333

334

## Types

335

336

```python { .api }

337

class Record:

338

"""Query result record with dict-like and tuple-like access."""

339

340

def get(self, key: str, default: typing.Any = None) -> typing.Any

341

def keys(self) -> typing.Iterator[str]

342

def values(self) -> typing.Iterator[typing.Any]

343

def items(self) -> typing.Iterator[typing.Tuple[str, typing.Any]]

344

def __getitem__(self, key: typing.Union[str, int, slice]) -> typing.Any

345

def __len__(self) -> int

346

def __iter__(self) -> typing.Iterator[typing.Any]

347

def __contains__(self, key: object) -> bool

348

349

class PreparedStatement:

350

"""A prepared SQL statement for reuse."""

351

352

def get_name(self) -> str

353

def get_query(self) -> str

354

def get_statusmsg(self) -> str

355

def get_parameters(self) -> typing.Tuple[Type, ...]

356

def get_attributes(self) -> typing.Tuple[Attribute, ...]

357

358

class CursorFactory:

359

"""Factory for creating cursors."""

360

361

class Cursor:

362

"""Database cursor for streaming large result sets."""

363

364

class CursorIterator:

365

"""Async iterator for cursor results."""

366

```