or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli.mdclient-sessions.mddsl.mdindex.mdtransports.mdutilities.md

client-sessions.mddocs/

0

# Client and Sessions

1

2

Core client functionality for executing GraphQL operations, managing connections, and handling schemas. Provides both synchronous and asynchronous execution patterns with session management for connection pooling and batched operations.

3

4

## Capabilities

5

6

### Client Class

7

8

The main entrypoint for executing GraphQL requests on a transport. Provides both synchronous and asynchronous execution methods with automatic schema management and result parsing.

9

10

```python { .api }

11

class Client:

12

def __init__(

13

self,

14

transport: Optional[Union[Transport, AsyncTransport]] = None,

15

fetch_schema_from_transport: bool = False,

16

schema: Optional[GraphQLSchema] = None,

17

parse_results: bool = False,

18

introspection: Optional[Dict[str, Any]] = None,

19

type_def: Optional[str] = None,

20

execute_timeout: Optional[Union[int, float]] = 10

21

):

22

"""

23

Initialize GraphQL client.

24

25

Args:

26

transport: Transport to use for GraphQL operations

27

fetch_schema_from_transport: Automatically fetch schema via introspection

28

schema: Pre-built GraphQL schema

29

parse_results: Parse results using schema for custom scalars/enums

30

introspection: Pre-fetched introspection result

31

type_def: Schema definition string

32

execute_timeout: Default timeout for operations in seconds

33

"""

34

35

def execute(

36

self,

37

request: GraphQLRequest,

38

**kwargs

39

) -> ExecutionResult:

40

"""

41

Execute GraphQL request synchronously.

42

43

Args:

44

request: GraphQL request to execute

45

**kwargs: Additional arguments passed to transport

46

47

Returns:

48

ExecutionResult with data, errors, and extensions

49

50

Raises:

51

TransportError: If transport operation fails

52

GraphQLError: If GraphQL validation fails

53

"""

54

55

async def execute_async(

56

self,

57

request: GraphQLRequest,

58

**kwargs

59

) -> ExecutionResult:

60

"""

61

Execute GraphQL request asynchronously.

62

63

Args:

64

request: GraphQL request to execute

65

**kwargs: Additional arguments passed to transport

66

67

Returns:

68

ExecutionResult with data, errors, and extensions

69

"""

70

71

def session(self, **kwargs) -> SyncClientSession:

72

"""

73

Create synchronous session for batched operations.

74

75

Args:

76

**kwargs: Additional arguments for session configuration

77

78

Returns:

79

SyncClientSession context manager

80

"""

81

82

async def connect_async(self, **kwargs) -> AsyncClientSession:

83

"""

84

Create asynchronous session for batched operations.

85

86

Args:

87

**kwargs: Additional arguments for session configuration

88

89

Returns:

90

AsyncClientSession context manager

91

"""

92

93

def get_schema(self) -> GraphQLSchema:

94

"""

95

Get the GraphQL schema.

96

97

Returns:

98

GraphQLSchema object

99

100

Raises:

101

Exception: If no schema is available

102

"""

103

104

def close(self) -> None:

105

"""Close synchronous client and transport."""

106

107

async def close_async(self) -> None:

108

"""Close asynchronous client and transport."""

109

```

110

111

### GraphQL Request Creation

112

113

Create GraphQL requests from query strings with variable support and operation naming.

114

115

```python { .api }

116

def gql(request_string: str) -> GraphQLRequest:

117

"""

118

Parse GraphQL query string into GraphQLRequest object.

119

120

Args:

121

request_string: GraphQL query, mutation, or subscription as string

122

123

Returns:

124

GraphQLRequest object ready for execution

125

126

Raises:

127

GraphQLError: If query string has syntax errors

128

"""

129

130

class GraphQLRequest:

131

def __init__(

132

self,

133

request: Union[DocumentNode, "GraphQLRequest", str],

134

*,

135

variable_values: Optional[Dict[str, Any]] = None,

136

operation_name: Optional[str] = None

137

):

138

"""

139

Initialize GraphQL request.

140

141

Args:

142

request: GraphQL query as string, DocumentNode, or existing GraphQLRequest

143

variable_values: Variables for the GraphQL operation

144

operation_name: Operation name for multi-operation documents

145

146

Raises:

147

GraphQLError: If request has syntax errors

148

TypeError: If request type is invalid

149

"""

150

151

def serialize_variable_values(self, schema: GraphQLSchema) -> GraphQLRequest:

152

"""

153

Serialize variable values using schema type information.

154

155

Args:

156

schema: GraphQL schema for type information

157

158

Returns:

159

New GraphQLRequest with serialized variables

160

"""

161

162

@property

163

def payload(self) -> Dict[str, Any]:

164

"""

165

Get JSON payload representation of the request.

166

167

Returns:

168

Dictionary with 'query', 'variables', and 'operationName' keys

169

"""

170

171

# Properties

172

document: DocumentNode # Parsed GraphQL document

173

variable_values: Optional[Dict[str, Any]] # Query variables

174

operation_name: Optional[str] # Operation name

175

```

176

177

### Synchronous Sessions

178

179

Synchronous session management for batched operations and connection reuse.

180

181

```python { .api }

182

class SyncClientSession:

183

def execute(

184

self,

185

request: GraphQLRequest,

186

**kwargs

187

) -> ExecutionResult:

188

"""

189

Execute GraphQL request in session context.

190

191

Args:

192

request: GraphQL request to execute

193

**kwargs: Additional arguments for execution

194

195

Returns:

196

ExecutionResult with data, errors, and extensions

197

"""

198

199

def execute_batch(

200

self,

201

requests: List[GraphQLRequest],

202

**kwargs

203

) -> List[ExecutionResult]:

204

"""

205

Execute multiple GraphQL requests in a batch.

206

207

Args:

208

requests: List of GraphQL requests to execute

209

**kwargs: Additional arguments for batch execution

210

211

Returns:

212

List of ExecutionResult objects

213

"""

214

215

def close(self) -> None:

216

"""Close the session and transport connection."""

217

218

# Context manager support

219

def __enter__(self) -> SyncClientSession: ...

220

def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...

221

```

222

223

### Asynchronous Sessions

224

225

Asynchronous session management with subscription support and automatic reconnection capabilities.

226

227

```python { .api }

228

class AsyncClientSession:

229

async def execute(

230

self,

231

request: GraphQLRequest,

232

**kwargs

233

) -> ExecutionResult:

234

"""

235

Execute GraphQL request asynchronously in session context.

236

237

Args:

238

request: GraphQL request to execute

239

**kwargs: Additional arguments for execution

240

241

Returns:

242

ExecutionResult with data, errors, and extensions

243

"""

244

245

async def execute_batch(

246

self,

247

requests: List[GraphQLRequest],

248

**kwargs

249

) -> List[ExecutionResult]:

250

"""

251

Execute multiple GraphQL requests in a batch asynchronously.

252

253

Args:

254

requests: List of GraphQL requests to execute

255

**kwargs: Additional arguments for batch execution

256

257

Returns:

258

List of ExecutionResult objects

259

"""

260

261

def subscribe(

262

self,

263

request: GraphQLRequest,

264

**kwargs

265

) -> AsyncGenerator[ExecutionResult, None]:

266

"""

267

Subscribe to GraphQL subscription.

268

269

Args:

270

request: GraphQL subscription request

271

**kwargs: Additional arguments for subscription

272

273

Yields:

274

ExecutionResult objects as they arrive

275

276

Raises:

277

TransportError: If subscription setup fails

278

"""

279

280

async def close(self) -> None:

281

"""Close the async session and transport connection."""

282

283

# Async context manager support

284

async def __aenter__(self) -> AsyncClientSession: ...

285

async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: ...

286

287

class ReconnectingAsyncClientSession(AsyncClientSession):

288

"""

289

Async session with automatic reconnection on connection failures.

290

291

Inherits all methods from AsyncClientSession with added reconnection logic.

292

"""

293

```

294

295

## Usage Examples

296

297

### Basic Client Usage

298

299

```python

300

from gql import gql, Client

301

from gql.transport.requests import RequestsHTTPTransport

302

303

# Create client with automatic schema fetching

304

transport = RequestsHTTPTransport(url="https://api.example.com/graphql")

305

client = Client(transport=transport, fetch_schema_from_transport=True)

306

307

# Execute query

308

query = gql('''

309

query GetUsers($limit: Int!) {

310

users(limit: $limit) {

311

id

312

name

313

email

314

}

315

}

316

''')

317

318

result = client.execute(query, variable_values={"limit": 10})

319

users = result["users"]

320

321

# Close client when done

322

client.close()

323

```

324

325

### Session-based Batch Operations

326

327

```python

328

from gql import gql, Client

329

from gql.transport.requests import RequestsHTTPTransport

330

331

transport = RequestsHTTPTransport(url="https://api.example.com/graphql")

332

client = Client(transport=transport)

333

334

# Using synchronous session

335

with client.session() as session:

336

query1 = gql('{ user(id: "1") { name } }')

337

query2 = gql('{ user(id: "2") { name } }')

338

339

# Execute individually

340

result1 = session.execute(query1)

341

result2 = session.execute(query2)

342

343

# Or execute as batch

344

results = session.execute_batch([query1, query2])

345

```

346

347

### Asynchronous Operations

348

349

```python

350

import asyncio

351

from gql import gql, Client

352

from gql.transport.aiohttp import AIOHTTPTransport

353

354

async def main():

355

transport = AIOHTTPTransport(url="https://api.example.com/graphql")

356

client = Client(transport=transport)

357

358

# Using async context manager

359

async with client.connect_async() as session:

360

query = gql('{ hello }')

361

result = await session.execute(query)

362

print(result)

363

364

# Batch execution

365

queries = [gql('{ user1: user(id: "1") { name } }'),

366

gql('{ user2: user(id: "2") { name } }')]

367

results = await session.execute_batch(queries)

368

369

await client.close_async()

370

371

asyncio.run(main())

372

```

373

374

### Subscription Handling

375

376

```python

377

import asyncio

378

from gql import gql, Client

379

from gql.transport.websockets import WebsocketsTransport

380

381

async def handle_subscriptions():

382

transport = WebsocketsTransport(url="wss://api.example.com/graphql")

383

client = Client(transport=transport)

384

385

async with client.connect_async() as session:

386

subscription = gql('''

387

subscription {

388

messageAdded {

389

id

390

content

391

user {

392

name

393

}

394

}

395

}

396

''')

397

398

# Handle subscription messages

399

async for result in session.subscribe(subscription):

400

if result.data:

401

message = result.data["messageAdded"]

402

print(f"New message from {message['user']['name']}: {message['content']}")

403

404

if result.errors:

405

print(f"Subscription error: {result.errors}")

406

break

407

408

asyncio.run(handle_subscriptions())

409

```

410

411

### Custom Schema Handling

412

413

```python

414

from gql import gql, Client

415

from gql.transport.requests import RequestsHTTPTransport

416

from gql.utilities import update_schema_scalar

417

from graphql import GraphQLScalarType

418

419

# Custom scalar for datetime handling

420

datetime_scalar = GraphQLScalarType(

421

name="DateTime",

422

serialize=lambda dt: dt.isoformat(),

423

parse_value=lambda value: datetime.fromisoformat(value),

424

parse_literal=lambda ast: datetime.fromisoformat(ast.value)

425

)

426

427

transport = RequestsHTTPTransport(url="https://api.example.com/graphql")

428

client = Client(

429

transport=transport,

430

fetch_schema_from_transport=True,

431

parse_results=True # Enable result parsing with custom scalars

432

)

433

434

# Update schema with custom scalar

435

update_schema_scalar(client.schema, "DateTime", datetime_scalar)

436

437

# Queries will now automatically parse DateTime fields

438

query = gql('{ posts { title createdAt } }')

439

result = client.execute(query)

440

# result["posts"][0]["createdAt"] is now a datetime object

441

```