or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced-features.mdarrow-flight.mdcompute-functions.mdcore-data-structures.mddata-types.mddataset-operations.mdfile-formats.mdindex.mdmemory-io.md

arrow-flight.mddocs/

0

# Arrow Flight RPC

1

2

High-performance RPC framework for distributed data services. Provides client-server architecture for streaming large datasets with authentication, metadata handling, custom middleware support, and efficient data transfer over networks.

3

4

## Capabilities

5

6

### Client Operations

7

8

Connect to Flight services and perform data operations with high-performance streaming.

9

10

```python { .api }

11

def connect(location, tls_certificates=None, cert_chain=None, private_key=None, auth_handler=None, call_options=None, tls_root_certs=None, tls_override_hostname=None, middleware=None, write_size_limit_bytes=None, disable_server_verification=False):

12

"""

13

Connect to Flight service.

14

15

Parameters:

16

- location: str or Location, service location

17

- tls_certificates: list, TLS certificates

18

- cert_chain: bytes, certificate chain for mTLS

19

- private_key: bytes, private key for mTLS

20

- auth_handler: ClientAuthHandler, authentication handler

21

- call_options: FlightCallOptions, default call options

22

- tls_root_certs: bytes, root certificates for TLS

23

- tls_override_hostname: str, override hostname for TLS

24

- middleware: list, client middleware

25

- write_size_limit_bytes: int, write size limit

26

- disable_server_verification: bool, disable server verification

27

28

Returns:

29

FlightClient: Connected Flight client

30

"""

31

32

class FlightClient:

33

"""

34

Flight client for connecting to Flight services.

35

"""

36

37

def authenticate(self, auth_handler, options=None):

38

"""

39

Authenticate with server.

40

41

Parameters:

42

- auth_handler: ClientAuthHandler, authentication handler

43

- options: FlightCallOptions, call options

44

"""

45

46

def list_flights(self, criteria=None, options=None):

47

"""

48

List available flights.

49

50

Parameters:

51

- criteria: bytes, listing criteria

52

- options: FlightCallOptions, call options

53

54

Returns:

55

iterator: Iterator of FlightInfo objects

56

"""

57

58

def get_flight_info(self, descriptor, options=None):

59

"""

60

Get flight information.

61

62

Parameters:

63

- descriptor: FlightDescriptor, flight descriptor

64

- options: FlightCallOptions, call options

65

66

Returns:

67

FlightInfo: Flight information

68

"""

69

70

def get_schema(self, descriptor, options=None):

71

"""

72

Get flight schema.

73

74

Parameters:

75

- descriptor: FlightDescriptor, flight descriptor

76

- options: FlightCallOptions, call options

77

78

Returns:

79

Schema: Flight schema

80

"""

81

82

def do_get(self, ticket, options=None):

83

"""

84

Retrieve data stream.

85

86

Parameters:

87

- ticket: Ticket, data ticket

88

- options: FlightCallOptions, call options

89

90

Returns:

91

FlightStreamReader: Data stream reader

92

"""

93

94

def do_put(self, descriptor, schema, options=None):

95

"""

96

Send data stream.

97

98

Parameters:

99

- descriptor: FlightDescriptor, flight descriptor

100

- schema: Schema, data schema

101

- options: FlightCallOptions, call options

102

103

Returns:

104

FlightStreamWriter: Data stream writer

105

"""

106

107

def do_exchange(self, descriptor, schema, options=None):

108

"""

109

Bidirectional data exchange.

110

111

Parameters:

112

- descriptor: FlightDescriptor, flight descriptor

113

- schema: Schema, data schema

114

- options: FlightCallOptions, call options

115

116

Returns:

117

FlightStreamWriter: Exchange stream writer

118

"""

119

120

def list_actions(self, options=None):

121

"""

122

List available actions.

123

124

Parameters:

125

- options: FlightCallOptions, call options

126

127

Returns:

128

iterator: Iterator of ActionType objects

129

"""

130

131

def do_action(self, action, options=None):

132

"""

133

Execute action.

134

135

Parameters:

136

- action: Action, action to execute

137

- options: FlightCallOptions, call options

138

139

Returns:

140

iterator: Iterator of Result objects

141

"""

142

143

def close(self):

144

"""Close client connection."""

145

146

class Location:

147

"""

148

Flight service location.

149

150

Attributes:

151

- uri: Location URI

152

"""

153

154

@classmethod

155

def for_grpc_tcp(cls, host, port):

156

"""Create TCP location."""

157

158

@classmethod

159

def for_grpc_tls(cls, host, port):

160

"""Create TLS location."""

161

162

@classmethod

163

def for_grpc_unix(cls, path):

164

"""Create Unix socket location."""

165

166

def __str__(self): ...

167

def __eq__(self, other): ...

168

```

169

170

### Server Implementation

171

172

Base classes and interfaces for implementing Flight servers.

173

174

```python { .api }

175

class FlightServerBase:

176

"""

177

Base class for implementing Flight servers.

178

"""

179

180

def list_flights(self, context, criteria):

181

"""

182

List available flights.

183

184

Parameters:

185

- context: ServerCallContext, call context

186

- criteria: bytes, listing criteria

187

188

Returns:

189

iterator: Iterator of FlightInfo objects

190

"""

191

raise NotImplementedError

192

193

def get_flight_info(self, context, descriptor):

194

"""

195

Get flight information.

196

197

Parameters:

198

- context: ServerCallContext, call context

199

- descriptor: FlightDescriptor, flight descriptor

200

201

Returns:

202

FlightInfo: Flight information

203

"""

204

raise NotImplementedError

205

206

def get_schema(self, context, descriptor):

207

"""

208

Get flight schema.

209

210

Parameters:

211

- context: ServerCallContext, call context

212

- descriptor: FlightDescriptor, flight descriptor

213

214

Returns:

215

SchemaResult: Schema result

216

"""

217

raise NotImplementedError

218

219

def do_get(self, context, ticket):

220

"""

221

Handle data retrieval.

222

223

Parameters:

224

- context: ServerCallContext, call context

225

- ticket: Ticket, data ticket

226

227

Returns:

228

FlightDataStream: Data stream

229

"""

230

raise NotImplementedError

231

232

def do_put(self, context, descriptor, reader, writer):

233

"""

234

Handle data upload.

235

236

Parameters:

237

- context: ServerCallContext, call context

238

- descriptor: FlightDescriptor, flight descriptor

239

- reader: FlightStreamReader, data stream reader

240

- writer: FlightMetadataWriter, metadata writer

241

"""

242

raise NotImplementedError

243

244

def do_exchange(self, context, descriptor, reader, writer):

245

"""

246

Handle bidirectional data exchange.

247

248

Parameters:

249

- context: ServerCallContext, call context

250

- descriptor: FlightDescriptor, flight descriptor

251

- reader: FlightStreamReader, data stream reader

252

- writer: FlightStreamWriter, data stream writer

253

"""

254

raise NotImplementedError

255

256

def list_actions(self, context):

257

"""

258

List available actions.

259

260

Parameters:

261

- context: ServerCallContext, call context

262

263

Returns:

264

iterator: Iterator of ActionType objects

265

"""

266

return []

267

268

def do_action(self, context, action):

269

"""

270

Execute action.

271

272

Parameters:

273

- context: ServerCallContext, call context

274

- action: Action, action to execute

275

276

Returns:

277

iterator: Iterator of Result objects

278

"""

279

raise NotImplementedError

280

281

class ServerCallContext:

282

"""

283

Server call context.

284

285

Attributes:

286

- peer_identity: Client identity

287

- peer: Client peer information

288

- method: Called method

289

"""

290

291

def is_cancelled(self):

292

"""Check if call is cancelled."""

293

294

def add_header(self, key, value):

295

"""Add response header."""

296

297

def add_trailer(self, key, value):

298

"""Add response trailer."""

299

```

300

301

### Data Streaming

302

303

Classes for handling data streams in Flight operations with efficient serialization.

304

305

```python { .api }

306

class FlightDataStream:

307

"""Base class for Flight data streams."""

308

309

def schema(self):

310

"""Get stream schema."""

311

raise NotImplementedError

312

313

def __iter__(self):

314

"""Iterate over stream chunks."""

315

raise NotImplementedError

316

317

class FlightStreamReader:

318

"""

319

Flight stream reader.

320

321

Attributes:

322

- schema: Stream schema

323

"""

324

325

def __iter__(self): ...

326

327

def read_next(self):

328

"""Read next chunk."""

329

330

def read_chunk(self):

331

"""Read chunk with metadata."""

332

333

def read_all(self):

334

"""Read all data as table."""

335

336

def read_pandas(self):

337

"""Read all data as pandas DataFrame."""

338

339

class FlightStreamWriter:

340

"""

341

Flight stream writer.

342

343

Attributes:

344

- schema: Stream schema

345

"""

346

347

def write_batch(self, batch):

348

"""Write record batch."""

349

350

def write_table(self, table, max_chunksize=None):

351

"""Write table."""

352

353

def write_with_metadata(self, batch, app_metadata=None):

354

"""Write batch with metadata."""

355

356

def done_writing(self):

357

"""Signal end of writing."""

358

359

def close(self):

360

"""Close writer."""

361

362

class FlightStreamChunk:

363

"""

364

Flight stream chunk.

365

366

Attributes:

367

- data: Record batch data

368

- app_metadata: Application metadata

369

"""

370

371

class RecordBatchStream(FlightDataStream):

372

"""Record batch-based Flight stream."""

373

374

def __init__(self, data_source): ...

375

376

class GeneratorStream(FlightDataStream):

377

"""Generator-based Flight stream."""

378

379

def __init__(self, schema, generator): ...

380

```

381

382

### Descriptors and Information

383

384

Flight descriptors and metadata for identifying and describing data streams.

385

386

```python { .api }

387

class FlightDescriptor:

388

"""

389

Flight descriptor for identifying data streams.

390

391

Attributes:

392

- descriptor_type: Descriptor type

393

- command: Command bytes (for COMMAND type)

394

- path: Path components (for PATH type)

395

"""

396

397

@classmethod

398

def for_command(cls, command):

399

"""

400

Create command descriptor.

401

402

Parameters:

403

- command: bytes, command data

404

405

Returns:

406

FlightDescriptor: Command descriptor

407

"""

408

409

@classmethod

410

def for_path(cls, *path):

411

"""

412

Create path descriptor.

413

414

Parameters:

415

- path: str components, path components

416

417

Returns:

418

FlightDescriptor: Path descriptor

419

"""

420

421

def __eq__(self, other): ...

422

def __hash__(self): ...

423

424

class DescriptorType:

425

"""Descriptor type enumeration."""

426

UNKNOWN = ...

427

PATH = ...

428

CMD = ...

429

430

class FlightInfo:

431

"""

432

Flight information.

433

434

Attributes:

435

- descriptor: Flight descriptor

436

- endpoints: List of flight endpoints

437

- total_records: Total number of records

438

- total_bytes: Total bytes

439

- schema: Flight schema

440

- ordered: Whether data is ordered

441

"""

442

443

@classmethod

444

def for_table(cls, table, descriptor, endpoints=None):

445

"""Create FlightInfo for table."""

446

447

class FlightEndpoint:

448

"""

449

Flight endpoint.

450

451

Attributes:

452

- ticket: Data ticket

453

- locations: List of locations

454

"""

455

456

def __eq__(self, other): ...

457

458

class Ticket:

459

"""

460

Flight ticket for data retrieval.

461

462

Attributes:

463

- ticket: Ticket bytes

464

"""

465

466

def __eq__(self, other): ...

467

468

class SchemaResult:

469

"""

470

Schema result.

471

472

Attributes:

473

- schema: Arrow schema

474

"""

475

```

476

477

### Authentication

478

479

Authentication handlers for client and server authentication.

480

481

```python { .api }

482

class BasicAuth:

483

"""

484

Basic username/password authentication.

485

"""

486

487

def __init__(self, username, password): ...

488

489

@property

490

def username(self): ...

491

492

@property

493

def password(self): ...

494

495

class ClientAuthHandler:

496

"""Client-side authentication handler."""

497

498

def authenticate(self, outgoing, incoming):

499

"""

500

Authenticate client.

501

502

Parameters:

503

- outgoing: outgoing metadata

504

- incoming: incoming metadata

505

"""

506

raise NotImplementedError

507

508

def get_token(self):

509

"""Get authentication token."""

510

return None

511

512

class ServerAuthHandler:

513

"""Server-side authentication handler."""

514

515

def authenticate(self, outgoing, incoming):

516

"""

517

Authenticate request.

518

519

Parameters:

520

- outgoing: outgoing metadata

521

- incoming: incoming metadata

522

523

Returns:

524

str: Client identity

525

"""

526

raise NotImplementedError

527

528

def is_valid(self, token):

529

"""

530

Validate authentication token.

531

532

Parameters:

533

- token: str, authentication token

534

535

Returns:

536

str: Client identity if valid

537

"""

538

raise NotImplementedError

539

```

540

541

### Middleware

542

543

Middleware system for intercepting and modifying Flight calls.

544

545

```python { .api }

546

class ClientMiddleware:

547

"""Client-side middleware interface."""

548

549

def sending_headers(self):

550

"""Called when sending headers."""

551

pass

552

553

def received_headers(self, headers):

554

"""Called when receiving headers."""

555

pass

556

557

def received_trailers(self, trailers):

558

"""Called when receiving trailers."""

559

pass

560

561

class ClientMiddlewareFactory:

562

"""Factory for client middleware."""

563

564

def start_call(self, info):

565

"""

566

Start middleware for call.

567

568

Parameters:

569

- info: CallInfo, call information

570

571

Returns:

572

ClientMiddleware: Middleware instance

573

"""

574

raise NotImplementedError

575

576

class ServerMiddleware:

577

"""Server-side middleware interface."""

578

579

def sending_headers(self):

580

"""Called when sending headers."""

581

pass

582

583

def call_completed(self, exception):

584

"""Called when call completes."""

585

pass

586

587

class ServerMiddlewareFactory:

588

"""Factory for server middleware."""

589

590

def start_call(self, info, headers):

591

"""

592

Start middleware for call.

593

594

Parameters:

595

- info: CallInfo, call information

596

- headers: dict, request headers

597

598

Returns:

599

ServerMiddleware: Middleware instance

600

"""

601

raise NotImplementedError

602

603

class TracingServerMiddlewareFactory(ServerMiddlewareFactory):

604

"""Built-in tracing middleware factory."""

605

606

class CallInfo:

607

"""

608

Call information.

609

610

Attributes:

611

- method: Flight method

612

"""

613

614

class FlightMethod:

615

"""Flight RPC method enumeration."""

616

LIST_FLIGHTS = ...

617

GET_FLIGHT_INFO = ...

618

GET_SCHEMA = ...

619

DO_GET = ...

620

DO_PUT = ...

621

DO_EXCHANGE = ...

622

LIST_ACTIONS = ...

623

DO_ACTION = ...

624

```

625

626

### Actions and Results

627

628

Custom actions and results for extending Flight functionality.

629

630

```python { .api }

631

class Action:

632

"""

633

Flight action request.

634

635

Attributes:

636

- type: Action type

637

- body: Action body bytes

638

"""

639

640

def __eq__(self, other): ...

641

642

class ActionType:

643

"""

644

Flight action type information.

645

646

Attributes:

647

- type: Action type string

648

- description: Action description

649

"""

650

651

def __eq__(self, other): ...

652

653

class Result:

654

"""

655

Flight action result.

656

657

Attributes:

658

- body: Result body bytes

659

"""

660

661

def __eq__(self, other): ...

662

```

663

664

### Metadata and Options

665

666

Configuration options and metadata handling for Flight operations.

667

668

```python { .api }

669

class FlightCallOptions:

670

"""

671

Options for Flight calls.

672

673

Attributes:

674

- headers: Request headers

675

- timeout: Call timeout

676

"""

677

678

def __init__(self, headers=None, timeout=None): ...

679

680

class FlightMetadataReader:

681

"""Flight metadata reader."""

682

683

def read(self):

684

"""Read metadata."""

685

686

class FlightMetadataWriter:

687

"""Flight metadata writer."""

688

689

def write(self, metadata):

690

"""Write metadata."""

691

692

class MetadataRecordBatchReader:

693

"""Record batch reader with metadata."""

694

695

class MetadataRecordBatchWriter:

696

"""Record batch writer with metadata."""

697

```

698

699

### Security

700

701

Security configuration including TLS certificates and encryption.

702

703

```python { .api }

704

class CertKeyPair:

705

"""

706

TLS certificate and key pair.

707

708

Attributes:

709

- cert: Certificate bytes

710

- key: Private key bytes

711

"""

712

713

def __init__(self, cert, key): ...

714

```

715

716

### Exceptions

717

718

Flight-specific exceptions for error handling.

719

720

```python { .api }

721

class FlightError(Exception):

722

"""Base Flight exception."""

723

724

class FlightInternalError(FlightError):

725

"""Internal Flight error."""

726

727

class FlightTimedOutError(FlightError):

728

"""Flight timeout error."""

729

730

class FlightCancelledError(FlightError):

731

"""Flight cancellation error."""

732

733

class FlightUnauthenticatedError(FlightError):

734

"""Authentication required error."""

735

736

class FlightUnauthorizedError(FlightError):

737

"""Authorization failed error."""

738

739

class FlightUnavailableError(FlightError):

740

"""Service unavailable error."""

741

742

class FlightServerError(FlightError):

743

"""Server-side error."""

744

745

class FlightWriteSizeExceededError(FlightError):

746

"""Write size limit exceeded error."""

747

```

748

749

## Usage Examples

750

751

### Basic Client Usage

752

753

```python

754

import pyarrow as pa

755

import pyarrow.flight as flight

756

757

# Connect to Flight server

758

client = flight.connect("grpc://localhost:8080")

759

760

# List available flights

761

for flight_info in client.list_flights():

762

print(f"Flight: {flight_info.descriptor}")

763

print(f" Records: {flight_info.total_records}")

764

print(f" Bytes: {flight_info.total_bytes}")

765

print(f" Schema: {flight_info.schema}")

766

767

# Get specific flight info

768

descriptor = flight.FlightDescriptor.for_path("dataset", "table1")

769

info = client.get_flight_info(descriptor)

770

print(f"Flight info: {info}")

771

772

# Get data

773

for endpoint in info.endpoints:

774

stream_reader = client.do_get(endpoint.ticket)

775

table = stream_reader.read_all()

776

print(f"Retrieved table: {len(table)} rows, {len(table.columns)} columns")

777

778

# Upload data

779

upload_descriptor = flight.FlightDescriptor.for_path("uploads", "new_data")

780

table_to_upload = pa.table({

781

'id': [1, 2, 3, 4, 5],

782

'value': [10.5, 20.3, 30.1, 40.7, 50.2]

783

})

784

785

writer, metadata_reader = client.do_put(upload_descriptor, table_to_upload.schema)

786

writer.write_table(table_to_upload)

787

writer.close()

788

789

# Execute action

790

action = flight.Action("list_tables", b"")

791

results = client.do_action(action)

792

for result in results:

793

print(f"Action result: {result.body}")

794

795

client.close()

796

```

797

798

### Server Implementation

799

800

```python

801

import pyarrow as pa

802

import pyarrow.flight as flight

803

import threading

804

805

class DataFlightServer(flight.FlightServerBase):

806

"""Example Flight server implementation."""

807

808

def __init__(self):

809

super().__init__()

810

self.data_store = {}

811

self.lock = threading.Lock()

812

813

# Initialize with sample data

814

self.data_store["dataset/sales"] = pa.table({

815

'date': ['2023-01-01', '2023-01-02', '2023-01-03'],

816

'amount': [100.0, 150.0, 200.0],

817

'region': ['North', 'South', 'East']

818

})

819

820

self.data_store["dataset/products"] = pa.table({

821

'id': [1, 2, 3],

822

'name': ['Widget A', 'Widget B', 'Widget C'],

823

'price': [10.99, 15.99, 20.99]

824

})

825

826

def list_flights(self, context, criteria):

827

"""List available flights."""

828

with self.lock:

829

for path, table in self.data_store.items():

830

descriptor = flight.FlightDescriptor.for_path(*path.split('/'))

831

endpoints = [flight.FlightEndpoint(

832

flight.Ticket(path.encode()),

833

["grpc://localhost:8080"]

834

)]

835

yield flight.FlightInfo.for_table(table, descriptor, endpoints)

836

837

def get_flight_info(self, context, descriptor):

838

"""Get flight information."""

839

path = '/'.join(descriptor.path)

840

841

with self.lock:

842

if path not in self.data_store:

843

raise flight.FlightUnavailableError(f"Unknown path: {path}")

844

845

table = self.data_store[path]

846

endpoints = [flight.FlightEndpoint(

847

flight.Ticket(path.encode()),

848

["grpc://localhost:8080"]

849

)]

850

return flight.FlightInfo.for_table(table, descriptor, endpoints)

851

852

def get_schema(self, context, descriptor):

853

"""Get flight schema."""

854

path = '/'.join(descriptor.path)

855

856

with self.lock:

857

if path not in self.data_store:

858

raise flight.FlightUnavailableError(f"Unknown path: {path}")

859

860

table = self.data_store[path]

861

return flight.SchemaResult(table.schema)

862

863

def do_get(self, context, ticket):

864

"""Retrieve data stream."""

865

path = ticket.ticket.decode()

866

867

with self.lock:

868

if path not in self.data_store:

869

raise flight.FlightUnavailableError(f"Unknown ticket: {path}")

870

871

table = self.data_store[path]

872

return flight.RecordBatchStream(table)

873

874

def do_put(self, context, descriptor, reader, writer):

875

"""Handle data upload."""

876

path = '/'.join(descriptor.path)

877

878

# Read all data

879

table = reader.read_all()

880

881

with self.lock:

882

self.data_store[path] = table

883

884

print(f"Stored table at {path}: {len(table)} rows")

885

886

def list_actions(self, context):

887

"""List available actions."""

888

return [

889

flight.ActionType("list_tables", "List all stored tables"),

890

flight.ActionType("get_stats", "Get server statistics")

891

]

892

893

def do_action(self, context, action):

894

"""Execute action."""

895

if action.type == "list_tables":

896

with self.lock:

897

tables = list(self.data_store.keys())

898

yield flight.Result('\n'.join(tables).encode())

899

900

elif action.type == "get_stats":

901

with self.lock:

902

stats = {

903

'table_count': len(self.data_store),

904

'total_rows': sum(len(table) for table in self.data_store.values())

905

}

906

yield flight.Result(str(stats).encode())

907

908

else:

909

raise flight.FlightUnavailableError(f"Unknown action: {action.type}")

910

911

# Run server

912

if __name__ == "__main__":

913

server = DataFlightServer()

914

location = flight.Location.for_grpc_tcp("localhost", 8080)

915

916

# Note: This is conceptual - actual server startup requires more setup

917

print(f"Starting server at {location}")

918

# server.serve(location) # Actual implementation would differ

919

```

920

921

### Authentication Example

922

923

```python

924

import pyarrow.flight as flight

925

926

class SimpleAuthHandler(flight.ServerAuthHandler):

927

"""Simple authentication handler."""

928

929

def __init__(self):

930

self.valid_tokens = {"user123": "secret456"}

931

932

def authenticate(self, outgoing, incoming):

933

"""Authenticate request."""

934

# Extract credentials from incoming headers

935

username = None

936

password = None

937

938

for header in incoming:

939

if header[0] == b'username':

940

username = header[1].decode()

941

elif header[0] == b'password':

942

password = header[1].decode()

943

944

if username in self.valid_tokens and self.valid_tokens[username] == password:

945

# Set authentication token

946

outgoing.append((b'auth-token', f'token-{username}'.encode()))

947

return username

948

else:

949

raise flight.FlightUnauthenticatedError("Invalid credentials")

950

951

def is_valid(self, token):

952

"""Validate authentication token."""

953

if token.startswith('token-'):

954

username = token[6:] # Remove 'token-' prefix

955

return username if username in self.valid_tokens else None

956

return None

957

958

class SimpleClientAuthHandler(flight.ClientAuthHandler):

959

"""Simple client authentication handler."""

960

961

def __init__(self, username, password):

962

self.username = username

963

self.password = password

964

self.token = None

965

966

def authenticate(self, outgoing, incoming):

967

"""Authenticate client."""

968

# Send credentials

969

outgoing.append((b'username', self.username.encode()))

970

outgoing.append((b'password', self.password.encode()))

971

972

# Get token from response

973

for header in incoming:

974

if header[0] == b'auth-token':

975

self.token = header[1].decode()

976

break

977

978

def get_token(self):

979

"""Get authentication token."""

980

return self.token

981

982

# Client usage with authentication

983

auth_handler = SimpleClientAuthHandler("user123", "secret456")

984

client = flight.connect("grpc://localhost:8080", auth_handler=auth_handler)

985

986

# Authenticate

987

client.authenticate(auth_handler)

988

989

# Now use authenticated client

990

flights = list(client.list_flights())

991

print(f"Found {len(flights)} flights")

992

993

client.close()

994

```

995

996

### Advanced Streaming

997

998

```python

999

import pyarrow as pa

1000

import pyarrow.flight as flight

1001

import time

1002

1003

class StreamingFlightServer(flight.FlightServerBase):

1004

"""Flight server with streaming data generation."""

1005

1006

def do_get(self, context, ticket):

1007

"""Generate streaming data."""

1008

path = ticket.ticket.decode()

1009

1010

if path == "streaming/numbers":

1011

return self.generate_number_stream()

1012

elif path == "streaming/time_series":

1013

return self.generate_time_series()

1014

else:

1015

raise flight.FlightUnavailableError(f"Unknown streaming path: {path}")

1016

1017

def generate_number_stream(self):

1018

"""Generate stream of random numbers."""

1019

schema = pa.schema([

1020

pa.field('id', pa.int64()),

1021

pa.field('random_value', pa.float64())

1022

])

1023

1024

def number_generator():

1025

import random

1026

batch_size = 1000

1027

1028

for batch_num in range(10): # 10 batches

1029

ids = list(range(batch_num * batch_size, (batch_num + 1) * batch_size))

1030

values = [random.random() for _ in range(batch_size)]

1031

1032

batch = pa.record_batch([ids, values], schema=schema)

1033

yield batch

1034

1035

# Simulate processing delay

1036

time.sleep(0.1)

1037

1038

return flight.GeneratorStream(schema, number_generator())

1039

1040

def generate_time_series(self):

1041

"""Generate time series data."""

1042

schema = pa.schema([

1043

pa.field('timestamp', pa.timestamp('s')),

1044

pa.field('sensor_id', pa.string()),

1045

pa.field('value', pa.float64())

1046

])

1047

1048

def time_series_generator():

1049

import random

1050

from datetime import datetime, timedelta

1051

1052

start_time = datetime.now()

1053

sensors = ['sensor_001', 'sensor_002', 'sensor_003']

1054

1055

for minute in range(60): # 1 hour of data

1056

current_time = start_time + timedelta(minutes=minute)

1057

1058

timestamps = [current_time] * len(sensors)

1059

sensor_ids = sensors

1060

values = [random.uniform(20.0, 30.0) for _ in sensors]

1061

1062

batch = pa.record_batch([timestamps, sensor_ids, values], schema=schema)

1063

yield batch

1064

1065

# Real-time simulation

1066

time.sleep(0.05)

1067

1068

return flight.GeneratorStream(schema, time_series_generator())

1069

1070

# Client streaming consumption

1071

client = flight.connect("grpc://localhost:8080")

1072

1073

# Stream processing

1074

descriptor = flight.FlightDescriptor.for_path("streaming", "numbers")

1075

info = client.get_flight_info(descriptor)

1076

1077

for endpoint in info.endpoints:

1078

reader = client.do_get(endpoint.ticket)

1079

1080

batch_count = 0

1081

total_rows = 0

1082

1083

for chunk in reader:

1084

batch = chunk.data

1085

batch_count += 1

1086

total_rows += len(batch)

1087

1088

print(f"Received batch {batch_count}: {len(batch)} rows")

1089

1090

# Process batch

1091

if len(batch) > 0:

1092

avg_value = pa.compute.mean(batch['random_value']).as_py()

1093

print(f" Average value: {avg_value:.4f}")

1094

1095

print(f"Total: {batch_count} batches, {total_rows} rows")

1096

1097

client.close()

1098

```

1099

1100

### Middleware and Monitoring

1101

1102

```python

1103

import pyarrow.flight as flight

1104

import time

1105

1106

class TimingClientMiddleware(flight.ClientMiddleware):

1107

"""Client middleware for timing requests."""

1108

1109

def __init__(self):

1110

self.start_time = None

1111

1112

def sending_headers(self):

1113

"""Record start time."""

1114

self.start_time = time.time()

1115

1116

def received_headers(self, headers):

1117

"""Log headers received."""

1118

print(f"Received headers: {dict(headers)}")

1119

1120

def received_trailers(self, trailers):

1121

"""Calculate and log timing."""

1122

if self.start_time:

1123

duration = time.time() - self.start_time

1124

print(f"Request completed in {duration:.3f} seconds")

1125

1126

class TimingClientMiddlewareFactory(flight.ClientMiddlewareFactory):

1127

"""Factory for timing middleware."""

1128

1129

def start_call(self, info):

1130

"""Create timing middleware for each call."""

1131

print(f"Starting call: {info.method}")

1132

return TimingClientMiddleware()

1133

1134

class LoggingServerMiddleware(flight.ServerMiddleware):

1135

"""Server middleware for logging requests."""

1136

1137

def __init__(self, call_info, headers):

1138

self.call_info = call_info

1139

self.headers = headers

1140

self.start_time = time.time()

1141

print(f"Request started: {call_info.method}")

1142

print(f"Headers: {dict(headers)}")

1143

1144

def call_completed(self, exception):

1145

"""Log call completion."""

1146

duration = time.time() - self.start_time

1147

if exception:

1148

print(f"Request failed after {duration:.3f}s: {exception}")

1149

else:

1150

print(f"Request completed in {duration:.3f}s")

1151

1152

class LoggingServerMiddlewareFactory(flight.ServerMiddlewareFactory):

1153

"""Factory for logging middleware."""

1154

1155

def start_call(self, info, headers):

1156

"""Create logging middleware for each call."""

1157

return LoggingServerMiddleware(info, headers)

1158

1159

# Client with middleware

1160

middleware = [TimingClientMiddlewareFactory()]

1161

client = flight.connect("grpc://localhost:8080", middleware=middleware)

1162

1163

# All requests will be timed

1164

flights = list(client.list_flights())

1165

print(f"Listed {len(flights)} flights")

1166

1167

client.close()

1168

```