tessl install tessl/pypi-pyfuse3@3.4.0Python 3 bindings for libfuse 3 with async I/O support
Functions for initializing, running, and terminating FUSE filesystems.
Initialize a FUSE filesystem by mounting it at a specified mountpoint with optional mount options.
def init(ops: Operations, mountpoint: str, options: set[str] = ...) -> None:
"""
Initialize FUSE filesystem.
Args:
ops: Operations instance implementing filesystem handlers
mountpoint: Path where filesystem will be mounted
options: Set of mount options (defaults to default_options)
Notes:
- Must be called before main()
- The mountpoint directory must exist
- Common options include: 'default_permissions', 'allow_other', 'ro'
- Use default_options as a starting point: my_opts = set(pyfuse3.default_options)
- This is a synchronous function
- Does not return an error; raises exception on failure
Raises:
RuntimeError: If mounting fails
OSError: If mountpoint doesn't exist or isn't accessible
"""Usage example:
import pyfuse3
fs = MyFileSystem()
options = set(pyfuse3.default_options)
options.add('allow_other')
try:
pyfuse3.init(fs, '/mnt/myfs', options)
except RuntimeError as e:
print(f"Failed to mount: {e}")
sys.exit(1)Run the FUSE main event loop to handle filesystem requests. This async function processes FUSE requests using a pool of worker tasks.
async def main(min_tasks: int = 1, max_tasks: int = 99) -> None:
"""
Run FUSE main loop.
Args:
min_tasks: Minimum number of worker tasks (default: 1)
max_tasks: Maximum number of worker tasks (default: 99)
Notes:
- Must call init() before main()
- Runs until terminate() is called or filesystem is unmounted
- Returns when filesystem is unmounted or terminate() is called
- Must be run within a Trio or asyncio event loop
- Worker tasks handle filesystem operation requests concurrently
- min_tasks: always this many tasks running (idle if no work)
- max_tasks: spawns more tasks if all workers busy (up to max)
- Choose min_tasks based on typical concurrent operations
- Choose max_tasks based on maximum concurrent operations expected
- Each task consumes memory; balance parallelism vs memory usage
Raises:
RuntimeError: If init() not called first
OSError: If FUSE communication fails
"""Usage example with Trio:
import trio
import pyfuse3
async def run_fs():
fs = MyFileSystem()
pyfuse3.init(fs, '/mnt/myfs')
# For filesystems with high concurrency needs
await pyfuse3.main(min_tasks=5, max_tasks=50)
trio.run(run_fs)Usage example with asyncio:
import asyncio
import pyfuse3
import pyfuse3.asyncio
pyfuse3.asyncio.enable()
async def run_fs():
fs = MyFileSystem()
pyfuse3.init(fs, '/mnt/myfs')
# Use default task counts for simple filesystems
await pyfuse3.main()
asyncio.run(run_fs())Gracefully terminate the FUSE main loop, causing main() to return.
def terminate() -> None:
"""
Terminate FUSE main loop.
Notes:
- Causes main() to return gracefully
- When called from different thread, must use trio.from_thread.run_sync
- The trio_token module attribute provides the necessary token
- Filesystem will remain mounted; use close() to unmount
- Safe to call multiple times (subsequent calls ignored)
- Not async; call directly from async context or via trio.from_thread from other threads
- Does not wait for pending operations to complete (they may error)
Thread Safety:
- Thread-safe when called correctly
- From async context: call directly
- From other thread: use trio.from_thread.run_sync with trio_token
"""Usage example from same async context:
import pyfuse3
async def shutdown_handler():
# Within async context
print("Shutting down filesystem...")
pyfuse3.terminate()Usage example from different thread (with Trio):
import trio.from_thread
import pyfuse3
def signal_handler(signum, frame):
# From a different thread (e.g., signal handler)
print("Received signal, terminating...")
trio.from_thread.run_sync(pyfuse3.terminate, trio_token=pyfuse3.trio_token)Usage example from different thread (with asyncio):
import asyncio
import pyfuse3
# With asyncio, use call_soon_threadsafe
def signal_handler(signum, frame):
loop = asyncio.get_event_loop()
loop.call_soon_threadsafe(pyfuse3.terminate)Clean up resources and optionally unmount the filesystem.
def close(unmount: bool = True) -> None:
"""
Clean up and ensure filesystem is unmounted.
Args:
unmount: Whether to unmount the filesystem (default: True)
Notes:
- Typically called after main() returns
- If unmount=False, only cleanup is performed (mountpoint stays mounted)
- Normally filesystem is unmounted via umount(8) or fusermount(1)
- If main() terminates unexpectedly, filesystem may remain mounted
- close(unmount=True) ensures proper unmounting in exception cases
- Does not unmount if connection terminated via /sys/fs/fuse/connections/
- Safe to call even if filesystem not mounted or already unmounted
- Should be called in finally block to ensure cleanup
- Not async; call from sync context after main() returns
Thread Safety:
- Not thread-safe; call from same thread/context as init()
Raises:
No exceptions raised; errors logged but not propagated
"""Usage example:
import trio
import pyfuse3
async def run_fs():
fs = MyFileSystem()
try:
pyfuse3.init(fs, '/mnt/myfs')
await pyfuse3.main()
finally:
# Ensures unmount even if exception raised
pyfuse3.close()
trio.run(run_fs)import trio
import pyfuse3
import signal
import sys
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class MyFileSystem(pyfuse3.Operations):
# Implement filesystem operations
pass
async def run_filesystem():
fs = MyFileSystem()
# Setup signal handler for graceful shutdown
def signal_handler(signum, frame):
logger.info(f"Received signal {signum}, initiating shutdown...")
trio.from_thread.run_sync(pyfuse3.terminate, trio_token=pyfuse3.trio_token)
# Register signal handlers
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Initialize filesystem
options = set(pyfuse3.default_options)
options.add('allow_other')
options.add('fsname=myfs')
logger.info("Initializing filesystem...")
try:
pyfuse3.init(fs, '/mnt/myfs', options)
except RuntimeError as e:
logger.error(f"Failed to mount: {e}")
return 1
try:
# Run main loop
logger.info("Starting main loop...")
await pyfuse3.main(min_tasks=5, max_tasks=50)
logger.info("Main loop terminated")
except Exception as e:
logger.error(f"Error in main loop: {e}", exc_info=True)
return 1
finally:
# Clean up and unmount
logger.info("Cleaning up...")
pyfuse3.close()
logger.info("Filesystem unmounted")
return 0
if __name__ == '__main__':
sys.exit(trio.run(run_filesystem))The default_options constant provides recommended mount options:
default_options: frozenset[str]
# Contains: {'default_permissions'}Usage:
import pyfuse3
# Start with default options
my_options = set(pyfuse3.default_options)
# Add additional options
my_options.add('allow_other')
my_options.add('ro')Common mount options:
default_permissions - Enable kernel permission checking (recommended; included in default_options)allow_other - Allow access by users other than the mounter (requires user_allow_other in /etc/fuse.conf)allow_root - Allow access by root user onlyro - Mount read-onlyrw - Mount read-write (default)nonempty - Allow mounting over non-empty directoryfsname=NAME - Set filesystem name shown in df/mountsubtype=TYPE - Set filesystem subtype (shown as fsname.subtype)debug - Enable debug output (verbose kernel messages)use_ino - Honor st_ino field from Operations (instead of generating)readdir_ino - Honor st_ino from readdir (requires use_ino)direct_io - Disable page cache for all files (unless overridden per-file)kernel_cache - Cache file data in kernel (for read-only filesystems)auto_cache - Automatically invalidate cache on file size changenoauto_cache - Disable auto_cacheumask=MASK - Set umask for file permissions (octal)uid=N - Override owner user IDgid=N - Override owner group IDentry_timeout=N - Override entry_timeout (seconds)attr_timeout=N - Override attr_timeout (seconds)ac_attr_timeout=N - Override attr_timeout for auto_cachenegative_timeout=N - Timeout for negative lookups (seconds)noforget - Never forget inodes (debugging only)intr - Allow operations to be interruptedintr_signal=N - Signal to use for interruptionThe min_tasks and max_tasks parameters control parallelism:
Low-concurrency filesystems (single-user, sequential access):
await pyfuse3.main(min_tasks=1, max_tasks=10)Medium-concurrency filesystems (typical multi-user):
await pyfuse3.main(min_tasks=5, max_tasks=50)High-concurrency filesystems (servers, many clients):
await pyfuse3.main(min_tasks=10, max_tasks=200)CPU-bound operations (compression, encryption):
# Fewer tasks to avoid CPU contention
await pyfuse3.main(min_tasks=2, max_tasks=20)I/O-bound operations (network filesystems):
# More tasks to overlap I/O waits
await pyfuse3.main(min_tasks=10, max_tasks=100)import pyfuse3
import os
try:
pyfuse3.init(fs, '/mnt/myfs', options)
except RuntimeError as e:
# Mounting failed
print(f"Mount failed: {e}")
# Common causes:
# - Mountpoint doesn't exist
# - Mountpoint already mounted
# - Permission denied
# - FUSE kernel module not loaded
# - Invalid mount options
except OSError as e:
# System error
print(f"System error: {e}")try:
await pyfuse3.main()
except RuntimeError as e:
# init() not called
print(f"Initialization error: {e}")
except OSError as e:
# FUSE communication error
print(f"Communication error: {e}")
except KeyboardInterrupt:
# User interrupted (Ctrl+C)
print("Interrupted by user")Always use try/finally to ensure cleanup:
try:
pyfuse3.init(fs, mountpoint, options)
await pyfuse3.main()
finally:
# Always cleanup, even on exception
pyfuse3.close()From another terminal:
# Linux
fusermount -u /mnt/myfs
# Or with umount
sudo umount /mnt/myfsThis causes main() to return normally.
If filesystem is busy:
# Linux
fusermount -uz /mnt/myfs # -z for lazy unmount
# Or
sudo umount -l /mnt/myfs # lazy unmount
sudo umount -f /mnt/myfs # force unmount# Call terminate() to stop main loop
pyfuse3.terminate()
# Then close() to unmount
pyfuse3.close(unmount=True)import os
if not os.path.exists('/dev/fuse'):
print("FUSE not available - install fuse3 package")import os
mountpoint = '/mnt/myfs'
if not os.path.exists(mountpoint):
print(f"Mountpoint {mountpoint} doesn't exist")
os.makedirs(mountpoint)
elif os.listdir(mountpoint):
print(f"Mountpoint {mountpoint} not empty - use nonempty option")import os
mountpoint = '/mnt/myfs'
if not os.access(mountpoint, os.W_OK):
print(f"No write permission for {mountpoint}")import subprocess
result = subprocess.run(['mount'], capture_output=True, text=True)
if '/mnt/myfs' in result.stdout:
print("Filesystem already mounted")import pyfuse3
import logging
# Enable pyfuse3 debug messages
logging.basicConfig(level=logging.DEBUG)
# Enable FUSE kernel debug messages
options = set(pyfuse3.default_options)
options.add('debug')
pyfuse3.init(fs, mountpoint, options)apt install libfuse3-3 libfuse3-dev (Debian/Ubuntu) or dnf install fuse3 fuse3-devel (Fedora)modprobe fuseControl how long kernel caches entries and attributes:
# Set cache timeouts in mount options
options = set(pyfuse3.default_options)
options.add('entry_timeout=60') # Cache entries for 60 seconds
options.add('attr_timeout=60') # Cache attributes for 60 seconds
pyfuse3.init(fs, mountpoint, options)
# Or set per-entry in EntryAttributes
entry = pyfuse3.EntryAttributes()
entry.entry_timeout = 60.0 # seconds
entry.attr_timeout = 60.0 # secondsEnable writeback caching for better write performance:
class FastFS(pyfuse3.Operations):
enable_writeback_cache = True # Enable writeback caching
# Must implement flush() and fsync() when writeback cache enabled
async def flush(self, fh):
# Flush writes to storage
pass
async def fsync(self, fh, datasync):
# Sync writes to storage
passWarning: Writeback cache requires careful cache invalidation to avoid inconsistencies.
Bypass kernel cache for specific files:
async def open(self, inode, flags, ctx):
fh = self.allocate_fh(inode)
return pyfuse3.FileInfo(
fh=fh,
direct_io=True # Bypass kernel cache for this file
)Use for files that:
Tune worker tasks based on profiling:
# Profile filesystem under load
# Monitor task creation/destruction
# Adjust min_tasks and max_tasks accordingly
await pyfuse3.main(min_tasks=optimal_min, max_tasks=optimal_max)# By default, cannot mount over non-empty directory
try:
pyfuse3.init(fs, '/mnt/myfs', options)
except RuntimeError:
# Failed - mountpoint not empty
pass
# Use 'nonempty' option to allow
options = set(pyfuse3.default_options)
options.add('nonempty')
pyfuse3.init(fs, '/mnt/myfs', options)# Can mount same filesystem multiple times at different mountpoints
fs1 = MyFS()
fs2 = MyFS() # Separate instance
pyfuse3.init(fs1, '/mnt/myfs1', options)
# In another process:
# pyfuse3.init(fs2, '/mnt/myfs2', options)async def test_rapid_mount_unmount():
"""Test rapid mount/unmount cycles."""
for i in range(10):
fs = MyFS()
try:
pyfuse3.init(fs, '/mnt/myfs', options)
await pyfuse3.main()
finally:
pyfuse3.close()
# Small delay between cycles
await trio.sleep(0.1)class RecoverableFS(pyfuse3.Operations):
def init(self):
"""Initialize with crash recovery."""
# Check for previous unclean shutdown
if os.path.exists(self.recovery_file):
logger.warning("Unclean shutdown detected, recovering...")
self._recover_from_crash()
# Mark as running
self._mark_running()
async def main_wrapper(self):
"""Wrapper to handle cleanup."""
try:
await pyfuse3.main()
finally:
# Mark as cleanly shutdown
self._mark_clean_shutdown()