0
# Data Reading and Conversion
1
2
Converting Delta tables to various formats including pandas DataFrames, PyArrow tables, and streaming readers for efficient data processing and integration with the Python data ecosystem.
3
4
## Imports
5
6
```python
7
from deltalake import DeltaTable
8
from typing import Callable, Any
9
import pandas as pd
10
import pyarrow
11
import pyarrow.fs as pa_fs
12
from pyarrow.dataset import Expression
13
from arro3.core import RecordBatch, RecordBatchReader
14
15
# Type aliases for filtering
16
FilterLiteralType = tuple[str, str, Any]
17
FilterConjunctionType = list[FilterLiteralType]
18
FilterDNFType = list[FilterConjunctionType]
19
FilterType = FilterConjunctionType | FilterDNFType
20
```
21
22
## Capabilities
23
24
### Pandas Integration
25
26
```python { .api }
27
def to_pandas(
28
self,
29
partitions: list[tuple[str, str, Any]] | None = None,
30
columns: list[str] | None = None,
31
filesystem: str | pa_fs.FileSystem | None = None,
32
filters: FilterType | Expression | None = None,
33
types_mapper: Callable[[pyarrow.DataType], Any] | None = None
34
) -> pd.DataFrame: ...
35
```
36
37
Convert Delta table to pandas DataFrame with optional column selection and filtering.
38
39
**Parameters:**
40
- `partitions`: Partition-level filters for efficient data access
41
- `columns`: Specific columns to include in the result
42
- `filesystem`: Custom filesystem for reading files (string path or PyArrow FileSystem)
43
- `filters`: Row-level filters in DNF (Disjunctive Normal Form) or PyArrow Expression
44
- `types_mapper`: Optional function to map PyArrow data types to pandas types
45
46
### PyArrow Integration
47
48
```python { .api }
49
def to_pyarrow_table(
50
self,
51
partitions: list[tuple[str, str, Any]] | None = None,
52
columns: list[str] | None = None,
53
filesystem: str | pa_fs.FileSystem | None = None,
54
filters: FilterType | Expression | None = None
55
) -> pyarrow.Table: ...
56
57
def to_pyarrow_dataset(
58
self,
59
partitions: list[tuple[str, str, Any]] | None = None,
60
filesystem: Any | None = None
61
) -> pyarrow.dataset.Dataset: ...
62
63
def to_pyarrow_scan(
64
self,
65
columns: list[str] | None = None,
66
filters: list[tuple[str, str, Any]] | list[list[tuple[str, str, Any]]] | None = None,
67
partitions: list[tuple[str, str, Any]] | None = None,
68
limit: int | None = None,
69
batch_size: int | None = None
70
) -> Iterator[RecordBatch]: ...
71
```
72
73
### Streaming Data Access
74
75
```python { .api }
76
def to_arro3_reader(
77
self,
78
columns: list[str] | None = None,
79
filters: list[tuple[str, str, Any]] | list[list[tuple[str, str, Any]]] | None = None,
80
partitions: list[tuple[str, str, Any]] | None = None,
81
batch_size: int | None = None
82
) -> RecordBatchReader: ...
83
```
84
85
Create a streaming reader for processing large datasets without loading everything into memory.
86
87
### File and Action Information
88
89
```python { .api }
90
def files(
91
self,
92
partition_filters: list[tuple[str, str, str | list[str]]] | None = None
93
) -> list[str]: ...
94
95
def file_uris(
96
self,
97
partition_filters: list[tuple[str, str, str | list[str]]] | None = None
98
) -> list[str]: ...
99
100
def get_add_actions(self, flatten: bool = False) -> RecordBatch: ...
101
```
102
103
Access underlying file information and transaction log add actions.
104
105
### Change Data Feed (CDF)
106
107
```python { .api }
108
def load_cdf(
109
self,
110
starting_version: int = 0,
111
ending_version: int | None = None,
112
starting_timestamp: str | None = None,
113
ending_timestamp: str | None = None,
114
columns: list[str] | None = None,
115
predicate: str | None = None,
116
allow_out_of_range: bool = False,
117
) -> RecordBatchReader: ...
118
```
119
120
Load the Change Data Feed (CDF) to track row-level changes between table versions.
121
122
### DataFusion Integration
123
124
```python { .api }
125
def __datafusion_table_provider__(self) -> Any: ...
126
```
127
128
Internal method for DataFusion SQL engine integration (used by QueryBuilder).
129
130
## Usage Examples
131
132
### Basic Data Reading
133
134
```python
135
from deltalake import DeltaTable
136
137
dt = DeltaTable("path/to/table")
138
139
# Convert to pandas
140
df = dt.to_pandas()
141
print(f"DataFrame shape: {df.shape}")
142
143
# Select specific columns
144
df_subset = dt.to_pandas(columns=["id", "name"])
145
146
# Convert to PyArrow Table
147
arrow_table = dt.to_pyarrow_table()
148
print(f"Arrow table schema: {arrow_table.schema}")
149
```
150
151
### Filtering Data
152
153
```python
154
# Row-level filters (DNF format)
155
# Single filter: age > 25
156
filters = [("age", ">", 25)]
157
df_filtered = dt.to_pandas(filters=filters)
158
159
# Multiple filters (AND): age > 25 AND name != "Alice"
160
filters = [("age", ">", 25), ("name", "!=", "Alice")]
161
df_filtered = dt.to_pandas(filters=filters)
162
163
# OR filters: (age > 25) OR (name = "Bob")
164
filters = [[("age", ">", 25)], [("name", "=", "Bob")]]
165
df_filtered = dt.to_pandas(filters=filters)
166
167
# Partition filters for performance
168
partition_filters = [("year", "=", "2023")]
169
df_2023 = dt.to_pandas(partitions=partition_filters)
170
```
171
172
### Streaming Large Datasets
173
174
```python
175
from deltalake import DeltaTable
176
177
dt = DeltaTable("path/to/large-table")
178
179
# Process data in batches
180
total_rows = 0
181
for batch in dt.to_pyarrow_scan(batch_size=10000):
182
total_rows += batch.num_rows
183
# Process batch
184
df_batch = batch.to_pandas()
185
# Your processing logic here
186
187
print(f"Processed {total_rows} total rows")
188
189
# Using arro3 reader
190
reader = dt.to_arro3_reader(batch_size=5000)
191
for batch in reader:
192
# Process each RecordBatch
193
print(f"Batch has {batch.num_rows} rows")
194
```
195
196
### Working with PyArrow Datasets
197
198
```python
199
import pyarrow.compute as pc
200
201
# Get as PyArrow dataset for advanced operations
202
dataset = dt.to_pyarrow_dataset()
203
204
# Use PyArrow compute functions
205
result = dataset.to_table(
206
filter=pc.and_(
207
pc.greater(pc.field("age"), 25),
208
pc.not_equal(pc.field("name"), "Alice")
209
),
210
columns=["id", "name", "age"]
211
)
212
213
# Convert to pandas if needed
214
df = result.to_pandas()
215
```
216
217
### File Information
218
219
```python
220
# Get list of data files
221
files = dt.files()
222
print(f"Table has {len(files)} data files")
223
224
# Get full URIs
225
file_uris = dt.file_uris()
226
for uri in file_uris[:3]: # Show first 3
227
print(f"File: {uri}")
228
229
# Get detailed add actions from transaction log
230
add_actions = dt.get_add_actions()
231
add_df = add_actions.to_pandas()
232
print("Add actions:")
233
print(add_df[["path", "size", "modification_time"]].head())
234
```
235
236
### Memory-Efficient Processing
237
238
```python
239
# For very large tables, use streaming approach
240
def process_large_table(table_path):
241
dt = DeltaTable(table_path)
242
243
# Process in chunks to manage memory
244
chunk_size = 50000
245
processed_count = 0
246
247
for batch in dt.to_pyarrow_scan(batch_size=chunk_size):
248
# Convert to pandas for processing
249
chunk_df = batch.to_pandas()
250
251
# Your data processing logic
252
processed_data = chunk_df.groupby("category").sum()
253
254
processed_count += len(chunk_df)
255
print(f"Processed {processed_count} rows so far...")
256
257
# Save intermediate results or accumulate
258
# processed_data.to_csv(f"output_chunk_{processed_count}.csv")
259
260
return processed_count
261
262
# Use the function
263
total_processed = process_large_table("path/to/huge-table")
264
```