CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pyarrow

Python library for Apache Arrow columnar memory format and computing libraries

Pending
Overview
Eval results
Files

arrow-flight.mddocs/

Arrow Flight RPC

High-performance RPC framework for distributed data services. Provides client-server architecture for streaming large datasets with authentication, metadata handling, custom middleware support, and efficient data transfer over networks.

Capabilities

Client Operations

Connect to Flight services and perform data operations with high-performance streaming.

def connect(location, tls_certificates=None, cert_chain=None, private_key=None, auth_handler=None, call_options=None, tls_root_certs=None, tls_override_hostname=None, middleware=None, write_size_limit_bytes=None, disable_server_verification=False):
    """
    Connect to Flight service.
    
    Parameters:
    - location: str or Location, service location
    - tls_certificates: list, TLS certificates
    - cert_chain: bytes, certificate chain for mTLS
    - private_key: bytes, private key for mTLS
    - auth_handler: ClientAuthHandler, authentication handler
    - call_options: FlightCallOptions, default call options
    - tls_root_certs: bytes, root certificates for TLS
    - tls_override_hostname: str, override hostname for TLS
    - middleware: list, client middleware
    - write_size_limit_bytes: int, write size limit
    - disable_server_verification: bool, disable server verification
    
    Returns:
    FlightClient: Connected Flight client
    """

class FlightClient:
    """
    Flight client for connecting to Flight services.
    """
    
    def authenticate(self, auth_handler, options=None):
        """
        Authenticate with server.
        
        Parameters:
        - auth_handler: ClientAuthHandler, authentication handler
        - options: FlightCallOptions, call options
        """
    
    def list_flights(self, criteria=None, options=None):
        """
        List available flights.
        
        Parameters:
        - criteria: bytes, listing criteria
        - options: FlightCallOptions, call options
        
        Returns:
        iterator: Iterator of FlightInfo objects
        """
    
    def get_flight_info(self, descriptor, options=None):
        """
        Get flight information.
        
        Parameters:
        - descriptor: FlightDescriptor, flight descriptor
        - options: FlightCallOptions, call options
        
        Returns:
        FlightInfo: Flight information
        """
    
    def get_schema(self, descriptor, options=None):
        """
        Get flight schema.
        
        Parameters:
        - descriptor: FlightDescriptor, flight descriptor
        - options: FlightCallOptions, call options
        
        Returns:
        Schema: Flight schema
        """
    
    def do_get(self, ticket, options=None):
        """
        Retrieve data stream.
        
        Parameters:
        - ticket: Ticket, data ticket
        - options: FlightCallOptions, call options
        
        Returns:
        FlightStreamReader: Data stream reader
        """
    
    def do_put(self, descriptor, schema, options=None):
        """
        Send data stream.
        
        Parameters:
        - descriptor: FlightDescriptor, flight descriptor
        - schema: Schema, data schema
        - options: FlightCallOptions, call options
        
        Returns:
        FlightStreamWriter: Data stream writer
        """
    
    def do_exchange(self, descriptor, schema, options=None):
        """
        Bidirectional data exchange.
        
        Parameters:
        - descriptor: FlightDescriptor, flight descriptor
        - schema: Schema, data schema
        - options: FlightCallOptions, call options
        
        Returns:
        FlightStreamWriter: Exchange stream writer
        """
    
    def list_actions(self, options=None):
        """
        List available actions.
        
        Parameters:
        - options: FlightCallOptions, call options
        
        Returns:
        iterator: Iterator of ActionType objects
        """
    
    def do_action(self, action, options=None):
        """
        Execute action.
        
        Parameters:
        - action: Action, action to execute
        - options: FlightCallOptions, call options
        
        Returns:
        iterator: Iterator of Result objects
        """
    
    def close(self):
        """Close client connection."""

class Location:
    """
    Flight service location.
    
    Attributes:
    - uri: Location URI
    """
    
    @classmethod
    def for_grpc_tcp(cls, host, port):
        """Create TCP location."""
    
    @classmethod
    def for_grpc_tls(cls, host, port):
        """Create TLS location."""
    
    @classmethod
    def for_grpc_unix(cls, path):
        """Create Unix socket location."""
    
    def __str__(self): ...
    def __eq__(self, other): ...

Server Implementation

Base classes and interfaces for implementing Flight servers.

class FlightServerBase:
    """
    Base class for implementing Flight servers.
    """
    
    def list_flights(self, context, criteria):
        """
        List available flights.
        
        Parameters:
        - context: ServerCallContext, call context
        - criteria: bytes, listing criteria
        
        Returns:
        iterator: Iterator of FlightInfo objects
        """
        raise NotImplementedError
    
    def get_flight_info(self, context, descriptor):
        """
        Get flight information.
        
        Parameters:
        - context: ServerCallContext, call context
        - descriptor: FlightDescriptor, flight descriptor
        
        Returns:
        FlightInfo: Flight information
        """
        raise NotImplementedError
    
    def get_schema(self, context, descriptor):
        """
        Get flight schema.
        
        Parameters:
        - context: ServerCallContext, call context
        - descriptor: FlightDescriptor, flight descriptor
        
        Returns:
        SchemaResult: Schema result
        """
        raise NotImplementedError
    
    def do_get(self, context, ticket):
        """
        Handle data retrieval.
        
        Parameters:
        - context: ServerCallContext, call context
        - ticket: Ticket, data ticket
        
        Returns:
        FlightDataStream: Data stream
        """
        raise NotImplementedError
    
    def do_put(self, context, descriptor, reader, writer):
        """
        Handle data upload.
        
        Parameters:
        - context: ServerCallContext, call context
        - descriptor: FlightDescriptor, flight descriptor
        - reader: FlightStreamReader, data stream reader
        - writer: FlightMetadataWriter, metadata writer
        """
        raise NotImplementedError
    
    def do_exchange(self, context, descriptor, reader, writer):
        """
        Handle bidirectional data exchange.
        
        Parameters:
        - context: ServerCallContext, call context
        - descriptor: FlightDescriptor, flight descriptor
        - reader: FlightStreamReader, data stream reader
        - writer: FlightStreamWriter, data stream writer
        """
        raise NotImplementedError
    
    def list_actions(self, context):
        """
        List available actions.
        
        Parameters:
        - context: ServerCallContext, call context
        
        Returns:
        iterator: Iterator of ActionType objects
        """
        return []
    
    def do_action(self, context, action):
        """
        Execute action.
        
        Parameters:
        - context: ServerCallContext, call context
        - action: Action, action to execute
        
        Returns:
        iterator: Iterator of Result objects
        """
        raise NotImplementedError

class ServerCallContext:
    """
    Server call context.
    
    Attributes:
    - peer_identity: Client identity
    - peer: Client peer information
    - method: Called method
    """
    
    def is_cancelled(self):
        """Check if call is cancelled."""
    
    def add_header(self, key, value):
        """Add response header."""
    
    def add_trailer(self, key, value):
        """Add response trailer."""

Data Streaming

Classes for handling data streams in Flight operations with efficient serialization.

class FlightDataStream:
    """Base class for Flight data streams."""
    
    def schema(self):
        """Get stream schema."""
        raise NotImplementedError
    
    def __iter__(self):
        """Iterate over stream chunks."""
        raise NotImplementedError

class FlightStreamReader:
    """
    Flight stream reader.
    
    Attributes:
    - schema: Stream schema
    """
    
    def __iter__(self): ...
    
    def read_next(self):
        """Read next chunk."""
    
    def read_chunk(self):
        """Read chunk with metadata."""
    
    def read_all(self):
        """Read all data as table."""
    
    def read_pandas(self):
        """Read all data as pandas DataFrame."""

class FlightStreamWriter:
    """
    Flight stream writer.
    
    Attributes:
    - schema: Stream schema
    """
    
    def write_batch(self, batch):
        """Write record batch."""
    
    def write_table(self, table, max_chunksize=None):
        """Write table."""
    
    def write_with_metadata(self, batch, app_metadata=None):
        """Write batch with metadata."""
    
    def done_writing(self):
        """Signal end of writing."""
    
    def close(self):
        """Close writer."""

class FlightStreamChunk:
    """
    Flight stream chunk.
    
    Attributes:
    - data: Record batch data
    - app_metadata: Application metadata
    """

class RecordBatchStream(FlightDataStream):
    """Record batch-based Flight stream."""
    
    def __init__(self, data_source): ...

class GeneratorStream(FlightDataStream):
    """Generator-based Flight stream."""
    
    def __init__(self, schema, generator): ...

Descriptors and Information

Flight descriptors and metadata for identifying and describing data streams.

class FlightDescriptor:
    """
    Flight descriptor for identifying data streams.
    
    Attributes:
    - descriptor_type: Descriptor type
    - command: Command bytes (for COMMAND type)
    - path: Path components (for PATH type)
    """
    
    @classmethod
    def for_command(cls, command):
        """
        Create command descriptor.
        
        Parameters:
        - command: bytes, command data
        
        Returns:
        FlightDescriptor: Command descriptor
        """
    
    @classmethod
    def for_path(cls, *path):
        """
        Create path descriptor.
        
        Parameters:
        - path: str components, path components
        
        Returns:
        FlightDescriptor: Path descriptor
        """
    
    def __eq__(self, other): ...
    def __hash__(self): ...

class DescriptorType:
    """Descriptor type enumeration."""
    UNKNOWN = ...
    PATH = ...
    CMD = ...

class FlightInfo:
    """
    Flight information.
    
    Attributes:
    - descriptor: Flight descriptor
    - endpoints: List of flight endpoints
    - total_records: Total number of records
    - total_bytes: Total bytes
    - schema: Flight schema
    - ordered: Whether data is ordered
    """
    
    @classmethod
    def for_table(cls, table, descriptor, endpoints=None):
        """Create FlightInfo for table."""

class FlightEndpoint:
    """
    Flight endpoint.
    
    Attributes:
    - ticket: Data ticket
    - locations: List of locations
    """
    
    def __eq__(self, other): ...

class Ticket:
    """
    Flight ticket for data retrieval.
    
    Attributes:
    - ticket: Ticket bytes
    """
    
    def __eq__(self, other): ...

class SchemaResult:
    """
    Schema result.
    
    Attributes:
    - schema: Arrow schema
    """

Authentication

Authentication handlers for client and server authentication.

class BasicAuth:
    """
    Basic username/password authentication.
    """
    
    def __init__(self, username, password): ...
    
    @property
    def username(self): ...
    
    @property 
    def password(self): ...

class ClientAuthHandler:
    """Client-side authentication handler."""
    
    def authenticate(self, outgoing, incoming):
        """
        Authenticate client.
        
        Parameters:
        - outgoing: outgoing metadata
        - incoming: incoming metadata
        """
        raise NotImplementedError
    
    def get_token(self):
        """Get authentication token."""
        return None

class ServerAuthHandler:
    """Server-side authentication handler."""
    
    def authenticate(self, outgoing, incoming):
        """
        Authenticate request.
        
        Parameters:
        - outgoing: outgoing metadata
        - incoming: incoming metadata
        
        Returns:
        str: Client identity
        """
        raise NotImplementedError
    
    def is_valid(self, token):
        """
        Validate authentication token.
        
        Parameters:
        - token: str, authentication token
        
        Returns:
        str: Client identity if valid
        """
        raise NotImplementedError

Middleware

Middleware system for intercepting and modifying Flight calls.

class ClientMiddleware:
    """Client-side middleware interface."""
    
    def sending_headers(self):
        """Called when sending headers."""
        pass
    
    def received_headers(self, headers):
        """Called when receiving headers."""
        pass
    
    def received_trailers(self, trailers):
        """Called when receiving trailers."""
        pass

class ClientMiddlewareFactory:
    """Factory for client middleware."""
    
    def start_call(self, info):
        """
        Start middleware for call.
        
        Parameters:
        - info: CallInfo, call information
        
        Returns:
        ClientMiddleware: Middleware instance
        """
        raise NotImplementedError

class ServerMiddleware:
    """Server-side middleware interface."""
    
    def sending_headers(self):
        """Called when sending headers."""
        pass
    
    def call_completed(self, exception):
        """Called when call completes."""
        pass

class ServerMiddlewareFactory:
    """Factory for server middleware."""
    
    def start_call(self, info, headers):
        """
        Start middleware for call.
        
        Parameters:
        - info: CallInfo, call information
        - headers: dict, request headers
        
        Returns:
        ServerMiddleware: Middleware instance
        """
        raise NotImplementedError

class TracingServerMiddlewareFactory(ServerMiddlewareFactory):
    """Built-in tracing middleware factory."""

class CallInfo:
    """
    Call information.
    
    Attributes:
    - method: Flight method
    """

class FlightMethod:
    """Flight RPC method enumeration."""
    LIST_FLIGHTS = ...
    GET_FLIGHT_INFO = ...
    GET_SCHEMA = ...
    DO_GET = ...
    DO_PUT = ...
    DO_EXCHANGE = ...
    LIST_ACTIONS = ...
    DO_ACTION = ...

Actions and Results

Custom actions and results for extending Flight functionality.

class Action:
    """
    Flight action request.
    
    Attributes:
    - type: Action type
    - body: Action body bytes
    """
    
    def __eq__(self, other): ...

class ActionType:
    """
    Flight action type information.
    
    Attributes:
    - type: Action type string
    - description: Action description
    """
    
    def __eq__(self, other): ...

class Result:
    """
    Flight action result.
    
    Attributes:
    - body: Result body bytes
    """
    
    def __eq__(self, other): ...

Metadata and Options

Configuration options and metadata handling for Flight operations.

class FlightCallOptions:
    """
    Options for Flight calls.
    
    Attributes:
    - headers: Request headers
    - timeout: Call timeout
    """
    
    def __init__(self, headers=None, timeout=None): ...

class FlightMetadataReader:
    """Flight metadata reader."""
    
    def read(self):
        """Read metadata."""

class FlightMetadataWriter:
    """Flight metadata writer."""
    
    def write(self, metadata):
        """Write metadata."""

class MetadataRecordBatchReader:
    """Record batch reader with metadata."""

class MetadataRecordBatchWriter:
    """Record batch writer with metadata."""

Security

Security configuration including TLS certificates and encryption.

class CertKeyPair:
    """
    TLS certificate and key pair.
    
    Attributes:
    - cert: Certificate bytes
    - key: Private key bytes
    """
    
    def __init__(self, cert, key): ...

Exceptions

Flight-specific exceptions for error handling.

class FlightError(Exception):
    """Base Flight exception."""

class FlightInternalError(FlightError):
    """Internal Flight error."""

class FlightTimedOutError(FlightError):
    """Flight timeout error."""

class FlightCancelledError(FlightError):
    """Flight cancellation error."""

class FlightUnauthenticatedError(FlightError):
    """Authentication required error."""

class FlightUnauthorizedError(FlightError):
    """Authorization failed error."""

class FlightUnavailableError(FlightError):
    """Service unavailable error."""

class FlightServerError(FlightError):
    """Server-side error."""

class FlightWriteSizeExceededError(FlightError):
    """Write size limit exceeded error."""

Usage Examples

Basic Client Usage

import pyarrow as pa
import pyarrow.flight as flight

# Connect to Flight server
client = flight.connect("grpc://localhost:8080")

# List available flights
for flight_info in client.list_flights():
    print(f"Flight: {flight_info.descriptor}")
    print(f"  Records: {flight_info.total_records}")
    print(f"  Bytes: {flight_info.total_bytes}")
    print(f"  Schema: {flight_info.schema}")

# Get specific flight info
descriptor = flight.FlightDescriptor.for_path("dataset", "table1")
info = client.get_flight_info(descriptor)
print(f"Flight info: {info}")

# Get data
for endpoint in info.endpoints:
    stream_reader = client.do_get(endpoint.ticket)
    table = stream_reader.read_all()
    print(f"Retrieved table: {len(table)} rows, {len(table.columns)} columns")

# Upload data
upload_descriptor = flight.FlightDescriptor.for_path("uploads", "new_data")
table_to_upload = pa.table({
    'id': [1, 2, 3, 4, 5],
    'value': [10.5, 20.3, 30.1, 40.7, 50.2]
})

writer, metadata_reader = client.do_put(upload_descriptor, table_to_upload.schema)
writer.write_table(table_to_upload)
writer.close()

# Execute action
action = flight.Action("list_tables", b"")
results = client.do_action(action)
for result in results:
    print(f"Action result: {result.body}")

client.close()

Server Implementation

import pyarrow as pa
import pyarrow.flight as flight
import threading

class DataFlightServer(flight.FlightServerBase):
    """Example Flight server implementation."""
    
    def __init__(self):
        super().__init__()
        self.data_store = {}
        self.lock = threading.Lock()
        
        # Initialize with sample data
        self.data_store["dataset/sales"] = pa.table({
            'date': ['2023-01-01', '2023-01-02', '2023-01-03'],
            'amount': [100.0, 150.0, 200.0],
            'region': ['North', 'South', 'East']
        })
        
        self.data_store["dataset/products"] = pa.table({
            'id': [1, 2, 3],
            'name': ['Widget A', 'Widget B', 'Widget C'],
            'price': [10.99, 15.99, 20.99]
        })
    
    def list_flights(self, context, criteria):
        """List available flights."""
        with self.lock:
            for path, table in self.data_store.items():
                descriptor = flight.FlightDescriptor.for_path(*path.split('/'))
                endpoints = [flight.FlightEndpoint(
                    flight.Ticket(path.encode()),
                    ["grpc://localhost:8080"]
                )]
                yield flight.FlightInfo.for_table(table, descriptor, endpoints)
    
    def get_flight_info(self, context, descriptor):
        """Get flight information."""
        path = '/'.join(descriptor.path)
        
        with self.lock:
            if path not in self.data_store:
                raise flight.FlightUnavailableError(f"Unknown path: {path}")
            
            table = self.data_store[path]
            endpoints = [flight.FlightEndpoint(
                flight.Ticket(path.encode()),
                ["grpc://localhost:8080"]
            )]
            return flight.FlightInfo.for_table(table, descriptor, endpoints)
    
    def get_schema(self, context, descriptor):
        """Get flight schema."""
        path = '/'.join(descriptor.path)
        
        with self.lock:
            if path not in self.data_store:
                raise flight.FlightUnavailableError(f"Unknown path: {path}")
            
            table = self.data_store[path]
            return flight.SchemaResult(table.schema)
    
    def do_get(self, context, ticket):
        """Retrieve data stream."""
        path = ticket.ticket.decode()
        
        with self.lock:
            if path not in self.data_store:
                raise flight.FlightUnavailableError(f"Unknown ticket: {path}")
            
            table = self.data_store[path]
            return flight.RecordBatchStream(table)
    
    def do_put(self, context, descriptor, reader, writer):
        """Handle data upload."""
        path = '/'.join(descriptor.path)
        
        # Read all data
        table = reader.read_all()
        
        with self.lock:
            self.data_store[path] = table
        
        print(f"Stored table at {path}: {len(table)} rows")
    
    def list_actions(self, context):
        """List available actions."""
        return [
            flight.ActionType("list_tables", "List all stored tables"),
            flight.ActionType("get_stats", "Get server statistics")
        ]
    
    def do_action(self, context, action):
        """Execute action."""
        if action.type == "list_tables":
            with self.lock:
                tables = list(self.data_store.keys())
            yield flight.Result('\n'.join(tables).encode())
        
        elif action.type == "get_stats":
            with self.lock:
                stats = {
                    'table_count': len(self.data_store),
                    'total_rows': sum(len(table) for table in self.data_store.values())
                }
            yield flight.Result(str(stats).encode())
        
        else:
            raise flight.FlightUnavailableError(f"Unknown action: {action.type}")

# Run server
if __name__ == "__main__":
    server = DataFlightServer()
    location = flight.Location.for_grpc_tcp("localhost", 8080)
    
    # Note: This is conceptual - actual server startup requires more setup
    print(f"Starting server at {location}")
    # server.serve(location)  # Actual implementation would differ

Authentication Example

import pyarrow.flight as flight

class SimpleAuthHandler(flight.ServerAuthHandler):
    """Simple authentication handler."""
    
    def __init__(self):
        self.valid_tokens = {"user123": "secret456"}
    
    def authenticate(self, outgoing, incoming):
        """Authenticate request."""
        # Extract credentials from incoming headers
        username = None
        password = None
        
        for header in incoming:
            if header[0] == b'username':
                username = header[1].decode()
            elif header[0] == b'password':
                password = header[1].decode()
        
        if username in self.valid_tokens and self.valid_tokens[username] == password:
            # Set authentication token
            outgoing.append((b'auth-token', f'token-{username}'.encode()))
            return username
        else:
            raise flight.FlightUnauthenticatedError("Invalid credentials")
    
    def is_valid(self, token):
        """Validate authentication token."""
        if token.startswith('token-'):
            username = token[6:]  # Remove 'token-' prefix
            return username if username in self.valid_tokens else None
        return None

class SimpleClientAuthHandler(flight.ClientAuthHandler):
    """Simple client authentication handler."""
    
    def __init__(self, username, password):
        self.username = username
        self.password = password
        self.token = None
    
    def authenticate(self, outgoing, incoming):
        """Authenticate client."""
        # Send credentials
        outgoing.append((b'username', self.username.encode()))
        outgoing.append((b'password', self.password.encode()))
        
        # Get token from response
        for header in incoming:
            if header[0] == b'auth-token':
                self.token = header[1].decode()
                break
    
    def get_token(self):
        """Get authentication token."""
        return self.token

# Client usage with authentication
auth_handler = SimpleClientAuthHandler("user123", "secret456")
client = flight.connect("grpc://localhost:8080", auth_handler=auth_handler)

# Authenticate
client.authenticate(auth_handler)

# Now use authenticated client
flights = list(client.list_flights())
print(f"Found {len(flights)} flights")

client.close()

Advanced Streaming

import pyarrow as pa
import pyarrow.flight as flight
import time

class StreamingFlightServer(flight.FlightServerBase):
    """Flight server with streaming data generation."""
    
    def do_get(self, context, ticket):
        """Generate streaming data."""
        path = ticket.ticket.decode()
        
        if path == "streaming/numbers":
            return self.generate_number_stream()
        elif path == "streaming/time_series":
            return self.generate_time_series()
        else:
            raise flight.FlightUnavailableError(f"Unknown streaming path: {path}")
    
    def generate_number_stream(self):
        """Generate stream of random numbers."""
        schema = pa.schema([
            pa.field('id', pa.int64()),
            pa.field('random_value', pa.float64())
        ])
        
        def number_generator():
            import random
            batch_size = 1000
            
            for batch_num in range(10):  # 10 batches
                ids = list(range(batch_num * batch_size, (batch_num + 1) * batch_size))
                values = [random.random() for _ in range(batch_size)]
                
                batch = pa.record_batch([ids, values], schema=schema)
                yield batch
                
                # Simulate processing delay
                time.sleep(0.1)
        
        return flight.GeneratorStream(schema, number_generator())
    
    def generate_time_series(self):
        """Generate time series data."""
        schema = pa.schema([
            pa.field('timestamp', pa.timestamp('s')),
            pa.field('sensor_id', pa.string()),
            pa.field('value', pa.float64())
        ])
        
        def time_series_generator():
            import random
            from datetime import datetime, timedelta
            
            start_time = datetime.now()
            sensors = ['sensor_001', 'sensor_002', 'sensor_003']
            
            for minute in range(60):  # 1 hour of data
                current_time = start_time + timedelta(minutes=minute)
                
                timestamps = [current_time] * len(sensors)
                sensor_ids = sensors
                values = [random.uniform(20.0, 30.0) for _ in sensors]
                
                batch = pa.record_batch([timestamps, sensor_ids, values], schema=schema)
                yield batch
                
                # Real-time simulation
                time.sleep(0.05)
        
        return flight.GeneratorStream(schema, time_series_generator())

# Client streaming consumption
client = flight.connect("grpc://localhost:8080")

# Stream processing
descriptor = flight.FlightDescriptor.for_path("streaming", "numbers")
info = client.get_flight_info(descriptor)

for endpoint in info.endpoints:
    reader = client.do_get(endpoint.ticket)
    
    batch_count = 0
    total_rows = 0
    
    for chunk in reader:
        batch = chunk.data
        batch_count += 1
        total_rows += len(batch)
        
        print(f"Received batch {batch_count}: {len(batch)} rows")
        
        # Process batch
        if len(batch) > 0:
            avg_value = pa.compute.mean(batch['random_value']).as_py()
            print(f"  Average value: {avg_value:.4f}")
    
    print(f"Total: {batch_count} batches, {total_rows} rows")

client.close()

Middleware and Monitoring

import pyarrow.flight as flight
import time

class TimingClientMiddleware(flight.ClientMiddleware):
    """Client middleware for timing requests."""
    
    def __init__(self):
        self.start_time = None
    
    def sending_headers(self):
        """Record start time."""
        self.start_time = time.time()
    
    def received_headers(self, headers):
        """Log headers received."""
        print(f"Received headers: {dict(headers)}")
    
    def received_trailers(self, trailers):
        """Calculate and log timing."""
        if self.start_time:
            duration = time.time() - self.start_time
            print(f"Request completed in {duration:.3f} seconds")

class TimingClientMiddlewareFactory(flight.ClientMiddlewareFactory):
    """Factory for timing middleware."""
    
    def start_call(self, info):
        """Create timing middleware for each call."""
        print(f"Starting call: {info.method}")
        return TimingClientMiddleware()

class LoggingServerMiddleware(flight.ServerMiddleware):
    """Server middleware for logging requests."""
    
    def __init__(self, call_info, headers):
        self.call_info = call_info
        self.headers = headers
        self.start_time = time.time()
        print(f"Request started: {call_info.method}")
        print(f"Headers: {dict(headers)}")
    
    def call_completed(self, exception):
        """Log call completion."""
        duration = time.time() - self.start_time
        if exception:
            print(f"Request failed after {duration:.3f}s: {exception}")
        else:
            print(f"Request completed in {duration:.3f}s")

class LoggingServerMiddlewareFactory(flight.ServerMiddlewareFactory):
    """Factory for logging middleware."""
    
    def start_call(self, info, headers):
        """Create logging middleware for each call."""
        return LoggingServerMiddleware(info, headers)

# Client with middleware
middleware = [TimingClientMiddlewareFactory()]
client = flight.connect("grpc://localhost:8080", middleware=middleware)

# All requests will be timed
flights = list(client.list_flights())
print(f"Listed {len(flights)} flights")

client.close()

Install with Tessl CLI

npx tessl i tessl/pypi-pyarrow

docs

advanced-features.md

arrow-flight.md

compute-functions.md

core-data-structures.md

data-types.md

dataset-operations.md

file-formats.md

index.md

memory-io.md

tile.json