0
# Query Execution
1
2
Comprehensive query execution capabilities supporting various result formats, parameterized queries, prepared statements, bulk operations, and cursors for large result sets.
3
4
## Capabilities
5
6
### Basic Query Execution
7
8
Execute SQL commands and queries with automatic parameter binding and result processing.
9
10
```python { .api }
11
async def execute(self, query: str, *args, timeout: float = None) -> str:
12
"""
13
Execute an SQL command (or commands).
14
15
Parameters:
16
query: SQL command to execute
17
*args: Query parameters for placeholders ($1, $2, etc.)
18
timeout: Query timeout in seconds
19
20
Returns:
21
Command status string (e.g., 'SELECT 5', 'INSERT 0 1', 'UPDATE 3')
22
"""
23
24
async def executemany(self, command: str, args, *, timeout: float = None) -> None:
25
"""
26
Execute an SQL command for each sequence of arguments.
27
28
Parameters:
29
command: SQL command template with placeholders
30
args: Sequence of argument tuples/lists
31
timeout: Query timeout in seconds
32
"""
33
```
34
35
#### Example Usage
36
37
```python
38
# Execute DDL commands
39
await conn.execute("CREATE TABLE users(id serial, name text, email text)")
40
await conn.execute("CREATE INDEX idx_users_email ON users(email)")
41
42
# Execute DML with parameters
43
result = await conn.execute(
44
"INSERT INTO users(name, email) VALUES($1, $2)",
45
"Alice", "alice@example.com"
46
)
47
print(result) # "INSERT 0 1"
48
49
# Batch insert
50
rows = [
51
("Bob", "bob@example.com"),
52
("Charlie", "charlie@example.com"),
53
("David", "david@example.com")
54
]
55
await conn.executemany(
56
"INSERT INTO users(name, email) VALUES($1, $2)",
57
rows
58
)
59
```
60
61
### Result Fetching
62
63
Retrieve query results in various formats optimized for different use cases.
64
65
```python { .api }
66
async def fetch(self, query: str, *args, timeout: float = None, record_class: type = None) -> typing.List[Record]:
67
"""
68
Run a query and return all results as a list of Records.
69
70
Parameters:
71
query: SQL query to execute
72
*args: Query parameters for placeholders
73
timeout: Query timeout in seconds
74
record_class: Custom record class for results
75
76
Returns:
77
List of Record objects
78
"""
79
80
async def fetchval(self, query: str, *args, column: typing.Union[int, str] = 0, timeout: float = None) -> typing.Any:
81
"""
82
Run a query and return a single value from the first row.
83
84
Parameters:
85
query: SQL query to execute
86
*args: Query parameters for placeholders
87
column: Column index or name to return (default: 0)
88
timeout: Query timeout in seconds
89
90
Returns:
91
Single value or None if no results
92
"""
93
94
async def fetchrow(self, query: str, *args, timeout: float = None, record_class: type = None) -> typing.Optional[Record]:
95
"""
96
Run a query and return the first row.
97
98
Parameters:
99
query: SQL query to execute
100
*args: Query parameters for placeholders
101
timeout: Query timeout in seconds
102
record_class: Custom record class for result
103
104
Returns:
105
Record object or None if no results
106
"""
107
108
async def fetchmany(self, query: str, args, *, timeout: float = None, record_class: type = None) -> typing.List[typing.List[Record]]:
109
"""
110
Execute a query for each sequence of arguments and return all results.
111
112
Parameters:
113
query: SQL query template
114
args: Sequence of argument tuples/lists
115
timeout: Query timeout in seconds
116
record_class: Custom record class for results
117
118
Returns:
119
List of result lists, one per argument sequence
120
"""
121
```
122
123
#### Example Usage
124
125
```python
126
# Fetch all rows
127
users = await conn.fetch("SELECT * FROM users WHERE active = $1", True)
128
for user in users:
129
print(f"{user['name']} - {user['email']}")
130
131
# Fetch single value
132
user_count = await conn.fetchval("SELECT COUNT(*) FROM users")
133
print(f"Total users: {user_count}")
134
135
# Fetch first row
136
admin = await conn.fetchrow("SELECT * FROM users WHERE role = 'admin' LIMIT 1")
137
if admin:
138
print(f"Admin: {admin['name']}")
139
140
# Fetch by column name
141
latest_login = await conn.fetchval(
142
"SELECT last_login FROM users WHERE id = $1",
143
user_id,
144
column='last_login'
145
)
146
147
# Batch queries
148
user_ids = [1, 2, 3, 4, 5]
149
results = await conn.fetchmany(
150
"SELECT * FROM users WHERE id = $1",
151
[(uid,) for uid in user_ids]
152
)
153
```
154
155
### Prepared Statements
156
157
Create reusable prepared statements for improved performance with repeated queries.
158
159
```python { .api }
160
async def prepare(self, query: str, *, name: str = None, timeout: float = None, record_class: type = None) -> PreparedStatement:
161
"""
162
Create a prepared statement for the specified query.
163
164
Parameters:
165
query: SQL query to prepare
166
name: Optional name for the prepared statement
167
timeout: Preparation timeout in seconds
168
record_class: Custom record class for results
169
170
Returns:
171
PreparedStatement instance
172
"""
173
```
174
175
#### PreparedStatement Methods
176
177
```python { .api }
178
class PreparedStatement:
179
"""A prepared SQL statement."""
180
181
def get_name(self) -> str
182
def get_query(self) -> str
183
def get_statusmsg(self) -> str
184
def get_parameters(self) -> typing.Tuple[Type, ...]
185
def get_attributes(self) -> typing.Tuple[Attribute, ...]
186
187
async def fetch(self, *args, timeout: float = None, record_class: type = None) -> typing.List[Record]
188
async def fetchval(self, *args, column: typing.Union[int, str] = 0, timeout: float = None) -> typing.Any
189
async def fetchrow(self, *args, timeout: float = None, record_class: type = None) -> typing.Optional[Record]
190
async def execute(self, *args, timeout: float = None) -> str
191
async def executemany(self, args, *, timeout: float = None) -> None
192
193
def cursor(self, *args, prefetch: int = None, timeout: float = None, record_class: type = None) -> CursorFactory
194
```
195
196
#### Example Usage
197
198
```python
199
# Prepare a statement
200
stmt = await conn.prepare("SELECT * FROM users WHERE department = $1 AND active = $2")
201
202
# Use multiple times with different parameters
203
engineers = await stmt.fetch("Engineering", True)
204
sales = await stmt.fetch("Sales", True)
205
marketing = await stmt.fetch("Marketing", False)
206
207
# Prepared statement for updates
208
update_stmt = await conn.prepare("UPDATE users SET last_login = $1 WHERE id = $2")
209
await update_stmt.executemany([
210
(datetime.now(), 1),
211
(datetime.now(), 2),
212
(datetime.now(), 3)
213
])
214
```
215
216
### Cursors
217
218
Handle large result sets efficiently with scrollable cursors and streaming.
219
220
```python { .api }
221
def cursor(self, query: str, *args, prefetch: int = None, timeout: float = None, record_class: type = None) -> CursorFactory:
222
"""
223
Return a cursor factory for the specified query.
224
225
Parameters:
226
query: SQL query to execute
227
*args: Query parameters for placeholders
228
prefetch: Number of rows to prefetch (default: 50)
229
timeout: Query timeout in seconds
230
record_class: Custom record class for results
231
232
Returns:
233
Cursor factory (async context manager)
234
"""
235
```
236
237
#### Cursor Methods
238
239
```python { .api }
240
class CursorFactory:
241
"""Factory for creating cursors with async iteration support."""
242
243
def __aiter__(self) -> CursorIterator
244
def __await__(self) -> Cursor
245
async def __aenter__(self) -> Cursor
246
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None
247
248
class Cursor:
249
"""Database cursor for iterating over large result sets."""
250
251
async def forward(self, count: int) -> typing.List[Record]
252
async def backwards(self, count: int) -> typing.List[Record]
253
def get_prefetch_size(self) -> int
254
def get_query(self) -> str
255
def get_args(self) -> typing.Tuple
256
257
async def __aenter__(self) -> Cursor
258
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None
259
260
class CursorIterator:
261
"""Async iterator for cursor results."""
262
263
def __aiter__(self) -> CursorIterator
264
async def __anext__(self) -> Record
265
```
266
267
#### Example Usage
268
269
```python
270
# Process large result set with cursor
271
async with conn.cursor("SELECT * FROM large_table WHERE condition = $1", value) as cursor:
272
async for record in cursor:
273
process_record(record)
274
275
# Manual cursor control
276
async with conn.cursor("SELECT * FROM users ORDER BY id", prefetch=100) as cursor:
277
# Fetch first 50 rows
278
batch = await cursor.fetch(50)
279
280
while batch:
281
for row in batch:
282
print(f"User: {row['name']}")
283
284
# Fetch next batch
285
batch = await cursor.fetch(50)
286
```
287
288
### Query Parameterization
289
290
AsyncPG uses PostgreSQL's native parameter binding with numbered placeholders for security and performance.
291
292
```python
293
# Correct parameter usage
294
users = await conn.fetch(
295
"SELECT * FROM users WHERE name = $1 AND age > $2",
296
"Alice", 25
297
)
298
299
# Multiple parameters
300
result = await conn.execute(
301
"UPDATE users SET email = $1, updated_at = $2 WHERE id = $3",
302
"newemail@example.com", datetime.now(), user_id
303
)
304
305
# Array parameters
306
ids = [1, 2, 3, 4, 5]
307
users = await conn.fetch("SELECT * FROM users WHERE id = ANY($1)", ids)
308
309
# JSON parameters
310
data = {"preferences": {"theme": "dark", "language": "en"}}
311
await conn.execute(
312
"UPDATE users SET metadata = $1 WHERE id = $2",
313
json.dumps(data), user_id
314
)
315
```
316
317
### Error Handling
318
319
Handle query execution errors with appropriate exception types.
320
321
```python
322
try:
323
result = await conn.fetch("SELECT * FROM nonexistent_table")
324
except asyncpg.UndefinedTableError:
325
print("Table does not exist")
326
except asyncpg.PostgresSyntaxError:
327
print("SQL syntax error")
328
except asyncpg.DataError:
329
print("Data-related error")
330
except asyncio.TimeoutError:
331
print("Query timed out")
332
```
333
334
## Types
335
336
```python { .api }
337
class Record:
338
"""Query result record with dict-like and tuple-like access."""
339
340
def get(self, key: str, default: typing.Any = None) -> typing.Any
341
def keys(self) -> typing.Iterator[str]
342
def values(self) -> typing.Iterator[typing.Any]
343
def items(self) -> typing.Iterator[typing.Tuple[str, typing.Any]]
344
def __getitem__(self, key: typing.Union[str, int, slice]) -> typing.Any
345
def __len__(self) -> int
346
def __iter__(self) -> typing.Iterator[typing.Any]
347
def __contains__(self, key: object) -> bool
348
349
class PreparedStatement:
350
"""A prepared SQL statement for reuse."""
351
352
def get_name(self) -> str
353
def get_query(self) -> str
354
def get_statusmsg(self) -> str
355
def get_parameters(self) -> typing.Tuple[Type, ...]
356
def get_attributes(self) -> typing.Tuple[Attribute, ...]
357
358
class CursorFactory:
359
"""Factory for creating cursors."""
360
361
class Cursor:
362
"""Database cursor for streaming large result sets."""
363
364
class CursorIterator:
365
"""Async iterator for cursor results."""
366
```