Google Cloud Key Management Service client library for managing cryptographic keys in the cloud
—
The KeyManagementServiceClient provides comprehensive cryptographic key management capabilities including key creation, lifecycle management, and cryptographic operations. This is the primary interface for most Google Cloud KMS operations.
Key rings are top-level containers that group related cryptographic keys within a specific location.
def list_key_rings(self, request: ListKeyRingsRequest) -> ListKeyRingsResponse:
"""
Lists KeyRings in a location.
Parameters:
- request.parent: Required. Location path (projects/{project}/locations/{location})
- request.page_size: Optional. Maximum number of results per page
- request.page_token: Optional. Token for pagination
- request.filter: Optional. Filter expression
- request.order_by: Optional. Sorting order
Returns:
List of KeyRing objects with pagination support
"""
def get_key_ring(self, request: GetKeyRingRequest) -> KeyRing:
"""
Returns metadata for a given KeyRing.
Parameters:
- request.name: Required. KeyRing resource name
Returns:
KeyRing object with metadata
"""
def create_key_ring(self, request: CreateKeyRingRequest) -> KeyRing:
"""
Creates a new KeyRing in a given Project and Location.
Parameters:
- request.parent: Required. Location path
- request.key_ring_id: Required. Unique ID for the KeyRing
- request.key_ring: Required. KeyRing object to create
Returns:
Created KeyRing object
"""Crypto keys define the cryptographic material and configuration for encryption operations.
def list_crypto_keys(self, request: ListCryptoKeysRequest) -> ListCryptoKeysResponse:
"""
Lists CryptoKeys in a KeyRing.
Parameters:
- request.parent: Required. KeyRing path
- request.page_size: Optional. Maximum results per page
- request.page_token: Optional. Pagination token
- request.filter: Optional. Filter expression
- request.order_by: Optional. Sorting order
- request.version_view: Optional. CryptoKeyVersionView level
Returns:
List of CryptoKey objects with pagination
"""
def get_crypto_key(self, request: GetCryptoKeyRequest) -> CryptoKey:
"""
Returns metadata for a given CryptoKey and its primary CryptoKeyVersion.
Parameters:
- request.name: Required. CryptoKey resource name
- request.version_view: Optional. Version view level
Returns:
CryptoKey object with metadata
"""
def create_crypto_key(self, request: CreateCryptoKeyRequest) -> CryptoKey:
"""
Creates a new CryptoKey within a KeyRing.
Parameters:
- request.parent: Required. KeyRing path
- request.crypto_key_id: Required. Unique ID for the CryptoKey
- request.crypto_key: Required. CryptoKey object to create
- request.skip_initial_version_creation: Optional. Skip creating initial version
Returns:
Created CryptoKey object
"""
def update_crypto_key(self, request: UpdateCryptoKeyRequest) -> CryptoKey:
"""
Updates a CryptoKey.
Parameters:
- request.crypto_key: Required. Updated CryptoKey object
- request.update_mask: Required. Fields to update
Returns:
Updated CryptoKey object
"""
def update_crypto_key_primary_version(self, request: UpdateCryptoKeyPrimaryVersionRequest) -> CryptoKey:
"""
Updates the version of a CryptoKey used in Encrypt operations.
Parameters:
- request.name: Required. CryptoKey resource name
- request.crypto_key_version_id: Required. Version ID to set as primary
Returns:
Updated CryptoKey object
"""Crypto key versions contain the actual cryptographic material and can be managed independently.
def list_crypto_key_versions(self, request: ListCryptoKeyVersionsRequest) -> ListCryptoKeyVersionsResponse:
"""
Lists CryptoKeyVersions for a CryptoKey.
Parameters:
- request.parent: Required. CryptoKey path
- request.page_size: Optional. Maximum results per page
- request.page_token: Optional. Pagination token
- request.filter: Optional. Filter expression
- request.order_by: Optional. Sorting order
- request.view: Optional. CryptoKeyVersionView level
Returns:
List of CryptoKeyVersion objects with pagination
"""
def get_crypto_key_version(self, request: GetCryptoKeyVersionRequest) -> CryptoKeyVersion:
"""
Returns metadata for a given CryptoKeyVersion.
Parameters:
- request.name: Required. CryptoKeyVersion resource name
- request.view: Optional. Version view level
Returns:
CryptoKeyVersion object with metadata
"""
def create_crypto_key_version(self, request: CreateCryptoKeyVersionRequest) -> CryptoKeyVersion:
"""
Creates a new CryptoKeyVersion in a CryptoKey.
Parameters:
- request.parent: Required. CryptoKey path
- request.crypto_key_version: Required. CryptoKeyVersion to create
Returns:
Created CryptoKeyVersion object
"""
def update_crypto_key_version(self, request: UpdateCryptoKeyVersionRequest) -> CryptoKeyVersion:
"""
Updates a CryptoKeyVersion's metadata.
Parameters:
- request.crypto_key_version: Required. Updated CryptoKeyVersion
- request.update_mask: Required. Fields to update
Returns:
Updated CryptoKeyVersion object
"""
def destroy_crypto_key_version(self, request: DestroyCryptoKeyVersionRequest) -> CryptoKeyVersion:
"""
Schedules a CryptoKeyVersion for destruction.
Parameters:
- request.name: Required. CryptoKeyVersion resource name
Returns:
Updated CryptoKeyVersion with DESTROY_SCHEDULED state
"""
def restore_crypto_key_version(self, request: RestoreCryptoKeyVersionRequest) -> CryptoKeyVersion:
"""
Restores a CryptoKeyVersion in DESTROY_SCHEDULED state.
Parameters:
- request.name: Required. CryptoKeyVersion resource name
Returns:
Restored CryptoKeyVersion with DISABLED state
"""Standard encrypt and decrypt operations using symmetric keys.
def encrypt(self, request: EncryptRequest) -> EncryptResponse:
"""
Encrypts data using a CryptoKey or CryptoKeyVersion.
Parameters:
- request.name: Required. CryptoKey or CryptoKeyVersion resource name
- request.plaintext: Required. Data to encrypt (max 64KiB)
- request.additional_authenticated_data: Optional. AAD for authenticated encryption
- request.plaintext_crc32c: Optional. CRC32C checksum of plaintext
- request.additional_authenticated_data_crc32c: Optional. CRC32C checksum of AAD
Returns:
EncryptResponse with ciphertext and metadata
"""
def decrypt(self, request: DecryptRequest) -> DecryptResponse:
"""
Decrypts data that was protected by Encrypt.
Parameters:
- request.name: Required. CryptoKey resource name
- request.ciphertext: Required. Encrypted data
- request.additional_authenticated_data: Optional. AAD used during encryption
- request.ciphertext_crc32c: Optional. CRC32C checksum of ciphertext
- request.additional_authenticated_data_crc32c: Optional. CRC32C checksum of AAD
Returns:
DecryptResponse with plaintext and metadata
"""Low-level cryptographic operations using portable primitives.
def raw_encrypt(self, request: RawEncryptRequest) -> RawEncryptResponse:
"""
Encrypts data using portable cryptographic primitives.
Parameters:
- request.name: Required. CryptoKeyVersion resource name
- request.plaintext: Required. Data to encrypt
- request.additional_authenticated_data: Optional. AAD for authenticated encryption
- request.plaintext_crc32c: Optional. CRC32C checksum
- request.additional_authenticated_data_crc32c: Optional. CRC32C checksum
- request.initialization_vector: Optional. Initialization vector (AES-GCM, AES-CBC, AES-CTR)
Returns:
RawEncryptResponse with ciphertext and metadata
"""
def raw_decrypt(self, request: RawDecryptRequest) -> RawDecryptResponse:
"""
Decrypts data using raw cryptographic mechanisms.
Parameters:
- request.name: Required. CryptoKeyVersion resource name
- request.ciphertext: Required. Encrypted data
- request.additional_authenticated_data: Optional. AAD used during encryption
- request.initialization_vector: Required. IV used during encryption
- request.tag_length: Required. Authentication tag length (AES-GCM)
- request.ciphertext_crc32c: Optional. CRC32C checksum
- request.additional_authenticated_data_crc32c: Optional. CRC32C checksum
- request.initialization_vector_crc32c: Optional. CRC32C checksum
Returns:
RawDecryptResponse with plaintext and metadata
"""Digital signing and asymmetric decryption using asymmetric keys.
def asymmetric_sign(self, request: AsymmetricSignRequest) -> AsymmetricSignResponse:
"""
Signs data using a CryptoKeyVersion with ASYMMETRIC_SIGN purpose.
Parameters:
- request.name: Required. CryptoKeyVersion resource name
- request.digest: Optional. Pre-computed digest to sign
- request.digest_crc32c: Optional. CRC32C checksum of digest
- request.data: Optional. Raw data to sign (alternative to digest)
- request.data_crc32c: Optional. CRC32C checksum of data
Returns:
AsymmetricSignResponse with signature and metadata
"""
def asymmetric_decrypt(self, request: AsymmetricDecryptRequest) -> AsymmetricDecryptResponse:
"""
Decrypts data using asymmetric keys.
Parameters:
- request.name: Required. CryptoKeyVersion resource name
- request.ciphertext: Required. Data encrypted with public key
- request.ciphertext_crc32c: Optional. CRC32C checksum of ciphertext
Returns:
AsymmetricDecryptResponse with plaintext and metadata
"""
def get_public_key(self, request: GetPublicKeyRequest) -> PublicKey:
"""
Returns the public key for asymmetric CryptoKeyVersions.
Parameters:
- request.name: Required. CryptoKeyVersion resource name
Returns:
PublicKey object with key data and metadata
"""Message Authentication Code signing and verification.
def mac_sign(self, request: MacSignRequest) -> MacSignResponse:
"""
Signs data using a CryptoKeyVersion with MAC purpose.
Parameters:
- request.name: Required. CryptoKeyVersion resource name
- request.data: Required. Data to sign
- request.data_crc32c: Optional. CRC32C checksum of data
Returns:
MacSignResponse with MAC tag
"""
def mac_verify(self, request: MacVerifyRequest) -> MacVerifyResponse:
"""
Verifies MAC tag using a CryptoKeyVersion with MAC purpose.
Parameters:
- request.name: Required. CryptoKeyVersion resource name
- request.data: Required. Original data
- request.data_crc32c: Optional. CRC32C checksum of data
- request.mac: Required. MAC tag to verify
- request.mac_crc32c: Optional. CRC32C checksum of MAC
Returns:
MacVerifyResponse with verification result
"""Import pre-existing key material into Google Cloud KMS.
def list_import_jobs(self, request: ListImportJobsRequest) -> ListImportJobsResponse:
"""
Lists ImportJobs in a KeyRing.
Parameters:
- request.parent: Required. KeyRing path
- request.page_size: Optional. Maximum results per page
- request.page_token: Optional. Pagination token
- request.filter: Optional. Filter expression
- request.order_by: Optional. Sorting order
Returns:
List of ImportJob objects with pagination
"""
def get_import_job(self, request: GetImportJobRequest) -> ImportJob:
"""
Returns metadata for a given ImportJob.
Parameters:
- request.name: Required. ImportJob resource name
Returns:
ImportJob object with metadata
"""
def create_import_job(self, request: CreateImportJobRequest) -> ImportJob:
"""
Creates a new ImportJob within a KeyRing.
Parameters:
- request.parent: Required. KeyRing path
- request.import_job_id: Required. Unique ID for ImportJob
- request.import_job: Required. ImportJob object to create
Returns:
Created ImportJob object
"""
def import_crypto_key_version(self, request: ImportCryptoKeyVersionRequest) -> CryptoKeyVersion:
"""
Imports wrapped key material into a CryptoKeyVersion.
Parameters:
- request.parent: Required. CryptoKey path
- request.crypto_key_version: Optional. CryptoKeyVersion template
- request.algorithm: Required. Cryptographic algorithm
- request.import_job: Required. ImportJob resource name
- request.wrapped_key: Optional. RSA AES wrapped key material
- request.rsa_aes_wrapped_key: Optional. RSA AES key wrap data
Returns:
Imported CryptoKeyVersion object
"""Additional utility functions for random number generation and resource management.
def generate_random_bytes(self, request: GenerateRandomBytesRequest) -> GenerateRandomBytesResponse:
"""
Generates random bytes using Cloud KMS randomness source.
Parameters:
- request.location: Required. Location path for randomness generation
- request.length_bytes: Required. Number of random bytes to generate (1-1024)
- request.protection_level: Optional. Protection level for randomness generation
Returns:
GenerateRandomBytesResponse with random data
"""Identity and Access Management operations for controlling access to KMS resources.
def set_iam_policy(self, request: iam_policy_pb2.SetIamPolicyRequest) -> policy_pb2.Policy:
"""
Sets the IAM access control policy on a KMS resource.
Parameters:
- request.resource: Required. Resource name to set policy for
- request.policy: Required. IAM policy to set
Returns:
Updated IAM policy object
"""
def get_iam_policy(self, request: iam_policy_pb2.GetIamPolicyRequest) -> policy_pb2.Policy:
"""
Gets the IAM access control policy for a KMS resource.
Parameters:
- request.resource: Required. Resource name to get policy for
- request.options: Optional. Policy options
Returns:
Current IAM policy object or empty policy if none exists
"""
def test_iam_permissions(self, request: iam_policy_pb2.TestIamPermissionsRequest) -> iam_policy_pb2.TestIamPermissionsResponse:
"""
Tests specified IAM permissions against the IAM access control policy.
Parameters:
- request.resource: Required. Resource name to test permissions for
- request.permissions: Required. List of permissions to test
Returns:
List of permissions that the caller has on the resource
"""Long-running operations management for monitoring asynchronous tasks.
def get_operation(self, request: operations_pb2.GetOperationRequest) -> operations_pb2.Operation:
"""
Gets the latest state of a long-running operation.
Parameters:
- request.name: Required. Operation resource name
Returns:
Operation object with current state and metadata
"""Location discovery and information retrieval for Cloud KMS.
def list_locations(self, request: locations_pb2.ListLocationsRequest) -> locations_pb2.ListLocationsResponse:
"""
Lists information about supported locations for Cloud KMS.
Parameters:
- request.name: Required. Project resource name
- request.filter: Optional. Location filter expression
- request.page_size: Optional. Maximum results per page
- request.page_token: Optional. Pagination token
Returns:
List of available locations with pagination
"""
def get_location(self, request: locations_pb2.GetLocationRequest) -> locations_pb2.Location:
"""
Gets information about a specific location.
Parameters:
- request.name: Required. Location resource name
Returns:
Location object with detailed information
"""Utility methods for constructing and parsing Google Cloud KMS resource paths. These helper methods simplify resource name management and are essential for building proper resource references.
@classmethod
def crypto_key_path(cls, project: str, location: str, key_ring: str, crypto_key: str) -> str:
"""
Constructs a crypto key resource path.
Parameters:
- project: GCP project ID
- location: Location (e.g., 'global', 'us-central1')
- key_ring: Key ring ID
- crypto_key: Crypto key ID
Returns:
Formatted resource path string
"""
@classmethod
def crypto_key_version_path(cls, project: str, location: str, key_ring: str, crypto_key: str, crypto_key_version: str) -> str:
"""
Constructs a crypto key version resource path.
Parameters:
- project: GCP project ID
- location: Location
- key_ring: Key ring ID
- crypto_key: Crypto key ID
- crypto_key_version: Crypto key version ID
Returns:
Formatted resource path string
"""
@classmethod
def key_ring_path(cls, project: str, location: str, key_ring: str) -> str:
"""
Constructs a key ring resource path.
Parameters:
- project: GCP project ID
- location: Location
- key_ring: Key ring ID
Returns:
Formatted resource path string
"""
@classmethod
def import_job_path(cls, project: str, location: str, key_ring: str, import_job: str) -> str:
"""
Constructs an import job resource path.
Parameters:
- project: GCP project ID
- location: Location
- key_ring: Key ring ID
- import_job: Import job ID
Returns:
Formatted resource path string
"""
@classmethod
def public_key_path(cls, project: str, location: str, key_ring: str, crypto_key: str, crypto_key_version: str) -> str:
"""
Constructs a public key resource path.
Returns:
Formatted resource path string
"""
@classmethod
def parse_crypto_key_path(cls, path: str) -> Dict[str, str]:
"""
Parses a crypto key resource path into its components.
Parameters:
- path: Crypto key resource path
Returns:
Dictionary with 'project', 'location', 'key_ring', 'crypto_key' keys
"""
@classmethod
def parse_crypto_key_version_path(cls, path: str) -> Dict[str, str]:
"""
Parses a crypto key version resource path into its components.
Returns:
Dictionary with components as keys
"""
@classmethod
def parse_key_ring_path(cls, path: str) -> Dict[str, str]:
"""
Parses a key ring resource path into its components.
Returns:
Dictionary with 'project', 'location', 'key_ring' keys
"""
@classmethod
def parse_import_job_path(cls, path: str) -> Dict[str, str]:
"""
Parses an import job resource path into its components.
Returns:
Dictionary with components as keys
"""
@classmethod
def parse_public_key_path(cls, path: str) -> Dict[str, str]:
"""
Parses a public key resource path into its components.
Returns:
Dictionary with components as keys
"""Standard path construction methods for common Google Cloud resource types.
@classmethod
def common_project_path(cls, project: str) -> str:
"""Returns a project resource path."""
@classmethod
def common_location_path(cls, project: str, location: str) -> str:
"""Returns a location resource path."""
@classmethod
def common_billing_account_path(cls, billing_account: str) -> str:
"""Returns a billing account resource path."""
@classmethod
def common_folder_path(cls, folder: str) -> str:
"""Returns a folder resource path."""
@classmethod
def common_organization_path(cls, organization: str) -> str:
"""Returns an organization resource path."""from google.cloud import kms
# Initialize client
client = kms.KeyManagementServiceClient()
# Create key ring
location_name = f"projects/{project_id}/locations/{location_id}"
key_ring = client.create_key_ring(
request={
"parent": location_name,
"key_ring_id": "example-keyring",
"key_ring": {},
}
)
# Create symmetric encryption key
crypto_key = client.create_crypto_key(
request={
"parent": key_ring.name,
"crypto_key_id": "symmetric-key",
"crypto_key": {
"purpose": kms.CryptoKey.CryptoKeyPurpose.ENCRYPT_DECRYPT,
},
}
)
# Encrypt data
plaintext = b"Sensitive data to encrypt"
encrypt_response = client.encrypt(
request={
"name": crypto_key.name,
"plaintext": plaintext,
}
)
# Decrypt data
decrypt_response = client.decrypt(
request={
"name": crypto_key.name,
"ciphertext": encrypt_response.ciphertext,
}
)
assert decrypt_response.plaintext == plaintextfrom google.cloud import kms
import hashlib
# Create asymmetric signing key
crypto_key = client.create_crypto_key(
request={
"parent": key_ring.name,
"crypto_key_id": "signing-key",
"crypto_key": {
"purpose": kms.CryptoKey.CryptoKeyPurpose.ASYMMETRIC_SIGN,
"version_template": {
"algorithm": kms.CryptoKeyVersion.CryptoKeyVersionAlgorithm.RSA_SIGN_PSS_2048_SHA256,
},
},
}
)
# Sign data
data = b"Data to sign"
digest = kms.Digest()
digest.sha256 = hashlib.sha256(data).digest()
sign_response = client.asymmetric_sign(
request={
"name": f"{crypto_key.name}/cryptoKeyVersions/1",
"digest": digest,
}
)
# Get public key for verification
public_key = client.get_public_key(
request={"name": f"{crypto_key.name}/cryptoKeyVersions/1"}
)
print(f"Signature: {sign_response.signature.hex()}")
print(f"Public key: {public_key.pem}")from google.cloud import kms
# Initialize client
client = kms.KeyManagementServiceClient()
# Construct resource paths
key_ring_path = client.key_ring_path(
project="my-project",
location="global",
key_ring="my-keyring"
)
crypto_key_path = client.crypto_key_path(
project="my-project",
location="global",
key_ring="my-keyring",
crypto_key="my-key"
)
# Parse resource paths
path_components = client.parse_crypto_key_path(crypto_key_path)
print(f"Project: {path_components['project']}")
print(f"Location: {path_components['location']}")
print(f"Key Ring: {path_components['key_ring']}")
print(f"Crypto Key: {path_components['crypto_key']}")
# Use paths in API calls
key_ring = client.create_key_ring(
request={
"parent": client.common_location_path("my-project", "global"),
"key_ring_id": "my-keyring",
"key_ring": {},
}
)
crypto_key = client.create_crypto_key(
request={
"parent": key_ring_path,
"crypto_key_id": "my-key",
"crypto_key": {
"purpose": kms.CryptoKey.CryptoKeyPurpose.ENCRYPT_DECRYPT,
},
}
)Install with Tessl CLI
npx tessl i tessl/pypi-google-cloud-kms