tessl install tessl/pypi-pyfuse3@3.4.0Python 3 bindings for libfuse 3 with async I/O support
Extended attributes (xattrs) provide a way to attach additional metadata to files and directories beyond standard attributes like permissions and timestamps. pyfuse3 supports xattrs both through module-level functions for direct file access and through Operations handlers for filesystem implementations.
These functions operate on paths in the mounted filesystem or other filesystems.
Set an extended attribute on a file or directory by path.
def setxattr(path: str, name: str, value: bytes, namespace: NamespaceT = 'user') -> None:
"""
Set extended attribute on path.
Args:
path: File or directory path
name: Attribute name (without namespace prefix)
value: Attribute value (bytes)
namespace: Attribute namespace ('user' or 'system')
Notes:
- Attribute name should not include namespace prefix
- namespace='user' creates user.name attribute
- namespace='system' creates system.name attribute
- Creates attribute if it doesn't exist, overwrites if it does
- Requires appropriate permissions for namespace
- value can be any bytes (including zero-bytes)
- name must not contain zero-bytes
Permissions:
- user namespace: requires write permission on file
- system namespace: typically requires root/CAP_SYS_ADMIN
Raises:
OSError(errno.ENOENT): Path doesn't exist
OSError(errno.EACCES): Permission denied
OSError(errno.ENOSPC): No space available
OSError(errno.ENOTSUP): Extended attributes not supported
"""Usage example:
import pyfuse3
# Set user attribute
pyfuse3.setxattr('/path/to/file', 'comment', b'My comment')
# Creates user.comment attribute
# Set system attribute (requires privileges)
pyfuse3.setxattr('/path/to/file', 'checksum', b'abc123', namespace='system')
# Creates system.checksum attribute
# Set attribute with non-text data
pyfuse3.setxattr('/path/to/file', 'binary_data', b'\x00\x01\x02\x03')Get an extended attribute from a file or directory by path.
def getxattr(path: str, name: str, size_guess: int = 128, namespace: NamespaceT = 'user') -> bytes:
"""
Get extended attribute from path.
Args:
path: File or directory path
name: Attribute name (without namespace prefix)
size_guess: Initial buffer size guess (default: 128)
namespace: Attribute namespace ('user' or 'system')
Returns:
Attribute value (bytes)
Notes:
- Attribute name should not include namespace prefix
- size_guess is a performance hint; function handles any size
- Raises OSError(errno.ENODATA) if attribute doesn't exist
- On Linux, errno.ENODATA is errno.ENOATTR
- Automatically retries with larger buffer if size_guess too small
- Returns bytes; decode if text expected
Raises:
OSError(errno.ENOENT): Path doesn't exist
OSError(errno.ENODATA): Attribute doesn't exist
OSError(errno.EACCES): Permission denied
OSError(errno.ENOTSUP): Extended attributes not supported
"""Usage example:
import pyfuse3
import errno
# Get user attribute
try:
value = pyfuse3.getxattr('/path/to/file', 'comment')
print(f"Comment: {value.decode()}")
except OSError as e:
if e.errno == errno.ENODATA:
print("Attribute not found")
elif e.errno == errno.ENOTSUP:
print("Extended attributes not supported")
# Get system attribute with larger size hint
try:
value = pyfuse3.getxattr('/path/to/file', 'large_attr', size_guess=4096, namespace='system')
print(f"Large attr size: {len(value)}")
except OSError as e:
print(f"Error: {e}")Implement these async methods in your Operations subclass to support xattrs in your filesystem.
Handle requests to set extended attributes on inodes.
async def setxattr(self, inode: InodeT, name: XAttrNameT, value: bytes, ctx: RequestContext) -> None:
"""
Set extended attribute on inode.
Args:
inode: Inode number
name: Attribute name (bytes, with namespace prefix)
value: Attribute value (bytes)
ctx: Request context with caller information
Notes:
- name includes full attribute name (e.g., b'user.comment')
- name is guaranteed not to contain zero-bytes
- value can contain zero-bytes
- Attribute may or may not exist already (overwrite if exists)
- Raise FUSEError(errno.ENOSYS) if not implemented
- Raise FUSEError(errno.EACCES) if permission denied
- Raise FUSEError(errno.ENOSPC) if no space
- Update inode ctime after setting attribute
Permission Checking:
- user.* attributes: check write permission on inode
- system.* attributes: typically require root
- Use ctx.uid to check permissions
Raises:
FUSEError(errno.ENOENT): Inode not found
FUSEError(errno.ENOSYS): Operation not implemented
FUSEError(errno.EACCES): Permission denied
FUSEError(errno.ENOSPC): No space available
FUSEError(errno.EINVAL): Invalid name or value
"""Handle requests to get extended attributes from inodes.
async def getxattr(self, inode: InodeT, name: XAttrNameT, ctx: RequestContext) -> bytes:
"""
Get extended attribute from inode.
Args:
inode: Inode number
name: Attribute name (bytes, with namespace prefix)
ctx: Request context with caller information
Returns:
Attribute value (bytes)
Notes:
- name includes full attribute name (e.g., b'user.comment')
- name is guaranteed not to contain zero-bytes
- Raise FUSEError(errno.ENOATTR) if attribute doesn't exist
- Raise FUSEError(errno.ENOSYS) if not implemented
- Raise FUSEError(errno.EACCES) if permission denied
- Use pyfuse3.ENOATTR constant for missing attributes
Permission Checking:
- user.* attributes: check read permission on inode
- system.* attributes: may require root
Raises:
FUSEError(errno.ENOENT): Inode not found
FUSEError(pyfuse3.ENOATTR): Attribute not found
FUSEError(errno.ENOSYS): Operation not implemented
FUSEError(errno.EACCES): Permission denied
"""Handle requests to list all extended attributes on an inode.
async def listxattr(self, inode: InodeT, ctx: RequestContext) -> Sequence[XAttrNameT]:
"""
Get list of extended attributes for inode.
Args:
inode: Inode number
ctx: Request context with caller information
Returns:
Sequence of attribute names (bytes, with namespace prefixes)
Notes:
- Return full attribute names (e.g., [b'user.comment', b'user.author'])
- Names must not contain zero-bytes
- Return empty sequence if no attributes
- Raise FUSEError(errno.ENOSYS) if not implemented
- Include all attributes user has permission to see
Permission Checking:
- Return only attributes caller can read
- Filter based on ctx.uid permissions
Raises:
FUSEError(errno.ENOENT): Inode not found
FUSEError(errno.ENOSYS): Operation not implemented
FUSEError(errno.EIO): I/O error
"""Handle requests to remove extended attributes from inodes.
async def removexattr(self, inode: InodeT, name: XAttrNameT, ctx: RequestContext) -> None:
"""
Remove extended attribute from inode.
Args:
inode: Inode number
name: Attribute name (bytes, with namespace prefix)
ctx: Request context with caller information
Notes:
- name includes full attribute name (e.g., b'user.comment')
- name is guaranteed not to contain zero-bytes
- Raise FUSEError(errno.ENOATTR) if attribute doesn't exist
- Raise FUSEError(errno.ENOSYS) if not implemented
- Raise FUSEError(errno.EACCES) if permission denied
- Update inode ctime after removing attribute
Permission Checking:
- user.* attributes: check write permission on inode
- system.* attributes: typically require root
Raises:
FUSEError(errno.ENOENT): Inode not found
FUSEError(pyfuse3.ENOATTR): Attribute not found
FUSEError(errno.ENOSYS): Operation not implemented
FUSEError(errno.EACCES): Permission denied
"""import pyfuse3
import errno
import time
class XAttrFS(pyfuse3.Operations):
def __init__(self):
super().__init__()
# Store xattrs: inode -> {name: value}
self.xattrs = {}
async def setxattr(self, inode, name, value, ctx):
"""Set extended attribute."""
# Check if inode exists
if inode not in self.inodes:
raise pyfuse3.FUSEError(errno.ENOENT)
# Check permissions
if not self._check_write_permission(inode, ctx):
raise pyfuse3.FUSEError(errno.EACCES)
# Store attribute
if inode not in self.xattrs:
self.xattrs[inode] = {}
self.xattrs[inode][name] = value
# Update ctime
self.inodes[inode]['ctime'] = time.time_ns()
async def getxattr(self, inode, name, ctx):
"""Get extended attribute."""
# Check if inode exists
if inode not in self.inodes:
raise pyfuse3.FUSEError(errno.ENOENT)
# Check permissions
if not self._check_read_permission(inode, ctx):
raise pyfuse3.FUSEError(errno.EACCES)
# Get attribute
if inode not in self.xattrs or name not in self.xattrs[inode]:
raise pyfuse3.FUSEError(pyfuse3.ENOATTR)
return self.xattrs[inode][name]
async def listxattr(self, inode, ctx):
"""List extended attributes."""
# Check if inode exists
if inode not in self.inodes:
raise pyfuse3.FUSEError(errno.ENOENT)
# Check permissions
if not self._check_read_permission(inode, ctx):
raise pyfuse3.FUSEError(errno.EACCES)
# List attributes
if inode not in self.xattrs:
return []
return list(self.xattrs[inode].keys())
async def removexattr(self, inode, name, ctx):
"""Remove extended attribute."""
# Check if inode exists
if inode not in self.inodes:
raise pyfuse3.FUSEError(errno.ENOENT)
# Check permissions
if not self._check_write_permission(inode, ctx):
raise pyfuse3.FUSEError(errno.EACCES)
# Remove attribute
if inode not in self.xattrs or name not in self.xattrs[inode]:
raise pyfuse3.FUSEError(pyfuse3.ENOATTR)
del self.xattrs[inode][name]
# Update ctime
self.inodes[inode]['ctime'] = time.time_ns()
def _check_read_permission(self, inode, ctx):
"""Check if ctx has read permission on inode."""
# Simplified permission check
entry = self.inodes[inode]
if ctx.uid == 0: # root
return True
if ctx.uid == entry['uid']:
return (entry['mode'] & 0o400) != 0 # Owner read
# ... check group and other permissions ...
return False
def _check_write_permission(self, inode, ctx):
"""Check if ctx has write permission on inode."""
entry = self.inodes[inode]
if ctx.uid == 0: # root
return True
if ctx.uid == entry['uid']:
return (entry['mode'] & 0o200) != 0 # Owner write
# ... check group and other permissions ...
return Falseuser.*)user.comment, user.author, user.rating# Set user attributes
pyfuse3.setxattr('/path/to/file', 'comment', b'Important file')
pyfuse3.setxattr('/path/to/file', 'author', b'Alice')
pyfuse3.setxattr('/path/to/file', 'rating', b'5')system.*)system.posix_acl_access, system.posix_acl_default# Set system attributes (requires root)
pyfuse3.setxattr('/path/to/file', 'posix_acl_access', acl_data, namespace='system')security.*)security.selinux, security.imatrusted.*)trusted.lov, trusted.lmvStore custom metadata with files:
import pyfuse3
import json
def set_file_metadata(path, metadata_dict):
"""Store metadata as extended attribute."""
# Serialize to JSON
json_data = json.dumps(metadata_dict).encode('utf-8')
# Store as extended attribute
pyfuse3.setxattr(path, 'metadata', json_data)
def get_file_metadata(path):
"""Retrieve metadata from extended attribute."""
try:
# Get extended attribute
json_data = pyfuse3.getxattr(path, 'metadata')
# Deserialize from JSON
return json.loads(json_data.decode('utf-8'))
except OSError as e:
if e.errno == errno.ENODATA:
return {} # No metadata
raise
# Usage
set_file_metadata('/path/to/file', {
'description': 'Important document',
'tags': ['work', 'important'],
'version': 2
})
metadata = get_file_metadata('/path/to/file')
print(metadata['description'])Store checksums as extended attributes:
import pyfuse3
import hashlib
import errno
def store_checksum(path, algorithm='sha256'):
"""Calculate and store file checksum."""
# Calculate checksum
hasher = hashlib.new(algorithm)
with open(path, 'rb') as f:
while chunk := f.read(8192):
hasher.update(chunk)
checksum = hasher.hexdigest()
# Store as extended attribute
attr_name = f'checksum.{algorithm}'
pyfuse3.setxattr(path, attr_name, checksum.encode('ascii'))
return checksum
def verify_checksum(path, algorithm='sha256'):
"""Verify file checksum against stored value."""
try:
# Get stored checksum
attr_name = f'checksum.{algorithm}'
stored = pyfuse3.getxattr(path, attr_name).decode('ascii')
except OSError as e:
if e.errno == errno.ENODATA:
return None # No stored checksum
raise
# Calculate current checksum
hasher = hashlib.new(algorithm)
with open(path, 'rb') as f:
while chunk := f.read(8192):
hasher.update(chunk)
current = hasher.hexdigest()
return stored == current
# Usage
store_checksum('/path/to/file')
if verify_checksum('/path/to/file'):
print("Checksum matches")
else:
print("Checksum mismatch - file may be corrupted")Store application-specific metadata:
import pyfuse3
import pickle
class FileMetadata:
"""Custom metadata class."""
def __init__(self):
self.created_by = None
self.modified_by = None
self.workflow_state = None
self.custom_fields = {}
def save_metadata(path, metadata):
"""Save metadata to extended attribute."""
# Serialize with pickle
data = pickle.dumps(metadata)
# Store as extended attribute
pyfuse3.setxattr(path, 'app.metadata', data)
def load_metadata(path):
"""Load metadata from extended attribute."""
try:
# Get extended attribute
data = pyfuse3.getxattr(path, 'app.metadata')
# Deserialize with pickle
return pickle.loads(data)
except OSError as e:
if e.errno == errno.ENODATA:
return FileMetadata() # Default metadata
raise
# Usage
metadata = FileMetadata()
metadata.created_by = 'Alice'
metadata.workflow_state = 'draft'
save_metadata('/path/to/file', metadata)
loaded = load_metadata('/path/to/file')
print(f"Created by: {loaded.created_by}")Implement file tagging system:
import pyfuse3
import json
import errno
def add_tags(path, *tags):
"""Add tags to file."""
# Get existing tags
try:
existing_json = pyfuse3.getxattr(path, 'tags')
existing_tags = json.loads(existing_json.decode('utf-8'))
except OSError as e:
if e.errno == errno.ENODATA:
existing_tags = []
else:
raise
# Add new tags
all_tags = list(set(existing_tags + list(tags)))
# Store updated tags
tags_json = json.dumps(all_tags).encode('utf-8')
pyfuse3.setxattr(path, 'tags', tags_json)
def get_tags(path):
"""Get tags from file."""
try:
tags_json = pyfuse3.getxattr(path, 'tags')
return json.loads(tags_json.decode('utf-8'))
except OSError as e:
if e.errno == errno.ENODATA:
return []
raise
def remove_tag(path, tag):
"""Remove tag from file."""
try:
tags_json = pyfuse3.getxattr(path, 'tags')
tags = json.loads(tags_json.decode('utf-8'))
except OSError as e:
if e.errno == errno.ENODATA:
return # No tags
raise
# Remove tag
if tag in tags:
tags.remove(tag)
tags_json = json.dumps(tags).encode('utf-8')
pyfuse3.setxattr(path, 'tags', tags_json)
# Usage
add_tags('/path/to/file', 'important', 'work', '2024')
print(f"Tags: {get_tags('/path/to/file')}")
remove_tag('/path/to/file', 'work')Common errors when working with xattrs:
import pyfuse3
import errno
def safe_set_xattr(path, name, value):
"""Safely set extended attribute with error handling."""
try:
pyfuse3.setxattr(path, name, value)
return True
except OSError as e:
if e.errno == errno.ENOTSUP:
print(f"Extended attributes not supported on {path}")
elif e.errno == errno.EACCES:
print(f"Permission denied for {path}")
elif e.errno == errno.ENOSPC:
print(f"No space available for extended attribute on {path}")
elif e.errno == errno.ENOENT:
print(f"File {path} does not exist")
else:
print(f"Unexpected error setting xattr on {path}: {e}")
return False
def safe_get_xattr(path, name, default=None):
"""Safely get extended attribute with error handling."""
try:
return pyfuse3.getxattr(path, name)
except OSError as e:
if e.errno == errno.ENODATA:
return default # Attribute not found
elif e.errno == errno.ENOTSUP:
print(f"Extended attributes not supported on {path}")
elif e.errno == errno.EACCES:
print(f"Permission denied for {path}")
elif e.errno == errno.ENOENT:
print(f"File {path} does not exist")
else:
print(f"Unexpected error getting xattr from {path}: {e}")
return defaultIn Operations handlers:
import pyfuse3
import errno
class ErrorHandlingFS(pyfuse3.Operations):
async def getxattr(self, inode, name, ctx):
"""Get extended attribute with error handling."""
try:
# Check inode exists
if inode not in self.xattrs:
raise pyfuse3.FUSEError(errno.ENOENT)
# Check attribute exists
if name not in self.xattrs[inode]:
raise pyfuse3.FUSEError(pyfuse3.ENOATTR)
return self.xattrs[inode][name]
except KeyError:
# Convert KeyError to FUSEError
raise pyfuse3.FUSEError(pyfuse3.ENOATTR)
except Exception as e:
# Log unexpected errors
logger.error(f"Unexpected error in getxattr: {e}", exc_info=True)
raise pyfuse3.FUSEError(errno.EIO)import os
# Check if filesystem supports xattrs
def supports_xattr(path):
"""Check if path's filesystem supports extended attributes."""
try:
test_name = 'test_support'
test_value = b'test'
pyfuse3.setxattr(path, test_name, test_value)
pyfuse3.getxattr(path, test_name)
# Clean up
try:
os.removexattr(path, f'user.{test_name}')
except:
pass
return True
except OSError as e:
if e.errno == errno.ENOTSUP:
return False
raiseENOATTR: int # Extended attribute not found error codeUse pyfuse3.ENOATTR when raising FUSEError for missing attributes:
import pyfuse3
async def getxattr(self, inode, name, ctx):
if name not in self.attrs[inode]:
raise pyfuse3.FUSEError(pyfuse3.ENOATTR)
return self.attrs[inode][name]app.metadata, app.tags)