or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli.mdclient-sessions.mddsl.mdindex.mdtransports.mdutilities.md

transports.mddocs/

0

# Transport Protocols

1

2

Comprehensive transport implementations for HTTP, WebSocket, and local schema operations. Includes synchronous and asynchronous variants with protocol-specific optimizations, authentication support, and file upload capabilities.

3

4

## Capabilities

5

6

### HTTP Transports

7

8

HTTP-based transports for GraphQL operations using popular Python HTTP clients. Support synchronous and asynchronous execution, file uploads, request batching, and comprehensive configuration options.

9

10

#### RequestsHTTPTransport

11

12

Synchronous HTTP transport using the requests library with extensive configuration options and file upload support.

13

14

```python { .api }

15

class RequestsHTTPTransport(Transport):

16

def __init__(

17

self,

18

url: str,

19

headers: Optional[Dict[str, Any]] = None,

20

cookies: Optional[Union[Dict[str, Any], RequestsCookieJar]] = None,

21

auth: Optional[AuthBase] = None,

22

use_json: bool = True,

23

timeout: Optional[int] = None,

24

verify: Union[bool, str] = True,

25

retries: int = 0,

26

method: str = "POST",

27

retry_backoff_factor: float = 0.1,

28

retry_status_forcelist: Collection[int] = None,

29

json_serialize: Callable = json.dumps,

30

json_deserialize: Callable = json.loads

31

):

32

"""

33

Initialize requests-based HTTP transport.

34

35

Args:

36

url: GraphQL server endpoint URL

37

headers: HTTP headers to send with requests

38

cookies: HTTP cookies for requests

39

auth: requests authentication object

40

use_json: Send requests as JSON vs form-encoded

41

timeout: Request timeout in seconds

42

verify: SSL certificate verification (bool or CA bundle path)

43

retries: Number of retry attempts on failure

44

method: HTTP method to use (POST, GET)

45

retry_backoff_factor: Backoff multiplier for retries

46

retry_status_forcelist: HTTP status codes to retry

47

json_serialize: JSON serialization function

48

json_deserialize: JSON deserialization function

49

"""

50

51

def execute(

52

self,

53

request: GraphQLRequest,

54

timeout: Optional[int] = None,

55

extra_args: Optional[Dict] = None,

56

upload_files: bool = False

57

) -> ExecutionResult:

58

"""

59

Execute GraphQL request via HTTP.

60

61

Args:

62

request: GraphQL request to execute

63

timeout: Override default timeout

64

extra_args: Additional arguments for requests

65

upload_files: Enable multipart file upload support

66

67

Returns:

68

ExecutionResult with response data

69

"""

70

71

def execute_batch(

72

self,

73

reqs: List[GraphQLRequest],

74

timeout: Optional[int] = None,

75

extra_args: Optional[Dict] = None

76

) -> List[ExecutionResult]:

77

"""Execute multiple requests in a single HTTP call."""

78

79

# Properties

80

response_headers: Optional[CaseInsensitiveDict[str]] # Last response headers

81

```

82

83

#### HTTPXTransport and HTTPXAsyncTransport

84

85

HTTP transports using the httpx library with both synchronous and asynchronous variants.

86

87

```python { .api }

88

class HTTPXTransport(Transport):

89

def __init__(

90

self,

91

url: Union[str, httpx.URL],

92

json_serialize: Callable = json.dumps,

93

json_deserialize: Callable = json.loads,

94

**kwargs

95

):

96

"""

97

Initialize httpx-based synchronous HTTP transport.

98

99

Args:

100

url: GraphQL server endpoint URL

101

json_serialize: JSON serialization function

102

json_deserialize: JSON deserialization function

103

**kwargs: Additional httpx client parameters

104

"""

105

106

def execute(

107

self,

108

request: GraphQLRequest,

109

extra_args: Optional[Dict] = None,

110

upload_files: bool = False

111

) -> ExecutionResult: ...

112

113

# Properties

114

response_headers: Optional[httpx.Headers] # Last response headers

115

116

class HTTPXAsyncTransport(AsyncTransport):

117

def __init__(

118

self,

119

url: Union[str, httpx.URL],

120

json_serialize: Callable = json.dumps,

121

json_deserialize: Callable = json.loads,

122

**kwargs

123

):

124

"""Initialize httpx-based asynchronous HTTP transport."""

125

126

async def execute(

127

self,

128

request: GraphQLRequest,

129

extra_args: Optional[Dict] = None,

130

upload_files: bool = False

131

) -> ExecutionResult: ...

132

133

async def execute_batch(

134

self,

135

reqs: List[GraphQLRequest],

136

extra_args: Optional[Dict] = None

137

) -> List[ExecutionResult]: ...

138

139

def subscribe(self, request: GraphQLRequest) -> AsyncGenerator[ExecutionResult, None]:

140

"""Raises NotImplementedError - HTTP doesn't support subscriptions."""

141

```

142

143

#### AIOHTTPTransport

144

145

Asynchronous HTTP transport using aiohttp with comprehensive configuration and streaming file upload support.

146

147

```python { .api }

148

class AIOHTTPTransport(AsyncTransport):

149

def __init__(

150

self,

151

url: str,

152

headers: Optional[LooseHeaders] = None,

153

cookies: Optional[LooseCookies] = None,

154

auth: Optional[Union[BasicAuth, AppSyncAuthentication]] = None,

155

ssl: Union[SSLContext, bool, Fingerprint] = True,

156

timeout: Optional[int] = None,

157

ssl_close_timeout: Optional[Union[int, float]] = 10,

158

json_serialize: Callable = json.dumps,

159

json_deserialize: Callable = json.loads,

160

client_session_args: Optional[Dict[str, Any]] = None

161

):

162

"""

163

Initialize aiohttp-based HTTP transport.

164

165

Args:

166

url: GraphQL server endpoint URL

167

headers: HTTP headers for requests

168

cookies: HTTP cookies for requests

169

auth: Authentication (BasicAuth or AppSync)

170

ssl: SSL context or verification settings

171

timeout: Request timeout in seconds

172

ssl_close_timeout: SSL connection close timeout

173

json_serialize: JSON serialization function

174

json_deserialize: JSON deserialization function

175

client_session_args: Extra aiohttp ClientSession arguments

176

"""

177

178

async def execute(

179

self,

180

request: GraphQLRequest,

181

extra_args: Optional[Dict] = None,

182

upload_files: bool = False

183

) -> ExecutionResult:

184

"""

185

Execute GraphQL request asynchronously.

186

187

Supports streaming file uploads using aiohttp.StreamReader

188

and AsyncGenerator file types.

189

"""

190

191

# Properties

192

response_headers: Optional[CIMultiDictProxy[str]] # Last response headers

193

```

194

195

### WebSocket Transports

196

197

WebSocket-based transports for GraphQL subscriptions and real-time operations. Support multiple protocols and authentication methods.

198

199

#### WebsocketsTransport

200

201

Primary WebSocket transport using the websockets library with support for multiple GraphQL WebSocket protocols.

202

203

```python { .api }

204

class WebsocketsTransport(AsyncTransport):

205

def __init__(

206

self,

207

url: str,

208

headers: Optional[HeadersLike] = None,

209

ssl: Union[SSLContext, bool] = False,

210

init_payload: Optional[Dict[str, Any]] = None,

211

connect_timeout: Optional[Union[int, float]] = 10,

212

close_timeout: Optional[Union[int, float]] = 10,

213

ack_timeout: Optional[Union[int, float]] = 10,

214

keep_alive_timeout: Optional[Union[int, float]] = None,

215

ping_interval: Optional[Union[int, float]] = None,

216

pong_timeout: Optional[Union[int, float]] = None,

217

answer_pings: bool = True,

218

connect_args: Optional[Dict[str, Any]] = None,

219

subprotocols: Optional[List[str]] = None

220

):

221

"""

222

Initialize WebSocket transport for GraphQL subscriptions.

223

224

Args:

225

url: WebSocket server URL (wss://example.com/graphql)

226

headers: HTTP headers for WebSocket handshake

227

ssl: SSL context or verification settings

228

init_payload: Connection initialization payload

229

connect_timeout: Connection establishment timeout

230

close_timeout: Connection close timeout

231

ack_timeout: Acknowledgment timeout for operations

232

keep_alive_timeout: Keep-alive message timeout

233

ping_interval: Ping interval for graphql-ws protocol

234

pong_timeout: Pong response timeout

235

answer_pings: Whether to respond to server pings

236

connect_args: Additional websockets.connect arguments

237

subprotocols: WebSocket subprotocols to negotiate

238

"""

239

240

async def execute(self, request: GraphQLRequest) -> ExecutionResult:

241

"""Execute single GraphQL operation over WebSocket."""

242

243

def subscribe(

244

self,

245

request: GraphQLRequest,

246

send_stop: bool = True

247

) -> AsyncGenerator[ExecutionResult, None]:

248

"""

249

Subscribe to GraphQL subscription.

250

251

Args:

252

request: GraphQL subscription request

253

send_stop: Send stop message when subscription ends

254

255

Yields:

256

ExecutionResult objects as they arrive from server

257

"""

258

259

async def send_ping(self, payload: Optional[Any] = None) -> None:

260

"""Send ping message (graphql-ws protocol only)."""

261

262

async def send_pong(self, payload: Optional[Any] = None) -> None:

263

"""Send pong message (graphql-ws protocol only)."""

264

265

# Properties

266

response_headers: Dict[str, str] # WebSocket handshake response headers

267

url: str # Connection URL

268

headers: Optional[HeadersLike] # Connection headers

269

ssl: Union[SSLContext, bool] # SSL configuration

270

```

271

272

**Supported Subprotocols:**

273

- `graphql-ws` - Apollo GraphQL WebSocket protocol

274

- `graphql-transport-ws` - GraphQL WS protocol

275

276

#### AIOHTTPWebsocketsTransport

277

278

WebSocket transport using aiohttp's WebSocket client with additional configuration options.

279

280

```python { .api }

281

class AIOHTTPWebsocketsTransport(AsyncTransport):

282

def __init__(

283

self,

284

url: StrOrURL,

285

subprotocols: Optional[List[str]] = None,

286

heartbeat: Optional[float] = None,

287

auth: Optional[BasicAuth] = None,

288

origin: Optional[str] = None,

289

params: Optional[Mapping[str, str]] = None,

290

headers: Optional[LooseHeaders] = None,

291

proxy: Optional[StrOrURL] = None,

292

proxy_auth: Optional[BasicAuth] = None,

293

proxy_headers: Optional[LooseHeaders] = None,

294

ssl: Optional[Union[SSLContext, Literal[False], Fingerprint]] = None,

295

websocket_close_timeout: float = 10.0,

296

receive_timeout: Optional[float] = None,

297

ssl_close_timeout: Optional[Union[int, float]] = 10,

298

session: Optional[ClientSession] = None,

299

client_session_args: Optional[Dict[str, Any]] = None,

300

connect_args: Optional[Dict[str, Any]] = None,

301

# Plus all WebSocket protocol parameters

302

**kwargs

303

):

304

"""

305

Initialize aiohttp-based WebSocket transport.

306

307

Args:

308

url: WebSocket server URL

309

subprotocols: WebSocket subprotocols to negotiate

310

heartbeat: Low-level ping heartbeat interval

311

auth: Basic authentication for connection

312

origin: Origin header for WebSocket handshake

313

params: Query parameters for connection URL

314

headers: HTTP headers for handshake

315

proxy: Proxy server URL

316

proxy_auth: Proxy authentication

317

proxy_headers: Proxy-specific headers

318

ssl: SSL configuration

319

websocket_close_timeout: WebSocket close timeout

320

receive_timeout: Message receive timeout

321

ssl_close_timeout: SSL close timeout

322

session: Existing aiohttp ClientSession to use

323

client_session_args: ClientSession creation arguments

324

connect_args: WebSocket connection arguments

325

"""

326

```

327

328

#### PhoenixChannelWebsocketsTransport

329

330

Specialized transport for Phoenix Framework Absinthe GraphQL servers using Phoenix Channel protocol.

331

332

```python { .api }

333

class PhoenixChannelWebsocketsTransport(AsyncTransport):

334

def __init__(

335

self,

336

url: str,

337

channel_name: str = "__absinthe__:control",

338

heartbeat_interval: float = 30,

339

ack_timeout: Optional[Union[int, float]] = 10,

340

# Plus WebSocket connection parameters

341

**kwargs

342

):

343

"""

344

Initialize Phoenix Channel WebSocket transport.

345

346

Args:

347

url: Phoenix server URL

348

channel_name: Phoenix channel name for GraphQL operations

349

heartbeat_interval: Heartbeat interval in seconds

350

ack_timeout: Acknowledgment timeout for messages

351

"""

352

353

async def execute(self, request: GraphQLRequest) -> ExecutionResult: ...

354

355

def subscribe(

356

self,

357

request: GraphQLRequest,

358

send_stop: bool = True

359

) -> AsyncGenerator[ExecutionResult, None]: ...

360

```

361

362

**Phoenix-specific Features:**

363

- Automatic Phoenix Channel protocol handling (phx_join, phx_leave)

364

- Heartbeat messages to maintain connection

365

- Subscription management with subscriptionId tracking

366

367

#### AppSyncWebsocketsTransport

368

369

Specialized transport for AWS AppSync realtime subscriptions with AWS authentication support.

370

371

```python { .api }

372

class AppSyncWebsocketsTransport(AsyncTransport):

373

def __init__(

374

self,

375

url: str,

376

auth: Optional[AppSyncAuthentication] = None,

377

session: Optional[botocore.session.Session] = None,

378

ssl: Union[SSLContext, bool] = False,

379

connect_timeout: int = 10,

380

close_timeout: int = 10,

381

ack_timeout: int = 10,

382

keep_alive_timeout: Optional[Union[int, float]] = None,

383

connect_args: Dict[str, Any] = {}

384

):

385

"""

386

Initialize AWS AppSync WebSocket transport.

387

388

Args:

389

url: AppSync GraphQL endpoint URL (automatically converted to realtime endpoint)

390

auth: AWS authentication method (defaults to IAM)

391

session: Boto3 session for IAM authentication

392

ssl: SSL configuration

393

connect_timeout: Connection timeout

394

close_timeout: Close timeout

395

ack_timeout: Acknowledgment timeout

396

keep_alive_timeout: Keep-alive timeout

397

connect_args: WebSocket connection arguments

398

"""

399

400

def execute(self, request: GraphQLRequest) -> ExecutionResult:

401

"""Raises AssertionError - only subscriptions supported on realtime endpoint."""

402

403

def subscribe(

404

self,

405

request: GraphQLRequest,

406

send_stop: bool = True

407

) -> AsyncGenerator[ExecutionResult, None]:

408

"""Subscribe to AppSync realtime subscription."""

409

```

410

411

**AppSync-specific Features:**

412

- Only supports subscriptions (queries/mutations not allowed on realtime endpoint)

413

- Automatic URL conversion from GraphQL to realtime endpoint

414

- AWS signature-based authentication

415

- Supports multiple AWS auth methods

416

417

### Authentication Classes

418

419

Authentication implementations for AWS AppSync WebSocket connections.

420

421

```python { .api }

422

class AppSyncAuthentication:

423

"""Abstract base class for AppSync authentication methods."""

424

425

def get_auth_url(self, url: str) -> str:

426

"""Convert HTTP GraphQL URL to authenticated WebSocket URL."""

427

428

def get_headers(self, data=None, headers=None) -> Dict[str, Any]:

429

"""Get authentication headers for WebSocket connection."""

430

431

class AppSyncApiKeyAuthentication(AppSyncAuthentication):

432

def __init__(self, host: str, api_key: str):

433

"""

434

API key authentication for AppSync.

435

436

Args:

437

host: AppSync API host

438

api_key: AppSync API key

439

"""

440

441

class AppSyncJWTAuthentication(AppSyncAuthentication):

442

def __init__(self, host: str, jwt: str):

443

"""

444

JWT authentication for AppSync (Cognito User Pools, OIDC).

445

446

Args:

447

host: AppSync API host

448

jwt: JWT access token

449

"""

450

451

class AppSyncIAMAuthentication(AppSyncAuthentication):

452

def __init__(

453

self,

454

host: str,

455

region_name: Optional[str] = None,

456

signer: Optional[botocore.auth.BaseSigner] = None,

457

request_creator: Optional[Callable] = None,

458

credentials: Optional[botocore.credentials.Credentials] = None,

459

session: Optional[botocore.session.Session] = None

460

):

461

"""

462

IAM authentication for AppSync with SigV4 signing.

463

464

Args:

465

host: AppSync API host

466

region_name: AWS region (auto-detected if not provided)

467

signer: Custom botocore signer

468

request_creator: Custom request creator

469

credentials: AWS credentials

470

session: Boto3 session for credential resolution

471

"""

472

```

473

474

### Local Schema Transport

475

476

Execute GraphQL operations directly against local schemas without network communication.

477

478

```python { .api }

479

class LocalSchemaTransport(AsyncTransport):

480

def __init__(self, schema: GraphQLSchema):

481

"""

482

Initialize local schema transport.

483

484

Args:

485

schema: Local GraphQL schema object to execute against

486

"""

487

488

async def connect(self) -> None:

489

"""No-op connection (no network required)."""

490

491

async def execute(

492

self,

493

request: GraphQLRequest,

494

*args,

495

**kwargs

496

) -> ExecutionResult:

497

"""

498

Execute GraphQL request against local schema.

499

500

Args:

501

request: GraphQL request to execute

502

*args: Additional positional arguments

503

**kwargs: Additional keyword arguments

504

505

Returns:

506

ExecutionResult from local schema execution

507

"""

508

509

def subscribe(

510

self,

511

request: GraphQLRequest,

512

*args,

513

**kwargs

514

) -> AsyncGenerator[ExecutionResult, None]:

515

"""

516

Execute GraphQL subscription against local schema.

517

518

Args:

519

request: GraphQL subscription request

520

*args: Additional positional arguments

521

**kwargs: Additional keyword arguments

522

523

Yields:

524

ExecutionResult objects from subscription

525

"""

526

527

async def close(self) -> None:

528

"""No-op close (no connection to close)."""

529

```

530

531

### File Upload Support

532

533

Support for GraphQL multipart file uploads across HTTP transports.

534

535

```python { .api }

536

class FileVar:

537

def __init__(

538

self,

539

f: Any, # str | io.IOBase | aiohttp.StreamReader | AsyncGenerator

540

*,

541

filename: Optional[str] = None,

542

content_type: Optional[str] = None,

543

streaming: bool = False,

544

streaming_block_size: int = 64 * 1024

545

):

546

"""

547

File variable for GraphQL multipart uploads.

548

549

Args:

550

f: File object (path string, file handle, stream, or async generator)

551

filename: Override filename for upload

552

content_type: MIME content type

553

streaming: Enable streaming upload (requires transport support)

554

streaming_block_size: Chunk size for streaming uploads

555

"""

556

557

def open_file(self, transport_supports_streaming: bool = False) -> None:

558

"""

559

Open file for upload.

560

561

Args:

562

transport_supports_streaming: Whether transport supports streaming

563

"""

564

565

def close_file(self) -> None:

566

"""Close opened file handle."""

567

568

# File utility functions

569

def open_files(

570

filevars: List[FileVar],

571

transport_supports_streaming: bool = False

572

) -> None:

573

"""Open multiple FileVar objects for upload."""

574

575

def close_files(filevars: List[FileVar]) -> None:

576

"""Close multiple FileVar objects."""

577

578

def extract_files(

579

variables: Dict,

580

file_classes: Tuple[Type[Any], ...]

581

) -> Tuple[Dict, List[FileVar]]:

582

"""Extract FileVar objects from GraphQL variables."""

583

```

584

585

## Usage Examples

586

587

### HTTP Transport Configuration

588

589

```python

590

from gql import Client

591

from gql.transport.requests import RequestsHTTPTransport

592

from requests.auth import HTTPBasicAuth

593

594

# Basic HTTP transport

595

transport = RequestsHTTPTransport(

596

url="https://api.example.com/graphql",

597

headers={"User-Agent": "MyApp/1.0"},

598

timeout=30

599

)

600

601

# With authentication and retries

602

transport = RequestsHTTPTransport(

603

url="https://api.example.com/graphql",

604

auth=HTTPBasicAuth("username", "password"),

605

retries=3,

606

retry_backoff_factor=0.5,

607

retry_status_forcelist=[500, 502, 503, 504]

608

)

609

610

client = Client(transport=transport)

611

```

612

613

### File Upload Example

614

615

```python

616

from gql import gql, Client, FileVar

617

from gql.transport.aiohttp import AIOHTTPTransport

618

619

async def upload_file():

620

transport = AIOHTTPTransport(url="https://api.example.com/graphql")

621

client = Client(transport=transport)

622

623

# Create file variable

624

file_var = FileVar(

625

"document.pdf",

626

content_type="application/pdf"

627

)

628

629

# GraphQL mutation with file upload

630

mutation = gql('''

631

mutation UploadDocument($file: Upload!) {

632

uploadDocument(file: $file) {

633

id

634

filename

635

url

636

}

637

}

638

''')

639

640

async with client.connect_async() as session:

641

result = await session.execute(

642

mutation,

643

variable_values={"file": file_var},

644

upload_files=True

645

)

646

647

return result["uploadDocument"]

648

```

649

650

### WebSocket Subscription Setup

651

652

```python

653

import asyncio

654

from gql import gql, Client

655

from gql.transport.websockets import WebsocketsTransport

656

657

async def handle_messages():

658

# Configure WebSocket transport

659

transport = WebsocketsTransport(

660

url="wss://api.example.com/graphql",

661

headers={"Authorization": "Bearer token123"},

662

init_payload={"authToken": "token123"},

663

subprotocols=["graphql-ws"]

664

)

665

666

client = Client(transport=transport)

667

668

async with client.connect_async() as session:

669

subscription = gql('''

670

subscription MessageSubscription($channelId: ID!) {

671

messageAdded(channelId: $channelId) {

672

id

673

content

674

user {

675

name

676

}

677

timestamp

678

}

679

}

680

''')

681

682

async for result in session.subscribe(

683

subscription,

684

variable_values={"channelId": "general"}

685

):

686

if result.data:

687

message = result.data["messageAdded"]

688

print(f"[{message['timestamp']}] {message['user']['name']}: {message['content']}")

689

690

if result.errors:

691

print(f"Subscription error: {result.errors}")

692

693

asyncio.run(handle_messages())

694

```

695

696

### AWS AppSync Integration

697

698

```python

699

import asyncio

700

from gql import gql, Client

701

from gql.transport.appsync_websockets import AppSyncWebsocketsTransport

702

from gql.transport.appsync_auth import AppSyncIAMAuthentication

703

704

async def appsync_subscription():

705

# Configure AppSync authentication

706

auth = AppSyncIAMAuthentication(

707

host="example.appsync-api.us-east-1.amazonaws.com",

708

region_name="us-east-1"

709

)

710

711

# Create AppSync transport

712

transport = AppSyncWebsocketsTransport(

713

url="https://example.appsync-api.us-east-1.amazonaws.com/graphql",

714

auth=auth

715

)

716

717

client = Client(transport=transport)

718

719

async with client.connect_async() as session:

720

subscription = gql('''

721

subscription OnCommentAdded($postId: ID!) {

722

onCommentAdded(postId: $postId) {

723

id

724

content

725

author

726

createdAt

727

}

728

}

729

''')

730

731

async for result in session.subscribe(

732

subscription,

733

variable_values={"postId": "post-123"}

734

):

735

comment = result.data["onCommentAdded"]

736

print(f"New comment by {comment['author']}: {comment['content']}")

737

738

asyncio.run(appsync_subscription())

739

```

740

741

### Local Schema Testing

742

743

```python

744

from gql import gql, Client

745

from gql.transport.local_schema import LocalSchemaTransport

746

from graphql import build_schema

747

748

# Define schema

749

type_defs = '''

750

type Query {

751

hello(name: String): String

752

}

753

754

type Subscription {

755

counter: Int

756

}

757

'''

758

759

# Create executable schema with resolvers

760

schema = build_schema(type_defs)

761

762

# Add resolvers

763

def resolve_hello(root, info, name="World"):

764

return f"Hello, {name}!"

765

766

async def resolve_counter(root, info):

767

for i in range(10):

768

await asyncio.sleep(1)

769

yield {"counter": i}

770

771

schema.query_type.fields["hello"].resolve = resolve_hello

772

schema.subscription_type.fields["counter"].subscribe = resolve_counter

773

774

# Use local transport

775

transport = LocalSchemaTransport(schema)

776

client = Client(transport=transport)

777

778

async with client.connect_async() as session:

779

# Execute query

780

query = gql('{ hello(name: "Alice") }')

781

result = await session.execute(query)

782

print(result) # {'hello': 'Hello, Alice!'}

783

784

# Execute subscription

785

subscription = gql('subscription { counter }')

786

async for result in session.subscribe(subscription):

787

print(f"Counter: {result['counter']}")

788

```