Python client library for Facebook's Watchman file watching service
npx @tessl/cli install tessl/pypi-pywatchman@3.0.0Python client library for Facebook's Watchman file watching service. Provides a simple interface to connect to and query Watchman to discover file system changes efficiently.
pip install pywatchmanimport pywatchmanFor binary serialization:
from pywatchman import bserimport pywatchman
# Create a client connection
client = pywatchman.client()
# Query Watchman for socket name
result = client.query('get-sockname')
print(result['sockname'])
# Watch a directory for changes
watch_result = client.query('watch', '/path/to/directory')
# Query for file changes
files = client.query('query', '/path/to/directory', {
'expression': ['allof', ['type', 'f'], ['suffix', 'py']],
'fields': ['name', 'size', 'mtime_ms']
})
# Handle errors
try:
result = client.query('invalid-command')
except pywatchman.Unavailable as e:
print(f"Watchman unavailable: {e}")Connect to and communicate with the Watchman service via Unix domain socket.
class client:
def __init__(self, sockpath=None, timeout=1.0):
"""
Create a new Watchman client.
Args:
sockpath (str, optional): Path to Watchman socket. If None, auto-discovered.
timeout (float): Connection timeout in seconds. Default: 1.0
"""Send commands to Watchman and receive structured responses.
def query(self, *args):
"""
Send query to Watchman service and return result.
Args:
*args: Variable arguments representing the Watchman command and parameters
Returns:
dict: Structured response from Watchman
Raises:
Unavailable: When Watchman service is unavailable or command fails
"""Common query patterns:
# Get socket name
client.query('get-sockname')
# Watch a directory
client.query('watch', '/path/to/directory')
# Query files with expression
client.query('query', '/path/to/directory', {
'expression': ['allof', ['type', 'f'], ['suffix', 'py']],
'fields': ['name', 'size', 'mtime_ms', 'exists']
})
# Get version info
client.query('version')
# Shutdown watchman
client.query('shutdown-server')High-performance binary serialization for Watchman protocol communication.
def dumps(obj):
"""
Serialize Python object to BSER (Binary Serialization) format.
Args:
obj: Python object to serialize (dict, list, str, int, float, bool, None)
Returns:
bytes: Binary data in BSER format
Supported types:
- dict, list, tuple (converted to list)
- str, unicode (encoded as UTF-8)
- int, float, bool, None
"""
def loads(data):
"""
Deserialize BSER binary data to Python object.
Args:
data (bytes): Binary data in BSER format
Returns:
Python object (dict, list, str, int, float, bool, None)
Raises:
ValueError: On invalid BSER data
"""
def pdu_len(data):
"""
Extract PDU (Protocol Data Unit) length from BSER header.
Args:
data (bytes): Binary data containing BSER PDU header (minimum 13 bytes)
Returns:
int: Length of complete PDU for reading from socket
"""Example binary serialization usage:
from pywatchman import bser
# Serialize data
query_data = {'expression': ['type', 'f'], 'fields': ['name']}
serialized = bser.dumps(query_data)
# Deserialize response
response_data = bser.loads(serialized)
# Get PDU length for socket reading
pdu_length = bser.pdu_len(serialized[:13])Handle connection failures and service unavailability.
class Unavailable(Exception):
"""
Exception raised when Watchman service is unavailable.
Raised in cases of:
- Connection failures to Watchman socket
- Watchman executable not found in PATH
- Socket communication timeouts
- Watchman command errors
- Invalid response data
"""Error handling patterns:
try:
client = pywatchman.client()
result = client.query('watch', '/nonexistent/path')
except pywatchman.Unavailable as e:
if 'executable not in PATH' in str(e):
print("Watchman is not installed")
elif 'unable to connect' in str(e):
print("Watchman service is not running")
elif 'timed out' in str(e):
print("Connection timed out")
else:
print(f"Watchman error: {e}")# Response structure for typical Watchman queries
QueryResponse = dict[str, Any] # Contains 'files', 'clock', 'is_fresh_instance', etc.
# File information structure
FileInfo = dict[str, Any] # Contains 'name', 'size', 'mtime_ms', 'exists', etc.
# Watch response structure
WatchResponse = dict[str, Any] # Contains 'watch', 'watcher', etc.
# Version response structure
VersionResponse = dict[str, Any] # Contains 'version', 'buildinfo', etc.sniff_len = 13 # PDU header length for protocol detection