tessl install tessl/pypi-pyfuse3@3.4.0Python 3 bindings for libfuse 3 with async I/O support
Functions for controlling kernel caching behavior, invalidating cached data, and notifying the kernel of changes. Essential for filesystems where data can change outside of FUSE operations (e.g., network filesystems, synchronized filesystems).
Invalidate cached attributes and optionally data for an inode.
def invalidate_inode(inode: InodeT, attr_only: bool = False) -> None:
"""
Invalidate cache for inode.
Args:
inode: Inode number
attr_only: If True, only invalidate attributes (not data); if False, invalidate both
Notes:
- Instructs kernel to forget cached attributes and data
- May block if writeback caching is active with dirty data
- Can cause deadlock if write() requests cannot be processed
- Call from separate thread if writeback caching enabled
- Raises OSError(errno.ENOSYS) if not supported by kernel
- attr_only=True: Only invalidate cached attributes (size, times, etc.)
- attr_only=False: Invalidate both attributes and data pages
Thread Safety:
- May block indefinitely with writeback cache
- Safe to call from async context if writeback cache disabled
- With writeback cache, call from separate thread
Raises:
OSError(errno.ENOSYS): Operation not supported by kernel
OSError(errno.ENOENT): Inode not known to kernel
OSError(errno.EBADF): Invalid inode
"""Usage example:
import pyfuse3
# Invalidate all cached data for an inode
pyfuse3.invalidate_inode(inode=42, attr_only=False)
# Invalidate only cached attributes
pyfuse3.invalidate_inode(inode=42, attr_only=True)Critical: With writeback caching enabled, invalidate_inode() may block:
import threading
import pyfuse3
# WRONG: May deadlock with writeback cache
class BadFS(pyfuse3.Operations):
enable_writeback_cache = True
async def some_operation(self, inode):
# This can deadlock!
pyfuse3.invalidate_inode(inode)
# CORRECT: Call from separate thread
class GoodFS(pyfuse3.Operations):
enable_writeback_cache = True
async def some_operation(self, inode):
# Call from separate thread
def invalidate():
pyfuse3.invalidate_inode(inode)
thread = threading.Thread(target=invalidate)
thread.start()
# Don't wait for thread (it may block)Synchronously invalidate a cached directory entry.
def invalidate_entry(inode_p: InodeT, name: FileNameT, deleted: InodeT = 0) -> None:
"""
Invalidate directory entry (synchronous).
Args:
inode_p: Parent directory inode
name: Entry name to invalidate (bytes)
deleted: If non-zero, also invalidate this inode
Notes:
- Synchronous operation (may block)
- Use when entry has been deleted or renamed
- If deleted is non-zero, also invalidates that inode's cache
- May raise OSError if kernel doesn't support operation
- Kernel will call lookup() on next access to name
- More efficient than invalidating entire directory
Thread Safety:
- May block
- Safe to call from async context (typically doesn't block long)
Raises:
OSError(errno.ENOSYS): Operation not supported by kernel
OSError(errno.ENOENT): Parent inode not known to kernel
OSError(errno.ENOTDIR): Parent is not a directory
OSError(errno.EINVAL): Invalid arguments
"""Usage example:
import pyfuse3
# Invalidate entry "file.txt" in directory inode 10
pyfuse3.invalidate_entry(inode_p=10, name=b'file.txt')
# Invalidate entry and the inode it referred to
pyfuse3.invalidate_entry(inode_p=10, name=b'file.txt', deleted=42)Asynchronously invalidate a cached directory entry without blocking.
def invalidate_entry_async(
inode_p: InodeT,
name: FileNameT,
deleted: InodeT = 0,
ignore_enoent: bool = False
) -> None:
"""
Invalidate directory entry (asynchronous).
Args:
inode_p: Parent directory inode
name: Entry name to invalidate (bytes)
deleted: If non-zero, also invalidate this inode
ignore_enoent: If True, ignore ENOENT errors
Notes:
- Non-blocking asynchronous operation
- Preferred over invalidate_entry for most use cases
- If deleted is non-zero, also invalidates that inode's cache
- If ignore_enoent=True, silently ignores if entry not in cache
- Returns immediately; invalidation processed asynchronously
- Safe to call from any context (async or thread)
Thread Safety:
- Thread-safe and non-blocking
- Safe to call from async context
Raises:
OSError(errno.ENOSYS): Operation not supported by kernel (always raised if not supported)
OSError(errno.ENOENT): Parent inode not known (only if ignore_enoent=False)
"""Usage example:
import pyfuse3
# Asynchronously invalidate entry
pyfuse3.invalidate_entry_async(inode_p=10, name=b'file.txt')
# Invalidate entry and inode, ignore if not cached
pyfuse3.invalidate_entry_async(
inode_p=10,
name=b'file.txt',
deleted=42,
ignore_enoent=True
)Store data directly in the kernel cache for an inode.
def notify_store(inode: InodeT, offset: int, data: bytes) -> None:
"""
Store data in kernel cache.
Args:
inode: Inode number
offset: Offset within file (0-based byte offset)
data: Data to store in cache
Notes:
- Pushes data into kernel page cache without read request
- Useful for implementing write-through caching
- Can be used for prefetching data
- Raises OSError if operation fails
- Data stored at offset; existing cache at other offsets unaffected
- Can extend cached region beyond current size
- Kernel may evict data at any time (cache management)
Thread Safety:
- Thread-safe
- May block briefly
Raises:
OSError(errno.ENOSYS): Operation not supported by kernel
OSError(errno.ENOENT): Inode not known to kernel
OSError(errno.EINVAL): Invalid offset or data
OSError: Other errors from kernel
"""Usage example:
import pyfuse3
# Store data in cache for inode 42 at offset 0
data = b"Hello, World!"
pyfuse3.notify_store(inode=42, offset=0, data=data)
# Store data at different offset
pyfuse3.notify_store(inode=42, offset=1024, data=b"More data")
# Prefetch multiple blocks
for block_num in range(10):
offset = block_num * 4096
data = read_block_from_backend(block_num)
pyfuse3.notify_store(inode=42, offset=offset, data=data)When file contents are modified outside of FUSE operations (e.g., by a background sync process), invalidate the cache to ensure kernel reads fresh data:
import pyfuse3
class SyncedFS(pyfuse3.Operations):
def on_external_file_update(self, inode, new_data):
"""Called when file updated by external process."""
# Write new data to backing store
self.write_to_backing_store(inode, new_data)
# Invalidate kernel cache so next read fetches new data
pyfuse3.invalidate_inode(inode, attr_only=False)When only attributes change (size, timestamps, permissions), invalidate just the attributes:
import pyfuse3
import time
class AttributeChangingFS(pyfuse3.Operations):
def on_external_attr_update(self, inode, new_attrs):
"""Called when attributes updated externally."""
# Update attributes in backing store
self.update_attrs_in_store(inode, new_attrs)
# Invalidate only cached attributes
pyfuse3.invalidate_inode(inode, attr_only=True)When an entry is deleted outside of FUSE operations:
import pyfuse3
class NetworkFS(pyfuse3.Operations):
def on_remote_delete(self, parent_inode, name, inode):
"""Called when file deleted by another client."""
# Delete from local state
self.delete_from_local_state(parent_inode, name)
# Invalidate the directory entry and the inode
pyfuse3.invalidate_entry_async(
inode_p=parent_inode,
name=name,
deleted=inode,
ignore_enoent=True # OK if not cached
)When an entry is renamed outside of FUSE operations:
import pyfuse3
class CollaborativeFS(pyfuse3.Operations):
def on_remote_rename(self, old_parent, old_name, new_parent, new_name):
"""Called when file renamed by another client."""
# Rename in local state
self.rename_in_local_state(old_parent, old_name, new_parent, new_name)
# Invalidate old entry
pyfuse3.invalidate_entry_async(
inode_p=old_parent,
name=old_name,
ignore_enoent=True
)
# Invalidate new entry (in case it existed)
pyfuse3.invalidate_entry_async(
inode_p=new_parent,
name=new_name,
ignore_enoent=True
)When an entry is created outside of FUSE operations:
import pyfuse3
class MultiClientFS(pyfuse3.Operations):
def on_remote_create(self, parent_inode, name, inode):
"""Called when file created by another client."""
# Add to local state
self.add_to_local_state(parent_inode, name, inode)
# Invalidate parent directory so readdir shows new entry
# Don't use deleted parameter (inode is being added, not deleted)
pyfuse3.invalidate_entry_async(
inode_p=parent_inode,
name=name,
ignore_enoent=True
)Use notify_store to prefetch data into the kernel cache:
import pyfuse3
class PrefetchingFS(pyfuse3.Operations):
async def prefetch_file_data(self, inode, offset, size):
"""Prefetch data into kernel cache."""
# Read data from backing store (e.g., network, slow disk)
data = await self.read_from_backing_store(inode, offset, size)
# Store in kernel cache
pyfuse3.notify_store(inode, offset, data)
async def open(self, inode, flags, ctx):
"""Prefetch file on open."""
fh = self.allocate_fh(inode)
# Prefetch first block asynchronously
asyncio.create_task(self.prefetch_file_data(inode, 0, 4096))
return pyfuse3.FileInfo(fh=fh)When implementing write-through caching, store data in kernel cache after writing:
import pyfuse3
class WriteThroughFS(pyfuse3.Operations):
async def write(self, fh, offset, buf):
"""Write with write-through caching."""
inode = self.fh_to_inode[fh]
# Write to backing store
await self.write_to_backing_store(inode, offset, buf)
# Update kernel cache
pyfuse3.notify_store(inode, offset, buf)
return len(buf)Detect sequential reads and prefetch ahead:
import pyfuse3
class SequentialPrefetchFS(pyfuse3.Operations):
def __init__(self):
super().__init__()
self.read_patterns = {} # fh -> last_offset
async def read(self, fh, offset, size):
"""Read with sequential prefetching."""
inode = self.fh_to_inode[fh]
# Read requested data
data = await self.read_from_backend(inode, offset, size)
# Detect sequential access
last_offset = self.read_patterns.get(fh, -1)
if offset == last_offset + size:
# Sequential access detected, prefetch next block
prefetch_offset = offset + size
prefetch_size = size
prefetch_data = await self.read_from_backend(
inode, prefetch_offset, prefetch_size
)
pyfuse3.notify_store(inode, prefetch_offset, prefetch_data)
self.read_patterns[fh] = offset
return datainvalidate_inode(): May block with writeback caching; call from separate thread if neededinvalidate_entry(): Synchronous, may block brieflyinvalidate_entry_async(): Non-blocking, safe to call from any contextnotify_store(): May block briefly; generally thread-safeWhen enable_writeback_cache=True in Operations class:
invalidate_inode() may block waiting for dirty data to be flushedinvalidate_inode() from a separate threadExample with threading:
import threading
import pyfuse3
class SafeWritebackFS(pyfuse3.Operations):
enable_writeback_cache = True
def invalidate_from_thread(self, inode, attr_only=False):
"""Safely invalidate inode from separate thread."""
def do_invalidate():
try:
pyfuse3.invalidate_inode(inode, attr_only=attr_only)
except OSError as e:
# Log error but don't crash
logger.error(f"Failed to invalidate inode {inode}: {e}")
thread = threading.Thread(target=do_invalidate, daemon=True)
thread.start()
# Don't wait for thread to complete
async def on_external_change(self, inode):
"""Handle external change."""
# Update local state
self.update_local_state(inode)
# Invalidate from thread (safe with writeback cache)
self.invalidate_from_thread(inode, attr_only=False)All invalidation functions may raise OSError:
import pyfuse3
import errno
import logging
logger = logging.getLogger(__name__)
def safe_invalidate_inode(inode, attr_only=False):
"""Safely invalidate inode with error handling."""
try:
pyfuse3.invalidate_inode(inode, attr_only=attr_only)
except OSError as e:
if e.errno == errno.ENOSYS:
# Operation not supported by kernel
logger.warning("Inode invalidation not supported by kernel")
elif e.errno == errno.ENOENT:
# Inode not known to kernel (OK, nothing to invalidate)
logger.debug(f"Inode {inode} not in kernel cache")
else:
# Other error
logger.error(f"Failed to invalidate inode {inode}: {e}")
raise
def safe_invalidate_entry(parent_inode, name, deleted=0):
"""Safely invalidate entry with error handling."""
try:
pyfuse3.invalidate_entry_async(
inode_p=parent_inode,
name=name,
deleted=deleted,
ignore_enoent=True # Don't error if not cached
)
except OSError as e:
if e.errno == errno.ENOSYS:
# Operation not supported by kernel
logger.warning("Entry invalidation not supported by kernel")
else:
logger.error(f"Failed to invalidate entry {name}: {e}")
# Don't raise; invalidation is best-effortWhen enable_writeback_cache=True:
class ProblematicFS(pyfuse3.Operations):
enable_writeback_cache = True
async def some_operation(self, inode):
# Update data externally
self.update_external_data(inode)
# Try to invalidate cache
pyfuse3.invalidate_inode(inode) # MAY DEADLOCK!
# Kernel needs to flush dirty pages before invalidating
# But we're in the main event loop, so write() can't be called
# DEADLOCK!Use a separate thread:
import threading
import pyfuse3
class SafeFS(pyfuse3.Operations):
enable_writeback_cache = True
async def some_operation(self, inode):
# Update data externally
self.update_external_data(inode)
# Invalidate from separate thread
def invalidate():
pyfuse3.invalidate_inode(inode)
thread = threading.Thread(target=invalidate, daemon=True)
thread.start()
# Don't join thread (it needs main loop to handle write())If invalidation is frequent, consider disabling writeback cache:
class SimpleFS(pyfuse3.Operations):
enable_writeback_cache = False # Simpler but slower writes
async def some_operation(self, inode):
# Update data externally
self.update_external_data(inode)
# Safe to invalidate directly
pyfuse3.invalidate_inode(inode)Batch multiple invalidations to reduce overhead:
import pyfuse3
class BatchingFS(pyfuse3.Operations):
def __init__(self):
super().__init__()
self.pending_invalidations = []
self.invalidation_lock = asyncio.Lock()
async def queue_invalidation(self, inode):
"""Queue inode for invalidation."""
async with self.invalidation_lock:
self.pending_invalidations.append(inode)
async def flush_invalidations(self):
"""Flush all pending invalidations."""
async with self.invalidation_lock:
for inode in self.pending_invalidations:
try:
pyfuse3.invalidate_inode(inode, attr_only=False)
except OSError:
pass # Ignore errors
self.pending_invalidations.clear()
async def periodic_flush(self):
"""Periodically flush invalidations."""
while True:
await asyncio.sleep(1.0) # Every second
await self.flush_invalidations()import pyfuse3
import logging
logger = logging.getLogger(__name__)
def logged_invalidate_inode(inode, attr_only=False):
"""Invalidate inode with logging."""
logger.debug(f"Invalidating inode {inode} (attr_only={attr_only})")
try:
pyfuse3.invalidate_inode(inode, attr_only=attr_only)
logger.debug(f"Successfully invalidated inode {inode}")
except OSError as e:
logger.error(f"Failed to invalidate inode {inode}: {e}")
raise
def logged_invalidate_entry(parent, name, deleted=0):
"""Invalidate entry with logging."""
logger.debug(f"Invalidating entry {name} in {parent} (deleted={deleted})")
try:
pyfuse3.invalidate_entry_async(parent, name, deleted, ignore_enoent=True)
logger.debug(f"Successfully invalidated entry {name}")
except OSError as e:
logger.error(f"Failed to invalidate entry {name}: {e}")class MonitoredFS(pyfuse3.Operations):
def __init__(self):
super().__init__()
self.invalidation_count = 0
self.invalidation_errors = 0
def invalidate_with_stats(self, inode, attr_only=False):
"""Invalidate with statistics tracking."""
try:
pyfuse3.invalidate_inode(inode, attr_only=attr_only)
self.invalidation_count += 1
except OSError:
self.invalidation_errors += 1
raise
def get_stats(self):
"""Get invalidation statistics."""
return {
'invalidations': self.invalidation_count,
'errors': self.invalidation_errors
}invalidate_entry_async() over invalidate_entry()ignore_enoent=True when invalidating entriesattr_only=True when appropriate)