tessl install tessl/pypi-pyfuse3@3.4.0Python 3 bindings for libfuse 3 with async I/O support
Complete guide to creating your first pyfuse3 filesystem.
Linux:
# Debian/Ubuntu
sudo apt install libfuse3-3 libfuse3-dev
# Fedora/RHEL
sudo dnf install fuse3 fuse3-devel
# Load kernel module
sudo modprobe fusemacOS:
pip install pyfuse3Minimum Python version: 3.10+
import pyfuse3
import trio
import errno
import stat
import os
import time
class MyFirstFS(pyfuse3.Operations):
def __init__(self):
super().__init__()
self.next_inode = pyfuse3.ROOT_INODE + 1
self.inodes = {}
self.lookup_counts = {}
def init(self):
"""Initialize filesystem state."""
# Create root directory
self.inodes[pyfuse3.ROOT_INODE] = {
'mode': stat.S_IFDIR | 0o755,
'nlink': 2,
'uid': os.getuid(),
'gid': os.getgid(),
'size': 0,
'atime_ns': time.time_ns(),
'mtime_ns': time.time_ns(),
'ctime_ns': time.time_ns(),
'entries': {}, # name -> inode mapping
}
self.lookup_counts[pyfuse3.ROOT_INODE] = 1async def lookup(self, parent_inode, name, ctx):
"""Look up a directory entry."""
if parent_inode not in self.inodes:
raise pyfuse3.FUSEError(errno.ENOENT)
parent = self.inodes[parent_inode]
if name not in parent['entries']:
raise pyfuse3.FUSEError(errno.ENOENT)
inode = parent['entries'][name]
# IMPORTANT: Increase lookup count
self.lookup_counts[inode] = self.lookup_counts.get(inode, 0) + 1
return await self.getattr(inode, ctx)
async def getattr(self, inode, ctx):
"""Get file attributes."""
if inode not in self.inodes:
raise pyfuse3.FUSEError(errno.ENOENT)
data = self.inodes[inode]
entry = pyfuse3.EntryAttributes()
entry.st_ino = inode
entry.st_mode = data['mode']
entry.st_nlink = data['nlink']
entry.st_uid = data['uid']
entry.st_gid = data['gid']
entry.st_size = data['size']
entry.st_atime_ns = data['atime_ns']
entry.st_mtime_ns = data['mtime_ns']
entry.st_ctime_ns = data['ctime_ns']
entry.entry_timeout = 1.0
entry.attr_timeout = 1.0
return entry
async def forget(self, inode_list):
"""Decrease lookup counts (MUST NOT raise exceptions)."""
for inode, nlookup in inode_list:
if inode in self.lookup_counts:
self.lookup_counts[inode] -= nlookup
if self.lookup_counts[inode] <= 0:
del self.lookup_counts[inode]
# Delete inode if also unlinked
if inode in self.inodes and self.inodes[inode]['nlink'] == 0:
del self.inodes[inode]async def opendir(self, inode, ctx):
"""Open directory."""
if inode not in self.inodes:
raise pyfuse3.FUSEError(errno.ENOENT)
if not stat.S_ISDIR(self.inodes[inode]['mode']):
raise pyfuse3.FUSEError(errno.ENOTDIR)
return inode # Use inode as directory handle
async def readdir(self, fh, start_id, token):
"""List directory entries."""
if fh not in self.inodes:
raise pyfuse3.FUSEError(errno.EBADF)
entries = list(self.inodes[fh]['entries'].items())
for i, (name, inode) in enumerate(entries):
if i < start_id:
continue
attr = await self.getattr(inode, None)
if not pyfuse3.readdir_reply(token, name, attr, i + 1):
break # Buffer full
async def releasedir(self, fh):
"""Release directory."""
pass # Nothing to clean upimport signal
import sys
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def run_filesystem():
"""Run the filesystem."""
fs = MyFirstFS()
# Set up signal handler for graceful shutdown
def signal_handler(signum, frame):
logger.info("Shutting down...")
trio.from_thread.run_sync(pyfuse3.terminate, trio_token=pyfuse3.trio_token)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Create mountpoint
mountpoint = '/mnt/myfs'
os.makedirs(mountpoint, exist_ok=True)
# Initialize and run
options = set(pyfuse3.default_options)
try:
pyfuse3.init(fs, mountpoint, options)
logger.info(f"Filesystem mounted at {mountpoint}")
await pyfuse3.main()
finally:
pyfuse3.close()
logger.info("Filesystem unmounted")
if __name__ == '__main__':
trio.run(run_filesystem)# In another terminal:
# List directory (should be empty)
ls -la /mnt/myfs
# Check filesystem stats
df -h /mnt/myfs
# Unmount
fusermount -u /mnt/myfsasync def create(self, parent_inode, name, mode, flags, ctx):
"""Create and open a file."""
if parent_inode not in self.inodes:
raise pyfuse3.FUSEError(errno.ENOENT)
parent = self.inodes[parent_inode]
if name in parent['entries']:
raise pyfuse3.FUSEError(errno.EEXIST)
# Apply umask
final_mode = mode & ~ctx.umask
# Create inode
inode = self.next_inode
self.next_inode += 1
now_ns = time.time_ns()
self.inodes[inode] = {
'mode': stat.S_IFREG | final_mode,
'nlink': 1,
'uid': ctx.uid,
'gid': ctx.gid,
'size': 0,
'atime_ns': now_ns,
'mtime_ns': now_ns,
'ctime_ns': now_ns,
'data': b'',
}
# Add to directory
parent['entries'][name] = inode
parent['mtime_ns'] = now_ns
parent['ctime_ns'] = now_ns
# Track lookup count
self.lookup_counts[inode] = 1
# Return file info
fi = pyfuse3.FileInfo(fh=inode)
entry = await self.getattr(inode, ctx)
return (fi, entry)async def open(self, inode, flags, ctx):
"""Open a file."""
if inode not in self.inodes:
raise pyfuse3.FUSEError(errno.ENOENT)
return pyfuse3.FileInfo(fh=inode)
async def read(self, fh, offset, size):
"""Read from file."""
if fh not in self.inodes:
raise pyfuse3.FUSEError(errno.EBADF)
data = self.inodes[fh]['data']
return data[offset:offset + size]
async def write(self, fh, offset, buf):
"""Write to file."""
if fh not in self.inodes:
raise pyfuse3.FUSEError(errno.EBADF)
data = self.inodes[fh]['data']
# Extend file if necessary
if offset + len(buf) > len(data):
data = data + b'\x00' * (offset + len(buf) - len(data))
# Write data
data = data[:offset] + buf + data[offset + len(buf):]
self.inodes[fh]['data'] = data
self.inodes[fh]['size'] = len(data)
# Update timestamps
now_ns = time.time_ns()
self.inodes[fh]['mtime_ns'] = now_ns
self.inodes[fh]['ctime_ns'] = now_ns
return len(buf)
async def release(self, fh):
"""Release file."""
pass # Nothing to clean upasync def unlink(self, parent_inode, name, ctx):
"""Remove a file."""
if parent_inode not in self.inodes:
raise pyfuse3.FUSEError(errno.ENOENT)
parent = self.inodes[parent_inode]
if name not in parent['entries']:
raise pyfuse3.FUSEError(errno.ENOENT)
inode = parent['entries'][name]
# Remove from directory
del parent['entries'][name]
parent['mtime_ns'] = time.time_ns()
parent['ctime_ns'] = time.time_ns()
# Decrement link count
self.inodes[inode]['nlink'] -= 1
# Don't delete inode yet - kernel may still reference it
# Will be deleted in forget() when lookup count reaches 0# Create a file
echo "Hello, World!" > /mnt/myfs/test.txt
# Read the file
cat /mnt/myfs/test.txt
# List files
ls -l /mnt/myfs
# Delete the file
rm /mnt/myfs/test.txtError: fusermount: mount failed: Permission denied
Solution:
# Check if mountpoint exists
mkdir -p /mnt/myfs
# Check if FUSE is available
ls /dev/fuse
# Load FUSE module
sudo modprobe fuseCause: Blocking operation in async handler
Solution: Use async I/O operations:
# Wrong
def blocking_operation():
time.sleep(1)
# Correct
async def async_operation():
await trio.sleep(1)Cause: Not tracking lookup counts
Solution: Always implement forget() and track counts in lookup(), create(), etc.
The complete code for this quick start is available in Real-World Scenarios.