tessl install tessl/pypi-pyfuse3@3.4.0Python 3 bindings for libfuse 3 with async I/O support
Comprehensive guide to exception handling in pyfuse3 filesystems.
Operations handlers must ONLY raise FUSEError exceptions. Other exceptions may crash the filesystem or cause undefined behavior.
import pyfuse3
import errno
async def lookup(self, parent_inode, name, ctx):
try:
result = self._find_entry(parent_inode, name)
return result
except KeyError:
# Convert to FUSEError
raise pyfuse3.FUSEError(errno.ENOENT)
except PermissionError:
raise pyfuse3.FUSEError(errno.EACCES)
except Exception as e:
# Log unexpected errors
logger.error(f"Unexpected error: {e}", exc_info=True)
raise pyfuse3.FUSEError(errno.EIO)async def lookup(self, parent_inode, name, ctx):
if name not in self.entries:
raise KeyError("Not found") # WRONG - crashes filesystem!
return self.entries[name]| Errno | Constant | When to Use |
|---|---|---|
| 2 | errno.ENOENT | File/directory not found |
| 13 | errno.EACCES | Permission denied |
| 17 | errno.EEXIST | File already exists |
| 20 | errno.ENOTDIR | Expected directory, got file |
| 21 | errno.EISDIR | Expected file, got directory |
| 22 | errno.EINVAL | Invalid argument |
| 28 | errno.ENOSPC | No space left |
| 38 | errno.ENOSYS | Operation not implemented |
| 39 | errno.ENOTEMPTY | Directory not empty |
| 93 | pyfuse3.ENOATTR | Extended attribute not found |
async def lookup(self, parent_inode, name, ctx):
"""Look up directory entry."""
# Validate parent exists
if parent_inode not in self.inodes:
raise pyfuse3.FUSEError(errno.ENOENT)
# Validate parent is directory
if not stat.S_ISDIR(self.inodes[parent_inode]['mode']):
raise pyfuse3.FUSEError(errno.ENOTDIR)
# Find entry
if name not in self.inodes[parent_inode]['entries']:
raise pyfuse3.FUSEError(errno.ENOENT)
# Success path
inode = self.inodes[parent_inode]['entries'][name]
self.lookup_counts[inode] += 1
return await self.getattr(inode, ctx)async def open(self, inode, flags, ctx):
"""Open file with permission checking."""
if inode not in self.inodes:
raise pyfuse3.FUSEError(errno.ENOENT)
data = self.inodes[inode]
# Check read permission
if flags & (os.O_RDONLY | os.O_RDWR):
if not self._check_permission(data, os.R_OK, ctx):
raise pyfuse3.FUSEError(errno.EACCES)
# Check write permission
if flags & (os.O_WRONLY | os.O_RDWR):
if not self._check_permission(data, os.W_OK, ctx):
raise pyfuse3.FUSEError(errno.EACCES)
return pyfuse3.FileInfo(fh=self._allocate_fh(inode))async def create(self, parent_inode, name, mode, flags, ctx):
"""Create file, checking for duplicates."""
if parent_inode not in self.inodes:
raise pyfuse3.FUSEError(errno.ENOENT)
parent = self.inodes[parent_inode]
# Check if already exists
if name in parent['entries']:
raise pyfuse3.FUSEError(errno.EEXIST)
# Create new file
# ...async def rmdir(self, parent_inode, name, ctx):
"""Remove directory, checking type."""
if parent_inode not in self.inodes:
raise pyfuse3.FUSEError(errno.ENOENT)
parent = self.inodes[parent_inode]
if name not in parent['entries']:
raise pyfuse3.FUSEError(errno.ENOENT)
inode = parent['entries'][name]
# Must be a directory
if not stat.S_ISDIR(self.inodes[inode]['mode']):
raise pyfuse3.FUSEError(errno.ENOTDIR)
# Must be empty
if self.inodes[inode]['entries']:
raise pyfuse3.FUSEError(errno.ENOTEMPTY)
# Remove directory
# ...async def write(self, fh, offset, buf):
"""Write with space checking."""
if fh not in self.open_files:
raise pyfuse3.FUSEError(errno.EBADF)
inode = self.open_files[fh]
# Check available space
new_size = offset + len(buf)
if new_size > self.max_file_size:
raise pyfuse3.FUSEError(errno.EFBIG)
if self._get_free_space() < len(buf):
raise pyfuse3.FUSEError(errno.ENOSPC)
# Write data
# ...
return len(buf)The forget() method MUST NEVER raise exceptions, even FUSEError.
async def forget(self, inode_list):
"""Decrease lookup counts - MUST NOT raise exceptions."""
for inode, nlookup in inode_list:
try:
if inode in self.lookup_counts:
self.lookup_counts[inode] -= nlookup
if self.lookup_counts[inode] <= 0:
del self.lookup_counts[inode]
# Delete if also unlinked
if inode in self.inodes and self.inodes[inode]['nlink'] == 0:
del self.inodes[inode]
except Exception as e:
# Log but don't raise
logger.error(f"Error in forget for inode {inode}: {e}", exc_info=True)async def read(self, fh, offset, size):
"""Read with graceful error handling."""
try:
if fh not in self.open_files:
raise pyfuse3.FUSEError(errno.EBADF)
inode = self.open_files[fh]
data = await self._read_from_backend(inode, offset, size)
return data
except ConnectionError:
# Backend unavailable - return cached data if available
if inode in self.cache:
logger.warning(f"Backend unavailable, using cache for inode {inode}")
return self.cache[inode][offset:offset + size]
raise pyfuse3.FUSEError(errno.EIO)
except Exception as e:
logger.error(f"Unexpected error in read: {e}", exc_info=True)
raise pyfuse3.FUSEError(errno.EIO)async def write(self, fh, offset, buf):
"""Write with retry logic."""
max_retries = 3
for attempt in range(max_retries):
try:
result = await self._write_to_backend(fh, offset, buf)
return result
except TemporaryError as e:
if attempt < max_retries - 1:
logger.warning(f"Write failed, retrying ({attempt + 1}/{max_retries})")
await trio.sleep(0.1 * (attempt + 1))
else:
logger.error(f"Write failed after {max_retries} attempts")
raise pyfuse3.FUSEError(errno.EIO)async def getattr(self, inode, ctx):
"""Get attributes with fallback."""
try:
# Try primary source
return await self._getattr_from_primary(inode)
except PrimaryUnavailableError:
# Fall back to cache
if inode in self.attr_cache:
logger.warning(f"Using cached attributes for inode {inode}")
return self.attr_cache[inode]
raise pyfuse3.FUSEError(errno.EIO)import logging
logger = logging.getLogger(__name__)
async def operation(self, ...):
# DEBUG: Detailed information for debugging
logger.debug(f"Operation called with inode={inode}")
# INFO: General informational messages
logger.info(f"File created: {name}")
# WARNING: Something unexpected but handled
logger.warning(f"Cache miss for inode {inode}, fetching from backend")
# ERROR: Error condition that was handled
logger.error(f"Failed to write to backend: {e}", exc_info=True)
# CRITICAL: Severe error that may cause failure
logger.critical(f"Filesystem corruption detected!")async def lookup(self, parent_inode, name, ctx):
"""Lookup with structured logging."""
logger.debug(
"lookup called",
extra={
'parent_inode': parent_inode,
'name': name.decode('utf-8', errors='replace'),
'uid': ctx.uid,
'gid': ctx.gid,
'pid': ctx.pid
}
)
try:
# ... operation ...
pass
except pyfuse3.FUSEError as e:
logger.info(
"lookup failed",
extra={
'parent_inode': parent_inode,
'name': name.decode('utf-8', errors='replace'),
'errno': e.errno
}
)
raiseimport pytest
import pyfuse3
import errno
@pytest.mark.trio
async def test_lookup_not_found():
"""Test lookup returns ENOENT for missing entries."""
fs = MyFS()
ctx = MockRequestContext()
with pytest.raises(pyfuse3.FUSEError) as exc_info:
await fs.lookup(pyfuse3.ROOT_INODE, b'nonexistent', ctx)
assert exc_info.value.errno == errno.ENOENT
@pytest.mark.trio
async def test_create_already_exists():
"""Test create returns EEXIST for duplicate."""
fs = MyFS()
ctx = MockRequestContext()
# Create first time
await fs.create(pyfuse3.ROOT_INODE, b'test', 0o644, 0, ctx)
# Create again should fail
with pytest.raises(pyfuse3.FUSEError) as exc_info:
await fs.create(pyfuse3.ROOT_INODE, b'test', 0o644, 0, ctx)
assert exc_info.value.errno == errno.EEXIST# WRONG
async def lookup(self, parent_inode, name, ctx):
if name not in self.entries:
raise KeyError("Not found") # Crashes filesystem!
# CORRECT
async def lookup(self, parent_inode, name, ctx):
if name not in self.entries:
raise pyfuse3.FUSEError(errno.ENOENT)# WRONG
async def forget(self, inode_list):
for inode, nlookup in inode_list:
if inode not in self.lookup_counts:
raise pyfuse3.FUSEError(errno.EINVAL) # NEVER raise in forget()!
# CORRECT
async def forget(self, inode_list):
for inode, nlookup in inode_list:
try:
if inode in self.lookup_counts:
self.lookup_counts[inode] -= nlookup
except Exception as e:
logger.error(f"Error in forget: {e}", exc_info=True)
# Don't raise - just log# WRONG
async def rmdir(self, parent_inode, name, ctx):
if self.inodes[inode]['entries']:
raise pyfuse3.FUSEError(errno.EINVAL) # Wrong error code!
# CORRECT
async def rmdir(self, parent_inode, name, ctx):
if self.inodes[inode]['entries']:
raise pyfuse3.FUSEError(errno.ENOTEMPTY) # Correct error code