0
# Transport Protocols
1
2
Comprehensive transport implementations for HTTP, WebSocket, and local schema operations. Includes synchronous and asynchronous variants with protocol-specific optimizations, authentication support, and file upload capabilities.
3
4
## Capabilities
5
6
### HTTP Transports
7
8
HTTP-based transports for GraphQL operations using popular Python HTTP clients. Support synchronous and asynchronous execution, file uploads, request batching, and comprehensive configuration options.
9
10
#### RequestsHTTPTransport
11
12
Synchronous HTTP transport using the requests library with extensive configuration options and file upload support.
13
14
```python { .api }
15
class RequestsHTTPTransport(Transport):
16
def __init__(
17
self,
18
url: str,
19
headers: Optional[Dict[str, Any]] = None,
20
cookies: Optional[Union[Dict[str, Any], RequestsCookieJar]] = None,
21
auth: Optional[AuthBase] = None,
22
use_json: bool = True,
23
timeout: Optional[int] = None,
24
verify: Union[bool, str] = True,
25
retries: int = 0,
26
method: str = "POST",
27
retry_backoff_factor: float = 0.1,
28
retry_status_forcelist: Collection[int] = None,
29
json_serialize: Callable = json.dumps,
30
json_deserialize: Callable = json.loads
31
):
32
"""
33
Initialize requests-based HTTP transport.
34
35
Args:
36
url: GraphQL server endpoint URL
37
headers: HTTP headers to send with requests
38
cookies: HTTP cookies for requests
39
auth: requests authentication object
40
use_json: Send requests as JSON vs form-encoded
41
timeout: Request timeout in seconds
42
verify: SSL certificate verification (bool or CA bundle path)
43
retries: Number of retry attempts on failure
44
method: HTTP method to use (POST, GET)
45
retry_backoff_factor: Backoff multiplier for retries
46
retry_status_forcelist: HTTP status codes to retry
47
json_serialize: JSON serialization function
48
json_deserialize: JSON deserialization function
49
"""
50
51
def execute(
52
self,
53
request: GraphQLRequest,
54
timeout: Optional[int] = None,
55
extra_args: Optional[Dict] = None,
56
upload_files: bool = False
57
) -> ExecutionResult:
58
"""
59
Execute GraphQL request via HTTP.
60
61
Args:
62
request: GraphQL request to execute
63
timeout: Override default timeout
64
extra_args: Additional arguments for requests
65
upload_files: Enable multipart file upload support
66
67
Returns:
68
ExecutionResult with response data
69
"""
70
71
def execute_batch(
72
self,
73
reqs: List[GraphQLRequest],
74
timeout: Optional[int] = None,
75
extra_args: Optional[Dict] = None
76
) -> List[ExecutionResult]:
77
"""Execute multiple requests in a single HTTP call."""
78
79
# Properties
80
response_headers: Optional[CaseInsensitiveDict[str]] # Last response headers
81
```
82
83
#### HTTPXTransport and HTTPXAsyncTransport
84
85
HTTP transports using the httpx library with both synchronous and asynchronous variants.
86
87
```python { .api }
88
class HTTPXTransport(Transport):
89
def __init__(
90
self,
91
url: Union[str, httpx.URL],
92
json_serialize: Callable = json.dumps,
93
json_deserialize: Callable = json.loads,
94
**kwargs
95
):
96
"""
97
Initialize httpx-based synchronous HTTP transport.
98
99
Args:
100
url: GraphQL server endpoint URL
101
json_serialize: JSON serialization function
102
json_deserialize: JSON deserialization function
103
**kwargs: Additional httpx client parameters
104
"""
105
106
def execute(
107
self,
108
request: GraphQLRequest,
109
extra_args: Optional[Dict] = None,
110
upload_files: bool = False
111
) -> ExecutionResult: ...
112
113
# Properties
114
response_headers: Optional[httpx.Headers] # Last response headers
115
116
class HTTPXAsyncTransport(AsyncTransport):
117
def __init__(
118
self,
119
url: Union[str, httpx.URL],
120
json_serialize: Callable = json.dumps,
121
json_deserialize: Callable = json.loads,
122
**kwargs
123
):
124
"""Initialize httpx-based asynchronous HTTP transport."""
125
126
async def execute(
127
self,
128
request: GraphQLRequest,
129
extra_args: Optional[Dict] = None,
130
upload_files: bool = False
131
) -> ExecutionResult: ...
132
133
async def execute_batch(
134
self,
135
reqs: List[GraphQLRequest],
136
extra_args: Optional[Dict] = None
137
) -> List[ExecutionResult]: ...
138
139
def subscribe(self, request: GraphQLRequest) -> AsyncGenerator[ExecutionResult, None]:
140
"""Raises NotImplementedError - HTTP doesn't support subscriptions."""
141
```
142
143
#### AIOHTTPTransport
144
145
Asynchronous HTTP transport using aiohttp with comprehensive configuration and streaming file upload support.
146
147
```python { .api }
148
class AIOHTTPTransport(AsyncTransport):
149
def __init__(
150
self,
151
url: str,
152
headers: Optional[LooseHeaders] = None,
153
cookies: Optional[LooseCookies] = None,
154
auth: Optional[Union[BasicAuth, AppSyncAuthentication]] = None,
155
ssl: Union[SSLContext, bool, Fingerprint] = True,
156
timeout: Optional[int] = None,
157
ssl_close_timeout: Optional[Union[int, float]] = 10,
158
json_serialize: Callable = json.dumps,
159
json_deserialize: Callable = json.loads,
160
client_session_args: Optional[Dict[str, Any]] = None
161
):
162
"""
163
Initialize aiohttp-based HTTP transport.
164
165
Args:
166
url: GraphQL server endpoint URL
167
headers: HTTP headers for requests
168
cookies: HTTP cookies for requests
169
auth: Authentication (BasicAuth or AppSync)
170
ssl: SSL context or verification settings
171
timeout: Request timeout in seconds
172
ssl_close_timeout: SSL connection close timeout
173
json_serialize: JSON serialization function
174
json_deserialize: JSON deserialization function
175
client_session_args: Extra aiohttp ClientSession arguments
176
"""
177
178
async def execute(
179
self,
180
request: GraphQLRequest,
181
extra_args: Optional[Dict] = None,
182
upload_files: bool = False
183
) -> ExecutionResult:
184
"""
185
Execute GraphQL request asynchronously.
186
187
Supports streaming file uploads using aiohttp.StreamReader
188
and AsyncGenerator file types.
189
"""
190
191
# Properties
192
response_headers: Optional[CIMultiDictProxy[str]] # Last response headers
193
```
194
195
### WebSocket Transports
196
197
WebSocket-based transports for GraphQL subscriptions and real-time operations. Support multiple protocols and authentication methods.
198
199
#### WebsocketsTransport
200
201
Primary WebSocket transport using the websockets library with support for multiple GraphQL WebSocket protocols.
202
203
```python { .api }
204
class WebsocketsTransport(AsyncTransport):
205
def __init__(
206
self,
207
url: str,
208
headers: Optional[HeadersLike] = None,
209
ssl: Union[SSLContext, bool] = False,
210
init_payload: Optional[Dict[str, Any]] = None,
211
connect_timeout: Optional[Union[int, float]] = 10,
212
close_timeout: Optional[Union[int, float]] = 10,
213
ack_timeout: Optional[Union[int, float]] = 10,
214
keep_alive_timeout: Optional[Union[int, float]] = None,
215
ping_interval: Optional[Union[int, float]] = None,
216
pong_timeout: Optional[Union[int, float]] = None,
217
answer_pings: bool = True,
218
connect_args: Optional[Dict[str, Any]] = None,
219
subprotocols: Optional[List[str]] = None
220
):
221
"""
222
Initialize WebSocket transport for GraphQL subscriptions.
223
224
Args:
225
url: WebSocket server URL (wss://example.com/graphql)
226
headers: HTTP headers for WebSocket handshake
227
ssl: SSL context or verification settings
228
init_payload: Connection initialization payload
229
connect_timeout: Connection establishment timeout
230
close_timeout: Connection close timeout
231
ack_timeout: Acknowledgment timeout for operations
232
keep_alive_timeout: Keep-alive message timeout
233
ping_interval: Ping interval for graphql-ws protocol
234
pong_timeout: Pong response timeout
235
answer_pings: Whether to respond to server pings
236
connect_args: Additional websockets.connect arguments
237
subprotocols: WebSocket subprotocols to negotiate
238
"""
239
240
async def execute(self, request: GraphQLRequest) -> ExecutionResult:
241
"""Execute single GraphQL operation over WebSocket."""
242
243
def subscribe(
244
self,
245
request: GraphQLRequest,
246
send_stop: bool = True
247
) -> AsyncGenerator[ExecutionResult, None]:
248
"""
249
Subscribe to GraphQL subscription.
250
251
Args:
252
request: GraphQL subscription request
253
send_stop: Send stop message when subscription ends
254
255
Yields:
256
ExecutionResult objects as they arrive from server
257
"""
258
259
async def send_ping(self, payload: Optional[Any] = None) -> None:
260
"""Send ping message (graphql-ws protocol only)."""
261
262
async def send_pong(self, payload: Optional[Any] = None) -> None:
263
"""Send pong message (graphql-ws protocol only)."""
264
265
# Properties
266
response_headers: Dict[str, str] # WebSocket handshake response headers
267
url: str # Connection URL
268
headers: Optional[HeadersLike] # Connection headers
269
ssl: Union[SSLContext, bool] # SSL configuration
270
```
271
272
**Supported Subprotocols:**
273
- `graphql-ws` - Apollo GraphQL WebSocket protocol
274
- `graphql-transport-ws` - GraphQL WS protocol
275
276
#### AIOHTTPWebsocketsTransport
277
278
WebSocket transport using aiohttp's WebSocket client with additional configuration options.
279
280
```python { .api }
281
class AIOHTTPWebsocketsTransport(AsyncTransport):
282
def __init__(
283
self,
284
url: StrOrURL,
285
subprotocols: Optional[List[str]] = None,
286
heartbeat: Optional[float] = None,
287
auth: Optional[BasicAuth] = None,
288
origin: Optional[str] = None,
289
params: Optional[Mapping[str, str]] = None,
290
headers: Optional[LooseHeaders] = None,
291
proxy: Optional[StrOrURL] = None,
292
proxy_auth: Optional[BasicAuth] = None,
293
proxy_headers: Optional[LooseHeaders] = None,
294
ssl: Optional[Union[SSLContext, Literal[False], Fingerprint]] = None,
295
websocket_close_timeout: float = 10.0,
296
receive_timeout: Optional[float] = None,
297
ssl_close_timeout: Optional[Union[int, float]] = 10,
298
session: Optional[ClientSession] = None,
299
client_session_args: Optional[Dict[str, Any]] = None,
300
connect_args: Optional[Dict[str, Any]] = None,
301
# Plus all WebSocket protocol parameters
302
**kwargs
303
):
304
"""
305
Initialize aiohttp-based WebSocket transport.
306
307
Args:
308
url: WebSocket server URL
309
subprotocols: WebSocket subprotocols to negotiate
310
heartbeat: Low-level ping heartbeat interval
311
auth: Basic authentication for connection
312
origin: Origin header for WebSocket handshake
313
params: Query parameters for connection URL
314
headers: HTTP headers for handshake
315
proxy: Proxy server URL
316
proxy_auth: Proxy authentication
317
proxy_headers: Proxy-specific headers
318
ssl: SSL configuration
319
websocket_close_timeout: WebSocket close timeout
320
receive_timeout: Message receive timeout
321
ssl_close_timeout: SSL close timeout
322
session: Existing aiohttp ClientSession to use
323
client_session_args: ClientSession creation arguments
324
connect_args: WebSocket connection arguments
325
"""
326
```
327
328
#### PhoenixChannelWebsocketsTransport
329
330
Specialized transport for Phoenix Framework Absinthe GraphQL servers using Phoenix Channel protocol.
331
332
```python { .api }
333
class PhoenixChannelWebsocketsTransport(AsyncTransport):
334
def __init__(
335
self,
336
url: str,
337
channel_name: str = "__absinthe__:control",
338
heartbeat_interval: float = 30,
339
ack_timeout: Optional[Union[int, float]] = 10,
340
# Plus WebSocket connection parameters
341
**kwargs
342
):
343
"""
344
Initialize Phoenix Channel WebSocket transport.
345
346
Args:
347
url: Phoenix server URL
348
channel_name: Phoenix channel name for GraphQL operations
349
heartbeat_interval: Heartbeat interval in seconds
350
ack_timeout: Acknowledgment timeout for messages
351
"""
352
353
async def execute(self, request: GraphQLRequest) -> ExecutionResult: ...
354
355
def subscribe(
356
self,
357
request: GraphQLRequest,
358
send_stop: bool = True
359
) -> AsyncGenerator[ExecutionResult, None]: ...
360
```
361
362
**Phoenix-specific Features:**
363
- Automatic Phoenix Channel protocol handling (phx_join, phx_leave)
364
- Heartbeat messages to maintain connection
365
- Subscription management with subscriptionId tracking
366
367
#### AppSyncWebsocketsTransport
368
369
Specialized transport for AWS AppSync realtime subscriptions with AWS authentication support.
370
371
```python { .api }
372
class AppSyncWebsocketsTransport(AsyncTransport):
373
def __init__(
374
self,
375
url: str,
376
auth: Optional[AppSyncAuthentication] = None,
377
session: Optional[botocore.session.Session] = None,
378
ssl: Union[SSLContext, bool] = False,
379
connect_timeout: int = 10,
380
close_timeout: int = 10,
381
ack_timeout: int = 10,
382
keep_alive_timeout: Optional[Union[int, float]] = None,
383
connect_args: Dict[str, Any] = {}
384
):
385
"""
386
Initialize AWS AppSync WebSocket transport.
387
388
Args:
389
url: AppSync GraphQL endpoint URL (automatically converted to realtime endpoint)
390
auth: AWS authentication method (defaults to IAM)
391
session: Boto3 session for IAM authentication
392
ssl: SSL configuration
393
connect_timeout: Connection timeout
394
close_timeout: Close timeout
395
ack_timeout: Acknowledgment timeout
396
keep_alive_timeout: Keep-alive timeout
397
connect_args: WebSocket connection arguments
398
"""
399
400
def execute(self, request: GraphQLRequest) -> ExecutionResult:
401
"""Raises AssertionError - only subscriptions supported on realtime endpoint."""
402
403
def subscribe(
404
self,
405
request: GraphQLRequest,
406
send_stop: bool = True
407
) -> AsyncGenerator[ExecutionResult, None]:
408
"""Subscribe to AppSync realtime subscription."""
409
```
410
411
**AppSync-specific Features:**
412
- Only supports subscriptions (queries/mutations not allowed on realtime endpoint)
413
- Automatic URL conversion from GraphQL to realtime endpoint
414
- AWS signature-based authentication
415
- Supports multiple AWS auth methods
416
417
### Authentication Classes
418
419
Authentication implementations for AWS AppSync WebSocket connections.
420
421
```python { .api }
422
class AppSyncAuthentication:
423
"""Abstract base class for AppSync authentication methods."""
424
425
def get_auth_url(self, url: str) -> str:
426
"""Convert HTTP GraphQL URL to authenticated WebSocket URL."""
427
428
def get_headers(self, data=None, headers=None) -> Dict[str, Any]:
429
"""Get authentication headers for WebSocket connection."""
430
431
class AppSyncApiKeyAuthentication(AppSyncAuthentication):
432
def __init__(self, host: str, api_key: str):
433
"""
434
API key authentication for AppSync.
435
436
Args:
437
host: AppSync API host
438
api_key: AppSync API key
439
"""
440
441
class AppSyncJWTAuthentication(AppSyncAuthentication):
442
def __init__(self, host: str, jwt: str):
443
"""
444
JWT authentication for AppSync (Cognito User Pools, OIDC).
445
446
Args:
447
host: AppSync API host
448
jwt: JWT access token
449
"""
450
451
class AppSyncIAMAuthentication(AppSyncAuthentication):
452
def __init__(
453
self,
454
host: str,
455
region_name: Optional[str] = None,
456
signer: Optional[botocore.auth.BaseSigner] = None,
457
request_creator: Optional[Callable] = None,
458
credentials: Optional[botocore.credentials.Credentials] = None,
459
session: Optional[botocore.session.Session] = None
460
):
461
"""
462
IAM authentication for AppSync with SigV4 signing.
463
464
Args:
465
host: AppSync API host
466
region_name: AWS region (auto-detected if not provided)
467
signer: Custom botocore signer
468
request_creator: Custom request creator
469
credentials: AWS credentials
470
session: Boto3 session for credential resolution
471
"""
472
```
473
474
### Local Schema Transport
475
476
Execute GraphQL operations directly against local schemas without network communication.
477
478
```python { .api }
479
class LocalSchemaTransport(AsyncTransport):
480
def __init__(self, schema: GraphQLSchema):
481
"""
482
Initialize local schema transport.
483
484
Args:
485
schema: Local GraphQL schema object to execute against
486
"""
487
488
async def connect(self) -> None:
489
"""No-op connection (no network required)."""
490
491
async def execute(
492
self,
493
request: GraphQLRequest,
494
*args,
495
**kwargs
496
) -> ExecutionResult:
497
"""
498
Execute GraphQL request against local schema.
499
500
Args:
501
request: GraphQL request to execute
502
*args: Additional positional arguments
503
**kwargs: Additional keyword arguments
504
505
Returns:
506
ExecutionResult from local schema execution
507
"""
508
509
def subscribe(
510
self,
511
request: GraphQLRequest,
512
*args,
513
**kwargs
514
) -> AsyncGenerator[ExecutionResult, None]:
515
"""
516
Execute GraphQL subscription against local schema.
517
518
Args:
519
request: GraphQL subscription request
520
*args: Additional positional arguments
521
**kwargs: Additional keyword arguments
522
523
Yields:
524
ExecutionResult objects from subscription
525
"""
526
527
async def close(self) -> None:
528
"""No-op close (no connection to close)."""
529
```
530
531
### File Upload Support
532
533
Support for GraphQL multipart file uploads across HTTP transports.
534
535
```python { .api }
536
class FileVar:
537
def __init__(
538
self,
539
f: Any, # str | io.IOBase | aiohttp.StreamReader | AsyncGenerator
540
*,
541
filename: Optional[str] = None,
542
content_type: Optional[str] = None,
543
streaming: bool = False,
544
streaming_block_size: int = 64 * 1024
545
):
546
"""
547
File variable for GraphQL multipart uploads.
548
549
Args:
550
f: File object (path string, file handle, stream, or async generator)
551
filename: Override filename for upload
552
content_type: MIME content type
553
streaming: Enable streaming upload (requires transport support)
554
streaming_block_size: Chunk size for streaming uploads
555
"""
556
557
def open_file(self, transport_supports_streaming: bool = False) -> None:
558
"""
559
Open file for upload.
560
561
Args:
562
transport_supports_streaming: Whether transport supports streaming
563
"""
564
565
def close_file(self) -> None:
566
"""Close opened file handle."""
567
568
# File utility functions
569
def open_files(
570
filevars: List[FileVar],
571
transport_supports_streaming: bool = False
572
) -> None:
573
"""Open multiple FileVar objects for upload."""
574
575
def close_files(filevars: List[FileVar]) -> None:
576
"""Close multiple FileVar objects."""
577
578
def extract_files(
579
variables: Dict,
580
file_classes: Tuple[Type[Any], ...]
581
) -> Tuple[Dict, List[FileVar]]:
582
"""Extract FileVar objects from GraphQL variables."""
583
```
584
585
## Usage Examples
586
587
### HTTP Transport Configuration
588
589
```python
590
from gql import Client
591
from gql.transport.requests import RequestsHTTPTransport
592
from requests.auth import HTTPBasicAuth
593
594
# Basic HTTP transport
595
transport = RequestsHTTPTransport(
596
url="https://api.example.com/graphql",
597
headers={"User-Agent": "MyApp/1.0"},
598
timeout=30
599
)
600
601
# With authentication and retries
602
transport = RequestsHTTPTransport(
603
url="https://api.example.com/graphql",
604
auth=HTTPBasicAuth("username", "password"),
605
retries=3,
606
retry_backoff_factor=0.5,
607
retry_status_forcelist=[500, 502, 503, 504]
608
)
609
610
client = Client(transport=transport)
611
```
612
613
### File Upload Example
614
615
```python
616
from gql import gql, Client, FileVar
617
from gql.transport.aiohttp import AIOHTTPTransport
618
619
async def upload_file():
620
transport = AIOHTTPTransport(url="https://api.example.com/graphql")
621
client = Client(transport=transport)
622
623
# Create file variable
624
file_var = FileVar(
625
"document.pdf",
626
content_type="application/pdf"
627
)
628
629
# GraphQL mutation with file upload
630
mutation = gql('''
631
mutation UploadDocument($file: Upload!) {
632
uploadDocument(file: $file) {
633
id
634
filename
635
url
636
}
637
}
638
''')
639
640
async with client.connect_async() as session:
641
result = await session.execute(
642
mutation,
643
variable_values={"file": file_var},
644
upload_files=True
645
)
646
647
return result["uploadDocument"]
648
```
649
650
### WebSocket Subscription Setup
651
652
```python
653
import asyncio
654
from gql import gql, Client
655
from gql.transport.websockets import WebsocketsTransport
656
657
async def handle_messages():
658
# Configure WebSocket transport
659
transport = WebsocketsTransport(
660
url="wss://api.example.com/graphql",
661
headers={"Authorization": "Bearer token123"},
662
init_payload={"authToken": "token123"},
663
subprotocols=["graphql-ws"]
664
)
665
666
client = Client(transport=transport)
667
668
async with client.connect_async() as session:
669
subscription = gql('''
670
subscription MessageSubscription($channelId: ID!) {
671
messageAdded(channelId: $channelId) {
672
id
673
content
674
user {
675
name
676
}
677
timestamp
678
}
679
}
680
''')
681
682
async for result in session.subscribe(
683
subscription,
684
variable_values={"channelId": "general"}
685
):
686
if result.data:
687
message = result.data["messageAdded"]
688
print(f"[{message['timestamp']}] {message['user']['name']}: {message['content']}")
689
690
if result.errors:
691
print(f"Subscription error: {result.errors}")
692
693
asyncio.run(handle_messages())
694
```
695
696
### AWS AppSync Integration
697
698
```python
699
import asyncio
700
from gql import gql, Client
701
from gql.transport.appsync_websockets import AppSyncWebsocketsTransport
702
from gql.transport.appsync_auth import AppSyncIAMAuthentication
703
704
async def appsync_subscription():
705
# Configure AppSync authentication
706
auth = AppSyncIAMAuthentication(
707
host="example.appsync-api.us-east-1.amazonaws.com",
708
region_name="us-east-1"
709
)
710
711
# Create AppSync transport
712
transport = AppSyncWebsocketsTransport(
713
url="https://example.appsync-api.us-east-1.amazonaws.com/graphql",
714
auth=auth
715
)
716
717
client = Client(transport=transport)
718
719
async with client.connect_async() as session:
720
subscription = gql('''
721
subscription OnCommentAdded($postId: ID!) {
722
onCommentAdded(postId: $postId) {
723
id
724
content
725
author
726
createdAt
727
}
728
}
729
''')
730
731
async for result in session.subscribe(
732
subscription,
733
variable_values={"postId": "post-123"}
734
):
735
comment = result.data["onCommentAdded"]
736
print(f"New comment by {comment['author']}: {comment['content']}")
737
738
asyncio.run(appsync_subscription())
739
```
740
741
### Local Schema Testing
742
743
```python
744
from gql import gql, Client
745
from gql.transport.local_schema import LocalSchemaTransport
746
from graphql import build_schema
747
748
# Define schema
749
type_defs = '''
750
type Query {
751
hello(name: String): String
752
}
753
754
type Subscription {
755
counter: Int
756
}
757
'''
758
759
# Create executable schema with resolvers
760
schema = build_schema(type_defs)
761
762
# Add resolvers
763
def resolve_hello(root, info, name="World"):
764
return f"Hello, {name}!"
765
766
async def resolve_counter(root, info):
767
for i in range(10):
768
await asyncio.sleep(1)
769
yield {"counter": i}
770
771
schema.query_type.fields["hello"].resolve = resolve_hello
772
schema.subscription_type.fields["counter"].subscribe = resolve_counter
773
774
# Use local transport
775
transport = LocalSchemaTransport(schema)
776
client = Client(transport=transport)
777
778
async with client.connect_async() as session:
779
# Execute query
780
query = gql('{ hello(name: "Alice") }')
781
result = await session.execute(query)
782
print(result) # {'hello': 'Hello, Alice!'}
783
784
# Execute subscription
785
subscription = gql('subscription { counter }')
786
async for result in session.subscribe(subscription):
787
print(f"Counter: {result['counter']}")
788
```