tessl install tessl/pypi-pyfuse3@3.4.0Python 3 bindings for libfuse 3 with async I/O support
Production-ready filesystem implementations and patterns.
Complete implementation from Quick Start Guide.
import pyfuse3
import httpx
import trio
class NetworkFS(pyfuse3.Operations):
"""Filesystem backed by HTTP API."""
def __init__(self, base_url):
super().__init__()
self.base_url = base_url
self.client = httpx.AsyncClient()
self.cache = {}
async def read(self, fh, offset, size):
"""Read with caching and error recovery."""
inode = self.open_files[fh]
# Check cache
if inode in self.cache:
return self.cache[inode][offset:offset + size]
# Fetch from network
try:
response = await self.client.get(f"{self.base_url}/files/{inode}")
data = response.content
self.cache[inode] = data
return data[offset:offset + size]
except httpx.RequestError as e:
logger.error(f"Network error: {e}")
raise pyfuse3.FUSEError(errno.EIO)from cryptography.fernet import Fernet
class EncryptedFS(pyfuse3.Operations):
"""Filesystem with transparent encryption."""
def __init__(self, key):
super().__init__()
self.cipher = Fernet(key)
async def write(self, fh, offset, buf):
"""Encrypt before writing."""
encrypted = self.cipher.encrypt(buf)
# Write encrypted data
return len(buf)
async def read(self, fh, offset, size):
"""Decrypt after reading."""
encrypted_data = self._read_encrypted(fh, offset, size)
return self.cipher.decrypt(encrypted_data)