or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-reading.mdindex.mdquery-operations.mdschema-management.mdtable-maintenance.mdtable-operations.mdtransaction-management.mdwriting-modification.md

data-reading.mddocs/

0

# Data Reading and Conversion

1

2

Converting Delta tables to various formats including pandas DataFrames, PyArrow tables, and streaming readers for efficient data processing and integration with the Python data ecosystem.

3

4

## Imports

5

6

```python

7

from deltalake import DeltaTable

8

from typing import Callable, Any

9

import pandas as pd

10

import pyarrow

11

import pyarrow.fs as pa_fs

12

from pyarrow.dataset import Expression

13

from arro3.core import RecordBatch, RecordBatchReader

14

15

# Type aliases for filtering

16

FilterLiteralType = tuple[str, str, Any]

17

FilterConjunctionType = list[FilterLiteralType]

18

FilterDNFType = list[FilterConjunctionType]

19

FilterType = FilterConjunctionType | FilterDNFType

20

```

21

22

## Capabilities

23

24

### Pandas Integration

25

26

```python { .api }

27

def to_pandas(

28

self,

29

partitions: list[tuple[str, str, Any]] | None = None,

30

columns: list[str] | None = None,

31

filesystem: str | pa_fs.FileSystem | None = None,

32

filters: FilterType | Expression | None = None,

33

types_mapper: Callable[[pyarrow.DataType], Any] | None = None

34

) -> pd.DataFrame: ...

35

```

36

37

Convert Delta table to pandas DataFrame with optional column selection and filtering.

38

39

**Parameters:**

40

- `partitions`: Partition-level filters for efficient data access

41

- `columns`: Specific columns to include in the result

42

- `filesystem`: Custom filesystem for reading files (string path or PyArrow FileSystem)

43

- `filters`: Row-level filters in DNF (Disjunctive Normal Form) or PyArrow Expression

44

- `types_mapper`: Optional function to map PyArrow data types to pandas types

45

46

### PyArrow Integration

47

48

```python { .api }

49

def to_pyarrow_table(

50

self,

51

partitions: list[tuple[str, str, Any]] | None = None,

52

columns: list[str] | None = None,

53

filesystem: str | pa_fs.FileSystem | None = None,

54

filters: FilterType | Expression | None = None

55

) -> pyarrow.Table: ...

56

57

def to_pyarrow_dataset(

58

self,

59

partitions: list[tuple[str, str, Any]] | None = None,

60

filesystem: Any | None = None

61

) -> pyarrow.dataset.Dataset: ...

62

63

def to_pyarrow_scan(

64

self,

65

columns: list[str] | None = None,

66

filters: list[tuple[str, str, Any]] | list[list[tuple[str, str, Any]]] | None = None,

67

partitions: list[tuple[str, str, Any]] | None = None,

68

limit: int | None = None,

69

batch_size: int | None = None

70

) -> Iterator[RecordBatch]: ...

71

```

72

73

### Streaming Data Access

74

75

```python { .api }

76

def to_arro3_reader(

77

self,

78

columns: list[str] | None = None,

79

filters: list[tuple[str, str, Any]] | list[list[tuple[str, str, Any]]] | None = None,

80

partitions: list[tuple[str, str, Any]] | None = None,

81

batch_size: int | None = None

82

) -> RecordBatchReader: ...

83

```

84

85

Create a streaming reader for processing large datasets without loading everything into memory.

86

87

### File and Action Information

88

89

```python { .api }

90

def files(

91

self,

92

partition_filters: list[tuple[str, str, str | list[str]]] | None = None

93

) -> list[str]: ...

94

95

def file_uris(

96

self,

97

partition_filters: list[tuple[str, str, str | list[str]]] | None = None

98

) -> list[str]: ...

99

100

def get_add_actions(self, flatten: bool = False) -> RecordBatch: ...

101

```

102

103

Access underlying file information and transaction log add actions.

104

105

### Change Data Feed (CDF)

106

107

```python { .api }

108

def load_cdf(

109

self,

110

starting_version: int = 0,

111

ending_version: int | None = None,

112

starting_timestamp: str | None = None,

113

ending_timestamp: str | None = None,

114

columns: list[str] | None = None,

115

predicate: str | None = None,

116

allow_out_of_range: bool = False,

117

) -> RecordBatchReader: ...

118

```

119

120

Load the Change Data Feed (CDF) to track row-level changes between table versions.

121

122

### DataFusion Integration

123

124

```python { .api }

125

def __datafusion_table_provider__(self) -> Any: ...

126

```

127

128

Internal method for DataFusion SQL engine integration (used by QueryBuilder).

129

130

## Usage Examples

131

132

### Basic Data Reading

133

134

```python

135

from deltalake import DeltaTable

136

137

dt = DeltaTable("path/to/table")

138

139

# Convert to pandas

140

df = dt.to_pandas()

141

print(f"DataFrame shape: {df.shape}")

142

143

# Select specific columns

144

df_subset = dt.to_pandas(columns=["id", "name"])

145

146

# Convert to PyArrow Table

147

arrow_table = dt.to_pyarrow_table()

148

print(f"Arrow table schema: {arrow_table.schema}")

149

```

150

151

### Filtering Data

152

153

```python

154

# Row-level filters (DNF format)

155

# Single filter: age > 25

156

filters = [("age", ">", 25)]

157

df_filtered = dt.to_pandas(filters=filters)

158

159

# Multiple filters (AND): age > 25 AND name != "Alice"

160

filters = [("age", ">", 25), ("name", "!=", "Alice")]

161

df_filtered = dt.to_pandas(filters=filters)

162

163

# OR filters: (age > 25) OR (name = "Bob")

164

filters = [[("age", ">", 25)], [("name", "=", "Bob")]]

165

df_filtered = dt.to_pandas(filters=filters)

166

167

# Partition filters for performance

168

partition_filters = [("year", "=", "2023")]

169

df_2023 = dt.to_pandas(partitions=partition_filters)

170

```

171

172

### Streaming Large Datasets

173

174

```python

175

from deltalake import DeltaTable

176

177

dt = DeltaTable("path/to/large-table")

178

179

# Process data in batches

180

total_rows = 0

181

for batch in dt.to_pyarrow_scan(batch_size=10000):

182

total_rows += batch.num_rows

183

# Process batch

184

df_batch = batch.to_pandas()

185

# Your processing logic here

186

187

print(f"Processed {total_rows} total rows")

188

189

# Using arro3 reader

190

reader = dt.to_arro3_reader(batch_size=5000)

191

for batch in reader:

192

# Process each RecordBatch

193

print(f"Batch has {batch.num_rows} rows")

194

```

195

196

### Working with PyArrow Datasets

197

198

```python

199

import pyarrow.compute as pc

200

201

# Get as PyArrow dataset for advanced operations

202

dataset = dt.to_pyarrow_dataset()

203

204

# Use PyArrow compute functions

205

result = dataset.to_table(

206

filter=pc.and_(

207

pc.greater(pc.field("age"), 25),

208

pc.not_equal(pc.field("name"), "Alice")

209

),

210

columns=["id", "name", "age"]

211

)

212

213

# Convert to pandas if needed

214

df = result.to_pandas()

215

```

216

217

### File Information

218

219

```python

220

# Get list of data files

221

files = dt.files()

222

print(f"Table has {len(files)} data files")

223

224

# Get full URIs

225

file_uris = dt.file_uris()

226

for uri in file_uris[:3]: # Show first 3

227

print(f"File: {uri}")

228

229

# Get detailed add actions from transaction log

230

add_actions = dt.get_add_actions()

231

add_df = add_actions.to_pandas()

232

print("Add actions:")

233

print(add_df[["path", "size", "modification_time"]].head())

234

```

235

236

### Memory-Efficient Processing

237

238

```python

239

# For very large tables, use streaming approach

240

def process_large_table(table_path):

241

dt = DeltaTable(table_path)

242

243

# Process in chunks to manage memory

244

chunk_size = 50000

245

processed_count = 0

246

247

for batch in dt.to_pyarrow_scan(batch_size=chunk_size):

248

# Convert to pandas for processing

249

chunk_df = batch.to_pandas()

250

251

# Your data processing logic

252

processed_data = chunk_df.groupby("category").sum()

253

254

processed_count += len(chunk_df)

255

print(f"Processed {processed_count} rows so far...")

256

257

# Save intermediate results or accumulate

258

# processed_data.to_csv(f"output_chunk_{processed_count}.csv")

259

260

return processed_count

261

262

# Use the function

263

total_processed = process_large_table("path/to/huge-table")

264

```