0
# Arrow Flight RPC
1
2
High-performance RPC framework for distributed data services. Provides client-server architecture for streaming large datasets with authentication, metadata handling, custom middleware support, and efficient data transfer over networks.
3
4
## Capabilities
5
6
### Client Operations
7
8
Connect to Flight services and perform data operations with high-performance streaming.
9
10
```python { .api }
11
def connect(location, tls_certificates=None, cert_chain=None, private_key=None, auth_handler=None, call_options=None, tls_root_certs=None, tls_override_hostname=None, middleware=None, write_size_limit_bytes=None, disable_server_verification=False):
12
"""
13
Connect to Flight service.
14
15
Parameters:
16
- location: str or Location, service location
17
- tls_certificates: list, TLS certificates
18
- cert_chain: bytes, certificate chain for mTLS
19
- private_key: bytes, private key for mTLS
20
- auth_handler: ClientAuthHandler, authentication handler
21
- call_options: FlightCallOptions, default call options
22
- tls_root_certs: bytes, root certificates for TLS
23
- tls_override_hostname: str, override hostname for TLS
24
- middleware: list, client middleware
25
- write_size_limit_bytes: int, write size limit
26
- disable_server_verification: bool, disable server verification
27
28
Returns:
29
FlightClient: Connected Flight client
30
"""
31
32
class FlightClient:
33
"""
34
Flight client for connecting to Flight services.
35
"""
36
37
def authenticate(self, auth_handler, options=None):
38
"""
39
Authenticate with server.
40
41
Parameters:
42
- auth_handler: ClientAuthHandler, authentication handler
43
- options: FlightCallOptions, call options
44
"""
45
46
def list_flights(self, criteria=None, options=None):
47
"""
48
List available flights.
49
50
Parameters:
51
- criteria: bytes, listing criteria
52
- options: FlightCallOptions, call options
53
54
Returns:
55
iterator: Iterator of FlightInfo objects
56
"""
57
58
def get_flight_info(self, descriptor, options=None):
59
"""
60
Get flight information.
61
62
Parameters:
63
- descriptor: FlightDescriptor, flight descriptor
64
- options: FlightCallOptions, call options
65
66
Returns:
67
FlightInfo: Flight information
68
"""
69
70
def get_schema(self, descriptor, options=None):
71
"""
72
Get flight schema.
73
74
Parameters:
75
- descriptor: FlightDescriptor, flight descriptor
76
- options: FlightCallOptions, call options
77
78
Returns:
79
Schema: Flight schema
80
"""
81
82
def do_get(self, ticket, options=None):
83
"""
84
Retrieve data stream.
85
86
Parameters:
87
- ticket: Ticket, data ticket
88
- options: FlightCallOptions, call options
89
90
Returns:
91
FlightStreamReader: Data stream reader
92
"""
93
94
def do_put(self, descriptor, schema, options=None):
95
"""
96
Send data stream.
97
98
Parameters:
99
- descriptor: FlightDescriptor, flight descriptor
100
- schema: Schema, data schema
101
- options: FlightCallOptions, call options
102
103
Returns:
104
FlightStreamWriter: Data stream writer
105
"""
106
107
def do_exchange(self, descriptor, schema, options=None):
108
"""
109
Bidirectional data exchange.
110
111
Parameters:
112
- descriptor: FlightDescriptor, flight descriptor
113
- schema: Schema, data schema
114
- options: FlightCallOptions, call options
115
116
Returns:
117
FlightStreamWriter: Exchange stream writer
118
"""
119
120
def list_actions(self, options=None):
121
"""
122
List available actions.
123
124
Parameters:
125
- options: FlightCallOptions, call options
126
127
Returns:
128
iterator: Iterator of ActionType objects
129
"""
130
131
def do_action(self, action, options=None):
132
"""
133
Execute action.
134
135
Parameters:
136
- action: Action, action to execute
137
- options: FlightCallOptions, call options
138
139
Returns:
140
iterator: Iterator of Result objects
141
"""
142
143
def close(self):
144
"""Close client connection."""
145
146
class Location:
147
"""
148
Flight service location.
149
150
Attributes:
151
- uri: Location URI
152
"""
153
154
@classmethod
155
def for_grpc_tcp(cls, host, port):
156
"""Create TCP location."""
157
158
@classmethod
159
def for_grpc_tls(cls, host, port):
160
"""Create TLS location."""
161
162
@classmethod
163
def for_grpc_unix(cls, path):
164
"""Create Unix socket location."""
165
166
def __str__(self): ...
167
def __eq__(self, other): ...
168
```
169
170
### Server Implementation
171
172
Base classes and interfaces for implementing Flight servers.
173
174
```python { .api }
175
class FlightServerBase:
176
"""
177
Base class for implementing Flight servers.
178
"""
179
180
def list_flights(self, context, criteria):
181
"""
182
List available flights.
183
184
Parameters:
185
- context: ServerCallContext, call context
186
- criteria: bytes, listing criteria
187
188
Returns:
189
iterator: Iterator of FlightInfo objects
190
"""
191
raise NotImplementedError
192
193
def get_flight_info(self, context, descriptor):
194
"""
195
Get flight information.
196
197
Parameters:
198
- context: ServerCallContext, call context
199
- descriptor: FlightDescriptor, flight descriptor
200
201
Returns:
202
FlightInfo: Flight information
203
"""
204
raise NotImplementedError
205
206
def get_schema(self, context, descriptor):
207
"""
208
Get flight schema.
209
210
Parameters:
211
- context: ServerCallContext, call context
212
- descriptor: FlightDescriptor, flight descriptor
213
214
Returns:
215
SchemaResult: Schema result
216
"""
217
raise NotImplementedError
218
219
def do_get(self, context, ticket):
220
"""
221
Handle data retrieval.
222
223
Parameters:
224
- context: ServerCallContext, call context
225
- ticket: Ticket, data ticket
226
227
Returns:
228
FlightDataStream: Data stream
229
"""
230
raise NotImplementedError
231
232
def do_put(self, context, descriptor, reader, writer):
233
"""
234
Handle data upload.
235
236
Parameters:
237
- context: ServerCallContext, call context
238
- descriptor: FlightDescriptor, flight descriptor
239
- reader: FlightStreamReader, data stream reader
240
- writer: FlightMetadataWriter, metadata writer
241
"""
242
raise NotImplementedError
243
244
def do_exchange(self, context, descriptor, reader, writer):
245
"""
246
Handle bidirectional data exchange.
247
248
Parameters:
249
- context: ServerCallContext, call context
250
- descriptor: FlightDescriptor, flight descriptor
251
- reader: FlightStreamReader, data stream reader
252
- writer: FlightStreamWriter, data stream writer
253
"""
254
raise NotImplementedError
255
256
def list_actions(self, context):
257
"""
258
List available actions.
259
260
Parameters:
261
- context: ServerCallContext, call context
262
263
Returns:
264
iterator: Iterator of ActionType objects
265
"""
266
return []
267
268
def do_action(self, context, action):
269
"""
270
Execute action.
271
272
Parameters:
273
- context: ServerCallContext, call context
274
- action: Action, action to execute
275
276
Returns:
277
iterator: Iterator of Result objects
278
"""
279
raise NotImplementedError
280
281
class ServerCallContext:
282
"""
283
Server call context.
284
285
Attributes:
286
- peer_identity: Client identity
287
- peer: Client peer information
288
- method: Called method
289
"""
290
291
def is_cancelled(self):
292
"""Check if call is cancelled."""
293
294
def add_header(self, key, value):
295
"""Add response header."""
296
297
def add_trailer(self, key, value):
298
"""Add response trailer."""
299
```
300
301
### Data Streaming
302
303
Classes for handling data streams in Flight operations with efficient serialization.
304
305
```python { .api }
306
class FlightDataStream:
307
"""Base class for Flight data streams."""
308
309
def schema(self):
310
"""Get stream schema."""
311
raise NotImplementedError
312
313
def __iter__(self):
314
"""Iterate over stream chunks."""
315
raise NotImplementedError
316
317
class FlightStreamReader:
318
"""
319
Flight stream reader.
320
321
Attributes:
322
- schema: Stream schema
323
"""
324
325
def __iter__(self): ...
326
327
def read_next(self):
328
"""Read next chunk."""
329
330
def read_chunk(self):
331
"""Read chunk with metadata."""
332
333
def read_all(self):
334
"""Read all data as table."""
335
336
def read_pandas(self):
337
"""Read all data as pandas DataFrame."""
338
339
class FlightStreamWriter:
340
"""
341
Flight stream writer.
342
343
Attributes:
344
- schema: Stream schema
345
"""
346
347
def write_batch(self, batch):
348
"""Write record batch."""
349
350
def write_table(self, table, max_chunksize=None):
351
"""Write table."""
352
353
def write_with_metadata(self, batch, app_metadata=None):
354
"""Write batch with metadata."""
355
356
def done_writing(self):
357
"""Signal end of writing."""
358
359
def close(self):
360
"""Close writer."""
361
362
class FlightStreamChunk:
363
"""
364
Flight stream chunk.
365
366
Attributes:
367
- data: Record batch data
368
- app_metadata: Application metadata
369
"""
370
371
class RecordBatchStream(FlightDataStream):
372
"""Record batch-based Flight stream."""
373
374
def __init__(self, data_source): ...
375
376
class GeneratorStream(FlightDataStream):
377
"""Generator-based Flight stream."""
378
379
def __init__(self, schema, generator): ...
380
```
381
382
### Descriptors and Information
383
384
Flight descriptors and metadata for identifying and describing data streams.
385
386
```python { .api }
387
class FlightDescriptor:
388
"""
389
Flight descriptor for identifying data streams.
390
391
Attributes:
392
- descriptor_type: Descriptor type
393
- command: Command bytes (for COMMAND type)
394
- path: Path components (for PATH type)
395
"""
396
397
@classmethod
398
def for_command(cls, command):
399
"""
400
Create command descriptor.
401
402
Parameters:
403
- command: bytes, command data
404
405
Returns:
406
FlightDescriptor: Command descriptor
407
"""
408
409
@classmethod
410
def for_path(cls, *path):
411
"""
412
Create path descriptor.
413
414
Parameters:
415
- path: str components, path components
416
417
Returns:
418
FlightDescriptor: Path descriptor
419
"""
420
421
def __eq__(self, other): ...
422
def __hash__(self): ...
423
424
class DescriptorType:
425
"""Descriptor type enumeration."""
426
UNKNOWN = ...
427
PATH = ...
428
CMD = ...
429
430
class FlightInfo:
431
"""
432
Flight information.
433
434
Attributes:
435
- descriptor: Flight descriptor
436
- endpoints: List of flight endpoints
437
- total_records: Total number of records
438
- total_bytes: Total bytes
439
- schema: Flight schema
440
- ordered: Whether data is ordered
441
"""
442
443
@classmethod
444
def for_table(cls, table, descriptor, endpoints=None):
445
"""Create FlightInfo for table."""
446
447
class FlightEndpoint:
448
"""
449
Flight endpoint.
450
451
Attributes:
452
- ticket: Data ticket
453
- locations: List of locations
454
"""
455
456
def __eq__(self, other): ...
457
458
class Ticket:
459
"""
460
Flight ticket for data retrieval.
461
462
Attributes:
463
- ticket: Ticket bytes
464
"""
465
466
def __eq__(self, other): ...
467
468
class SchemaResult:
469
"""
470
Schema result.
471
472
Attributes:
473
- schema: Arrow schema
474
"""
475
```
476
477
### Authentication
478
479
Authentication handlers for client and server authentication.
480
481
```python { .api }
482
class BasicAuth:
483
"""
484
Basic username/password authentication.
485
"""
486
487
def __init__(self, username, password): ...
488
489
@property
490
def username(self): ...
491
492
@property
493
def password(self): ...
494
495
class ClientAuthHandler:
496
"""Client-side authentication handler."""
497
498
def authenticate(self, outgoing, incoming):
499
"""
500
Authenticate client.
501
502
Parameters:
503
- outgoing: outgoing metadata
504
- incoming: incoming metadata
505
"""
506
raise NotImplementedError
507
508
def get_token(self):
509
"""Get authentication token."""
510
return None
511
512
class ServerAuthHandler:
513
"""Server-side authentication handler."""
514
515
def authenticate(self, outgoing, incoming):
516
"""
517
Authenticate request.
518
519
Parameters:
520
- outgoing: outgoing metadata
521
- incoming: incoming metadata
522
523
Returns:
524
str: Client identity
525
"""
526
raise NotImplementedError
527
528
def is_valid(self, token):
529
"""
530
Validate authentication token.
531
532
Parameters:
533
- token: str, authentication token
534
535
Returns:
536
str: Client identity if valid
537
"""
538
raise NotImplementedError
539
```
540
541
### Middleware
542
543
Middleware system for intercepting and modifying Flight calls.
544
545
```python { .api }
546
class ClientMiddleware:
547
"""Client-side middleware interface."""
548
549
def sending_headers(self):
550
"""Called when sending headers."""
551
pass
552
553
def received_headers(self, headers):
554
"""Called when receiving headers."""
555
pass
556
557
def received_trailers(self, trailers):
558
"""Called when receiving trailers."""
559
pass
560
561
class ClientMiddlewareFactory:
562
"""Factory for client middleware."""
563
564
def start_call(self, info):
565
"""
566
Start middleware for call.
567
568
Parameters:
569
- info: CallInfo, call information
570
571
Returns:
572
ClientMiddleware: Middleware instance
573
"""
574
raise NotImplementedError
575
576
class ServerMiddleware:
577
"""Server-side middleware interface."""
578
579
def sending_headers(self):
580
"""Called when sending headers."""
581
pass
582
583
def call_completed(self, exception):
584
"""Called when call completes."""
585
pass
586
587
class ServerMiddlewareFactory:
588
"""Factory for server middleware."""
589
590
def start_call(self, info, headers):
591
"""
592
Start middleware for call.
593
594
Parameters:
595
- info: CallInfo, call information
596
- headers: dict, request headers
597
598
Returns:
599
ServerMiddleware: Middleware instance
600
"""
601
raise NotImplementedError
602
603
class TracingServerMiddlewareFactory(ServerMiddlewareFactory):
604
"""Built-in tracing middleware factory."""
605
606
class CallInfo:
607
"""
608
Call information.
609
610
Attributes:
611
- method: Flight method
612
"""
613
614
class FlightMethod:
615
"""Flight RPC method enumeration."""
616
LIST_FLIGHTS = ...
617
GET_FLIGHT_INFO = ...
618
GET_SCHEMA = ...
619
DO_GET = ...
620
DO_PUT = ...
621
DO_EXCHANGE = ...
622
LIST_ACTIONS = ...
623
DO_ACTION = ...
624
```
625
626
### Actions and Results
627
628
Custom actions and results for extending Flight functionality.
629
630
```python { .api }
631
class Action:
632
"""
633
Flight action request.
634
635
Attributes:
636
- type: Action type
637
- body: Action body bytes
638
"""
639
640
def __eq__(self, other): ...
641
642
class ActionType:
643
"""
644
Flight action type information.
645
646
Attributes:
647
- type: Action type string
648
- description: Action description
649
"""
650
651
def __eq__(self, other): ...
652
653
class Result:
654
"""
655
Flight action result.
656
657
Attributes:
658
- body: Result body bytes
659
"""
660
661
def __eq__(self, other): ...
662
```
663
664
### Metadata and Options
665
666
Configuration options and metadata handling for Flight operations.
667
668
```python { .api }
669
class FlightCallOptions:
670
"""
671
Options for Flight calls.
672
673
Attributes:
674
- headers: Request headers
675
- timeout: Call timeout
676
"""
677
678
def __init__(self, headers=None, timeout=None): ...
679
680
class FlightMetadataReader:
681
"""Flight metadata reader."""
682
683
def read(self):
684
"""Read metadata."""
685
686
class FlightMetadataWriter:
687
"""Flight metadata writer."""
688
689
def write(self, metadata):
690
"""Write metadata."""
691
692
class MetadataRecordBatchReader:
693
"""Record batch reader with metadata."""
694
695
class MetadataRecordBatchWriter:
696
"""Record batch writer with metadata."""
697
```
698
699
### Security
700
701
Security configuration including TLS certificates and encryption.
702
703
```python { .api }
704
class CertKeyPair:
705
"""
706
TLS certificate and key pair.
707
708
Attributes:
709
- cert: Certificate bytes
710
- key: Private key bytes
711
"""
712
713
def __init__(self, cert, key): ...
714
```
715
716
### Exceptions
717
718
Flight-specific exceptions for error handling.
719
720
```python { .api }
721
class FlightError(Exception):
722
"""Base Flight exception."""
723
724
class FlightInternalError(FlightError):
725
"""Internal Flight error."""
726
727
class FlightTimedOutError(FlightError):
728
"""Flight timeout error."""
729
730
class FlightCancelledError(FlightError):
731
"""Flight cancellation error."""
732
733
class FlightUnauthenticatedError(FlightError):
734
"""Authentication required error."""
735
736
class FlightUnauthorizedError(FlightError):
737
"""Authorization failed error."""
738
739
class FlightUnavailableError(FlightError):
740
"""Service unavailable error."""
741
742
class FlightServerError(FlightError):
743
"""Server-side error."""
744
745
class FlightWriteSizeExceededError(FlightError):
746
"""Write size limit exceeded error."""
747
```
748
749
## Usage Examples
750
751
### Basic Client Usage
752
753
```python
754
import pyarrow as pa
755
import pyarrow.flight as flight
756
757
# Connect to Flight server
758
client = flight.connect("grpc://localhost:8080")
759
760
# List available flights
761
for flight_info in client.list_flights():
762
print(f"Flight: {flight_info.descriptor}")
763
print(f" Records: {flight_info.total_records}")
764
print(f" Bytes: {flight_info.total_bytes}")
765
print(f" Schema: {flight_info.schema}")
766
767
# Get specific flight info
768
descriptor = flight.FlightDescriptor.for_path("dataset", "table1")
769
info = client.get_flight_info(descriptor)
770
print(f"Flight info: {info}")
771
772
# Get data
773
for endpoint in info.endpoints:
774
stream_reader = client.do_get(endpoint.ticket)
775
table = stream_reader.read_all()
776
print(f"Retrieved table: {len(table)} rows, {len(table.columns)} columns")
777
778
# Upload data
779
upload_descriptor = flight.FlightDescriptor.for_path("uploads", "new_data")
780
table_to_upload = pa.table({
781
'id': [1, 2, 3, 4, 5],
782
'value': [10.5, 20.3, 30.1, 40.7, 50.2]
783
})
784
785
writer, metadata_reader = client.do_put(upload_descriptor, table_to_upload.schema)
786
writer.write_table(table_to_upload)
787
writer.close()
788
789
# Execute action
790
action = flight.Action("list_tables", b"")
791
results = client.do_action(action)
792
for result in results:
793
print(f"Action result: {result.body}")
794
795
client.close()
796
```
797
798
### Server Implementation
799
800
```python
801
import pyarrow as pa
802
import pyarrow.flight as flight
803
import threading
804
805
class DataFlightServer(flight.FlightServerBase):
806
"""Example Flight server implementation."""
807
808
def __init__(self):
809
super().__init__()
810
self.data_store = {}
811
self.lock = threading.Lock()
812
813
# Initialize with sample data
814
self.data_store["dataset/sales"] = pa.table({
815
'date': ['2023-01-01', '2023-01-02', '2023-01-03'],
816
'amount': [100.0, 150.0, 200.0],
817
'region': ['North', 'South', 'East']
818
})
819
820
self.data_store["dataset/products"] = pa.table({
821
'id': [1, 2, 3],
822
'name': ['Widget A', 'Widget B', 'Widget C'],
823
'price': [10.99, 15.99, 20.99]
824
})
825
826
def list_flights(self, context, criteria):
827
"""List available flights."""
828
with self.lock:
829
for path, table in self.data_store.items():
830
descriptor = flight.FlightDescriptor.for_path(*path.split('/'))
831
endpoints = [flight.FlightEndpoint(
832
flight.Ticket(path.encode()),
833
["grpc://localhost:8080"]
834
)]
835
yield flight.FlightInfo.for_table(table, descriptor, endpoints)
836
837
def get_flight_info(self, context, descriptor):
838
"""Get flight information."""
839
path = '/'.join(descriptor.path)
840
841
with self.lock:
842
if path not in self.data_store:
843
raise flight.FlightUnavailableError(f"Unknown path: {path}")
844
845
table = self.data_store[path]
846
endpoints = [flight.FlightEndpoint(
847
flight.Ticket(path.encode()),
848
["grpc://localhost:8080"]
849
)]
850
return flight.FlightInfo.for_table(table, descriptor, endpoints)
851
852
def get_schema(self, context, descriptor):
853
"""Get flight schema."""
854
path = '/'.join(descriptor.path)
855
856
with self.lock:
857
if path not in self.data_store:
858
raise flight.FlightUnavailableError(f"Unknown path: {path}")
859
860
table = self.data_store[path]
861
return flight.SchemaResult(table.schema)
862
863
def do_get(self, context, ticket):
864
"""Retrieve data stream."""
865
path = ticket.ticket.decode()
866
867
with self.lock:
868
if path not in self.data_store:
869
raise flight.FlightUnavailableError(f"Unknown ticket: {path}")
870
871
table = self.data_store[path]
872
return flight.RecordBatchStream(table)
873
874
def do_put(self, context, descriptor, reader, writer):
875
"""Handle data upload."""
876
path = '/'.join(descriptor.path)
877
878
# Read all data
879
table = reader.read_all()
880
881
with self.lock:
882
self.data_store[path] = table
883
884
print(f"Stored table at {path}: {len(table)} rows")
885
886
def list_actions(self, context):
887
"""List available actions."""
888
return [
889
flight.ActionType("list_tables", "List all stored tables"),
890
flight.ActionType("get_stats", "Get server statistics")
891
]
892
893
def do_action(self, context, action):
894
"""Execute action."""
895
if action.type == "list_tables":
896
with self.lock:
897
tables = list(self.data_store.keys())
898
yield flight.Result('\n'.join(tables).encode())
899
900
elif action.type == "get_stats":
901
with self.lock:
902
stats = {
903
'table_count': len(self.data_store),
904
'total_rows': sum(len(table) for table in self.data_store.values())
905
}
906
yield flight.Result(str(stats).encode())
907
908
else:
909
raise flight.FlightUnavailableError(f"Unknown action: {action.type}")
910
911
# Run server
912
if __name__ == "__main__":
913
server = DataFlightServer()
914
location = flight.Location.for_grpc_tcp("localhost", 8080)
915
916
# Note: This is conceptual - actual server startup requires more setup
917
print(f"Starting server at {location}")
918
# server.serve(location) # Actual implementation would differ
919
```
920
921
### Authentication Example
922
923
```python
924
import pyarrow.flight as flight
925
926
class SimpleAuthHandler(flight.ServerAuthHandler):
927
"""Simple authentication handler."""
928
929
def __init__(self):
930
self.valid_tokens = {"user123": "secret456"}
931
932
def authenticate(self, outgoing, incoming):
933
"""Authenticate request."""
934
# Extract credentials from incoming headers
935
username = None
936
password = None
937
938
for header in incoming:
939
if header[0] == b'username':
940
username = header[1].decode()
941
elif header[0] == b'password':
942
password = header[1].decode()
943
944
if username in self.valid_tokens and self.valid_tokens[username] == password:
945
# Set authentication token
946
outgoing.append((b'auth-token', f'token-{username}'.encode()))
947
return username
948
else:
949
raise flight.FlightUnauthenticatedError("Invalid credentials")
950
951
def is_valid(self, token):
952
"""Validate authentication token."""
953
if token.startswith('token-'):
954
username = token[6:] # Remove 'token-' prefix
955
return username if username in self.valid_tokens else None
956
return None
957
958
class SimpleClientAuthHandler(flight.ClientAuthHandler):
959
"""Simple client authentication handler."""
960
961
def __init__(self, username, password):
962
self.username = username
963
self.password = password
964
self.token = None
965
966
def authenticate(self, outgoing, incoming):
967
"""Authenticate client."""
968
# Send credentials
969
outgoing.append((b'username', self.username.encode()))
970
outgoing.append((b'password', self.password.encode()))
971
972
# Get token from response
973
for header in incoming:
974
if header[0] == b'auth-token':
975
self.token = header[1].decode()
976
break
977
978
def get_token(self):
979
"""Get authentication token."""
980
return self.token
981
982
# Client usage with authentication
983
auth_handler = SimpleClientAuthHandler("user123", "secret456")
984
client = flight.connect("grpc://localhost:8080", auth_handler=auth_handler)
985
986
# Authenticate
987
client.authenticate(auth_handler)
988
989
# Now use authenticated client
990
flights = list(client.list_flights())
991
print(f"Found {len(flights)} flights")
992
993
client.close()
994
```
995
996
### Advanced Streaming
997
998
```python
999
import pyarrow as pa
1000
import pyarrow.flight as flight
1001
import time
1002
1003
class StreamingFlightServer(flight.FlightServerBase):
1004
"""Flight server with streaming data generation."""
1005
1006
def do_get(self, context, ticket):
1007
"""Generate streaming data."""
1008
path = ticket.ticket.decode()
1009
1010
if path == "streaming/numbers":
1011
return self.generate_number_stream()
1012
elif path == "streaming/time_series":
1013
return self.generate_time_series()
1014
else:
1015
raise flight.FlightUnavailableError(f"Unknown streaming path: {path}")
1016
1017
def generate_number_stream(self):
1018
"""Generate stream of random numbers."""
1019
schema = pa.schema([
1020
pa.field('id', pa.int64()),
1021
pa.field('random_value', pa.float64())
1022
])
1023
1024
def number_generator():
1025
import random
1026
batch_size = 1000
1027
1028
for batch_num in range(10): # 10 batches
1029
ids = list(range(batch_num * batch_size, (batch_num + 1) * batch_size))
1030
values = [random.random() for _ in range(batch_size)]
1031
1032
batch = pa.record_batch([ids, values], schema=schema)
1033
yield batch
1034
1035
# Simulate processing delay
1036
time.sleep(0.1)
1037
1038
return flight.GeneratorStream(schema, number_generator())
1039
1040
def generate_time_series(self):
1041
"""Generate time series data."""
1042
schema = pa.schema([
1043
pa.field('timestamp', pa.timestamp('s')),
1044
pa.field('sensor_id', pa.string()),
1045
pa.field('value', pa.float64())
1046
])
1047
1048
def time_series_generator():
1049
import random
1050
from datetime import datetime, timedelta
1051
1052
start_time = datetime.now()
1053
sensors = ['sensor_001', 'sensor_002', 'sensor_003']
1054
1055
for minute in range(60): # 1 hour of data
1056
current_time = start_time + timedelta(minutes=minute)
1057
1058
timestamps = [current_time] * len(sensors)
1059
sensor_ids = sensors
1060
values = [random.uniform(20.0, 30.0) for _ in sensors]
1061
1062
batch = pa.record_batch([timestamps, sensor_ids, values], schema=schema)
1063
yield batch
1064
1065
# Real-time simulation
1066
time.sleep(0.05)
1067
1068
return flight.GeneratorStream(schema, time_series_generator())
1069
1070
# Client streaming consumption
1071
client = flight.connect("grpc://localhost:8080")
1072
1073
# Stream processing
1074
descriptor = flight.FlightDescriptor.for_path("streaming", "numbers")
1075
info = client.get_flight_info(descriptor)
1076
1077
for endpoint in info.endpoints:
1078
reader = client.do_get(endpoint.ticket)
1079
1080
batch_count = 0
1081
total_rows = 0
1082
1083
for chunk in reader:
1084
batch = chunk.data
1085
batch_count += 1
1086
total_rows += len(batch)
1087
1088
print(f"Received batch {batch_count}: {len(batch)} rows")
1089
1090
# Process batch
1091
if len(batch) > 0:
1092
avg_value = pa.compute.mean(batch['random_value']).as_py()
1093
print(f" Average value: {avg_value:.4f}")
1094
1095
print(f"Total: {batch_count} batches, {total_rows} rows")
1096
1097
client.close()
1098
```
1099
1100
### Middleware and Monitoring
1101
1102
```python
1103
import pyarrow.flight as flight
1104
import time
1105
1106
class TimingClientMiddleware(flight.ClientMiddleware):
1107
"""Client middleware for timing requests."""
1108
1109
def __init__(self):
1110
self.start_time = None
1111
1112
def sending_headers(self):
1113
"""Record start time."""
1114
self.start_time = time.time()
1115
1116
def received_headers(self, headers):
1117
"""Log headers received."""
1118
print(f"Received headers: {dict(headers)}")
1119
1120
def received_trailers(self, trailers):
1121
"""Calculate and log timing."""
1122
if self.start_time:
1123
duration = time.time() - self.start_time
1124
print(f"Request completed in {duration:.3f} seconds")
1125
1126
class TimingClientMiddlewareFactory(flight.ClientMiddlewareFactory):
1127
"""Factory for timing middleware."""
1128
1129
def start_call(self, info):
1130
"""Create timing middleware for each call."""
1131
print(f"Starting call: {info.method}")
1132
return TimingClientMiddleware()
1133
1134
class LoggingServerMiddleware(flight.ServerMiddleware):
1135
"""Server middleware for logging requests."""
1136
1137
def __init__(self, call_info, headers):
1138
self.call_info = call_info
1139
self.headers = headers
1140
self.start_time = time.time()
1141
print(f"Request started: {call_info.method}")
1142
print(f"Headers: {dict(headers)}")
1143
1144
def call_completed(self, exception):
1145
"""Log call completion."""
1146
duration = time.time() - self.start_time
1147
if exception:
1148
print(f"Request failed after {duration:.3f}s: {exception}")
1149
else:
1150
print(f"Request completed in {duration:.3f}s")
1151
1152
class LoggingServerMiddlewareFactory(flight.ServerMiddlewareFactory):
1153
"""Factory for logging middleware."""
1154
1155
def start_call(self, info, headers):
1156
"""Create logging middleware for each call."""
1157
return LoggingServerMiddleware(info, headers)
1158
1159
# Client with middleware
1160
middleware = [TimingClientMiddlewareFactory()]
1161
client = flight.connect("grpc://localhost:8080", middleware=middleware)
1162
1163
# All requests will be timed
1164
flights = list(client.list_flights())
1165
print(f"Listed {len(flights)} flights")
1166
1167
client.close()
1168
```