or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication.mddb-api.mdindex.mdlow-level-client.mdsqlalchemy.md

low-level-client.mddocs/

0

# Low-Level Client

1

2

Direct HTTP protocol implementation providing fine-grained control over Trino communication. This interface is useful for advanced use cases requiring custom session management, request handling, or integration with existing HTTP infrastructure.

3

4

## Capabilities

5

6

### Client Session Management

7

8

Thread-safe session state management for Trino connections, handling user authentication, catalog/schema context, properties, headers, and transaction state.

9

10

```python { .api }

11

class ClientSession:

12

def __init__(

13

self,

14

user: str,

15

authorization_user: Optional[str] = None,

16

catalog: Optional[str] = None,

17

schema: Optional[str] = None,

18

source: Optional[str] = None,

19

properties: Optional[Dict[str, str]] = None,

20

headers: Optional[Dict[str, str]] = None,

21

transaction_id: Optional[str] = None,

22

extra_credential: Optional[List[Tuple[str, str]]] = None,

23

client_tags: Optional[List[str]] = None,

24

roles: Optional[Union[Dict[str, str], str]] = None,

25

timezone: Optional[str] = None,

26

encoding: Optional[Union[str, List[str]]] = None

27

)

28

29

@property

30

def user(self) -> str

31

"""Primary user for query execution."""

32

33

@property

34

def authorization_user(self) -> Optional[str]

35

"""User for authorization (different from query user for impersonation)."""

36

37

@authorization_user.setter

38

def authorization_user(self, authorization_user: Optional[str]) -> None

39

40

@property

41

def catalog(self) -> Optional[str]

42

"""Default catalog for queries."""

43

44

@catalog.setter

45

def catalog(self, catalog: Optional[str]) -> None

46

47

@property

48

def schema(self) -> Optional[str]

49

"""Default schema for queries."""

50

51

@schema.setter

52

def schema(self, schema: Optional[str]) -> None

53

54

@property

55

def source(self) -> Optional[str]

56

"""Query source identifier."""

57

58

@property

59

def properties(self) -> Dict[str, str]

60

"""Session properties dictionary."""

61

62

@properties.setter

63

def properties(self, properties: Dict[str, str]) -> None

64

65

@property

66

def headers(self) -> Dict[str, str]

67

"""Additional HTTP headers."""

68

69

@property

70

def transaction_id(self) -> Optional[str]

71

"""Current transaction ID."""

72

73

@transaction_id.setter

74

def transaction_id(self, transaction_id: Optional[str]) -> None

75

76

@property

77

def extra_credential(self) -> Optional[List[Tuple[str, str]]]

78

"""Extra credential key-value pairs."""

79

80

@property

81

def client_tags(self) -> List[str]

82

"""Client tags for query identification."""

83

84

@property

85

def roles(self) -> Dict[str, str]

86

"""Authorization roles per catalog."""

87

88

@roles.setter

89

def roles(self, roles: Dict[str, str]) -> None

90

91

@property

92

def prepared_statements(self) -> Dict[str, str]

93

"""Prepared statement name to SQL mapping."""

94

95

@prepared_statements.setter

96

def prepared_statements(self, prepared_statements: Dict[str, str]) -> None

97

98

@property

99

def timezone(self) -> str

100

"""Session timezone."""

101

102

@property

103

def encoding(self) -> Union[str, List[str]]

104

"""Spooled protocol encoding preferences."""

105

```

106

107

### HTTP Request Management

108

109

Low-level HTTP request handling with automatic retry logic, authentication integration, and comprehensive error handling.

110

111

```python { .api }

112

class TrinoRequest:

113

def __init__(

114

self,

115

host: str,

116

port: int,

117

client_session: ClientSession,

118

http_session: Optional[Session] = None,

119

http_scheme: Optional[str] = None,

120

auth: Optional[Authentication] = None,

121

max_attempts: int = 3,

122

request_timeout: Union[float, Tuple[float, float]] = 30.0,

123

handle_retry: _RetryWithExponentialBackoff = None,

124

verify: bool = True

125

)

126

127

def post(self, sql: str, additional_http_headers: Optional[Dict[str, Any]] = None) -> Response

128

"""

129

Submit SQL query to Trino coordinator.

130

131

Parameters:

132

- sql: SQL statement to execute

133

- additional_http_headers: Extra headers for this request

134

135

Returns:

136

HTTP response from coordinator

137

"""

138

139

def get(self, url: str) -> Response

140

"""

141

GET request to specified URL with session headers.

142

143

Parameters:

144

- url: Full URL to request

145

146

Returns:

147

HTTP response

148

"""

149

150

def delete(self, url: str) -> Response

151

"""

152

DELETE request to specified URL.

153

154

Parameters:

155

- url: Full URL to request

156

157

Returns:

158

HTTP response

159

"""

160

161

def process(self, http_response: Response) -> TrinoStatus

162

"""

163

Process HTTP response into TrinoStatus object.

164

165

Parameters:

166

- http_response: Raw HTTP response

167

168

Returns:

169

Parsed status information

170

"""

171

172

def unauthenticated(self) -> TrinoRequest

173

"""Create unauthenticated request instance for spooled segments."""

174

175

@property

176

def transaction_id(self) -> Optional[str]

177

"""Current transaction ID."""

178

179

@transaction_id.setter

180

def transaction_id(self, value: Optional[str]) -> None

181

182

@property

183

def http_headers(self) -> CaseInsensitiveDict[str]

184

"""Generated HTTP headers for requests."""

185

186

@property

187

def max_attempts(self) -> int

188

"""Maximum retry attempts."""

189

190

@max_attempts.setter

191

def max_attempts(self, value: int) -> None

192

193

@property

194

def statement_url(self) -> str

195

"""URL for statement submission."""

196

197

@property

198

def next_uri(self) -> Optional[str]

199

"""Next URI for query continuation."""

200

201

def get_url(self, path: str) -> str

202

"""Construct full URL for given path."""

203

204

@staticmethod

205

def raise_response_error(http_response: Response) -> None

206

"""Raise appropriate exception for HTTP error response."""

207

```

208

209

### Query Execution

210

211

High-level query execution with result streaming, status tracking, and cancellation support.

212

213

```python { .api }

214

class TrinoQuery:

215

def __init__(

216

self,

217

request: TrinoRequest,

218

query: str,

219

legacy_primitive_types: bool = False,

220

fetch_mode: Literal["mapped", "segments"] = "mapped"

221

)

222

223

def execute(self, additional_http_headers: Dict[str, Any] = None) -> TrinoResult

224

"""

225

Execute the query and return result iterator.

226

227

Parameters:

228

- additional_http_headers: Extra headers for initial request

229

230

Returns:

231

TrinoResult iterator for consuming rows

232

"""

233

234

def fetch(self) -> List[Union[List[Any]], Any]

235

"""Fetch next batch of results from the query."""

236

237

def cancel(self) -> None

238

"""Cancel query execution."""

239

240

@property

241

def query_id(self) -> Optional[str]

242

"""Unique query identifier assigned by Trino."""

243

244

@property

245

def query(self) -> Optional[str]

246

"""SQL query text."""

247

248

@property

249

def columns(self) -> Optional[List[Dict[str, Any]]]

250

"""Column metadata for query results."""

251

252

@property

253

def stats(self) -> Dict[str, Any]

254

"""Query execution statistics."""

255

256

@property

257

def update_type(self) -> Optional[str]

258

"""Type of update operation if applicable."""

259

260

@property

261

def update_count(self) -> Optional[int]

262

"""Number of rows affected by update operations."""

263

264

@property

265

def warnings(self) -> List[Dict[str, Any]]

266

"""Query execution warnings."""

267

268

@property

269

def result(self) -> Optional[TrinoResult]

270

"""Result iterator object."""

271

272

@property

273

def info_uri(self) -> Optional[str]

274

"""URI for detailed query information."""

275

276

@property

277

def finished(self) -> bool

278

"""Whether query execution is complete."""

279

280

@property

281

def cancelled(self) -> bool

282

"""Whether query was cancelled."""

283

```

284

285

### Result Iteration

286

287

Iterator over query results with row-by-row streaming and automatic result fetching.

288

289

```python { .api }

290

class TrinoResult:

291

def __init__(self, query: TrinoQuery, rows: List[Any])

292

293

@property

294

def rows(self) -> List[Any]

295

"""Current batch of rows."""

296

297

@rows.setter

298

def rows(self, rows: List[Any]) -> None

299

300

@property

301

def rownumber(self) -> int

302

"""Current row number (1-based)."""

303

304

def __iter__(self) -> Iterator[List[Any]]

305

"""Iterator interface for consuming all rows."""

306

```

307

308

### Query Status Information

309

310

Structured representation of query execution status with comprehensive metadata.

311

312

```python { .api }

313

@dataclass

314

class TrinoStatus:

315

id: str

316

stats: Dict[str, str]

317

warnings: List[Any]

318

info_uri: str

319

next_uri: Optional[str]

320

update_type: Optional[str]

321

update_count: Optional[int]

322

rows: Union[List[Any], Dict[str, Any]]

323

columns: List[Any]

324

```

325

326

### Spooled Protocol Support

327

328

Advanced segment-based result handling for high-throughput scenarios with compression and remote storage.

329

330

```python { .api }

331

class Segment:

332

"""Abstract base class for data segments."""

333

def __init__(self, segment: Dict[str, Any])

334

335

@property

336

def data(self) -> bytes

337

"""Raw segment data."""

338

339

@property

340

def metadata(self) -> Dict[str, Any]

341

"""Segment metadata."""

342

343

class InlineSegment(Segment):

344

"""Segment with base64-encoded inline data."""

345

def __init__(self, segment: Dict[str, Any])

346

347

@property

348

def data(self) -> bytes

349

"""Decoded inline data."""

350

351

class SpooledSegment(Segment):

352

"""Segment with remote data retrieval."""

353

def __init__(self, segment: Dict[str, Any], request: TrinoRequest)

354

355

@property

356

def data(self) -> bytes

357

"""Data retrieved from remote URI."""

358

359

@property

360

def uri(self) -> str

361

"""URI for data retrieval."""

362

363

@property

364

def ack_uri(self) -> str

365

"""URI for acknowledgment."""

366

367

@property

368

def headers(self) -> Dict[str, List[str]]

369

"""Headers for data retrieval."""

370

371

def acknowledge(self) -> None

372

"""Acknowledge segment processing."""

373

374

class DecodableSegment:

375

"""Segment with encoding information."""

376

def __init__(self, encoding: str, metadata: Dict[str, Any], segment: Segment)

377

378

@property

379

def encoding(self) -> str

380

"""Data encoding format."""

381

382

@property

383

def segment(self) -> Segment

384

"""Underlying segment."""

385

386

@property

387

def metadata(self) -> Dict[str, Any]

388

"""Segment metadata."""

389

```

390

391

### Segment Iteration

392

393

Iterator for processing segments with automatic decompression and row mapping.

394

395

```python { .api }

396

class SegmentIterator:

397

def __init__(self, segments: Union[DecodableSegment, List[DecodableSegment]], mapper: RowMapper)

398

399

def __iter__(self) -> Iterator[List[Any]]

400

"""Iterator over mapped rows from segments."""

401

402

def __next__(self) -> List[Any]

403

"""Next mapped row."""

404

```

405

406

## Usage Examples

407

408

### Basic Low-Level Query

409

410

```python

411

from trino.client import TrinoRequest, TrinoQuery, ClientSession

412

413

# Create session

414

session = ClientSession(

415

user="testuser",

416

catalog="memory",

417

schema="default"

418

)

419

420

# Create request handler

421

request = TrinoRequest(

422

host="localhost",

423

port=8080,

424

client_session=session

425

)

426

427

# Execute query

428

query = TrinoQuery(request, "SELECT * FROM users")

429

result = query.execute()

430

431

# Consume results

432

for row in result:

433

print(row)

434

```

435

436

### Session Property Management

437

438

```python

439

from trino.client import ClientSession

440

441

session = ClientSession(

442

user="testuser",

443

catalog="hive",

444

schema="default"

445

)

446

447

# Set session properties

448

session.properties = {

449

"query_max_run_time": "1h",

450

"join_distribution_type": "BROADCAST"

451

}

452

453

# Add client tags

454

session.client_tags = ["analytics", "daily-report"]

455

456

# Set roles

457

session.roles = {"hive": "admin", "system": "reader"}

458

```

459

460

### Custom HTTP Session

461

462

```python

463

import requests

464

from trino.client import TrinoRequest, ClientSession

465

466

# Create custom HTTP session

467

http_session = requests.Session()

468

http_session.cert = ('/path/to/client.cert', '/path/to/client.key')

469

470

session = ClientSession(user="testuser")

471

request = TrinoRequest(

472

host="secure.trino.example.com",

473

port=443,

474

client_session=session,

475

http_session=http_session,

476

http_scheme="https"

477

)

478

```

479

480

### Spooled Protocol Usage

481

482

```python

483

from trino.client import TrinoQuery, TrinoRequest, ClientSession

484

485

# Create session with spooled encoding

486

session = ClientSession(

487

user="testuser",

488

encoding=["json+zstd", "json+lz4", "json"]

489

)

490

491

request = TrinoRequest(host="localhost", port=8080, client_session=session)

492

493

# Query with segment fetch mode

494

query = TrinoQuery(request, "SELECT * FROM large_table", fetch_mode="segments")

495

result = query.execute()

496

497

# Process segments directly

498

for segment in result:

499

# segment is a DecodableSegment

500

print(f"Segment encoding: {segment.encoding}")

501

print(f"Segment metadata: {segment.metadata}")

502

```

503

504

### Query Cancellation

505

506

```python

507

import threading

508

from trino.client import TrinoQuery, TrinoRequest, ClientSession

509

510

session = ClientSession(user="testuser")

511

request = TrinoRequest(host="localhost", port=8080, client_session=session)

512

query = TrinoQuery(request, "SELECT * FROM very_large_table")

513

514

# Start query in background

515

def run_query():

516

result = query.execute()

517

for row in result:

518

if query.cancelled:

519

break

520

print(row)

521

522

query_thread = threading.Thread(target=run_query)

523

query_thread.start()

524

525

# Cancel after 10 seconds

526

threading.Timer(10.0, query.cancel).start()

527

```