HashiCorp Vault API client for Python with comprehensive authentication, secrets management, and system administration capabilities
—
Complete Vault administration interface providing initialization, seal management, policy administration, audit logging, cluster operations, and system monitoring. HVAC's system backend offers comprehensive administrative control for enterprise Vault deployments.
Bootstrap new Vault instances and manage seal/unseal operations for startup and emergency procedures.
class Init:
def read_init_status(self) -> dict: ...
def is_initialized(self) -> bool: ...
def initialize(
self,
secret_shares: int = 5,
secret_threshold: int = 3,
pgp_keys: list = None,
root_token_pgp_key: str = None,
stored_shares: int = None, # Enterprise HSM
recovery_shares: int = None, # Enterprise HSM
recovery_threshold: int = None, # Enterprise HSM
recovery_pgp_keys: list = None # Enterprise HSM
) -> dict: ...
class Seal:
def read_seal_status(self) -> dict: ...
def is_sealed(self) -> bool: ...
def seal(self) -> dict: ...
def submit_unseal_key(
self,
key: str,
reset: bool = False,
migrate: bool = None
) -> dict: ...
def submit_unseal_keys(self, keys: list) -> dict: ...Configure and manage authentication backends for user and machine identity verification.
class Auth:
def list_auth_methods(self) -> dict: ...
def enable_auth_method(
self,
method_type: str,
description: str = None,
config: dict = None,
plugin_name: str = None,
local: bool = None, # Enterprise replication
path: str = None
) -> None: ...
def disable_auth_method(self, path: str) -> None: ...
def read_auth_method_tuning(self, path: str) -> dict: ...
def tune_auth_method(
self,
path: str,
default_lease_ttl: str = None,
max_lease_ttl: str = None,
description: str = None,
audit_non_hmac_request_keys: list = None,
audit_non_hmac_response_keys: list = None,
listing_visibility: str = None,
passthrough_request_headers: list = None,
allowed_response_headers: list = None,
token_type: str = None
) -> None: ...Mount, configure, and manage secrets engines for different secret types and credential systems.
class Mount:
def list_mounted_secrets_engines(self) -> dict: ...
def enable_secrets_engine(
self,
backend_type: str,
description: str = None,
config: dict = None,
options: dict = None,
local: bool = None, # Enterprise replication
seal_wrap: bool = None, # Enterprise seal wrapping
path: str = None
) -> None: ...
def disable_secrets_engine(self, path: str) -> None: ...
def read_mount_configuration(self, path: str) -> dict: ...
def tune_mount_configuration(
self,
path: str,
default_lease_ttl: str = None,
max_lease_ttl: str = None,
description: str = None,
audit_non_hmac_request_keys: list = None,
audit_non_hmac_response_keys: list = None,
listing_visibility: str = None,
passthrough_request_headers: list = None,
allowed_response_headers: list = None
) -> None: ...
def move_backend(self, from_path: str, to_path: str) -> None: ...
def retrieve_mount_option(self, mount_point: str, option_name: str) -> dict: ...Create and manage ACL policies and Enterprise governance policies for fine-grained access control.
class Policy:
def list_policies(self) -> dict: ...
def read_policy(self, name: str, pretty_print: bool = False) -> dict: ...
def create_or_update_policy(
self,
name: str,
policy: str | dict,
pretty_print: bool = False
) -> None: ...
def delete_policy(self, name: str) -> None: ...
class Policies:
# ACL Policies
def list_acl_policies(self) -> dict: ...
def read_acl_policy(self, name: str) -> dict: ...
def create_or_update_acl_policy(
self,
name: str,
policy: str | dict
) -> None: ...
def delete_acl_policy(self, name: str) -> None: ...
# Role Governing Policies (Enterprise)
def list_rgp_policies(self) -> dict: ...
def read_rgp_policy(self, name: str) -> dict: ...
def create_or_update_rgp_policy(
self,
name: str,
policy: str,
enforcement_level: str = "hard-mandatory" # advisory, soft-mandatory, hard-mandatory
) -> None: ...
def delete_rgp_policy(self, name: str) -> None: ...
# Endpoint Governing Policies (Enterprise)
def list_egp_policies(self) -> dict: ...
def read_egp_policy(self, name: str) -> dict: ...
def create_or_update_egp_policy(
self,
name: str,
policy: str,
paths: list,
enforcement_level: str = "hard-mandatory"
) -> None: ...
def delete_egp_policies(self, name: str) -> None: ...Configure comprehensive audit logging for compliance and security monitoring.
class Audit:
def list_enabled_audit_devices(self) -> dict: ...
def enable_audit_device(
self,
device_type: str, # file, syslog, socket
description: str = None,
options: dict = None,
path: str = None,
local: bool = None # Enterprise replication
) -> None: ...
def disable_audit_device(self, path: str) -> None: ...
def calculate_hash(
self,
path: str,
input_to_hash: str
) -> dict: ...Monitor and manage secret leases for credential lifecycle and cleanup operations.
class Lease:
def read_lease(self, lease_id: str) -> dict: ...
def list_leases(self, prefix: str) -> dict: ...
def renew_lease(
self,
lease_id: str,
increment: int = None
) -> dict: ...
def revoke_lease(self, lease_id: str) -> None: ...
def revoke_prefix(
self,
prefix: str,
sync: bool = False
) -> None: ...
def revoke_force(
self,
prefix: str,
sync: bool = False
) -> None: ...Manage Vault's encryption keys, root token generation, and rekeying operations.
class Key:
# Encryption Key Management
def get_encryption_key_status(self) -> dict: ...
def rotate_encryption_key(self) -> None: ...
# Root Token Generation
def read_root_generation_progress(self) -> dict: ...
def start_root_token_generation(
self,
key: str = None,
pgp_key: str = None
) -> dict: ...
def generate_root(
self,
key: str,
nonce: str
) -> dict: ...
def cancel_root_generation(self) -> dict: ...
# Rekeying Operations
def read_rekey_progress(self) -> dict: ...
def start_rekey(
self,
secret_shares: int = None,
secret_threshold: int = None,
pgp_keys: list = None,
backup: bool = None,
require_verification: bool = None
) -> dict: ...
def rekey(
self,
key: str,
nonce: str
) -> dict: ...
def rekey_multi(
self,
keys: list,
nonce: str
) -> dict: ...
def cancel_rekey(self) -> dict: ...
def read_backup_keys(self) -> dict: ...
def rekey_verify(
self,
key: str,
nonce: str
) -> dict: ...
def rekey_verify_multi(
self,
keys: list,
nonce: str
) -> dict: ...Monitor Vault health status and manage high availability cluster operations.
class Health:
def read_health_status(
self,
standby_ok: bool = None,
active_code: int = 200,
standby_code: int = 429,
dr_secondary_code: int = 472,
performance_standby_code: int = 473,
sealed_code: int = 503,
uninit_code: int = 501,
method: str = "GET"
) -> dict: ...
class Leader:
def read_leader_status(self) -> dict: ...
def step_down(self) -> None: ...Verify token permissions and capabilities for access control testing.
class Capabilities:
def get_capabilities(
self,
paths: list,
token: str = None,
accessor: str = None
) -> dict: ...Advanced enterprise features for multi-tenancy, rate limiting, and integrated storage.
class Namespace:
def create_namespace(
self,
path: str
) -> None: ...
def list_namespaces(self) -> dict: ...
def delete_namespace(self, path: str) -> None: ...
class Quota:
def list_quotas(self) -> dict: ...
def read_quota(self, name: str) -> dict: ...
def create_or_update_quota(
self,
name: str,
rate: float,
path: str = "",
interval: str = "1s",
block_interval: str = "10s",
inheritable: bool = None
) -> None: ...
def delete_quota(self, name: str) -> None: ...
class Raft:
# Cluster Management
def join_raft_cluster(
self,
leader_api_addr: str,
leader_ca_cert: str = None,
leader_client_cert: str = None,
leader_client_key: str = None,
retry: bool = None,
non_voter: bool = None
) -> dict: ...
def read_raft_config(self) -> dict: ...
def remove_raft_node(self, server_id: str) -> None: ...
# Snapshot Operations
def take_raft_snapshot(self) -> bytes: ...
def restore_raft_snapshot(self, snapshot: bytes) -> None: ...
def force_restore_raft_snapshot(self, snapshot: bytes) -> None: ...
# Auto-Snapshot Configuration
def list_raft_auto_snapshot_configs(self) -> dict: ...
def create_or_update_raft_auto_snapshot_config(
self,
name: str,
interval: str,
retain: int,
path_prefix: str,
storage_type: str, # aws-s3, azure-blob, google-gcs, local
**kwargs # Storage-specific configuration
) -> None: ...
def delete_raft_auto_snapshot_config(self, name: str) -> None: ...
class Wrapping:
def wrap(
self,
payload: dict,
ttl: int = 300
) -> dict: ...
def unwrap(self, token: str) -> dict: ...import hvac
# Connect to uninitialized Vault
client = hvac.Client(url='https://vault.example.com:8200')
# Check initialization status
if not client.sys.init.is_initialized():
# Initialize Vault with 5 key shares, 2 required for unsealing
init_response = client.sys.init.initialize(
secret_shares=5,
secret_threshold=2
)
# Store unseal keys and root token securely
unseal_keys = init_response['keys']
root_token = init_response['root_token']
print(f"Vault initialized. Root token: {root_token}")
print(f"Unseal keys: {unseal_keys}")
# Check seal status and unseal if needed
seal_status = client.sys.seal.read_seal_status()
if seal_status['sealed']:
# Unseal with minimum threshold keys
client.sys.seal.submit_unseal_keys(unseal_keys[:2])
print("Vault unsealed successfully")
# Set root token for administrative operations
client.token = root_token# Enable username/password authentication
client.sys.auth.enable_auth_method(
method_type='userpass',
path='userpass',
description='Username/password authentication for users'
)
# Configure auth method settings
client.sys.auth.tune_auth_method(
path='userpass',
default_lease_ttl='1h',
max_lease_ttl='24h',
token_type='default'
)
# Enable AWS authentication for EC2 instances
client.sys.auth.enable_auth_method(
method_type='aws',
path='aws-ec2',
description='AWS EC2 instance authentication',
config={
'default_lease_ttl': '1h',
'max_lease_ttl': '12h'
}
)
# List all enabled auth methods
auth_methods = client.sys.auth.list_auth_methods()
for path, config in auth_methods['data'].items():
print(f"Auth method: {path} ({config['type']})")# Enable KV v2 secrets engine
client.sys.mount.enable_secrets_engine(
backend_type='kv',
path='secret',
options={'version': '2'},
description='Application secrets storage'
)
# Enable database secrets engine for dynamic credentials
client.sys.mount.enable_secrets_engine(
backend_type='database',
path='database',
description='Dynamic database credentials'
)
# Tune secrets engine settings
client.sys.mount.tune_mount_configuration(
path='database',
default_lease_ttl='1h',
max_lease_ttl='12h'
)
# List all mounted secrets engines
mounts = client.sys.mount.list_mounted_secrets_engines()
for path, config in mounts['data'].items():
print(f"Secrets engine: {path} ({config['type']})")# Create application policy
app_policy = """
path "secret/data/myapp/*" {
capabilities = ["create", "read", "update", "delete", "list"]
}
path "database/creds/readonly" {
capabilities = ["read"]
}
path "auth/token/renew-self" {
capabilities = ["update"]
}
"""
client.sys.policy.create_or_update_policy(
name='myapp-policy',
policy=app_policy
)
# Create read-only policy
readonly_policy = """
path "secret/data/*" {
capabilities = ["read", "list"]
}
"""
client.sys.policy.create_or_update_policy(
name='readonly-policy',
policy=readonly_policy
)
# List all policies
policies = client.sys.policy.list_policies()
print(f"Available policies: {policies['data']['policies']}")# Enable file audit logging
client.sys.audit.enable_audit_device(
device_type='file',
path='file-audit',
options={
'file_path': '/var/log/vault/audit.log',
'log_raw': False
},
description='File-based audit logging'
)
# Enable syslog audit (redundant logging)
client.sys.audit.enable_audit_device(
device_type='syslog',
path='syslog-audit',
options={
'facility': 'AUTH',
'tag': 'vault'
},
description='Syslog audit logging'
)
# List enabled audit devices
audit_devices = client.sys.audit.list_enabled_audit_devices()
for path, config in audit_devices['data'].items():
print(f"Audit device: {path} ({config['type']})")# Check Vault health for load balancer
health = client.sys.health.read_health_status(
standby_ok=True, # Accept standby nodes as healthy
active_code=200, # Active node returns 200
standby_code=200 # Standby node returns 200 (not 429)
)
print(f"Vault cluster ID: {health['cluster_id']}")
print(f"Version: {health['version']}")
print(f"Sealed: {health['sealed']}")
print(f"Standby: {health['standby']}")
# Check leader status for HA clusters
leader_status = client.sys.leader.read_leader_status()
print(f"HA enabled: {leader_status['ha_enabled']}")
if leader_status['ha_enabled']:
print(f"Is leader: {leader_status['is_self']}")
print(f"Leader address: {leader_status['leader_address']}")# Rotate the encryption key (automated background process)
client.sys.key.rotate_encryption_key()
print("Encryption key rotation initiated")
# Check key status
key_status = client.sys.key.get_encryption_key_status()
print(f"Key term: {key_status['term']}")
print(f"Install time: {key_status['install_time']}")
# Emergency root token generation (requires unseal keys)
# Start the process
root_gen_progress = client.sys.key.start_root_token_generation()
nonce = root_gen_progress['nonce']
# Submit unseal keys
for key in unseal_keys[:2]: # Submit threshold number of keys
result = client.sys.key.generate_root(key=key, nonce=nonce)
if result['complete']:
encoded_token = result['encoded_token']
# Decode the token (implementation depends on OTP used)
print(f"New root token generated: {encoded_token}")# Check Raft cluster configuration
raft_config = client.sys.raft.read_raft_config()
for server in raft_config['data']['config']['servers']:
print(f"Node ID: {server['node_id']}, Address: {server['address']}")
# Configure automated snapshots
client.sys.raft.create_or_update_raft_auto_snapshot_config(
name='daily-backup',
interval='24h',
retain=7, # Keep 7 snapshots
path_prefix='vault-backup/',
storage_type='aws-s3',
aws_s3_bucket='vault-backups',
aws_s3_region='us-east-1'
)
# List auto-snapshot configurations
snapshot_configs = client.sys.raft.list_raft_auto_snapshot_configs()
print(f"Auto-snapshot configs: {list(snapshot_configs['data']['keys'])}")Install with Tessl CLI
npx tessl i tessl/pypi-hvac