A python wrapper for rclone that makes rclone's functionality usable in python applications.
—
Data integrity operations including hash generation, validation, and file comparison with support for multiple hash algorithms and comprehensive checksum verification workflows.
Generate hashes for files or validate existing checksums using various hash algorithms supported by different storage backends.
def hash(hash: Union[str, HashTypes], path: str, download=False,
checkfile: Optional[str] = None, output_file: Optional[str] = None,
args: List[str] = None) -> Union[None, str, bool, Dict[str, str], Dict[str, bool]]:
"""
Generates or validates file hashes using specified algorithm.
Parameters:
- hash (Union[str, HashTypes]): Hash algorithm (e.g., 'sha1', HashTypes.md5)
- path (str): File or directory path to hash
- download (bool): Download files to hash locally (useful for unsupported algorithms)
- checkfile (str, optional): Validate against existing checksum file
- output_file (str, optional): Write hashes to file instead of returning
- args (List[str]): Additional rclone hashsum flags
Returns:
- None: When output_file is specified
- str: Hash value for single file (when no checkfile/output_file)
- bool: Validation result for single file (when checkfile specified)
- Dict[str, str]: Filename -> hash mapping for multiple files
- Dict[str, bool]: Filename -> validation result for multiple files
Raises:
RcloneException: If hash operation fails
"""Compare files between source and destination to identify differences, missing files, and data integrity issues.
def check(source: str, dest: str, combined: str = None, size_only: bool = False,
download: bool = False, one_way: bool = False, args: List[str] = None
) -> Tuple[bool, List[Tuple[str, str]]]:
"""
Compares files between source and destination locations.
Parameters:
- source (str): Source path to compare from
- dest (str): Destination path to compare to
- combined (str, optional): Output file path for detailed results
- size_only (bool): Compare only file sizes, not hashes (faster)
- download (bool): Download files for local comparison
- one_way (bool): Only check source -> dest, ignore extra dest files
- args (List[str]): Additional rclone check flags
Returns:
Tuple[bool, List[Tuple[str, str]]]:
- bool: True if all files match
- List of (symbol, filepath) tuples where symbols mean:
"=" - File identical in source and destination
"-" - File missing from source (only in destination)
"+" - File missing from destination (only in source)
"*" - File exists in both but differs
"!" - Error reading or hashing file
Raises:
RcloneException: If comparison operation fails
"""from rclone_python import rclone
from rclone_python.hash_types import HashTypes
# Generate SHA1 hash for single file
sha1_hash = rclone.hash(HashTypes.sha1, 'onedrive:document.pdf')
print(f"SHA1: {sha1_hash}")
# Generate MD5 hashes for directory
md5_hashes = rclone.hash('md5', 'dropbox:photos')
for filename, hash_value in md5_hashes.items():
print(f"{filename}: {hash_value}")
# Use string hash type
sha256_hash = rclone.hash('sha256', 'box:archive.zip')from rclone_python import rclone
from rclone_python.hash_types import HashTypes
# Create checksum file first
rclone.hash(HashTypes.sha1, 'backup:files', output_file='checksums.sha1')
# Later, validate against checksum file
validation_results = rclone.hash(
HashTypes.sha1,
'backup:files',
checkfile='checksums.sha1'
)
# Check results
if isinstance(validation_results, dict):
for filename, is_valid in validation_results.items():
status = "✓ Valid" if is_valid else "✗ Invalid"
print(f"{filename}: {status}")
else:
# Single file result
print(f"File validation: {'✓ Valid' if validation_results else '✗ Invalid'}")from rclone_python import rclone
# Compare directories
matches, file_list = rclone.check('local:source', 'onedrive:backup')
print(f"Directories match: {matches}")
print("\nFile comparison results:")
for symbol, filepath in file_list:
status_map = {
'=': '✓ Identical',
'+': '→ Missing from destination',
'-': '← Extra in destination',
'*': '≠ Different',
'!': '⚠ Error'
}
status = status_map.get(symbol, symbol)
print(f"{filepath}: {status}")from rclone_python import rclone
from rclone_python.hash_types import HashTypes
def verify_backup_integrity(source_path, backup_path):
"""Comprehensive backup integrity verification"""
print(f"Verifying backup integrity...")
print(f"Source: {source_path}")
print(f"Backup: {backup_path}")
# First, do a quick size-only check
print("\n1. Quick size comparison...")
size_match, size_results = rclone.check(
source_path, backup_path,
size_only=True
)
if not size_match:
print("⚠ Size differences detected:")
for symbol, filepath in size_results:
if symbol != '=':
print(f" {symbol} {filepath}")
# Ask if user wants to continue with hash check
response = input("\nContinue with hash verification? (y/n): ")
if response.lower() != 'y':
return
# Full hash-based comparison
print("\n2. Hash-based verification...")
hash_match, hash_results = rclone.check(source_path, backup_path)
# Categorize results
identical = [f for s, f in hash_results if s == '=']
different = [f for s, f in hash_results if s == '*']
missing_dest = [f for s, f in hash_results if s == '+']
extra_dest = [f for s, f in hash_results if s == '-']
errors = [f for s, f in hash_results if s == '!']
print(f"\nResults:")
print(f" ✓ Identical files: {len(identical)}")
print(f" ≠ Different files: {len(different)}")
print(f" → Missing from backup: {len(missing_dest)}")
print(f" ← Extra in backup: {len(extra_dest)}")
print(f" ⚠ Errors: {len(errors)}")
# Show problems
if different:
print(f"\nFiles with differences:")
for filepath in different[:10]: # Show first 10
print(f" {filepath}")
if missing_dest:
print(f"\nFiles missing from backup:")
for filepath in missing_dest[:10]:
print(f" {filepath}")
return hash_match
# Run integrity check
verify_backup_integrity('important_data', 'onedrive:backup/important_data')from rclone_python import rclone
from rclone_python.hash_types import HashTypes
from collections import defaultdict
def find_duplicate_files(path):
"""Find duplicate files based on hash values"""
print(f"Scanning for duplicates in: {path}")
# Generate hashes for all files
hashes = rclone.hash(HashTypes.md5, path)
if isinstance(hashes, str):
print("Only one file found, no duplicates possible")
return
# Group files by hash
hash_groups = defaultdict(list)
for filename, hash_value in hashes.items():
hash_groups[hash_value].append(filename)
# Find duplicates
duplicates = {h: files for h, files in hash_groups.items() if len(files) > 1}
if duplicates:
print(f"\nFound {len(duplicates)} sets of duplicate files:")
total_wasted_space = 0
for hash_value, files in duplicates.items():
print(f"\nDuplicate set (hash: {hash_value[:8]}...):")
# Get file sizes
file_sizes = []
for filename in files:
file_list = rclone.ls(path, files_only=True)
file_info = next((f for f in file_list if f['Name'] == filename), None)
if file_info:
file_sizes.append(file_info['Size'])
print(f" {filename} ({file_info['Size']} bytes)")
if file_sizes:
wasted = sum(file_sizes[1:]) # All but first file are duplicates
total_wasted_space += wasted
print(f"\nTotal wasted space: {total_wasted_space / (1024**2):.2f} MB")
else:
print("No duplicate files found")
# Find duplicates in directory
find_duplicate_files('downloads:')from rclone_python import rclone
from rclone_python.hash_types import HashTypes
import json
import os
from datetime import datetime
def create_integrity_baseline(path, baseline_file):
"""Create integrity baseline for monitoring"""
print(f"Creating integrity baseline for: {path}")
# Generate hashes for all files
hashes = rclone.hash(HashTypes.sha256, path)
# Get file metadata
files = rclone.ls(path, files_only=True)
file_metadata = {f['Name']: f for f in files}
# Create baseline data
baseline = {
'created': datetime.now().isoformat(),
'path': path,
'files': {}
}
if isinstance(hashes, dict):
for filename, hash_value in hashes.items():
metadata = file_metadata.get(filename, {})
baseline['files'][filename] = {
'hash': hash_value,
'size': metadata.get('Size', 0),
'modified': metadata.get('ModTime', '')
}
# Save baseline
with open(baseline_file, 'w') as f:
json.dump(baseline, f, indent=2)
print(f"Baseline saved to: {baseline_file}")
print(f"Tracked files: {len(baseline['files'])}")
def check_against_baseline(path, baseline_file):
"""Check current state against integrity baseline"""
if not os.path.exists(baseline_file):
print(f"Baseline file not found: {baseline_file}")
return
# Load baseline
with open(baseline_file, 'r') as f:
baseline = json.load(f)
print(f"Checking integrity against baseline from {baseline['created']}")
# Get current hashes
current_hashes = rclone.hash(HashTypes.sha256, path)
if isinstance(current_hashes, str):
current_hashes = {path.split('/')[-1]: current_hashes}
# Compare with baseline
issues = []
# Check for modified files
for filename, current_hash in current_hashes.items():
if filename in baseline['files']:
baseline_hash = baseline['files'][filename]['hash']
if current_hash != baseline_hash:
issues.append(f"MODIFIED: {filename}")
# Check for missing files
for filename in baseline['files']:
if filename not in current_hashes:
issues.append(f"MISSING: {filename}")
# Check for new files
for filename in current_hashes:
if filename not in baseline['files']:
issues.append(f"NEW: {filename}")
if issues:
print(f"\n⚠ Integrity issues detected:")
for issue in issues:
print(f" {issue}")
else:
print("✓ All files match baseline - integrity verified")
return len(issues) == 0
# Create and use integrity monitoring
create_integrity_baseline('critical_data:', 'critical_data_baseline.json')
# Later, check integrity
check_against_baseline('critical_data:', 'critical_data_baseline.json')The HashTypes enum provides access to all hash algorithms supported by rclone:
Choose hash algorithms based on your needs and backend support:
from rclone_python import rclone
from rclone_python.hash_types import HashTypes
# Check what hashes a backend supports
def check_backend_hashes(remote_path):
"""Test which hash algorithms work with a backend"""
test_algorithms = [HashTypes.md5, HashTypes.sha1, HashTypes.sha256, HashTypes.crc32]
supported = []
for algorithm in test_algorithms:
try:
result = rclone.hash(algorithm, remote_path)
supported.append(algorithm.value)
except Exception:
pass # Algorithm not supported
print(f"Supported hashes for {remote_path}: {supported}")
return supported
# Test backend support
check_backend_hashes('onedrive:test.txt')from rclone_python import rclone
from rclone_python.hash_types import HashTypes
def multi_hash_verification(path):
"""Verify files using multiple hash algorithms"""
algorithms = [HashTypes.md5, HashTypes.sha1, HashTypes.sha256]
results = {}
for algorithm in algorithms:
try:
hashes = rclone.hash(algorithm, path)
results[algorithm.value] = hashes
print(f"✓ {algorithm.value}: Generated hashes for {len(hashes) if isinstance(hashes, dict) else 1} files")
except Exception as e:
print(f"✗ {algorithm.value}: Failed - {e}")
return results
# Generate multiple hash types
multi_hashes = multi_hash_verification('important:files/')Install with Tessl CLI
npx tessl i tessl/pypi-rclone-python