or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/pyfuse3@3.4.x

docs

index.md
tile.json

tessl/pypi-pyfuse3

tessl install tessl/pypi-pyfuse3@3.4.0

Python 3 bindings for libfuse 3 with async I/O support

utilities.mddocs/reference/

Utility Functions

Helper functions for directory operations, filesystem synchronization, process information, directory reply handling, and async function wrapping.

Capabilities

List Directory

List contents of a directory.

def listdir(path: str) -> List[str]:
    """
    List directory contents.

    Args:
        path: Directory path to list

    Returns:
        List of entry names (strings, not bytes)

    Notes:
        - Similar to os.listdir()
        - Returns entry names without path
        - Does not include . or ..
        - Raises OSError if path doesn't exist or isn't a directory
        - Entry names are strings (decoded from filesystem encoding)
        - Order is not specified (may vary)

    Thread Safety:
        - Thread-safe

    Raises:
        OSError(errno.ENOENT): Path doesn't exist
        OSError(errno.ENOTDIR): Path is not a directory
        OSError(errno.EACCES): Permission denied
    """

Usage example:

import pyfuse3
import errno

# List directory contents
try:
    entries = pyfuse3.listdir('/mnt/myfs')
    for entry in entries:
        print(entry)
except OSError as e:
    if e.errno == errno.ENOENT:
        print("Directory not found")
    elif e.errno == errno.ENOTDIR:
        print("Not a directory")
    elif e.errno == errno.EACCES:
        print("Permission denied")

Sync Filesystem

Synchronize filesystem to disk.

def syncfs(path: str) -> str:
    """
    Sync filesystem to disk.

    Args:
        path: Path on filesystem to sync

    Returns:
        Path that was synced (same as input)

    Notes:
        - Syncs entire filesystem containing path
        - Ensures all buffered modifications are written to disk
        - Similar to sync(2) syscall but for specific filesystem
        - May block until sync completes
        - Expensive operation; use sparingly
        - Linux-specific (may not work on other platforms)

    Thread Safety:
        - Thread-safe
        - May block for significant time

    Raises:
        OSError: If sync fails or not supported
    """

Usage example:

import pyfuse3

# Sync filesystem
try:
    pyfuse3.syncfs('/mnt/myfs')
    print("Filesystem synced")
except OSError as e:
    print(f"Sync failed: {e}")

# Sync after critical operation
def write_critical_data(path, data):
    with open(path, 'wb') as f:
        f.write(data)
    # Ensure data persisted
    pyfuse3.syncfs(path)

Get Supplementary Groups

Get supplementary group IDs for a process.

def get_sup_groups(pid: int) -> set[int]:
    """
    Return supplementary group ids of pid.

    Args:
        pid: Process ID

    Returns:
        Set of supplementary group IDs (integers)

    Notes:
        - Reads from /proc/[pid]/status
        - Relatively expensive operation (file I/O)
        - Requires /proc filesystem (Linux)
        - Useful for implementing access() handler
        - Returns empty set if no supplementary groups
        - Raises RuntimeError if unable to parse /proc file
        - Raises FileNotFoundError if process doesn't exist
        - Does not include primary gid (use ctx.gid for that)

    Thread Safety:
        - Thread-safe
        - May block briefly for file I/O

    Raises:
        FileNotFoundError: Process doesn't exist
        RuntimeError: Unable to parse /proc file
        OSError: Other errors reading /proc
    """

Usage example:

import pyfuse3
import os

class PermissionCheckFS(pyfuse3.Operations):
    async def access(self, inode, mode, ctx):
        """Check if user has access based on supplementary groups."""
        # Get supplementary groups
        try:
            sup_groups = pyfuse3.get_sup_groups(ctx.pid)
        except (FileNotFoundError, RuntimeError):
            # Process gone or /proc unavailable
            sup_groups = set()

        entry = await self.getattr(inode, ctx)

        # Check owner permissions
        if ctx.uid == entry.st_uid:
            owner_perms = (entry.st_mode >> 6) & 0o7
            if (mode & owner_perms) == mode:
                return True

        # Check group permissions (including supplementary groups)
        if ctx.gid == entry.st_gid or entry.st_gid in sup_groups:
            group_perms = (entry.st_mode >> 3) & 0o7
            if (mode & group_perms) == mode:
                return True

        # Check other permissions
        other_perms = entry.st_mode & 0o7
        return (mode & other_perms) == mode

Async Function Wrapper

Wrap Cython-defined async functions to ensure they are pure-Python coroutines.

def async_wrapper(fn: Callable[..., Any]) -> Callable[..., Any]:
    """
    Wrap an async function to make it a pure-Python coroutine.

    Args:
        fn: Async function to wrap

    Returns:
        Wrapped async function

    Notes:
        - Required for top-level trio coroutines passed to trio.run()
        - Ensures Cython-defined async functions work correctly with Trio
        - Automatically preserves function name and metadata
        - Only needed when passing Cython async functions to trio.run()
        - Rarely needed in practice - pyfuse3's main() already works with trio.run()
        - Most users will never need to use this function
        - Internal utility for pyfuse3 development

    Thread Safety:
        - Thread-safe
        - Wrapping is idempotent
    """

Usage example:

import pyfuse3
import trio

# Rarely needed - only if you have custom Cython async functions
# pyfuse3's built-in async functions (like main()) already work correctly
@pyfuse3.async_wrapper
async def my_cython_main():
    """Custom Cython-compiled async function."""
    fs = MyFS()
    pyfuse3.init(fs, '/mnt/myfs')
    await pyfuse3.main()

# Can now be used with trio.run()
trio.run(my_cython_main)

Note: Most users will not need this function. It's primarily for edge cases where custom Cython-compiled async functions need to be passed to trio.run(). The standard pyfuse3 workflow with pyfuse3.main() works without any wrapping.

Reply to Readdir Request

Reply with a directory entry during readdir operation.

def readdir_reply(
    token: ReaddirToken,
    name: FileNameT,
    attr: EntryAttributes,
    next_id: int
) -> bool:
    """
    Report a directory entry in response to readdir request.

    Args:
        token: Token received by Operations.readdir()
        name: Entry name (bytes)
        attr: EntryAttributes for the entry
        next_id: ID for next position in directory listing

    Returns:
        True if entry added (continue listing), False if buffer full (stop)

    Notes:
        - Called by Operations.readdir() for each entry
        - Must be called once per directory entry
        - If returns True, increase lookup count and continue
        - If returns False, stop without increasing lookup count for this entry
        - next_id should be robust across file additions/removals
        - next_id passed back to future readdir calls as start_id
        - Do not increase lookup count for . and .. entries (kernel handles it)
        - name must be bytes (not str)
        - attr must be valid EntryAttributes
        - next_id must be > start_id (monotonically increasing)

    Thread Safety:
        - Must be called from same async context as readdir handler
        - Not thread-safe

    Raises:
        No exceptions raised; returns False on error
    """

Usage example:

import pyfuse3

class MyFS(pyfuse3.Operations):
    async def readdir(self, fh, start_id, token):
        """List directory entries."""
        entries = self.get_entries(fh)

        for i, (name, inode) in enumerate(entries):
            # Skip entries before start_id
            if i < start_id:
                continue

            # Get entry attributes
            attr = await self.getattr(inode, None)

            # Reply with entry
            more = pyfuse3.readdir_reply(
                token=token,
                name=name,  # bytes
                attr=attr,
                next_id=i + 1  # Next position
            )

            if not more:
                # Buffer full, stop listing
                # Kernel will call readdir again with start_id = i + 1
                break

Detailed Usage Examples

Directory Listing with Pagination

import pyfuse3

class PaginatedFS(pyfuse3.Operations):
    def __init__(self):
        super().__init__()
        self.dirs = {
            pyfuse3.ROOT_INODE: [
                (b'file1.txt', 2),
                (b'file2.txt', 3),
                (b'file3.txt', 4),
                (b'dir1', 5),
            ]
        }

    async def readdir(self, fh, start_id, token):
        """List directory with pagination support."""
        entries = self.dirs.get(fh, [])

        for i, (name, inode) in enumerate(entries):
            # Resume from start_id
            if i < start_id:
                continue

            # Get attributes
            attr = await self.getattr(inode, None)

            # Reply with entry
            if not pyfuse3.readdir_reply(token, name, attr, i + 1):
                # Buffer full, kernel will call again with start_id = i + 1
                return

Using Supplementary Groups for Access Control

import pyfuse3
import stat
import os

class AccessFS(pyfuse3.Operations):
    async def access(self, inode, mode, ctx):
        """Check access with supplementary group support."""
        entry = await self.getattr(inode, ctx)

        # Root always has access
        if ctx.uid == 0:
            return True

        # Owner has access based on owner permissions
        if ctx.uid == entry.st_uid:
            owner_perms = (entry.st_mode >> 6) & 0o7
            return (mode & owner_perms) == mode

        # Check supplementary groups
        try:
            sup_groups = pyfuse3.get_sup_groups(ctx.pid)
        except (FileNotFoundError, RuntimeError):
            sup_groups = set()

        # Group access if gid matches or in supplementary groups
        if ctx.gid == entry.st_gid or entry.st_gid in sup_groups:
            group_perms = (entry.st_mode >> 3) & 0o7
            return (mode & group_perms) == mode

        # Others access
        other_perms = entry.st_mode & 0o7
        return (mode & other_perms) == mode

Ensuring Data Durability

import pyfuse3

class DurableFS(pyfuse3.Operations):
    def __init__(self):
        super().__init__()
        self.dirty_inodes = set()
        self.backing_path = '/var/lib/myfs'

    async def write(self, fh, offset, buf):
        """Write data and track dirty inodes."""
        # Write data
        inode = self.fh_to_inode[fh]
        self.write_data(inode, offset, buf)

        # Mark as dirty
        self.dirty_inodes.add(inode)

        return len(buf)

    async def fsync(self, fh, datasync):
        """Sync file to disk."""
        # Sync file to backing store
        inode = self.fh_to_inode[fh]
        self.flush_inode(inode)

        # Sync filesystem
        if not datasync:
            pyfuse3.syncfs(self.backing_path)

        self.dirty_inodes.discard(inode)

    async def release(self, fh):
        """Ensure data written on close."""
        inode = self.fh_to_inode[fh]
        if inode in self.dirty_inodes:
            # Flush dirty data
            self.flush_inode(inode)
            pyfuse3.syncfs(self.backing_path)
            self.dirty_inodes.discard(inode)

        del self.fh_to_inode[fh]

Handling . and .. Entries

import pyfuse3
import stat

class DotFS(pyfuse3.Operations):
    async def readdir(self, fh, start_id, token):
        """List directory including . and .. entries."""
        inode = fh  # Using inode as fh for simplicity
        parent_inode = self.get_parent(inode)

        # Entries: [(name, inode)]
        entries = []

        # Add . and .. if starting from beginning
        if start_id == 0:
            entries.append((b'.', inode))
            entries.append((b'..', parent_inode))

        # Add regular entries
        entries.extend(self.get_children(inode))

        for i, (name, entry_inode) in enumerate(entries):
            if i < start_id:
                continue

            attr = await self.getattr(entry_inode, None)

            if not pyfuse3.readdir_reply(token, name, attr, i + 1):
                return

            # IMPORTANT: Don't manually increase lookup count for . and ..
            # pyfuse3 handles this automatically

Robust Directory Listing IDs

Use robust IDs that handle file creation/deletion:

import pyfuse3

class RobustDirFS(pyfuse3.Operations):
    def __init__(self):
        super().__init__()
        # Store entries with unique IDs
        # ID -> (name, inode)
        self.dirs = {
            pyfuse3.ROOT_INODE: {
                100: (b'file1.txt', 2),
                101: (b'file2.txt', 3),
                102: (b'file3.txt', 4),
            }
        }
        self.next_entry_id = 103

    async def readdir(self, fh, start_id, token):
        """List directory with robust entry IDs."""
        entries = self.dirs.get(fh, {})

        # Sort by ID for consistent ordering
        sorted_ids = sorted(entries.keys())

        for entry_id in sorted_ids:
            # Skip until we reach start_id
            if entry_id < start_id:
                continue

            name, inode = entries[entry_id]
            attr = await self.getattr(inode, None)

            # Use entry_id + 1 as next_id
            # This ensures start_id is always valid even if entries deleted
            if not pyfuse3.readdir_reply(token, name, attr, entry_id + 1):
                return

    async def create(self, parent_inode, name, mode, flags, ctx):
        """Create file with unique entry ID."""
        # ... create inode ...

        # Add to directory with unique ID
        entry_id = self.next_entry_id
        self.next_entry_id += 1
        self.dirs[parent_inode][entry_id] = (name, new_inode)

        # ... return FileInfo and EntryAttributes ...

    async def unlink(self, parent_inode, name, ctx):
        """Remove entry but ID remains used."""
        entries = self.dirs[parent_inode]
        
        # Find and remove entry
        entry_id = self.find_entry_id(entries, name)
        if entry_id:
            del entries[entry_id]
            # Gap in IDs is OK; next readdir with old start_id will skip

Error Handling

listdir

import pyfuse3
import errno

def safe_listdir(path):
    """Safely list directory with error handling."""
    try:
        return pyfuse3.listdir(path)
    except OSError as e:
        if e.errno == errno.ENOENT:
            print(f"Directory {path} not found")
        elif e.errno == errno.ENOTDIR:
            print(f"{path} is not a directory")
        elif e.errno == errno.EACCES:
            print(f"Permission denied for {path}")
        else:
            print(f"Error listing {path}: {e}")
        return []

syncfs

import pyfuse3

def safe_syncfs(path):
    """Safely sync filesystem with error handling."""
    try:
        pyfuse3.syncfs(path)
        return True
    except OSError as e:
        print(f"Sync failed for {path}: {e}")
        return False

get_sup_groups

import pyfuse3

def safe_get_sup_groups(pid):
    """Safely get supplementary groups with error handling."""
    try:
        return pyfuse3.get_sup_groups(pid)
    except FileNotFoundError:
        # Process doesn't exist (may have exited)
        return set()
    except RuntimeError as e:
        # Unable to parse /proc
        print(f"Unable to get groups for pid {pid}: {e}")
        return set()
    except OSError as e:
        # Other error
        print(f"Error reading /proc for pid {pid}: {e}")
        return set()

readdir_reply

readdir_reply returns a boolean, not an exception:

async def readdir(self, fh, start_id, token):
    """List directory with proper error handling."""
    try:
        entries = self.get_entries(fh)
    except KeyError:
        # Invalid fh
        raise pyfuse3.FUSEError(errno.EBADF)

    for i, (name, inode) in enumerate(entries):
        if i < start_id:
            continue

        try:
            attr = await self.getattr(inode, None)
        except pyfuse3.FUSEError:
            # Skip entries that can't be read
            continue

        # Check return value
        if not pyfuse3.readdir_reply(token, name, attr, i + 1):
            # Buffer full - this is normal, not an error
            # Kernel will call readdir again with new start_id
            return

Performance Considerations

get_sup_groups

  • Expensive operation: Reads from /proc filesystem
  • Cache results: Cache supplementary groups per pid
  • Use sparingly: Only call when needed for permission checks
  • Consider staleness: Cached groups may become stale if user changes groups
import pyfuse3
from functools import lru_cache
import time

class CachedSupGroups:
    """Cache supplementary groups with timeout."""
    def __init__(self, timeout=60):
        self.cache = {}
        self.timeout = timeout

    def get(self, pid):
        """Get supplementary groups with caching."""
        now = time.time()
        
        # Check cache
        if pid in self.cache:
            groups, timestamp = self.cache[pid]
            if now - timestamp < self.timeout:
                return groups
        
        # Fetch and cache
        try:
            groups = pyfuse3.get_sup_groups(pid)
            self.cache[pid] = (groups, now)
            return groups
        except (FileNotFoundError, RuntimeError):
            return set()

    def invalidate(self, pid):
        """Invalidate cache for pid."""
        if pid in self.cache:
            del self.cache[pid]

# Usage
sup_groups_cache = CachedSupGroups(timeout=60)

async def access(self, inode, mode, ctx):
    """Access check with cached supplementary groups."""
    sup_groups = sup_groups_cache.get(ctx.pid)
    # ... rest of access check ...

readdir_reply

  • Efficient buffering: Kernel provides buffer size
  • Stop when full: Return when readdir_reply returns False
  • Resumable: Use next_id to resume listing
  • Don't prefetch everything: Let kernel control pacing
  • Optimize getattr: Cache attributes if possible
class OptimizedFS(pyfuse3.Operations):
    def __init__(self):
        super().__init__()
        self.attr_cache = {}

    async def readdir(self, fh, start_id, token):
        """Optimized directory listing."""
        entries = self.get_entries_sorted(fh)

        for i, (name, inode) in enumerate(entries):
            if i < start_id:
                continue

            # Use cached attributes if available
            if inode in self.attr_cache:
                attr = self.attr_cache[inode]
            else:
                attr = await self.getattr(inode, None)
                self.attr_cache[inode] = attr

            if not pyfuse3.readdir_reply(token, name, attr, i + 1):
                return

syncfs

  • Blocking operation: May take significant time
  • Use selectively: Don't sync after every write
  • Consider async alternatives: Use fsync for single files
  • Batch operations: Group writes and sync once
class BatchingFS(pyfuse3.Operations):
    def __init__(self):
        super().__init__()
        self.dirty_count = 0
        self.sync_threshold = 100
        self.backing_path = '/var/lib/myfs'

    async def write(self, fh, offset, buf):
        """Write with batched sync."""
        # Write data
        inode = self.fh_to_inode[fh]
        self.write_data(inode, offset, buf)
        
        # Track dirty writes
        self.dirty_count += 1

        # Sync if threshold reached
        if self.dirty_count >= self.sync_threshold:
            pyfuse3.syncfs(self.backing_path)
            self.dirty_count = 0

        return len(buf)

listdir

  • Simple operation: Generally fast
  • May block: For network/slow filesystems
  • Cache if needed: Cache directory listings if appropriate

Platform Differences

Linux

  • All functions fully supported
  • get_sup_groups requires /proc
  • syncfs Linux-specific

macOS

  • listdir supported
  • syncfs may not work (no syncfs syscall)
  • get_sup_groups may not work (no /proc)

BSD

  • Varies by BSD variant
  • Check feature availability
  • May require platform-specific implementations

Best Practices

  1. Cache supplementary groups: Use caching to avoid repeated /proc reads
  2. Handle readdir_reply False: Always check return value and handle buffer full
  3. Use robust entry IDs: Use IDs that remain valid across modifications
  4. Sync selectively: Use syncfs sparingly, prefer fsync for single files
  5. Handle . and ..: Include . and .. in readdir if appropriate
  6. Error handling: Always wrap utility calls in try/except
  7. Test edge cases: Test with empty directories, large directories, etc.
  8. Monitor performance: Profile utility function usage
  9. Document behavior: Document readdir ID scheme and assumptions
  10. Platform awareness: Test on all target platforms

Debugging

Log Utility Calls

import pyfuse3
import logging

logger = logging.getLogger(__name__)

def logged_get_sup_groups(pid):
    """Get supplementary groups with logging."""
    logger.debug(f"Getting supplementary groups for pid {pid}")
    try:
        groups = pyfuse3.get_sup_groups(pid)
        logger.debug(f"Supplementary groups for pid {pid}: {groups}")
        return groups
    except Exception as e:
        logger.error(f"Failed to get supplementary groups for pid {pid}: {e}")
        return set()

async def logged_readdir(self, fh, start_id, token):
    """Readdir with logging."""
    logger.debug(f"readdir: fh={fh}, start_id={start_id}")
    
    entries = self.get_entries(fh)
    count = 0

    for i, (name, inode) in enumerate(entries):
        if i < start_id:
            continue

        attr = await self.getattr(inode, None)

        if not pyfuse3.readdir_reply(token, name, attr, i + 1):
            logger.debug(f"readdir: buffer full after {count} entries")
            return

        count += 1

    logger.debug(f"readdir: returned {count} entries")

Monitor Statistics

class MonitoredFS(pyfuse3.Operations):
    def __init__(self):
        super().__init__()
        self.readdir_calls = 0
        self.readdir_entries = 0
        self.sup_groups_calls = 0

    async def readdir(self, fh, start_id, token):
        """Monitored readdir."""
        self.readdir_calls += 1
        
        entries = self.get_entries(fh)

        for i, (name, inode) in enumerate(entries):
            if i < start_id:
                continue

            attr = await self.getattr(inode, None)

            if not pyfuse3.readdir_reply(token, name, attr, i + 1):
                return

            self.readdir_entries += 1

    async def access(self, inode, mode, ctx):
        """Monitored access."""
        self.sup_groups_calls += 1
        sup_groups = pyfuse3.get_sup_groups(ctx.pid)
        # ... rest of access check ...

    def get_stats(self):
        """Get statistics."""
        return {
            'readdir_calls': self.readdir_calls,
            'readdir_entries': self.readdir_entries,
            'sup_groups_calls': self.sup_groups_calls,
            'avg_entries_per_call': (
                self.readdir_entries / self.readdir_calls
                if self.readdir_calls > 0 else 0
            )
        }