Python bindings for FUSE (Filesystem in USErspace) enabling custom userspace filesystems
—
Advanced FUSE features including extended attributes, file locking, access control, filesystem statistics, and specialized operations for enhanced filesystem functionality.
Support for extended file attributes (xattrs) that provide additional metadata beyond standard POSIX attributes.
def getxattr(self, path, name, size):
"""
Get extended attribute value.
Args:
path (str): Path to file or directory
name (str): Attribute name (e.g., 'user.comment')
size (int): Maximum size to return (0 = return size needed)
Returns:
bytes: Attribute value
int: Size needed (if size=0) or negative errno on error
"""
def setxattr(self, path, name, val, flags):
"""
Set extended attribute value.
Args:
path (str): Path to file or directory
name (str): Attribute name
val (bytes): Attribute value
flags (int): XATTR_CREATE, XATTR_REPLACE, or 0
Returns:
int: 0 on success, negative errno on error
"""
def listxattr(self, path, size):
"""
List extended attribute names.
Args:
path (str): Path to file or directory
size (int): Maximum size to return (0 = return size needed)
Returns:
bytes: Null-separated attribute names
int: Size needed (if size=0) or negative errno on error
"""
def removexattr(self, path, name):
"""
Remove extended attribute.
Args:
path (str): Path to file or directory
name (str): Attribute name to remove
Returns:
int: 0 on success, negative errno on error
"""Usage Example:
class MyFS(Fuse):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.xattrs = {} # {path: {name: value}}
def getxattr(self, path, name, size):
if path not in self.xattrs or name not in self.xattrs[path]:
return -errno.ENODATA
value = self.xattrs[path][name]
if size == 0:
return len(value) # Return size needed
return value[:size]
def setxattr(self, path, name, val, flags):
if path not in self.files and path not in self.dirs:
return -errno.ENOENT
if path not in self.xattrs:
self.xattrs[path] = {}
exists = name in self.xattrs[path]
if flags == XATTR_CREATE and exists:
return -errno.EEXIST
if flags == XATTR_REPLACE and not exists:
return -errno.ENODATA
self.xattrs[path][name] = val
return 0
def listxattr(self, path, size):
if path not in self.xattrs:
return b''
names = b'\0'.join(name.encode() for name in self.xattrs[path].keys())
if names:
names += b'\0' # Null-terminate the list
if size == 0:
return len(names)
return names[:size]Advisory file locking operations for coordinating access between processes.
def lock(self, path, fip, cmd, lock):
"""
File locking operations.
Args:
path (str): Path to file
fip: File info pointer (usually None)
cmd (int): Lock command (F_GETLK, F_SETLK, F_SETLKW)
lock (Flock): Lock specification
Returns:
int: 0 on success, negative errno on error
Flock: Lock information (for F_GETLK)
"""Usage Example:
import fcntl
class MyFS(Fuse):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.locks = {} # {path: [lock_info, ...]}
def lock(self, path, fip, cmd, lock):
if path not in self.files:
return -errno.ENOENT
if cmd == fcntl.F_GETLK:
# Check for conflicting locks
for existing_lock in self.locks.get(path, []):
if self.locks_conflict(lock, existing_lock):
return existing_lock
# No conflict - set type to F_UNLCK
result = fuse.Flock()
result.l_type = fcntl.F_UNLCK
return result
elif cmd == fcntl.F_SETLK:
# Non-blocking lock
if self.has_conflicting_lock(path, lock):
return -errno.EAGAIN
return self.set_lock(path, lock)
elif cmd == fcntl.F_SETLKW:
# Blocking lock (would block in real implementation)
return self.set_lock(path, lock)
def locks_conflict(self, lock1, lock2):
# Check if two locks conflict
if lock1.l_type == fcntl.F_UNLCK or lock2.l_type == fcntl.F_UNLCK:
return False
if lock1.l_type == fcntl.F_RDLCK and lock2.l_type == fcntl.F_RDLCK:
return False # Read locks don't conflict
# Check range overlap
return self.ranges_overlap(lock1, lock2)Enhanced access permission checking beyond basic POSIX permissions.
def access(self, path, mode):
"""
Check file access permissions.
Args:
path (str): Path to file or directory
mode (int): Access mode to check (R_OK, W_OK, X_OK, F_OK)
Returns:
int: 0 if access granted, negative errno on error
"""Usage Example:
import os
class MyFS(Fuse):
def access(self, path, mode):
if path not in self.files and path not in self.dirs:
return -errno.ENOENT
# Get current context
ctx = self.GetContext()
uid = ctx.get('uid', 0)
gid = ctx.get('gid', 0)
# Get file attributes
st = self.getattr(path)
# Check ownership and permissions
if uid == 0: # Root can access anything
return 0
# Check user permissions
if uid == st.st_uid:
user_perms = (st.st_mode >> 6) & 7
elif gid == st.st_gid:
user_perms = (st.st_mode >> 3) & 7
else:
user_perms = st.st_mode & 7
# Check requested access
if mode & os.R_OK and not (user_perms & 4):
return -errno.EACCES
if mode & os.W_OK and not (user_perms & 2):
return -errno.EACCES
if mode & os.X_OK and not (user_perms & 1):
return -errno.EACCES
return 0Access information about the current FUSE request context.
def GetContext(self):
"""
Get current request context.
Returns:
dict: Context information with keys:
- uid: User ID of calling process
- gid: Group ID of calling process
- pid: Process ID of calling process
"""Usage Example:
class MyFS(Fuse):
def create(self, path, mode, fi):
# Get context of calling process
ctx = self.GetContext()
# Set file ownership to calling user
st = fuse.Stat()
st.st_mode = mode
st.st_uid = ctx['uid']
st.st_gid = ctx['gid']
st.st_size = 0
# Create file with proper ownership
self.files[path] = b''
self.file_attrs[path] = st
return 0Control kernel caching behavior for performance optimization.
def Invalidate(self, path):
"""
Invalidate cached entry.
Args:
path (str): Path to invalidate in kernel cache
Returns:
int: 0 on success, negative errno on error
"""Usage Example:
class MyFS(Fuse):
def write(self, path, buf, offset):
# Update file content
result = self.do_write(path, buf, offset)
# Invalidate cache since file changed
if result > 0:
self.Invalidate(path)
return result
def setxattr(self, path, name, val, flags):
result = self.do_setxattr(path, name, val, flags)
# Invalidate cache for attribute changes
if result == 0:
self.Invalidate(path)
return resultHandle filesystem-specific I/O control operations.
def ioctl(self, path, cmd, arg, fip, flags, data):
"""
I/O control operations.
Args:
path (str): Path to file
cmd (int): I/O control command
arg: Command argument
fip: File info pointer
flags (int): FUSE I/O control flags
data: Input/output data
Returns:
int: 0 on success, negative errno on error
bytes: Output data for some operations
"""Usage Example:
# Define custom ioctl commands
MYFS_IOCTL_GET_INFO = 0x1000
MYFS_IOCTL_SET_FLAG = 0x1001
class MyFS(Fuse):
def ioctl(self, path, cmd, arg, fip, flags, data):
if path not in self.files:
return -errno.ENOENT
if cmd == MYFS_IOCTL_GET_INFO:
# Return file information
info = {
'size': len(self.files[path]),
'created': self.file_created[path]
}
return json.dumps(info).encode()
elif cmd == MYFS_IOCTL_SET_FLAG:
# Set filesystem-specific flag
flag_value = struct.unpack('I', data)[0]
self.file_flags[path] = flag_value
return 0
else:
return -errno.ENOTTY # Inappropriate ioctlSupport for poll/select operations on files.
def poll(self, path, fip, ph, reventsp):
"""
Poll file for events.
Args:
path (str): Path to file
fip: File info pointer
ph: Poll handle for notifications
reventsp: Pointer to returned events
Returns:
int: 0 on success, negative errno on error
"""
def NotifyPoll(self, ph):
"""
Notify poll completion.
Args:
ph: Poll handle from poll() call
"""Handle filesystem initialization and cleanup.
def fsinit(self):
"""
Initialize filesystem.
Called when filesystem is mounted.
Returns:
int: 0 on success, negative errno on error
"""
def fsdestroy(self):
"""
Clean up filesystem.
Called when filesystem is unmounted.
"""Usage Example:
class MyFS(Fuse):
def fsinit(self):
"""Initialize filesystem resources."""
print("Filesystem mounting...")
# Initialize data structures
self.files = {}
self.dirs = {'/': {}}
self.xattrs = {}
# Load persistent data if needed
self.load_filesystem_state()
return 0
def fsdestroy(self):
"""Clean up filesystem resources."""
print("Filesystem unmounting...")
# Save persistent data
self.save_filesystem_state()
# Clean up resources
self.cleanup_resources()Extended features may raise specialized exceptions:
class FuseError(Exception):
"""
FUSE-specific error.
Raised by feature checking functions and other FUSE operations
when the underlying FUSE library doesn't support a feature or
when a FUSE-specific error occurs.
"""Usage Example:
try:
# Check if FUSE library supports extended attributes
fuse.feature_assert('has_getxattr', 'has_setxattr')
# Enable extended attribute support
self.supports_xattrs = True
except fuse.FuseError as e:
print(f"Extended attributes not supported: {e}")
self.supports_xattrs = False
def getxattr(self, path, name, size):
if not self.supports_xattrs:
return -errno.ENOTSUP
# Implementation here...Install with Tessl CLI
npx tessl i tessl/pypi-fuse-python