A wrapper for the Gnu Privacy Guard (GPG or GnuPG)
—
Keyserver interactions for publishing, retrieving, and searching public keys across distributed keyserver networks for key distribution and discovery.
Retrieve public keys from keyservers using key IDs, fingerprints, or email addresses with support for multiple keyserver protocols.
def recv_keys(self, keyserver, *keyids, **kwargs):
"""
Retrieve one or more keys from a keyserver.
Parameters:
- keyserver (str): Keyserver URL or hostname
- *keyids: Variable number of key IDs, fingerprints, or email addresses
- extra_args (list): Additional GPG arguments
- passphrase (str): Passphrase if needed for key import
Returns:
ImportResult: Result with imported key information and statistics
"""
def auto_locate_key(self, email, mechanisms=None, **kwargs):
"""
Automatically locate a public key by email address using various mechanisms.
Parameters:
- email (str): Email address to locate key for
- mechanisms (list): Location mechanisms ('keyserver', 'pka', 'cert', 'dane', 'wkd', 'ldap')
- extra_args (list): Additional GPG arguments
Returns:
AutoLocateKey: Result with located key information
"""Publish public keys to keyservers to make them available for others to discover and retrieve.
def send_keys(self, keyserver, *keyids, **kwargs):
"""
Send one or more keys to a keyserver for public distribution.
Parameters:
- keyserver (str): Keyserver URL or hostname to send keys to
- *keyids: Variable number of key IDs, fingerprints, or email addresses to send
- extra_args (list): Additional GPG arguments
Returns:
SendResult: Result with send operation status
"""Search keyservers for keys matching specific criteria such as names, email addresses, or key IDs.
def search_keys(self, query, keyserver='pgp.mit.edu', extra_args=None):
"""
Search a keyserver for keys matching the query.
Parameters:
- query (str): Search query (name, email, or key ID)
- keyserver (str): Keyserver to search (default: 'pgp.mit.edu')
- extra_args (list): Additional GPG arguments
Returns:
SearchKeys: List of matching keys with metadata
"""class ImportResult(StatusHandler):
count: int # Total keys processed
imported: int # Keys successfully imported
not_imported: int # Keys that failed to import
unchanged: int # Keys already in keyring unchanged
new_user_ids: int # New user IDs added to existing keys
new_signatures: int # New signatures added
new_subkeys: int # New subkeys added
secret_imported: int # Secret keys imported
secret_unchanged: int # Secret keys unchanged
fingerprints: list # List of processed key fingerprints
def summary(self):
"""Return human-readable import summary."""
class SendResult(StatusHandler):
# Status information for key sending operations
pass
class AutoLocateKey(StatusHandler):
email: str # Email address that was searched
display_name: str # Display name from located key
created_at: datetime # Key creation timestamp
key_length: int # Key length in bits
fingerprint: str # Key fingerprint
class SearchKeys(list):
# List of key dictionaries containing:
# - keyid: Key ID
# - algo: Algorithm number
# - length: Key length in bits
# - date: Creation date
# - expires: Expiration date (if any)
# - uids: List of user IDs
# - revoked: Whether key is revoked
# - disabled: Whether key is disabled
# - invalid: Whether key is invalid
passimport gnupg
gpg = gnupg.GPG()
# Retrieve key by email address
result = gpg.recv_keys('keyserver.ubuntu.com', 'alice@example.com')
if result.imported > 0:
print(f"Successfully imported {result.imported} keys")
print(f"Key fingerprints: {result.fingerprints}")
else:
print(f"No keys imported: {result.status}")
# Retrieve multiple keys by ID
key_ids = ['DEADBEEF', 'CAFEBABE', '12345678']
result = gpg.recv_keys('pgp.mit.edu', *key_ids)
print(f"Processed {result.count} keys")
print(f"Imported: {result.imported}")
print(f"Already had: {result.unchanged}")# Various keyserver formats
keyservers = [
'keyserver.ubuntu.com',
'pgp.mit.edu',
'hkp://pool.sks-keyservers.net',
'hkps://hkps.pool.sks-keyservers.net',
'ldap://keyserver.pgp.com'
]
# Try multiple keyservers for reliability
def retrieve_key_with_fallback(gpg_instance, key_identifier, keyservers):
for keyserver in keyservers:
try:
result = gpg_instance.recv_keys(keyserver, key_identifier)
if result.imported > 0:
print(f"Retrieved key from {keyserver}")
return result
except Exception as e:
print(f"Failed to retrieve from {keyserver}: {e}")
continue
print("Failed to retrieve key from all keyservers")
return None
# Usage
result = retrieve_key_with_fallback(gpg, 'user@example.com', keyservers)# Auto-locate key using multiple mechanisms
result = gpg.auto_locate_key(
'colleague@company.com',
mechanisms=['wkd', 'keyserver', 'pka'] # Web Key Directory, keyserver, Public Key Association
)
if result.fingerprint:
print(f"Found key for {result.email}")
print(f"Display name: {result.display_name}")
print(f"Fingerprint: {result.fingerprint}")
print(f"Key length: {result.key_length} bits")
print(f"Created: {result.created_at}")
else:
print(f"Could not locate key for {result.email}")
# Auto-locate with specific mechanisms
wkd_result = gpg.auto_locate_key('user@example.org', mechanisms=['wkd'])
keyserver_result = gpg.auto_locate_key('user@example.org', mechanisms=['keyserver'])# Send public key to keyserver
result = gpg.send_keys('keyserver.ubuntu.com', 'alice@example.com')
if result.status == 'key sent':
print("Key successfully published to keyserver")
else:
print(f"Failed to send key: {result.status}")
# Send multiple keys
my_keys = ['alice@example.com', 'work@alice.org']
result = gpg.send_keys('pgp.mit.edu', *my_keys)
# Send by fingerprint
result = gpg.send_keys('hkp://pool.sks-keyservers.net', 'DEADBEEFCAFEBABE12345678')# Search by name
results = gpg.search_keys('John Smith', keyserver='keyserver.ubuntu.com')
print(f"Found {len(results)} matching keys:")
for key in results:
print(f"Key ID: {key['keyid']}")
print(f"Algorithm: {key['algo']}")
print(f"Length: {key['length']} bits")
print(f"Created: {key['date']}")
print(f"User IDs: {', '.join(key['uids'])}")
if key['expires']:
print(f"Expires: {key['expires']}")
if key['revoked']:
print("WARNING: Key is revoked")
print("---")
# Search by email
email_results = gpg.search_keys('alice@example.com')
# Search by partial key ID
keyid_results = gpg.search_keys('DEADBEEF')def find_best_key(gpg_instance, search_query, keyserver='keyserver.ubuntu.com'):
"""Find the best key from search results based on various criteria."""
results = gpg_instance.search_keys(search_query, keyserver=keyserver)
if not results:
return None
# Filter and rank keys
valid_keys = []
for key in results:
# Skip revoked, disabled, or invalid keys
if key.get('revoked') or key.get('disabled') or key.get('invalid'):
continue
# Skip expired keys
if key.get('expires'):
from datetime import datetime
try:
expires = datetime.strptime(key['expires'], '%Y-%m-%d')
if expires < datetime.now():
continue
except:
pass # Skip date parsing errors
# Prefer longer keys
key_score = key.get('length', 0)
# Prefer more recent keys
try:
created = datetime.strptime(key['date'], '%Y-%m-%d')
days_old = (datetime.now() - created).days
key_score += max(0, 3650 - days_old) # Bonus for newer keys (up to 10 years)
except:
pass
valid_keys.append((key_score, key))
if not valid_keys:
return None
# Return highest scoring key
valid_keys.sort(key=lambda x: x[0], reverse=True)
return valid_keys[0][1]
# Usage
best_key = find_best_key(gpg, 'alice@example.com')
if best_key:
print(f"Best key: {best_key['keyid']}")
print(f"Length: {best_key['length']} bits")
print(f"User IDs: {', '.join(best_key['uids'])}")
# Import the selected key
import_result = gpg.recv_keys('keyserver.ubuntu.com', best_key['keyid'])
if import_result.imported > 0:
print("Key imported successfully")# Configure default keyserver options
def setup_keyserver_options(gpg_instance):
"""Configure GPG instance with keyserver preferences."""
# Set keyserver options for reliability
keyserver_options = [
'--keyserver-options', 'timeout=30', # 30 second timeout
'--keyserver-options', 'import-clean', # Clean imported keys
'--keyserver-options', 'export-clean', # Clean exported keys
'--keyserver-options', 'auto-key-retrieve', # Auto-retrieve missing keys
]
if gpg_instance.options:
gpg_instance.options.extend(keyserver_options)
else:
gpg_instance.options = keyserver_options
# Create GPG instance with keyserver configuration
gpg = gnupg.GPG(options=[
'--keyserver', 'hkps://keys.openpgp.org',
'--keyserver-options', 'timeout=30'
])
# Test keyserver connectivity
def test_keyserver_connectivity(gpg_instance, keyserver):
"""Test if a keyserver is accessible."""
try:
# Try to search for a common key (GPG itself)
results = gpg_instance.search_keys('gnupg', keyserver=keyserver)
return len(results) > 0
except Exception as e:
print(f"Keyserver {keyserver} not accessible: {e}")
return False
# Test multiple keyservers
keyservers_to_test = [
'keys.openpgp.org',
'keyserver.ubuntu.com',
'pgp.mit.edu'
]
working_keyservers = []
for ks in keyservers_to_test:
if test_keyserver_connectivity(gpg, ks):
working_keyservers.append(ks)
print(f"✓ {ks} is accessible")
else:
print(f"✗ {ks} is not accessible")
print(f"Working keyservers: {working_keyservers}")def reliable_key_operation(operation_func, *args, max_retries=3, **kwargs):
"""Wrapper for reliable keyserver operations with retry logic."""
for attempt in range(max_retries):
try:
result = operation_func(*args, **kwargs)
# Check if operation was successful
if hasattr(result, 'imported') and result.imported > 0:
return result
elif hasattr(result, 'status') and 'success' in result.status.lower():
return result
elif not hasattr(result, 'imported'): # Search operations
return result
print(f"Attempt {attempt + 1} had no results, retrying...")
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt == max_retries - 1:
raise
# Wait before retry (exponential backoff)
import time
time.sleep(2 ** attempt)
return None
# Usage with retry logic
result = reliable_key_operation(
gpg.recv_keys,
'keyserver.ubuntu.com',
'user@example.com',
max_retries=3
)
if result and result.imported > 0:
print("Key retrieved successfully with retry logic")Install with Tessl CLI
npx tessl i tessl/pypi-python-gnupg