0
# File Operations
1
2
File-specific operations for uploading, downloading, appending data, and managing file properties and metadata. The DataLakeFileClient provides comprehensive file management capabilities including streaming operations and query functionality.
3
4
## Capabilities
5
6
### DataLakeFileClient
7
8
Client to interact with a specific file, providing operations for data upload/download, append operations, and file management. Inherits path-based operations from the underlying PathClient.
9
10
```python { .api }
11
class DataLakeFileClient:
12
"""
13
A client to interact with a specific file in Azure Data Lake Storage Gen2.
14
15
Attributes:
16
url (str): The full endpoint URL to the file, including SAS token if used
17
primary_endpoint (str): The full primary endpoint URL
18
primary_hostname (str): The hostname of the primary endpoint
19
file_system_name (str): Name of the file system
20
path_name (str): Path to the file
21
"""
22
23
def __init__(
24
self,
25
account_url: str,
26
file_system_name: str,
27
file_path: str,
28
credential=None,
29
**kwargs
30
):
31
"""
32
Initialize the DataLakeFileClient.
33
34
Args:
35
account_url (str): The URL to the DataLake storage account
36
file_system_name (str): Name of the file system
37
file_path (str): Path to the file
38
credential: Authentication credential
39
**kwargs: Additional client configuration options
40
"""
41
42
@classmethod
43
def from_connection_string(
44
cls,
45
conn_str: str,
46
file_system_name: str,
47
file_path: str,
48
credential=None,
49
**kwargs
50
) -> 'DataLakeFileClient':
51
"""
52
Create DataLakeFileClient from connection string.
53
54
Args:
55
conn_str (str): Connection string for the storage account
56
file_system_name (str): Name of the file system
57
file_path (str): Path to the file
58
credential: Optional credential to override connection string auth
59
**kwargs: Additional client configuration options
60
61
Returns:
62
DataLakeFileClient: The file client instance
63
"""
64
```
65
66
**Usage Examples:**
67
68
```python
69
from azure.storage.filedatalake import DataLakeFileClient
70
71
# Create client directly
72
file_client = DataLakeFileClient(
73
account_url="https://mystorageaccount.dfs.core.windows.net",
74
file_system_name="myfilesystem",
75
file_path="data/analytics/results.json",
76
credential="<account_key>"
77
)
78
79
# From connection string
80
file_client = DataLakeFileClient.from_connection_string(
81
"DefaultEndpointsProtocol=https;AccountName=mystorageaccount;AccountKey=<key>",
82
file_system_name="myfilesystem",
83
file_path="data/analytics/results.json"
84
)
85
```
86
87
### File Management
88
89
Core operations for creating, deleting, and managing the file itself.
90
91
```python { .api }
92
def create_file(self, **kwargs) -> Dict[str, Any]:
93
"""
94
Create the file.
95
96
Args:
97
content_settings (ContentSettings, optional): Content settings for the file
98
metadata (dict, optional): Metadata key-value pairs
99
permissions (str, optional): POSIX permissions in octal format
100
umask (str, optional): POSIX umask for permission calculation
101
**kwargs: Additional options including conditions and CPK
102
103
Returns:
104
dict: File creation response headers including etag and last_modified
105
"""
106
107
def delete_file(self, **kwargs) -> None:
108
"""
109
Delete the file.
110
111
Args:
112
**kwargs: Additional options including conditions
113
"""
114
115
def exists(self, **kwargs) -> bool:
116
"""
117
Check if the file exists.
118
119
Args:
120
**kwargs: Additional options
121
122
Returns:
123
bool: True if file exists, False otherwise
124
"""
125
126
def get_file_properties(self, **kwargs) -> FileProperties:
127
"""
128
Get file properties and metadata.
129
130
Args:
131
**kwargs: Additional options including conditions and user principal names
132
133
Returns:
134
FileProperties: Properties of the file including size, metadata, etag, permissions
135
"""
136
137
def rename_file(
138
self,
139
new_name: str,
140
**kwargs
141
) -> DataLakeFileClient:
142
"""
143
Rename the file.
144
145
Args:
146
new_name (str): New name/path for the file
147
content_settings (ContentSettings, optional): Content settings for renamed file
148
metadata (dict, optional): Metadata for renamed file
149
**kwargs: Additional options including conditions
150
151
Returns:
152
DataLakeFileClient: Client for the renamed file
153
"""
154
```
155
156
### Data Upload Operations
157
158
Operations for uploading and writing data to files.
159
160
```python { .api }
161
def upload_data(
162
self,
163
data,
164
length: int = None,
165
overwrite: bool = False,
166
**kwargs
167
) -> Dict[str, Any]:
168
"""
169
Upload data to the file, creating it if it doesn't exist.
170
171
Args:
172
data: Data to upload (bytes, str, or file-like object)
173
length (int, optional): Length of the data in bytes
174
overwrite (bool): Whether to overwrite existing file
175
**kwargs: Additional options including content settings, metadata, conditions
176
177
Returns:
178
dict: Upload response headers including etag and last_modified
179
"""
180
181
def append_data(
182
self,
183
data,
184
offset: int,
185
length: int = None,
186
**kwargs
187
) -> Dict[str, Any]:
188
"""
189
Append data to the file at the specified offset.
190
191
Args:
192
data: Data to append (bytes, str, or file-like object)
193
offset (int): Byte offset where data should be appended
194
length (int, optional): Length of the data in bytes
195
**kwargs: Additional options including validate_content and lease conditions
196
197
Returns:
198
dict: Append response headers
199
"""
200
201
def flush_data(
202
self,
203
offset: int,
204
retain_uncommitted_data: bool = False,
205
**kwargs
206
) -> Dict[str, Any]:
207
"""
208
Commit previously appended data to the file.
209
210
Args:
211
offset (int): Offset equal to the length of the file after committing data
212
retain_uncommitted_data (bool): Whether to retain uncommitted data after flush
213
**kwargs: Additional options including conditions
214
215
Returns:
216
dict: Flush response headers including etag and last_modified
217
"""
218
219
def flush_data(
220
self,
221
offset: int,
222
retain_uncommitted_data: bool = False,
223
**kwargs
224
) -> Dict[str, Any]:
225
"""
226
Flush (commit) previously appended data to the file.
227
228
Args:
229
offset (int): Byte offset to flush up to
230
retain_uncommitted_data (bool): Whether to retain uncommitted data beyond offset
231
**kwargs: Additional options including content settings, conditions
232
233
Returns:
234
dict: Flush response headers including etag and last_modified
235
"""
236
```
237
238
### Data Download Operations
239
240
Operations for downloading and reading data from files.
241
242
```python { .api }
243
def download_file(self, **kwargs) -> StorageStreamDownloader:
244
"""
245
Download the file content as a stream.
246
247
Args:
248
offset (int, optional): Start position for download
249
length (int, optional): Number of bytes to download
250
**kwargs: Additional options including conditions and CPK
251
252
Returns:
253
StorageStreamDownloader: Stream downloader for reading file content
254
"""
255
256
def read_file(self, **kwargs) -> bytes:
257
"""
258
Download and return the entire file content as bytes.
259
260
Args:
261
offset (int, optional): Start position for download
262
length (int, optional): Number of bytes to download
263
**kwargs: Additional options including conditions and CPK
264
265
Returns:
266
bytes: Complete file content
267
"""
268
```
269
270
### Query Operations
271
272
Operations for querying structured data within files using SQL-like syntax.
273
274
```python { .api }
275
def query_file(
276
self,
277
query_expression: str,
278
**kwargs
279
) -> DataLakeFileQueryReader:
280
"""
281
Query file content using SQL-like expressions.
282
283
Args:
284
query_expression (str): SQL-like query expression (e.g., "SELECT * FROM BlobStorage")
285
file_format (QuickQueryDialect, optional): File format (CSV, JSON, Arrow, Parquet)
286
on_error (Callable, optional): Function to handle query errors
287
**kwargs: Additional options including input/output serialization settings
288
289
Returns:
290
DataLakeFileQueryReader: Query reader for streaming results
291
"""
292
```
293
294
### Access Control Management
295
296
Operations for managing file-level access control and permissions.
297
298
```python { .api }
299
def get_access_control(self, **kwargs) -> Dict[str, Any]:
300
"""
301
Get access control properties for the file.
302
303
Args:
304
upn (bool, optional): Return user principal names instead of object IDs
305
**kwargs: Additional options including conditions
306
307
Returns:
308
dict: Access control information including ACL, group, owner, permissions
309
"""
310
311
def set_access_control(
312
self,
313
owner: str = None,
314
group: str = None,
315
permissions: str = None,
316
acl: str = None,
317
**kwargs
318
) -> Dict[str, Any]:
319
"""
320
Set access control properties for the file.
321
322
Args:
323
owner (str, optional): Owner user ID or principal name
324
group (str, optional): Owning group ID or principal name
325
permissions (str, optional): POSIX permissions in octal format
326
acl (str, optional): Access control list in POSIX format
327
**kwargs: Additional options including conditions
328
329
Returns:
330
dict: Response headers including etag and last_modified
331
"""
332
```
333
334
## StorageStreamDownloader
335
336
Streaming downloader for efficiently handling large file downloads and query results. Returned by download_file() and query_file() operations.
337
338
```python { .api }
339
class StorageStreamDownloader:
340
"""
341
A streaming object to download from Azure Storage.
342
343
Attributes:
344
name (str): The name of the file being downloaded
345
properties (FileProperties): The properties of the file being downloaded
346
size (int): The size of the total data in the stream
347
"""
348
349
def readall(self) -> bytes:
350
"""
351
Download the contents of this file.
352
353
This operation is blocking until all data is downloaded.
354
355
Returns:
356
bytes: The contents of the specified file
357
"""
358
359
def readinto(self, stream) -> int:
360
"""
361
Download the contents of this file to a stream.
362
363
Args:
364
stream: The stream to download to. This can be an open file-handle,
365
or any writable stream. The stream must be seekable if the
366
download uses more than one parallel connection.
367
368
Returns:
369
int: The number of bytes read
370
"""
371
372
def read(self, size: int = -1) -> bytes:
373
"""
374
Read up to size bytes from the stream and return them.
375
376
Args:
377
size (int): The number of bytes to download from the stream.
378
Leave unspecified or set to -1 to download all bytes.
379
380
Returns:
381
bytes: The requested data as bytes. If the return value is empty,
382
there is no more data to read.
383
"""
384
385
def chunks(self):
386
"""
387
Iterate over chunks in the download stream.
388
389
Note: The iterator returned will iterate over the entire download content,
390
regardless of any data that was previously read.
391
392
Returns:
393
Iterator[bytes]: An iterator containing the chunks in the download stream
394
"""
395
396
def __len__(self) -> int:
397
"""
398
Returns the size of the download stream.
399
400
Returns:
401
int: The size of the stream
402
"""
403
404
def read(self, size: int = -1) -> bytes:
405
"""
406
Download and return up to size bytes.
407
408
Args:
409
size (int): Maximum number of bytes to read (-1 for all)
410
411
Returns:
412
bytes: Downloaded content
413
"""
414
415
def __iter__(self):
416
"""Iterate over the content in chunks."""
417
418
def __enter__(self) -> 'StorageStreamDownloader':
419
"""Context manager entry."""
420
421
def __exit__(self, *args) -> None:
422
"""Context manager exit."""
423
```
424
425
**Usage Examples:**
426
427
```python
428
import json
429
from azure.storage.filedatalake import DataLakeFileClient, ContentSettings
430
431
# Create a file client
432
file_client = DataLakeFileClient(
433
account_url="https://mystorageaccount.dfs.core.windows.net",
434
file_system_name="myfilesystem",
435
file_path="data/results.json",
436
credential="<account_key>"
437
)
438
439
# Upload JSON data
440
data = {"results": [1, 2, 3], "timestamp": "2023-01-01T00:00:00Z"}
441
json_data = json.dumps(data)
442
443
file_client.upload_data(
444
json_data,
445
overwrite=True,
446
content_settings=ContentSettings(content_type="application/json"),
447
metadata={"format": "json", "version": "1.0"}
448
)
449
450
# Download the file
451
download_stream = file_client.download_file()
452
content = download_stream.readall()
453
downloaded_data = json.loads(content.decode())
454
print(f"Downloaded: {downloaded_data}")
455
456
# Append data to an existing file
457
log_client = DataLakeFileClient(
458
account_url="https://mystorageaccount.dfs.core.windows.net",
459
file_system_name="myfilesystem",
460
file_path="logs/app.log",
461
credential="<account_key>"
462
)
463
464
# Get current file size for append offset
465
properties = log_client.get_file_properties()
466
current_size = properties.size
467
468
# Append new log entry
469
new_entry = "\n2023-01-01 12:00:00 INFO: Application started"
470
log_client.append_data(new_entry.encode(), offset=current_size)
471
log_client.flush_data(offset=current_size + len(new_entry.encode()))
472
473
# Query CSV file data
474
csv_client = DataLakeFileClient(
475
account_url="https://mystorageaccount.dfs.core.windows.net",
476
file_system_name="myfilesystem",
477
file_path="data/sales.csv",
478
credential="<account_key>"
479
)
480
481
# Query for specific records
482
query_result = csv_client.query_file(
483
"SELECT * FROM BlobStorage WHERE amount > 1000"
484
)
485
486
# Process query results
487
with query_result as stream:
488
for chunk in stream:
489
print(chunk.decode())
490
```