Python library for Apache Arrow columnar memory format and computing libraries
—
High-performance RPC framework for distributed data services. Provides client-server architecture for streaming large datasets with authentication, metadata handling, custom middleware support, and efficient data transfer over networks.
Connect to Flight services and perform data operations with high-performance streaming.
def connect(location, tls_certificates=None, cert_chain=None, private_key=None, auth_handler=None, call_options=None, tls_root_certs=None, tls_override_hostname=None, middleware=None, write_size_limit_bytes=None, disable_server_verification=False):
"""
Connect to Flight service.
Parameters:
- location: str or Location, service location
- tls_certificates: list, TLS certificates
- cert_chain: bytes, certificate chain for mTLS
- private_key: bytes, private key for mTLS
- auth_handler: ClientAuthHandler, authentication handler
- call_options: FlightCallOptions, default call options
- tls_root_certs: bytes, root certificates for TLS
- tls_override_hostname: str, override hostname for TLS
- middleware: list, client middleware
- write_size_limit_bytes: int, write size limit
- disable_server_verification: bool, disable server verification
Returns:
FlightClient: Connected Flight client
"""
class FlightClient:
"""
Flight client for connecting to Flight services.
"""
def authenticate(self, auth_handler, options=None):
"""
Authenticate with server.
Parameters:
- auth_handler: ClientAuthHandler, authentication handler
- options: FlightCallOptions, call options
"""
def list_flights(self, criteria=None, options=None):
"""
List available flights.
Parameters:
- criteria: bytes, listing criteria
- options: FlightCallOptions, call options
Returns:
iterator: Iterator of FlightInfo objects
"""
def get_flight_info(self, descriptor, options=None):
"""
Get flight information.
Parameters:
- descriptor: FlightDescriptor, flight descriptor
- options: FlightCallOptions, call options
Returns:
FlightInfo: Flight information
"""
def get_schema(self, descriptor, options=None):
"""
Get flight schema.
Parameters:
- descriptor: FlightDescriptor, flight descriptor
- options: FlightCallOptions, call options
Returns:
Schema: Flight schema
"""
def do_get(self, ticket, options=None):
"""
Retrieve data stream.
Parameters:
- ticket: Ticket, data ticket
- options: FlightCallOptions, call options
Returns:
FlightStreamReader: Data stream reader
"""
def do_put(self, descriptor, schema, options=None):
"""
Send data stream.
Parameters:
- descriptor: FlightDescriptor, flight descriptor
- schema: Schema, data schema
- options: FlightCallOptions, call options
Returns:
FlightStreamWriter: Data stream writer
"""
def do_exchange(self, descriptor, schema, options=None):
"""
Bidirectional data exchange.
Parameters:
- descriptor: FlightDescriptor, flight descriptor
- schema: Schema, data schema
- options: FlightCallOptions, call options
Returns:
FlightStreamWriter: Exchange stream writer
"""
def list_actions(self, options=None):
"""
List available actions.
Parameters:
- options: FlightCallOptions, call options
Returns:
iterator: Iterator of ActionType objects
"""
def do_action(self, action, options=None):
"""
Execute action.
Parameters:
- action: Action, action to execute
- options: FlightCallOptions, call options
Returns:
iterator: Iterator of Result objects
"""
def close(self):
"""Close client connection."""
class Location:
"""
Flight service location.
Attributes:
- uri: Location URI
"""
@classmethod
def for_grpc_tcp(cls, host, port):
"""Create TCP location."""
@classmethod
def for_grpc_tls(cls, host, port):
"""Create TLS location."""
@classmethod
def for_grpc_unix(cls, path):
"""Create Unix socket location."""
def __str__(self): ...
def __eq__(self, other): ...Base classes and interfaces for implementing Flight servers.
class FlightServerBase:
"""
Base class for implementing Flight servers.
"""
def list_flights(self, context, criteria):
"""
List available flights.
Parameters:
- context: ServerCallContext, call context
- criteria: bytes, listing criteria
Returns:
iterator: Iterator of FlightInfo objects
"""
raise NotImplementedError
def get_flight_info(self, context, descriptor):
"""
Get flight information.
Parameters:
- context: ServerCallContext, call context
- descriptor: FlightDescriptor, flight descriptor
Returns:
FlightInfo: Flight information
"""
raise NotImplementedError
def get_schema(self, context, descriptor):
"""
Get flight schema.
Parameters:
- context: ServerCallContext, call context
- descriptor: FlightDescriptor, flight descriptor
Returns:
SchemaResult: Schema result
"""
raise NotImplementedError
def do_get(self, context, ticket):
"""
Handle data retrieval.
Parameters:
- context: ServerCallContext, call context
- ticket: Ticket, data ticket
Returns:
FlightDataStream: Data stream
"""
raise NotImplementedError
def do_put(self, context, descriptor, reader, writer):
"""
Handle data upload.
Parameters:
- context: ServerCallContext, call context
- descriptor: FlightDescriptor, flight descriptor
- reader: FlightStreamReader, data stream reader
- writer: FlightMetadataWriter, metadata writer
"""
raise NotImplementedError
def do_exchange(self, context, descriptor, reader, writer):
"""
Handle bidirectional data exchange.
Parameters:
- context: ServerCallContext, call context
- descriptor: FlightDescriptor, flight descriptor
- reader: FlightStreamReader, data stream reader
- writer: FlightStreamWriter, data stream writer
"""
raise NotImplementedError
def list_actions(self, context):
"""
List available actions.
Parameters:
- context: ServerCallContext, call context
Returns:
iterator: Iterator of ActionType objects
"""
return []
def do_action(self, context, action):
"""
Execute action.
Parameters:
- context: ServerCallContext, call context
- action: Action, action to execute
Returns:
iterator: Iterator of Result objects
"""
raise NotImplementedError
class ServerCallContext:
"""
Server call context.
Attributes:
- peer_identity: Client identity
- peer: Client peer information
- method: Called method
"""
def is_cancelled(self):
"""Check if call is cancelled."""
def add_header(self, key, value):
"""Add response header."""
def add_trailer(self, key, value):
"""Add response trailer."""Classes for handling data streams in Flight operations with efficient serialization.
class FlightDataStream:
"""Base class for Flight data streams."""
def schema(self):
"""Get stream schema."""
raise NotImplementedError
def __iter__(self):
"""Iterate over stream chunks."""
raise NotImplementedError
class FlightStreamReader:
"""
Flight stream reader.
Attributes:
- schema: Stream schema
"""
def __iter__(self): ...
def read_next(self):
"""Read next chunk."""
def read_chunk(self):
"""Read chunk with metadata."""
def read_all(self):
"""Read all data as table."""
def read_pandas(self):
"""Read all data as pandas DataFrame."""
class FlightStreamWriter:
"""
Flight stream writer.
Attributes:
- schema: Stream schema
"""
def write_batch(self, batch):
"""Write record batch."""
def write_table(self, table, max_chunksize=None):
"""Write table."""
def write_with_metadata(self, batch, app_metadata=None):
"""Write batch with metadata."""
def done_writing(self):
"""Signal end of writing."""
def close(self):
"""Close writer."""
class FlightStreamChunk:
"""
Flight stream chunk.
Attributes:
- data: Record batch data
- app_metadata: Application metadata
"""
class RecordBatchStream(FlightDataStream):
"""Record batch-based Flight stream."""
def __init__(self, data_source): ...
class GeneratorStream(FlightDataStream):
"""Generator-based Flight stream."""
def __init__(self, schema, generator): ...Flight descriptors and metadata for identifying and describing data streams.
class FlightDescriptor:
"""
Flight descriptor for identifying data streams.
Attributes:
- descriptor_type: Descriptor type
- command: Command bytes (for COMMAND type)
- path: Path components (for PATH type)
"""
@classmethod
def for_command(cls, command):
"""
Create command descriptor.
Parameters:
- command: bytes, command data
Returns:
FlightDescriptor: Command descriptor
"""
@classmethod
def for_path(cls, *path):
"""
Create path descriptor.
Parameters:
- path: str components, path components
Returns:
FlightDescriptor: Path descriptor
"""
def __eq__(self, other): ...
def __hash__(self): ...
class DescriptorType:
"""Descriptor type enumeration."""
UNKNOWN = ...
PATH = ...
CMD = ...
class FlightInfo:
"""
Flight information.
Attributes:
- descriptor: Flight descriptor
- endpoints: List of flight endpoints
- total_records: Total number of records
- total_bytes: Total bytes
- schema: Flight schema
- ordered: Whether data is ordered
"""
@classmethod
def for_table(cls, table, descriptor, endpoints=None):
"""Create FlightInfo for table."""
class FlightEndpoint:
"""
Flight endpoint.
Attributes:
- ticket: Data ticket
- locations: List of locations
"""
def __eq__(self, other): ...
class Ticket:
"""
Flight ticket for data retrieval.
Attributes:
- ticket: Ticket bytes
"""
def __eq__(self, other): ...
class SchemaResult:
"""
Schema result.
Attributes:
- schema: Arrow schema
"""Authentication handlers for client and server authentication.
class BasicAuth:
"""
Basic username/password authentication.
"""
def __init__(self, username, password): ...
@property
def username(self): ...
@property
def password(self): ...
class ClientAuthHandler:
"""Client-side authentication handler."""
def authenticate(self, outgoing, incoming):
"""
Authenticate client.
Parameters:
- outgoing: outgoing metadata
- incoming: incoming metadata
"""
raise NotImplementedError
def get_token(self):
"""Get authentication token."""
return None
class ServerAuthHandler:
"""Server-side authentication handler."""
def authenticate(self, outgoing, incoming):
"""
Authenticate request.
Parameters:
- outgoing: outgoing metadata
- incoming: incoming metadata
Returns:
str: Client identity
"""
raise NotImplementedError
def is_valid(self, token):
"""
Validate authentication token.
Parameters:
- token: str, authentication token
Returns:
str: Client identity if valid
"""
raise NotImplementedErrorMiddleware system for intercepting and modifying Flight calls.
class ClientMiddleware:
"""Client-side middleware interface."""
def sending_headers(self):
"""Called when sending headers."""
pass
def received_headers(self, headers):
"""Called when receiving headers."""
pass
def received_trailers(self, trailers):
"""Called when receiving trailers."""
pass
class ClientMiddlewareFactory:
"""Factory for client middleware."""
def start_call(self, info):
"""
Start middleware for call.
Parameters:
- info: CallInfo, call information
Returns:
ClientMiddleware: Middleware instance
"""
raise NotImplementedError
class ServerMiddleware:
"""Server-side middleware interface."""
def sending_headers(self):
"""Called when sending headers."""
pass
def call_completed(self, exception):
"""Called when call completes."""
pass
class ServerMiddlewareFactory:
"""Factory for server middleware."""
def start_call(self, info, headers):
"""
Start middleware for call.
Parameters:
- info: CallInfo, call information
- headers: dict, request headers
Returns:
ServerMiddleware: Middleware instance
"""
raise NotImplementedError
class TracingServerMiddlewareFactory(ServerMiddlewareFactory):
"""Built-in tracing middleware factory."""
class CallInfo:
"""
Call information.
Attributes:
- method: Flight method
"""
class FlightMethod:
"""Flight RPC method enumeration."""
LIST_FLIGHTS = ...
GET_FLIGHT_INFO = ...
GET_SCHEMA = ...
DO_GET = ...
DO_PUT = ...
DO_EXCHANGE = ...
LIST_ACTIONS = ...
DO_ACTION = ...Custom actions and results for extending Flight functionality.
class Action:
"""
Flight action request.
Attributes:
- type: Action type
- body: Action body bytes
"""
def __eq__(self, other): ...
class ActionType:
"""
Flight action type information.
Attributes:
- type: Action type string
- description: Action description
"""
def __eq__(self, other): ...
class Result:
"""
Flight action result.
Attributes:
- body: Result body bytes
"""
def __eq__(self, other): ...Configuration options and metadata handling for Flight operations.
class FlightCallOptions:
"""
Options for Flight calls.
Attributes:
- headers: Request headers
- timeout: Call timeout
"""
def __init__(self, headers=None, timeout=None): ...
class FlightMetadataReader:
"""Flight metadata reader."""
def read(self):
"""Read metadata."""
class FlightMetadataWriter:
"""Flight metadata writer."""
def write(self, metadata):
"""Write metadata."""
class MetadataRecordBatchReader:
"""Record batch reader with metadata."""
class MetadataRecordBatchWriter:
"""Record batch writer with metadata."""Security configuration including TLS certificates and encryption.
class CertKeyPair:
"""
TLS certificate and key pair.
Attributes:
- cert: Certificate bytes
- key: Private key bytes
"""
def __init__(self, cert, key): ...Flight-specific exceptions for error handling.
class FlightError(Exception):
"""Base Flight exception."""
class FlightInternalError(FlightError):
"""Internal Flight error."""
class FlightTimedOutError(FlightError):
"""Flight timeout error."""
class FlightCancelledError(FlightError):
"""Flight cancellation error."""
class FlightUnauthenticatedError(FlightError):
"""Authentication required error."""
class FlightUnauthorizedError(FlightError):
"""Authorization failed error."""
class FlightUnavailableError(FlightError):
"""Service unavailable error."""
class FlightServerError(FlightError):
"""Server-side error."""
class FlightWriteSizeExceededError(FlightError):
"""Write size limit exceeded error."""import pyarrow as pa
import pyarrow.flight as flight
# Connect to Flight server
client = flight.connect("grpc://localhost:8080")
# List available flights
for flight_info in client.list_flights():
print(f"Flight: {flight_info.descriptor}")
print(f" Records: {flight_info.total_records}")
print(f" Bytes: {flight_info.total_bytes}")
print(f" Schema: {flight_info.schema}")
# Get specific flight info
descriptor = flight.FlightDescriptor.for_path("dataset", "table1")
info = client.get_flight_info(descriptor)
print(f"Flight info: {info}")
# Get data
for endpoint in info.endpoints:
stream_reader = client.do_get(endpoint.ticket)
table = stream_reader.read_all()
print(f"Retrieved table: {len(table)} rows, {len(table.columns)} columns")
# Upload data
upload_descriptor = flight.FlightDescriptor.for_path("uploads", "new_data")
table_to_upload = pa.table({
'id': [1, 2, 3, 4, 5],
'value': [10.5, 20.3, 30.1, 40.7, 50.2]
})
writer, metadata_reader = client.do_put(upload_descriptor, table_to_upload.schema)
writer.write_table(table_to_upload)
writer.close()
# Execute action
action = flight.Action("list_tables", b"")
results = client.do_action(action)
for result in results:
print(f"Action result: {result.body}")
client.close()import pyarrow as pa
import pyarrow.flight as flight
import threading
class DataFlightServer(flight.FlightServerBase):
"""Example Flight server implementation."""
def __init__(self):
super().__init__()
self.data_store = {}
self.lock = threading.Lock()
# Initialize with sample data
self.data_store["dataset/sales"] = pa.table({
'date': ['2023-01-01', '2023-01-02', '2023-01-03'],
'amount': [100.0, 150.0, 200.0],
'region': ['North', 'South', 'East']
})
self.data_store["dataset/products"] = pa.table({
'id': [1, 2, 3],
'name': ['Widget A', 'Widget B', 'Widget C'],
'price': [10.99, 15.99, 20.99]
})
def list_flights(self, context, criteria):
"""List available flights."""
with self.lock:
for path, table in self.data_store.items():
descriptor = flight.FlightDescriptor.for_path(*path.split('/'))
endpoints = [flight.FlightEndpoint(
flight.Ticket(path.encode()),
["grpc://localhost:8080"]
)]
yield flight.FlightInfo.for_table(table, descriptor, endpoints)
def get_flight_info(self, context, descriptor):
"""Get flight information."""
path = '/'.join(descriptor.path)
with self.lock:
if path not in self.data_store:
raise flight.FlightUnavailableError(f"Unknown path: {path}")
table = self.data_store[path]
endpoints = [flight.FlightEndpoint(
flight.Ticket(path.encode()),
["grpc://localhost:8080"]
)]
return flight.FlightInfo.for_table(table, descriptor, endpoints)
def get_schema(self, context, descriptor):
"""Get flight schema."""
path = '/'.join(descriptor.path)
with self.lock:
if path not in self.data_store:
raise flight.FlightUnavailableError(f"Unknown path: {path}")
table = self.data_store[path]
return flight.SchemaResult(table.schema)
def do_get(self, context, ticket):
"""Retrieve data stream."""
path = ticket.ticket.decode()
with self.lock:
if path not in self.data_store:
raise flight.FlightUnavailableError(f"Unknown ticket: {path}")
table = self.data_store[path]
return flight.RecordBatchStream(table)
def do_put(self, context, descriptor, reader, writer):
"""Handle data upload."""
path = '/'.join(descriptor.path)
# Read all data
table = reader.read_all()
with self.lock:
self.data_store[path] = table
print(f"Stored table at {path}: {len(table)} rows")
def list_actions(self, context):
"""List available actions."""
return [
flight.ActionType("list_tables", "List all stored tables"),
flight.ActionType("get_stats", "Get server statistics")
]
def do_action(self, context, action):
"""Execute action."""
if action.type == "list_tables":
with self.lock:
tables = list(self.data_store.keys())
yield flight.Result('\n'.join(tables).encode())
elif action.type == "get_stats":
with self.lock:
stats = {
'table_count': len(self.data_store),
'total_rows': sum(len(table) for table in self.data_store.values())
}
yield flight.Result(str(stats).encode())
else:
raise flight.FlightUnavailableError(f"Unknown action: {action.type}")
# Run server
if __name__ == "__main__":
server = DataFlightServer()
location = flight.Location.for_grpc_tcp("localhost", 8080)
# Note: This is conceptual - actual server startup requires more setup
print(f"Starting server at {location}")
# server.serve(location) # Actual implementation would differimport pyarrow.flight as flight
class SimpleAuthHandler(flight.ServerAuthHandler):
"""Simple authentication handler."""
def __init__(self):
self.valid_tokens = {"user123": "secret456"}
def authenticate(self, outgoing, incoming):
"""Authenticate request."""
# Extract credentials from incoming headers
username = None
password = None
for header in incoming:
if header[0] == b'username':
username = header[1].decode()
elif header[0] == b'password':
password = header[1].decode()
if username in self.valid_tokens and self.valid_tokens[username] == password:
# Set authentication token
outgoing.append((b'auth-token', f'token-{username}'.encode()))
return username
else:
raise flight.FlightUnauthenticatedError("Invalid credentials")
def is_valid(self, token):
"""Validate authentication token."""
if token.startswith('token-'):
username = token[6:] # Remove 'token-' prefix
return username if username in self.valid_tokens else None
return None
class SimpleClientAuthHandler(flight.ClientAuthHandler):
"""Simple client authentication handler."""
def __init__(self, username, password):
self.username = username
self.password = password
self.token = None
def authenticate(self, outgoing, incoming):
"""Authenticate client."""
# Send credentials
outgoing.append((b'username', self.username.encode()))
outgoing.append((b'password', self.password.encode()))
# Get token from response
for header in incoming:
if header[0] == b'auth-token':
self.token = header[1].decode()
break
def get_token(self):
"""Get authentication token."""
return self.token
# Client usage with authentication
auth_handler = SimpleClientAuthHandler("user123", "secret456")
client = flight.connect("grpc://localhost:8080", auth_handler=auth_handler)
# Authenticate
client.authenticate(auth_handler)
# Now use authenticated client
flights = list(client.list_flights())
print(f"Found {len(flights)} flights")
client.close()import pyarrow as pa
import pyarrow.flight as flight
import time
class StreamingFlightServer(flight.FlightServerBase):
"""Flight server with streaming data generation."""
def do_get(self, context, ticket):
"""Generate streaming data."""
path = ticket.ticket.decode()
if path == "streaming/numbers":
return self.generate_number_stream()
elif path == "streaming/time_series":
return self.generate_time_series()
else:
raise flight.FlightUnavailableError(f"Unknown streaming path: {path}")
def generate_number_stream(self):
"""Generate stream of random numbers."""
schema = pa.schema([
pa.field('id', pa.int64()),
pa.field('random_value', pa.float64())
])
def number_generator():
import random
batch_size = 1000
for batch_num in range(10): # 10 batches
ids = list(range(batch_num * batch_size, (batch_num + 1) * batch_size))
values = [random.random() for _ in range(batch_size)]
batch = pa.record_batch([ids, values], schema=schema)
yield batch
# Simulate processing delay
time.sleep(0.1)
return flight.GeneratorStream(schema, number_generator())
def generate_time_series(self):
"""Generate time series data."""
schema = pa.schema([
pa.field('timestamp', pa.timestamp('s')),
pa.field('sensor_id', pa.string()),
pa.field('value', pa.float64())
])
def time_series_generator():
import random
from datetime import datetime, timedelta
start_time = datetime.now()
sensors = ['sensor_001', 'sensor_002', 'sensor_003']
for minute in range(60): # 1 hour of data
current_time = start_time + timedelta(minutes=minute)
timestamps = [current_time] * len(sensors)
sensor_ids = sensors
values = [random.uniform(20.0, 30.0) for _ in sensors]
batch = pa.record_batch([timestamps, sensor_ids, values], schema=schema)
yield batch
# Real-time simulation
time.sleep(0.05)
return flight.GeneratorStream(schema, time_series_generator())
# Client streaming consumption
client = flight.connect("grpc://localhost:8080")
# Stream processing
descriptor = flight.FlightDescriptor.for_path("streaming", "numbers")
info = client.get_flight_info(descriptor)
for endpoint in info.endpoints:
reader = client.do_get(endpoint.ticket)
batch_count = 0
total_rows = 0
for chunk in reader:
batch = chunk.data
batch_count += 1
total_rows += len(batch)
print(f"Received batch {batch_count}: {len(batch)} rows")
# Process batch
if len(batch) > 0:
avg_value = pa.compute.mean(batch['random_value']).as_py()
print(f" Average value: {avg_value:.4f}")
print(f"Total: {batch_count} batches, {total_rows} rows")
client.close()import pyarrow.flight as flight
import time
class TimingClientMiddleware(flight.ClientMiddleware):
"""Client middleware for timing requests."""
def __init__(self):
self.start_time = None
def sending_headers(self):
"""Record start time."""
self.start_time = time.time()
def received_headers(self, headers):
"""Log headers received."""
print(f"Received headers: {dict(headers)}")
def received_trailers(self, trailers):
"""Calculate and log timing."""
if self.start_time:
duration = time.time() - self.start_time
print(f"Request completed in {duration:.3f} seconds")
class TimingClientMiddlewareFactory(flight.ClientMiddlewareFactory):
"""Factory for timing middleware."""
def start_call(self, info):
"""Create timing middleware for each call."""
print(f"Starting call: {info.method}")
return TimingClientMiddleware()
class LoggingServerMiddleware(flight.ServerMiddleware):
"""Server middleware for logging requests."""
def __init__(self, call_info, headers):
self.call_info = call_info
self.headers = headers
self.start_time = time.time()
print(f"Request started: {call_info.method}")
print(f"Headers: {dict(headers)}")
def call_completed(self, exception):
"""Log call completion."""
duration = time.time() - self.start_time
if exception:
print(f"Request failed after {duration:.3f}s: {exception}")
else:
print(f"Request completed in {duration:.3f}s")
class LoggingServerMiddlewareFactory(flight.ServerMiddlewareFactory):
"""Factory for logging middleware."""
def start_call(self, info, headers):
"""Create logging middleware for each call."""
return LoggingServerMiddleware(info, headers)
# Client with middleware
middleware = [TimingClientMiddlewareFactory()]
client = flight.connect("grpc://localhost:8080", middleware=middleware)
# All requests will be timed
flights = list(client.list_flights())
print(f"Listed {len(flights)} flights")
client.close()Install with Tessl CLI
npx tessl i tessl/pypi-pyarrow