CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-fuse-python

Python bindings for FUSE (Filesystem in USErspace) enabling custom userspace filesystems

Pending
Overview
Eval results
Files

extended-features.mddocs/

Extended Features

Advanced FUSE features including extended attributes, file locking, access control, filesystem statistics, and specialized operations for enhanced filesystem functionality.

Capabilities

Extended Attributes

Support for extended file attributes (xattrs) that provide additional metadata beyond standard POSIX attributes.

def getxattr(self, path, name, size):
    """
    Get extended attribute value.
    
    Args:
        path (str): Path to file or directory
        name (str): Attribute name (e.g., 'user.comment')
        size (int): Maximum size to return (0 = return size needed)
        
    Returns:
        bytes: Attribute value
        int: Size needed (if size=0) or negative errno on error
    """

def setxattr(self, path, name, val, flags):
    """
    Set extended attribute value.
    
    Args:
        path (str): Path to file or directory
        name (str): Attribute name
        val (bytes): Attribute value
        flags (int): XATTR_CREATE, XATTR_REPLACE, or 0
        
    Returns:
        int: 0 on success, negative errno on error
    """

def listxattr(self, path, size):
    """
    List extended attribute names.
    
    Args:
        path (str): Path to file or directory
        size (int): Maximum size to return (0 = return size needed)
        
    Returns:
        bytes: Null-separated attribute names
        int: Size needed (if size=0) or negative errno on error
    """

def removexattr(self, path, name):
    """
    Remove extended attribute.
    
    Args:
        path (str): Path to file or directory
        name (str): Attribute name to remove
        
    Returns:
        int: 0 on success, negative errno on error
    """

Usage Example:

class MyFS(Fuse):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.xattrs = {}  # {path: {name: value}}
    
    def getxattr(self, path, name, size):
        if path not in self.xattrs or name not in self.xattrs[path]:
            return -errno.ENODATA
        
        value = self.xattrs[path][name]
        if size == 0:
            return len(value)  # Return size needed
        
        return value[:size]
    
    def setxattr(self, path, name, val, flags):
        if path not in self.files and path not in self.dirs:
            return -errno.ENOENT
        
        if path not in self.xattrs:
            self.xattrs[path] = {}
        
        exists = name in self.xattrs[path]
        
        if flags == XATTR_CREATE and exists:
            return -errno.EEXIST
        if flags == XATTR_REPLACE and not exists:
            return -errno.ENODATA
        
        self.xattrs[path][name] = val
        return 0
    
    def listxattr(self, path, size):
        if path not in self.xattrs:
            return b''
        
        names = b'\0'.join(name.encode() for name in self.xattrs[path].keys())
        if names:
            names += b'\0'  # Null-terminate the list
        
        if size == 0:
            return len(names)
        
        return names[:size]

File Locking

Advisory file locking operations for coordinating access between processes.

def lock(self, path, fip, cmd, lock):
    """
    File locking operations.
    
    Args:
        path (str): Path to file
        fip: File info pointer (usually None)
        cmd (int): Lock command (F_GETLK, F_SETLK, F_SETLKW)
        lock (Flock): Lock specification
        
    Returns:
        int: 0 on success, negative errno on error
        Flock: Lock information (for F_GETLK)
    """

Usage Example:

import fcntl

class MyFS(Fuse):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.locks = {}  # {path: [lock_info, ...]}
    
    def lock(self, path, fip, cmd, lock):
        if path not in self.files:
            return -errno.ENOENT
        
        if cmd == fcntl.F_GETLK:
            # Check for conflicting locks
            for existing_lock in self.locks.get(path, []):
                if self.locks_conflict(lock, existing_lock):
                    return existing_lock
            
            # No conflict - set type to F_UNLCK
            result = fuse.Flock()
            result.l_type = fcntl.F_UNLCK
            return result
        
        elif cmd == fcntl.F_SETLK:
            # Non-blocking lock
            if self.has_conflicting_lock(path, lock):
                return -errno.EAGAIN
            return self.set_lock(path, lock)
        
        elif cmd == fcntl.F_SETLKW:
            # Blocking lock (would block in real implementation)
            return self.set_lock(path, lock)
    
    def locks_conflict(self, lock1, lock2):
        # Check if two locks conflict
        if lock1.l_type == fcntl.F_UNLCK or lock2.l_type == fcntl.F_UNLCK:
            return False
        
        if lock1.l_type == fcntl.F_RDLCK and lock2.l_type == fcntl.F_RDLCK:
            return False  # Read locks don't conflict
        
        # Check range overlap
        return self.ranges_overlap(lock1, lock2)

Access Control

Enhanced access permission checking beyond basic POSIX permissions.

def access(self, path, mode):
    """
    Check file access permissions.
    
    Args:
        path (str): Path to file or directory
        mode (int): Access mode to check (R_OK, W_OK, X_OK, F_OK)
        
    Returns:
        int: 0 if access granted, negative errno on error
    """

Usage Example:

import os

class MyFS(Fuse):
    def access(self, path, mode):
        if path not in self.files and path not in self.dirs:
            return -errno.ENOENT
        
        # Get current context
        ctx = self.GetContext()
        uid = ctx.get('uid', 0)
        gid = ctx.get('gid', 0)
        
        # Get file attributes
        st = self.getattr(path)
        
        # Check ownership and permissions
        if uid == 0:  # Root can access anything
            return 0
        
        # Check user permissions
        if uid == st.st_uid:
            user_perms = (st.st_mode >> 6) & 7
        elif gid == st.st_gid:
            user_perms = (st.st_mode >> 3) & 7
        else:
            user_perms = st.st_mode & 7
        
        # Check requested access
        if mode & os.R_OK and not (user_perms & 4):
            return -errno.EACCES
        if mode & os.W_OK and not (user_perms & 2):
            return -errno.EACCES
        if mode & os.X_OK and not (user_perms & 1):
            return -errno.EACCES
        
        return 0

Context Information

Access information about the current FUSE request context.

def GetContext(self):
    """
    Get current request context.
    
    Returns:
        dict: Context information with keys:
            - uid: User ID of calling process
            - gid: Group ID of calling process  
            - pid: Process ID of calling process
    """

Usage Example:

class MyFS(Fuse):
    def create(self, path, mode, fi):
        # Get context of calling process
        ctx = self.GetContext()
        
        # Set file ownership to calling user
        st = fuse.Stat()
        st.st_mode = mode
        st.st_uid = ctx['uid']
        st.st_gid = ctx['gid']
        st.st_size = 0
        
        # Create file with proper ownership
        self.files[path] = b''
        self.file_attrs[path] = st
        
        return 0

Cache Management

Control kernel caching behavior for performance optimization.

def Invalidate(self, path):
    """
    Invalidate cached entry.
    
    Args:
        path (str): Path to invalidate in kernel cache
        
    Returns:
        int: 0 on success, negative errno on error
    """

Usage Example:

class MyFS(Fuse):
    def write(self, path, buf, offset):
        # Update file content
        result = self.do_write(path, buf, offset)
        
        # Invalidate cache since file changed
        if result > 0:
            self.Invalidate(path)
        
        return result
    
    def setxattr(self, path, name, val, flags):
        result = self.do_setxattr(path, name, val, flags)
        
        # Invalidate cache for attribute changes
        if result == 0:
            self.Invalidate(path)
        
        return result

I/O Control Operations

Handle filesystem-specific I/O control operations.

def ioctl(self, path, cmd, arg, fip, flags, data):
    """
    I/O control operations.
    
    Args:
        path (str): Path to file
        cmd (int): I/O control command
        arg: Command argument
        fip: File info pointer
        flags (int): FUSE I/O control flags
        data: Input/output data
        
    Returns:
        int: 0 on success, negative errno on error
        bytes: Output data for some operations
    """

Usage Example:

# Define custom ioctl commands
MYFS_IOCTL_GET_INFO = 0x1000
MYFS_IOCTL_SET_FLAG = 0x1001

class MyFS(Fuse):
    def ioctl(self, path, cmd, arg, fip, flags, data):
        if path not in self.files:
            return -errno.ENOENT
        
        if cmd == MYFS_IOCTL_GET_INFO:
            # Return file information
            info = {
                'size': len(self.files[path]),
                'created': self.file_created[path]
            }
            return json.dumps(info).encode()
        
        elif cmd == MYFS_IOCTL_SET_FLAG:
            # Set filesystem-specific flag
            flag_value = struct.unpack('I', data)[0]
            self.file_flags[path] = flag_value
            return 0
        
        else:
            return -errno.ENOTTY  # Inappropriate ioctl

Poll Support

Support for poll/select operations on files.

def poll(self, path, fip, ph, reventsp):
    """
    Poll file for events.
    
    Args:
        path (str): Path to file
        fip: File info pointer
        ph: Poll handle for notifications
        reventsp: Pointer to returned events
        
    Returns:
        int: 0 on success, negative errno on error
    """

def NotifyPoll(self, ph):
    """
    Notify poll completion.
    
    Args:
        ph: Poll handle from poll() call
    """

Filesystem Lifecycle

Handle filesystem initialization and cleanup.

def fsinit(self):
    """
    Initialize filesystem.
    
    Called when filesystem is mounted.
    
    Returns:
        int: 0 on success, negative errno on error
    """

def fsdestroy(self):
    """
    Clean up filesystem.
    
    Called when filesystem is unmounted.
    """

Usage Example:

class MyFS(Fuse):
    def fsinit(self):
        """Initialize filesystem resources."""
        print("Filesystem mounting...")
        
        # Initialize data structures
        self.files = {}
        self.dirs = {'/': {}}
        self.xattrs = {}
        
        # Load persistent data if needed
        self.load_filesystem_state()
        
        return 0
    
    def fsdestroy(self):
        """Clean up filesystem resources."""
        print("Filesystem unmounting...")
        
        # Save persistent data
        self.save_filesystem_state()
        
        # Clean up resources
        self.cleanup_resources()

Error Handling

Extended features may raise specialized exceptions:

class FuseError(Exception):
    """
    FUSE-specific error.
    
    Raised by feature checking functions and other FUSE operations
    when the underlying FUSE library doesn't support a feature or
    when a FUSE-specific error occurs.
    """

Usage Example:

try:
    # Check if FUSE library supports extended attributes
    fuse.feature_assert('has_getxattr', 'has_setxattr')
    
    # Enable extended attribute support
    self.supports_xattrs = True
    
except fuse.FuseError as e:
    print(f"Extended attributes not supported: {e}")
    self.supports_xattrs = False

def getxattr(self, path, name, size):
    if not self.supports_xattrs:
        return -errno.ENOTSUP
    
    # Implementation here...

Best Practices

  • Implement extended attributes for applications that rely on them
  • Use file locking for multi-process coordination when needed
  • Implement proper access control based on process context
  • Use cache invalidation judiciously to balance performance and consistency
  • Handle ioctl operations for filesystem-specific functionality
  • Provide meaningful poll support for applications that need it
  • Initialize and clean up resources properly in fsinit/fsdestroy
  • Test extended features with real applications that use them
  • Use feature checking functions to verify FUSE library capabilities
  • Handle FuseError exceptions gracefully for unsupported features

Install with Tessl CLI

npx tessl i tessl/pypi-fuse-python

docs

configuration.md

core-operations.md

data-structures.md

extended-features.md

file-management.md

index.md

tile.json