tessl install tessl/pypi-pyfuse3@3.4.0Python 3 bindings for libfuse 3 with async I/O support
Helper functions for directory operations, filesystem synchronization, process information, directory reply handling, and async function wrapping.
List contents of a directory.
def listdir(path: str) -> List[str]:
"""
List directory contents.
Args:
path: Directory path to list
Returns:
List of entry names (strings, not bytes)
Notes:
- Similar to os.listdir()
- Returns entry names without path
- Does not include . or ..
- Raises OSError if path doesn't exist or isn't a directory
- Entry names are strings (decoded from filesystem encoding)
- Order is not specified (may vary)
Thread Safety:
- Thread-safe
Raises:
OSError(errno.ENOENT): Path doesn't exist
OSError(errno.ENOTDIR): Path is not a directory
OSError(errno.EACCES): Permission denied
"""Usage example:
import pyfuse3
import errno
# List directory contents
try:
entries = pyfuse3.listdir('/mnt/myfs')
for entry in entries:
print(entry)
except OSError as e:
if e.errno == errno.ENOENT:
print("Directory not found")
elif e.errno == errno.ENOTDIR:
print("Not a directory")
elif e.errno == errno.EACCES:
print("Permission denied")Synchronize filesystem to disk.
def syncfs(path: str) -> str:
"""
Sync filesystem to disk.
Args:
path: Path on filesystem to sync
Returns:
Path that was synced (same as input)
Notes:
- Syncs entire filesystem containing path
- Ensures all buffered modifications are written to disk
- Similar to sync(2) syscall but for specific filesystem
- May block until sync completes
- Expensive operation; use sparingly
- Linux-specific (may not work on other platforms)
Thread Safety:
- Thread-safe
- May block for significant time
Raises:
OSError: If sync fails or not supported
"""Usage example:
import pyfuse3
# Sync filesystem
try:
pyfuse3.syncfs('/mnt/myfs')
print("Filesystem synced")
except OSError as e:
print(f"Sync failed: {e}")
# Sync after critical operation
def write_critical_data(path, data):
with open(path, 'wb') as f:
f.write(data)
# Ensure data persisted
pyfuse3.syncfs(path)Get supplementary group IDs for a process.
def get_sup_groups(pid: int) -> set[int]:
"""
Return supplementary group ids of pid.
Args:
pid: Process ID
Returns:
Set of supplementary group IDs (integers)
Notes:
- Reads from /proc/[pid]/status
- Relatively expensive operation (file I/O)
- Requires /proc filesystem (Linux)
- Useful for implementing access() handler
- Returns empty set if no supplementary groups
- Raises RuntimeError if unable to parse /proc file
- Raises FileNotFoundError if process doesn't exist
- Does not include primary gid (use ctx.gid for that)
Thread Safety:
- Thread-safe
- May block briefly for file I/O
Raises:
FileNotFoundError: Process doesn't exist
RuntimeError: Unable to parse /proc file
OSError: Other errors reading /proc
"""Usage example:
import pyfuse3
import os
class PermissionCheckFS(pyfuse3.Operations):
async def access(self, inode, mode, ctx):
"""Check if user has access based on supplementary groups."""
# Get supplementary groups
try:
sup_groups = pyfuse3.get_sup_groups(ctx.pid)
except (FileNotFoundError, RuntimeError):
# Process gone or /proc unavailable
sup_groups = set()
entry = await self.getattr(inode, ctx)
# Check owner permissions
if ctx.uid == entry.st_uid:
owner_perms = (entry.st_mode >> 6) & 0o7
if (mode & owner_perms) == mode:
return True
# Check group permissions (including supplementary groups)
if ctx.gid == entry.st_gid or entry.st_gid in sup_groups:
group_perms = (entry.st_mode >> 3) & 0o7
if (mode & group_perms) == mode:
return True
# Check other permissions
other_perms = entry.st_mode & 0o7
return (mode & other_perms) == modeWrap Cython-defined async functions to ensure they are pure-Python coroutines.
def async_wrapper(fn: Callable[..., Any]) -> Callable[..., Any]:
"""
Wrap an async function to make it a pure-Python coroutine.
Args:
fn: Async function to wrap
Returns:
Wrapped async function
Notes:
- Required for top-level trio coroutines passed to trio.run()
- Ensures Cython-defined async functions work correctly with Trio
- Automatically preserves function name and metadata
- Only needed when passing Cython async functions to trio.run()
- Rarely needed in practice - pyfuse3's main() already works with trio.run()
- Most users will never need to use this function
- Internal utility for pyfuse3 development
Thread Safety:
- Thread-safe
- Wrapping is idempotent
"""Usage example:
import pyfuse3
import trio
# Rarely needed - only if you have custom Cython async functions
# pyfuse3's built-in async functions (like main()) already work correctly
@pyfuse3.async_wrapper
async def my_cython_main():
"""Custom Cython-compiled async function."""
fs = MyFS()
pyfuse3.init(fs, '/mnt/myfs')
await pyfuse3.main()
# Can now be used with trio.run()
trio.run(my_cython_main)Note: Most users will not need this function. It's primarily for edge cases where custom Cython-compiled async functions need to be passed to trio.run(). The standard pyfuse3 workflow with pyfuse3.main() works without any wrapping.
Reply with a directory entry during readdir operation.
def readdir_reply(
token: ReaddirToken,
name: FileNameT,
attr: EntryAttributes,
next_id: int
) -> bool:
"""
Report a directory entry in response to readdir request.
Args:
token: Token received by Operations.readdir()
name: Entry name (bytes)
attr: EntryAttributes for the entry
next_id: ID for next position in directory listing
Returns:
True if entry added (continue listing), False if buffer full (stop)
Notes:
- Called by Operations.readdir() for each entry
- Must be called once per directory entry
- If returns True, increase lookup count and continue
- If returns False, stop without increasing lookup count for this entry
- next_id should be robust across file additions/removals
- next_id passed back to future readdir calls as start_id
- Do not increase lookup count for . and .. entries (kernel handles it)
- name must be bytes (not str)
- attr must be valid EntryAttributes
- next_id must be > start_id (monotonically increasing)
Thread Safety:
- Must be called from same async context as readdir handler
- Not thread-safe
Raises:
No exceptions raised; returns False on error
"""Usage example:
import pyfuse3
class MyFS(pyfuse3.Operations):
async def readdir(self, fh, start_id, token):
"""List directory entries."""
entries = self.get_entries(fh)
for i, (name, inode) in enumerate(entries):
# Skip entries before start_id
if i < start_id:
continue
# Get entry attributes
attr = await self.getattr(inode, None)
# Reply with entry
more = pyfuse3.readdir_reply(
token=token,
name=name, # bytes
attr=attr,
next_id=i + 1 # Next position
)
if not more:
# Buffer full, stop listing
# Kernel will call readdir again with start_id = i + 1
breakimport pyfuse3
class PaginatedFS(pyfuse3.Operations):
def __init__(self):
super().__init__()
self.dirs = {
pyfuse3.ROOT_INODE: [
(b'file1.txt', 2),
(b'file2.txt', 3),
(b'file3.txt', 4),
(b'dir1', 5),
]
}
async def readdir(self, fh, start_id, token):
"""List directory with pagination support."""
entries = self.dirs.get(fh, [])
for i, (name, inode) in enumerate(entries):
# Resume from start_id
if i < start_id:
continue
# Get attributes
attr = await self.getattr(inode, None)
# Reply with entry
if not pyfuse3.readdir_reply(token, name, attr, i + 1):
# Buffer full, kernel will call again with start_id = i + 1
returnimport pyfuse3
import stat
import os
class AccessFS(pyfuse3.Operations):
async def access(self, inode, mode, ctx):
"""Check access with supplementary group support."""
entry = await self.getattr(inode, ctx)
# Root always has access
if ctx.uid == 0:
return True
# Owner has access based on owner permissions
if ctx.uid == entry.st_uid:
owner_perms = (entry.st_mode >> 6) & 0o7
return (mode & owner_perms) == mode
# Check supplementary groups
try:
sup_groups = pyfuse3.get_sup_groups(ctx.pid)
except (FileNotFoundError, RuntimeError):
sup_groups = set()
# Group access if gid matches or in supplementary groups
if ctx.gid == entry.st_gid or entry.st_gid in sup_groups:
group_perms = (entry.st_mode >> 3) & 0o7
return (mode & group_perms) == mode
# Others access
other_perms = entry.st_mode & 0o7
return (mode & other_perms) == modeimport pyfuse3
class DurableFS(pyfuse3.Operations):
def __init__(self):
super().__init__()
self.dirty_inodes = set()
self.backing_path = '/var/lib/myfs'
async def write(self, fh, offset, buf):
"""Write data and track dirty inodes."""
# Write data
inode = self.fh_to_inode[fh]
self.write_data(inode, offset, buf)
# Mark as dirty
self.dirty_inodes.add(inode)
return len(buf)
async def fsync(self, fh, datasync):
"""Sync file to disk."""
# Sync file to backing store
inode = self.fh_to_inode[fh]
self.flush_inode(inode)
# Sync filesystem
if not datasync:
pyfuse3.syncfs(self.backing_path)
self.dirty_inodes.discard(inode)
async def release(self, fh):
"""Ensure data written on close."""
inode = self.fh_to_inode[fh]
if inode in self.dirty_inodes:
# Flush dirty data
self.flush_inode(inode)
pyfuse3.syncfs(self.backing_path)
self.dirty_inodes.discard(inode)
del self.fh_to_inode[fh]import pyfuse3
import stat
class DotFS(pyfuse3.Operations):
async def readdir(self, fh, start_id, token):
"""List directory including . and .. entries."""
inode = fh # Using inode as fh for simplicity
parent_inode = self.get_parent(inode)
# Entries: [(name, inode)]
entries = []
# Add . and .. if starting from beginning
if start_id == 0:
entries.append((b'.', inode))
entries.append((b'..', parent_inode))
# Add regular entries
entries.extend(self.get_children(inode))
for i, (name, entry_inode) in enumerate(entries):
if i < start_id:
continue
attr = await self.getattr(entry_inode, None)
if not pyfuse3.readdir_reply(token, name, attr, i + 1):
return
# IMPORTANT: Don't manually increase lookup count for . and ..
# pyfuse3 handles this automaticallyUse robust IDs that handle file creation/deletion:
import pyfuse3
class RobustDirFS(pyfuse3.Operations):
def __init__(self):
super().__init__()
# Store entries with unique IDs
# ID -> (name, inode)
self.dirs = {
pyfuse3.ROOT_INODE: {
100: (b'file1.txt', 2),
101: (b'file2.txt', 3),
102: (b'file3.txt', 4),
}
}
self.next_entry_id = 103
async def readdir(self, fh, start_id, token):
"""List directory with robust entry IDs."""
entries = self.dirs.get(fh, {})
# Sort by ID for consistent ordering
sorted_ids = sorted(entries.keys())
for entry_id in sorted_ids:
# Skip until we reach start_id
if entry_id < start_id:
continue
name, inode = entries[entry_id]
attr = await self.getattr(inode, None)
# Use entry_id + 1 as next_id
# This ensures start_id is always valid even if entries deleted
if not pyfuse3.readdir_reply(token, name, attr, entry_id + 1):
return
async def create(self, parent_inode, name, mode, flags, ctx):
"""Create file with unique entry ID."""
# ... create inode ...
# Add to directory with unique ID
entry_id = self.next_entry_id
self.next_entry_id += 1
self.dirs[parent_inode][entry_id] = (name, new_inode)
# ... return FileInfo and EntryAttributes ...
async def unlink(self, parent_inode, name, ctx):
"""Remove entry but ID remains used."""
entries = self.dirs[parent_inode]
# Find and remove entry
entry_id = self.find_entry_id(entries, name)
if entry_id:
del entries[entry_id]
# Gap in IDs is OK; next readdir with old start_id will skipimport pyfuse3
import errno
def safe_listdir(path):
"""Safely list directory with error handling."""
try:
return pyfuse3.listdir(path)
except OSError as e:
if e.errno == errno.ENOENT:
print(f"Directory {path} not found")
elif e.errno == errno.ENOTDIR:
print(f"{path} is not a directory")
elif e.errno == errno.EACCES:
print(f"Permission denied for {path}")
else:
print(f"Error listing {path}: {e}")
return []import pyfuse3
def safe_syncfs(path):
"""Safely sync filesystem with error handling."""
try:
pyfuse3.syncfs(path)
return True
except OSError as e:
print(f"Sync failed for {path}: {e}")
return Falseimport pyfuse3
def safe_get_sup_groups(pid):
"""Safely get supplementary groups with error handling."""
try:
return pyfuse3.get_sup_groups(pid)
except FileNotFoundError:
# Process doesn't exist (may have exited)
return set()
except RuntimeError as e:
# Unable to parse /proc
print(f"Unable to get groups for pid {pid}: {e}")
return set()
except OSError as e:
# Other error
print(f"Error reading /proc for pid {pid}: {e}")
return set()readdir_reply returns a boolean, not an exception:
async def readdir(self, fh, start_id, token):
"""List directory with proper error handling."""
try:
entries = self.get_entries(fh)
except KeyError:
# Invalid fh
raise pyfuse3.FUSEError(errno.EBADF)
for i, (name, inode) in enumerate(entries):
if i < start_id:
continue
try:
attr = await self.getattr(inode, None)
except pyfuse3.FUSEError:
# Skip entries that can't be read
continue
# Check return value
if not pyfuse3.readdir_reply(token, name, attr, i + 1):
# Buffer full - this is normal, not an error
# Kernel will call readdir again with new start_id
returnimport pyfuse3
from functools import lru_cache
import time
class CachedSupGroups:
"""Cache supplementary groups with timeout."""
def __init__(self, timeout=60):
self.cache = {}
self.timeout = timeout
def get(self, pid):
"""Get supplementary groups with caching."""
now = time.time()
# Check cache
if pid in self.cache:
groups, timestamp = self.cache[pid]
if now - timestamp < self.timeout:
return groups
# Fetch and cache
try:
groups = pyfuse3.get_sup_groups(pid)
self.cache[pid] = (groups, now)
return groups
except (FileNotFoundError, RuntimeError):
return set()
def invalidate(self, pid):
"""Invalidate cache for pid."""
if pid in self.cache:
del self.cache[pid]
# Usage
sup_groups_cache = CachedSupGroups(timeout=60)
async def access(self, inode, mode, ctx):
"""Access check with cached supplementary groups."""
sup_groups = sup_groups_cache.get(ctx.pid)
# ... rest of access check ...class OptimizedFS(pyfuse3.Operations):
def __init__(self):
super().__init__()
self.attr_cache = {}
async def readdir(self, fh, start_id, token):
"""Optimized directory listing."""
entries = self.get_entries_sorted(fh)
for i, (name, inode) in enumerate(entries):
if i < start_id:
continue
# Use cached attributes if available
if inode in self.attr_cache:
attr = self.attr_cache[inode]
else:
attr = await self.getattr(inode, None)
self.attr_cache[inode] = attr
if not pyfuse3.readdir_reply(token, name, attr, i + 1):
returnclass BatchingFS(pyfuse3.Operations):
def __init__(self):
super().__init__()
self.dirty_count = 0
self.sync_threshold = 100
self.backing_path = '/var/lib/myfs'
async def write(self, fh, offset, buf):
"""Write with batched sync."""
# Write data
inode = self.fh_to_inode[fh]
self.write_data(inode, offset, buf)
# Track dirty writes
self.dirty_count += 1
# Sync if threshold reached
if self.dirty_count >= self.sync_threshold:
pyfuse3.syncfs(self.backing_path)
self.dirty_count = 0
return len(buf)import pyfuse3
import logging
logger = logging.getLogger(__name__)
def logged_get_sup_groups(pid):
"""Get supplementary groups with logging."""
logger.debug(f"Getting supplementary groups for pid {pid}")
try:
groups = pyfuse3.get_sup_groups(pid)
logger.debug(f"Supplementary groups for pid {pid}: {groups}")
return groups
except Exception as e:
logger.error(f"Failed to get supplementary groups for pid {pid}: {e}")
return set()
async def logged_readdir(self, fh, start_id, token):
"""Readdir with logging."""
logger.debug(f"readdir: fh={fh}, start_id={start_id}")
entries = self.get_entries(fh)
count = 0
for i, (name, inode) in enumerate(entries):
if i < start_id:
continue
attr = await self.getattr(inode, None)
if not pyfuse3.readdir_reply(token, name, attr, i + 1):
logger.debug(f"readdir: buffer full after {count} entries")
return
count += 1
logger.debug(f"readdir: returned {count} entries")class MonitoredFS(pyfuse3.Operations):
def __init__(self):
super().__init__()
self.readdir_calls = 0
self.readdir_entries = 0
self.sup_groups_calls = 0
async def readdir(self, fh, start_id, token):
"""Monitored readdir."""
self.readdir_calls += 1
entries = self.get_entries(fh)
for i, (name, inode) in enumerate(entries):
if i < start_id:
continue
attr = await self.getattr(inode, None)
if not pyfuse3.readdir_reply(token, name, attr, i + 1):
return
self.readdir_entries += 1
async def access(self, inode, mode, ctx):
"""Monitored access."""
self.sup_groups_calls += 1
sup_groups = pyfuse3.get_sup_groups(ctx.pid)
# ... rest of access check ...
def get_stats(self):
"""Get statistics."""
return {
'readdir_calls': self.readdir_calls,
'readdir_entries': self.readdir_entries,
'sup_groups_calls': self.sup_groups_calls,
'avg_entries_per_call': (
self.readdir_entries / self.readdir_calls
if self.readdir_calls > 0 else 0
)
}