0
# Data Import and Export
1
2
Comprehensive data import/export capabilities supporting various formats including Parquet, CSV, COCO datasets, and custom data ingestion pipelines. Deep Lake provides seamless data migration and integration with existing data workflows.
3
4
## Capabilities
5
6
### Data Import Functions
7
8
Import data from various formats with automatic schema detection and type conversion.
9
10
```python { .api }
11
def from_parquet(url_or_bytes: Union[str, bytes]) -> ReadOnlyDataset:
12
"""
13
Create dataset from Parquet file or bytes.
14
15
Parameters:
16
- url_or_bytes: Parquet file path/URL or raw bytes
17
18
Returns:
19
ReadOnlyDataset: Read-only dataset with Parquet data
20
"""
21
22
def from_csv(url_or_bytes: Union[str, bytes]) -> ReadOnlyDataset:
23
"""
24
Create dataset from CSV file or bytes.
25
26
Parameters:
27
- url_or_bytes: CSV file path/URL or raw bytes
28
29
Returns:
30
ReadOnlyDataset: Read-only dataset with CSV data
31
"""
32
33
def from_coco(images_directory: Union[str, pathlib.Path], annotation_files: Dict[str, Union[str, pathlib.Path]], dest: Union[str, pathlib.Path], dest_creds: Optional[Dict[str, str]] = None, key_to_column_mapping: Optional[Dict] = None, file_to_group_mapping: Optional[Dict] = None) -> Dataset:
34
"""
35
Import COCO format dataset.
36
37
Parameters:
38
- images_directory: Directory containing COCO images
39
- annotation_files: Dictionary mapping annotation type to JSON file path (keys: 'instances', 'keypoints', 'stuff')
40
- dest: Destination path for Deep Lake dataset
41
- dest_creds: Storage credentials for destination
42
- key_to_column_mapping: Optional mapping of COCO keys to column names
43
- file_to_group_mapping: Optional mapping of file types to group names
44
45
Returns:
46
Dataset: Deep Lake dataset with COCO data
47
"""
48
```
49
50
### Data Export Functions
51
52
Export datasets to various formats for integration with other tools and workflows.
53
54
```python { .api }
55
class DatasetView:
56
"""Export capabilities for dataset views."""
57
58
def to_csv(self, path: str) -> None:
59
"""
60
Export dataset view to CSV format.
61
62
Parameters:
63
- path: Output CSV file path
64
"""
65
```
66
67
### Legacy Data Conversion
68
69
Convert datasets between Deep Lake versions with data preservation and format migration.
70
71
```python { .api }
72
def convert(src: str, dst: str, dst_creds: Optional[Dict[str, str]] = None, token: Optional[str] = None) -> None:
73
"""
74
Convert v3 dataset to v4 format.
75
76
Parameters:
77
- src: Source v3 dataset path
78
- dst: Destination v4 dataset path
79
- dst_creds: Destination storage credentials
80
- token: Activeloop authentication token
81
"""
82
```
83
84
## Usage Examples
85
86
### Parquet Import
87
88
```python
89
import deeplake
90
91
# Import from local Parquet file
92
dataset = deeplake.from_parquet("./data/my_data.parquet")
93
print(f"Imported {len(dataset)} rows from Parquet")
94
print(f"Columns: {[col.name for col in dataset.schema.columns]}")
95
96
# Access imported data
97
for i in range(min(5, len(dataset))):
98
row = dataset[i]
99
print(f"Row {i}: {row.to_dict()}")
100
101
# Import from remote Parquet file
102
s3_dataset = deeplake.from_parquet("s3://my-bucket/data.parquet")
103
print(f"Imported {len(s3_dataset)} rows from S3 Parquet")
104
105
# Convert to mutable dataset if needed
106
mutable_dataset = deeplake.like(dataset, "./mutable_from_parquet")
107
print("Created mutable copy of Parquet data")
108
```
109
110
### CSV Import
111
112
```python
113
# Import from local CSV file
114
csv_dataset = deeplake.from_csv("./data/dataset.csv")
115
print(f"Imported {len(csv_dataset)} rows from CSV")
116
117
# Examine schema (automatically inferred)
118
schema = csv_dataset.schema
119
for col in schema.columns:
120
print(f"Column '{col.name}': {type(col.dtype)}")
121
122
# Import from URL
123
url_dataset = deeplake.from_csv("https://example.com/data.csv")
124
print(f"Imported {len(url_dataset)} rows from URL")
125
126
# Import from bytes (useful for processing in-memory CSV)
127
import io
128
csv_content = """name,age,score
129
Alice,25,0.95
130
Bob,30,0.88
131
Charlie,35,0.92"""
132
133
csv_bytes = csv_content.encode('utf-8')
134
bytes_dataset = deeplake.from_csv(csv_bytes)
135
print(f"Imported {len(bytes_dataset)} rows from bytes")
136
137
# Access CSV data
138
for row in bytes_dataset:
139
print(f"Name: {row['name']}, Age: {row['age']}, Score: {row['score']}")
140
```
141
142
### COCO Dataset Import
143
144
```python
145
import pathlib
146
147
# Import COCO dataset with instances annotations
148
coco_dataset = deeplake.from_coco(
149
images_directory="./coco_data/images",
150
annotation_files={"instances": "./coco_data/annotations/instances_train2017.json"},
151
dest="./coco_deep_lake"
152
)
153
154
print(f"Imported COCO dataset with {len(coco_dataset)} samples")
155
156
# Examine COCO schema
157
for col in coco_dataset.schema.columns:
158
print(f"COCO column: {col.name} ({type(col.dtype)})")
159
160
# Access COCO data
161
sample = coco_dataset[0]
162
print(f"Image: {sample['images']}")
163
164
# Import with multiple annotation types
165
full_coco_dataset = deeplake.from_coco(
166
images_directory=pathlib.Path("./coco_data/images"),
167
annotation_files={
168
"instances": "./coco_data/annotations/instances_train2017.json",
169
"keypoints": "./coco_data/annotations/person_keypoints_train2017.json"
170
],
171
dest="s3://my-bucket/full_coco_dataset",
172
dest_creds={"aws_access_key_id": "...", "aws_secret_access_key": "..."}
173
)
174
175
print(f"Full COCO dataset: {len(full_coco_dataset)} samples")
176
```
177
178
### CSV Export
179
180
```python
181
# Create sample dataset
182
dataset = deeplake.create("./export_dataset")
183
dataset.add_column("id", deeplake.types.Int64())
184
dataset.add_column("name", deeplake.types.Text())
185
dataset.add_column("score", deeplake.types.Float32())
186
187
# Add sample data
188
for i in range(100):
189
dataset.append({
190
"id": i,
191
"name": f"item_{i}",
192
"score": i * 0.01
193
})
194
195
dataset.commit("Added sample data for export")
196
197
# Export to CSV
198
dataset.to_csv("./exported_data.csv")
199
print("Exported dataset to CSV")
200
201
# Export filtered data
202
high_scores = deeplake.query("SELECT * FROM dataset WHERE score > 0.5")
203
high_scores.to_csv("./high_scores.csv")
204
print("Exported filtered data to CSV")
205
206
# Export specific columns
207
columns_subset = deeplake.query("SELECT name, score FROM dataset")
208
columns_subset.to_csv("./subset_data.csv")
209
print("Exported subset of columns to CSV")
210
```
211
212
### Legacy Dataset Conversion
213
214
```python
215
# Convert Deep Lake v3 dataset to v4 format
216
deeplake.convert(
217
src="./old_v3_dataset",
218
dst="./new_v4_dataset"
219
)
220
print("Converted v3 dataset to v4 format")
221
222
# Convert with cloud storage
223
deeplake.convert(
224
src="s3://old-bucket/v3_dataset",
225
dst="s3://new-bucket/v4_dataset",
226
dst_creds={"aws_access_key_id": "...", "aws_secret_access_key": "..."}
227
)
228
print("Converted cloud v3 dataset to v4 format")
229
230
# Open converted dataset
231
converted_dataset = deeplake.open("./new_v4_dataset")
232
print(f"Converted dataset has {len(converted_dataset)} rows")
233
print(f"Schema: {[col.name for col in converted_dataset.schema.columns]}")
234
```
235
236
### Custom Data Ingestion Pipeline
237
238
```python
239
import pandas as pd
240
import numpy as np
241
from pathlib import Path
242
243
def ingest_custom_format(data_dir: str, dest_path: str):
244
"""Custom ingestion pipeline for proprietary format."""
245
246
# Create target dataset
247
dataset = deeplake.create(dest_path)
248
249
# Define schema based on source format
250
dataset.add_column("file_id", deeplake.types.Text())
251
dataset.add_column("image", deeplake.types.Image())
252
dataset.add_column("metadata", deeplake.types.Dict())
253
dataset.add_column("features", deeplake.types.Array(deeplake.types.Float32(), shape=[512]))
254
255
# Process source files
256
data_path = Path(data_dir)
257
batch_data = []
258
259
for file_path in data_path.glob("*.json"):
260
# Read custom metadata format
261
with open(file_path, 'r') as f:
262
metadata = json.load(f)
263
264
# Find corresponding image
265
image_path = data_path / f"{file_path.stem}.jpg"
266
if not image_path.exists():
267
continue
268
269
# Extract features (example: using pre-computed features)
270
features_path = data_path / f"{file_path.stem}_features.npy"
271
if features_path.exists():
272
features = np.load(features_path).astype(np.float32)
273
else:
274
features = np.zeros(512, dtype=np.float32)
275
276
# Prepare batch entry
277
batch_data.append({
278
"file_id": file_path.stem,
279
"image": str(image_path),
280
"metadata": metadata,
281
"features": features
282
})
283
284
# Batch commit for performance
285
if len(batch_data) >= 100:
286
dataset.extend(batch_data)
287
dataset.commit(f"Ingested batch of {len(batch_data)} items")
288
batch_data = []
289
290
# Final commit
291
if batch_data:
292
dataset.extend(batch_data)
293
dataset.commit(f"Final batch of {len(batch_data)} items")
294
295
print(f"Ingestion complete. Dataset has {len(dataset)} items")
296
return dataset
297
298
# Use custom ingestion pipeline
299
custom_dataset = ingest_custom_format("./custom_data", "./ingested_dataset")
300
```
301
302
### Batch Data Processing Pipeline
303
304
```python
305
def process_multiple_sources(sources: List[Dict], output_path: str):
306
"""Process multiple data sources into unified dataset."""
307
308
# Create unified dataset
309
unified_dataset = deeplake.create(output_path)
310
311
# Define common schema
312
unified_dataset.add_column("source", deeplake.types.Text())
313
unified_dataset.add_column("id", deeplake.types.Text())
314
unified_dataset.add_column("content", deeplake.types.Text())
315
unified_dataset.add_column("timestamp", deeplake.types.Int64())
316
unified_dataset.add_column("metadata", deeplake.types.Dict())
317
318
for source_config in sources:
319
source_type = source_config["type"]
320
source_path = source_config["path"]
321
source_name = source_config["name"]
322
323
print(f"Processing {source_name} ({source_type})...")
324
325
if source_type == "csv":
326
# Import CSV and transform
327
csv_data = deeplake.from_csv(source_path)
328
329
for row in csv_data:
330
unified_dataset.append({
331
"source": source_name,
332
"id": f"{source_name}_{row['id']}",
333
"content": row.get("text", ""),
334
"timestamp": int(row.get("timestamp", 0)),
335
"metadata": {"original_source": source_type}
336
})
337
338
elif source_type == "parquet":
339
# Import Parquet and transform
340
parquet_data = deeplake.from_parquet(source_path)
341
342
for row in parquet_data:
343
unified_dataset.append({
344
"source": source_name,
345
"id": f"{source_name}_{row['identifier']}",
346
"content": row.get("content", ""),
347
"timestamp": int(row.get("created_at", 0)),
348
"metadata": {"original_source": source_type, "extra": row.get("extra", {})}
349
})
350
351
# Commit after each source
352
unified_dataset.commit(f"Added data from {source_name}")
353
354
print(f"Unified dataset created with {len(unified_dataset)} total records")
355
return unified_dataset
356
357
# Example usage
358
sources = [
359
{"type": "csv", "path": "./data/source1.csv", "name": "dataset_a"},
360
{"type": "parquet", "path": "./data/source2.parquet", "name": "dataset_b"},
361
{"type": "csv", "path": "s3://bucket/source3.csv", "name": "dataset_c"}
362
]
363
364
unified = process_multiple_sources(sources, "./unified_dataset")
365
```
366
367
### Advanced Export Options
368
369
```python
370
# Create complex dataset for export examples
371
dataset = deeplake.create("./complex_export_dataset")
372
373
dataset.add_column("id", deeplake.types.Int64())
374
dataset.add_column("category", deeplake.types.Text())
375
dataset.add_column("embeddings", deeplake.types.Embedding(size=128))
376
dataset.add_column("image_path", deeplake.types.Text())
377
dataset.add_column("metadata", deeplake.types.Dict())
378
dataset.add_column("active", deeplake.types.Bool())
379
380
# Add sample data
381
for i in range(1000):
382
dataset.append({
383
"id": i,
384
"category": f"category_{i % 10}",
385
"embeddings": np.random.random(128).astype(np.float32),
386
"image_path": f"images/img_{i}.jpg",
387
"metadata": {"score": np.random.random(), "tags": [f"tag_{j}" for j in range(3)]},
388
"active": i % 2 == 0
389
})
390
391
dataset.commit("Added complex sample data")
392
393
# Export with filtering
394
active_records = deeplake.query("SELECT * FROM dataset WHERE active == true")
395
active_records.to_csv("./active_records.csv")
396
397
# Export specific categories
398
category_5 = deeplake.query("SELECT * FROM dataset WHERE category == 'category_5'")
399
category_5.to_csv("./category_5_data.csv")
400
401
# Export aggregated data
402
category_stats = deeplake.query("""
403
SELECT category, COUNT(*) as count, AVG(metadata['score']) as avg_score
404
FROM dataset
405
GROUP BY category
406
""")
407
category_stats.to_csv("./category_statistics.csv")
408
409
print("Exported multiple views of complex dataset")
410
```
411
412
### Integration with Pandas
413
414
```python
415
import pandas as pd
416
417
# Export Deep Lake data for Pandas processing
418
def export_for_pandas(dataset_view, include_embeddings=False):
419
"""Export dataset to format suitable for Pandas."""
420
421
# Create temporary CSV (excluding complex types)
422
if include_embeddings:
423
# For datasets with embeddings, we need special handling
424
data_rows = []
425
for row in dataset_view:
426
row_dict = row.to_dict()
427
# Convert embeddings to string representation
428
if "embeddings" in row_dict:
429
row_dict["embeddings"] = str(row_dict["embeddings"].tolist())
430
data_rows.append(row_dict)
431
432
return pd.DataFrame(data_rows)
433
else:
434
# Export to CSV and read with Pandas
435
temp_csv = "./temp_export.csv"
436
dataset_view.to_csv(temp_csv)
437
df = pd.read_csv(temp_csv)
438
Path(temp_csv).unlink() # Clean up
439
return df
440
441
# Use with Pandas
442
df = export_for_pandas(dataset[0:100]) # First 100 rows
443
print(f"Pandas DataFrame shape: {df.shape}")
444
print(df.head())
445
446
# Process with Pandas and re-import
447
processed_df = df.groupby('category').agg({
448
'id': 'count',
449
'active': 'sum'
450
}).rename(columns={'id': 'total_count', 'active': 'active_count'})
451
452
# Convert processed results back to Deep Lake
453
processed_dataset = deeplake.create("./processed_results")
454
processed_dataset.add_column("category", deeplake.types.Text())
455
processed_dataset.add_column("total_count", deeplake.types.Int64())
456
processed_dataset.add_column("active_count", deeplake.types.Int64())
457
458
for category, row in processed_df.iterrows():
459
processed_dataset.append({
460
"category": category,
461
"total_count": int(row['total_count']),
462
"active_count": int(row['active_count'])
463
})
464
465
processed_dataset.commit("Imported processed Pandas results")
466
print(f"Processed dataset has {len(processed_dataset)} category summaries")
467
```