0
# Client and Sessions
1
2
Core client functionality for executing GraphQL operations, managing connections, and handling schemas. Provides both synchronous and asynchronous execution patterns with session management for connection pooling and batched operations.
3
4
## Capabilities
5
6
### Client Class
7
8
The main entrypoint for executing GraphQL requests on a transport. Provides both synchronous and asynchronous execution methods with automatic schema management and result parsing.
9
10
```python { .api }
11
class Client:
12
def __init__(
13
self,
14
transport: Optional[Union[Transport, AsyncTransport]] = None,
15
fetch_schema_from_transport: bool = False,
16
schema: Optional[GraphQLSchema] = None,
17
parse_results: bool = False,
18
introspection: Optional[Dict[str, Any]] = None,
19
type_def: Optional[str] = None,
20
execute_timeout: Optional[Union[int, float]] = 10
21
):
22
"""
23
Initialize GraphQL client.
24
25
Args:
26
transport: Transport to use for GraphQL operations
27
fetch_schema_from_transport: Automatically fetch schema via introspection
28
schema: Pre-built GraphQL schema
29
parse_results: Parse results using schema for custom scalars/enums
30
introspection: Pre-fetched introspection result
31
type_def: Schema definition string
32
execute_timeout: Default timeout for operations in seconds
33
"""
34
35
def execute(
36
self,
37
request: GraphQLRequest,
38
**kwargs
39
) -> ExecutionResult:
40
"""
41
Execute GraphQL request synchronously.
42
43
Args:
44
request: GraphQL request to execute
45
**kwargs: Additional arguments passed to transport
46
47
Returns:
48
ExecutionResult with data, errors, and extensions
49
50
Raises:
51
TransportError: If transport operation fails
52
GraphQLError: If GraphQL validation fails
53
"""
54
55
async def execute_async(
56
self,
57
request: GraphQLRequest,
58
**kwargs
59
) -> ExecutionResult:
60
"""
61
Execute GraphQL request asynchronously.
62
63
Args:
64
request: GraphQL request to execute
65
**kwargs: Additional arguments passed to transport
66
67
Returns:
68
ExecutionResult with data, errors, and extensions
69
"""
70
71
def session(self, **kwargs) -> SyncClientSession:
72
"""
73
Create synchronous session for batched operations.
74
75
Args:
76
**kwargs: Additional arguments for session configuration
77
78
Returns:
79
SyncClientSession context manager
80
"""
81
82
async def connect_async(self, **kwargs) -> AsyncClientSession:
83
"""
84
Create asynchronous session for batched operations.
85
86
Args:
87
**kwargs: Additional arguments for session configuration
88
89
Returns:
90
AsyncClientSession context manager
91
"""
92
93
def get_schema(self) -> GraphQLSchema:
94
"""
95
Get the GraphQL schema.
96
97
Returns:
98
GraphQLSchema object
99
100
Raises:
101
Exception: If no schema is available
102
"""
103
104
def close(self) -> None:
105
"""Close synchronous client and transport."""
106
107
async def close_async(self) -> None:
108
"""Close asynchronous client and transport."""
109
```
110
111
### GraphQL Request Creation
112
113
Create GraphQL requests from query strings with variable support and operation naming.
114
115
```python { .api }
116
def gql(request_string: str) -> GraphQLRequest:
117
"""
118
Parse GraphQL query string into GraphQLRequest object.
119
120
Args:
121
request_string: GraphQL query, mutation, or subscription as string
122
123
Returns:
124
GraphQLRequest object ready for execution
125
126
Raises:
127
GraphQLError: If query string has syntax errors
128
"""
129
130
class GraphQLRequest:
131
def __init__(
132
self,
133
request: Union[DocumentNode, "GraphQLRequest", str],
134
*,
135
variable_values: Optional[Dict[str, Any]] = None,
136
operation_name: Optional[str] = None
137
):
138
"""
139
Initialize GraphQL request.
140
141
Args:
142
request: GraphQL query as string, DocumentNode, or existing GraphQLRequest
143
variable_values: Variables for the GraphQL operation
144
operation_name: Operation name for multi-operation documents
145
146
Raises:
147
GraphQLError: If request has syntax errors
148
TypeError: If request type is invalid
149
"""
150
151
def serialize_variable_values(self, schema: GraphQLSchema) -> GraphQLRequest:
152
"""
153
Serialize variable values using schema type information.
154
155
Args:
156
schema: GraphQL schema for type information
157
158
Returns:
159
New GraphQLRequest with serialized variables
160
"""
161
162
@property
163
def payload(self) -> Dict[str, Any]:
164
"""
165
Get JSON payload representation of the request.
166
167
Returns:
168
Dictionary with 'query', 'variables', and 'operationName' keys
169
"""
170
171
# Properties
172
document: DocumentNode # Parsed GraphQL document
173
variable_values: Optional[Dict[str, Any]] # Query variables
174
operation_name: Optional[str] # Operation name
175
```
176
177
### Synchronous Sessions
178
179
Synchronous session management for batched operations and connection reuse.
180
181
```python { .api }
182
class SyncClientSession:
183
def execute(
184
self,
185
request: GraphQLRequest,
186
**kwargs
187
) -> ExecutionResult:
188
"""
189
Execute GraphQL request in session context.
190
191
Args:
192
request: GraphQL request to execute
193
**kwargs: Additional arguments for execution
194
195
Returns:
196
ExecutionResult with data, errors, and extensions
197
"""
198
199
def execute_batch(
200
self,
201
requests: List[GraphQLRequest],
202
**kwargs
203
) -> List[ExecutionResult]:
204
"""
205
Execute multiple GraphQL requests in a batch.
206
207
Args:
208
requests: List of GraphQL requests to execute
209
**kwargs: Additional arguments for batch execution
210
211
Returns:
212
List of ExecutionResult objects
213
"""
214
215
def close(self) -> None:
216
"""Close the session and transport connection."""
217
218
# Context manager support
219
def __enter__(self) -> SyncClientSession: ...
220
def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...
221
```
222
223
### Asynchronous Sessions
224
225
Asynchronous session management with subscription support and automatic reconnection capabilities.
226
227
```python { .api }
228
class AsyncClientSession:
229
async def execute(
230
self,
231
request: GraphQLRequest,
232
**kwargs
233
) -> ExecutionResult:
234
"""
235
Execute GraphQL request asynchronously in session context.
236
237
Args:
238
request: GraphQL request to execute
239
**kwargs: Additional arguments for execution
240
241
Returns:
242
ExecutionResult with data, errors, and extensions
243
"""
244
245
async def execute_batch(
246
self,
247
requests: List[GraphQLRequest],
248
**kwargs
249
) -> List[ExecutionResult]:
250
"""
251
Execute multiple GraphQL requests in a batch asynchronously.
252
253
Args:
254
requests: List of GraphQL requests to execute
255
**kwargs: Additional arguments for batch execution
256
257
Returns:
258
List of ExecutionResult objects
259
"""
260
261
def subscribe(
262
self,
263
request: GraphQLRequest,
264
**kwargs
265
) -> AsyncGenerator[ExecutionResult, None]:
266
"""
267
Subscribe to GraphQL subscription.
268
269
Args:
270
request: GraphQL subscription request
271
**kwargs: Additional arguments for subscription
272
273
Yields:
274
ExecutionResult objects as they arrive
275
276
Raises:
277
TransportError: If subscription setup fails
278
"""
279
280
async def close(self) -> None:
281
"""Close the async session and transport connection."""
282
283
# Async context manager support
284
async def __aenter__(self) -> AsyncClientSession: ...
285
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: ...
286
287
class ReconnectingAsyncClientSession(AsyncClientSession):
288
"""
289
Async session with automatic reconnection on connection failures.
290
291
Inherits all methods from AsyncClientSession with added reconnection logic.
292
"""
293
```
294
295
## Usage Examples
296
297
### Basic Client Usage
298
299
```python
300
from gql import gql, Client
301
from gql.transport.requests import RequestsHTTPTransport
302
303
# Create client with automatic schema fetching
304
transport = RequestsHTTPTransport(url="https://api.example.com/graphql")
305
client = Client(transport=transport, fetch_schema_from_transport=True)
306
307
# Execute query
308
query = gql('''
309
query GetUsers($limit: Int!) {
310
users(limit: $limit) {
311
id
312
name
313
314
}
315
}
316
''')
317
318
result = client.execute(query, variable_values={"limit": 10})
319
users = result["users"]
320
321
# Close client when done
322
client.close()
323
```
324
325
### Session-based Batch Operations
326
327
```python
328
from gql import gql, Client
329
from gql.transport.requests import RequestsHTTPTransport
330
331
transport = RequestsHTTPTransport(url="https://api.example.com/graphql")
332
client = Client(transport=transport)
333
334
# Using synchronous session
335
with client.session() as session:
336
query1 = gql('{ user(id: "1") { name } }')
337
query2 = gql('{ user(id: "2") { name } }')
338
339
# Execute individually
340
result1 = session.execute(query1)
341
result2 = session.execute(query2)
342
343
# Or execute as batch
344
results = session.execute_batch([query1, query2])
345
```
346
347
### Asynchronous Operations
348
349
```python
350
import asyncio
351
from gql import gql, Client
352
from gql.transport.aiohttp import AIOHTTPTransport
353
354
async def main():
355
transport = AIOHTTPTransport(url="https://api.example.com/graphql")
356
client = Client(transport=transport)
357
358
# Using async context manager
359
async with client.connect_async() as session:
360
query = gql('{ hello }')
361
result = await session.execute(query)
362
print(result)
363
364
# Batch execution
365
queries = [gql('{ user1: user(id: "1") { name } }'),
366
gql('{ user2: user(id: "2") { name } }')]
367
results = await session.execute_batch(queries)
368
369
await client.close_async()
370
371
asyncio.run(main())
372
```
373
374
### Subscription Handling
375
376
```python
377
import asyncio
378
from gql import gql, Client
379
from gql.transport.websockets import WebsocketsTransport
380
381
async def handle_subscriptions():
382
transport = WebsocketsTransport(url="wss://api.example.com/graphql")
383
client = Client(transport=transport)
384
385
async with client.connect_async() as session:
386
subscription = gql('''
387
subscription {
388
messageAdded {
389
id
390
content
391
user {
392
name
393
}
394
}
395
}
396
''')
397
398
# Handle subscription messages
399
async for result in session.subscribe(subscription):
400
if result.data:
401
message = result.data["messageAdded"]
402
print(f"New message from {message['user']['name']}: {message['content']}")
403
404
if result.errors:
405
print(f"Subscription error: {result.errors}")
406
break
407
408
asyncio.run(handle_subscriptions())
409
```
410
411
### Custom Schema Handling
412
413
```python
414
from gql import gql, Client
415
from gql.transport.requests import RequestsHTTPTransport
416
from gql.utilities import update_schema_scalar
417
from graphql import GraphQLScalarType
418
419
# Custom scalar for datetime handling
420
datetime_scalar = GraphQLScalarType(
421
name="DateTime",
422
serialize=lambda dt: dt.isoformat(),
423
parse_value=lambda value: datetime.fromisoformat(value),
424
parse_literal=lambda ast: datetime.fromisoformat(ast.value)
425
)
426
427
transport = RequestsHTTPTransport(url="https://api.example.com/graphql")
428
client = Client(
429
transport=transport,
430
fetch_schema_from_transport=True,
431
parse_results=True # Enable result parsing with custom scalars
432
)
433
434
# Update schema with custom scalar
435
update_schema_scalar(client.schema, "DateTime", datetime_scalar)
436
437
# Queries will now automatically parse DateTime fields
438
query = gql('{ posts { title createdAt } }')
439
result = client.execute(query)
440
# result["posts"][0]["createdAt"] is now a datetime object
441
```