Open source cloud security assessment tool for AWS, Azure, GCP, and Kubernetes with hundreds of compliance checks.
—
Multi-cloud provider abstraction supporting AWS, Azure, GCP, Kubernetes, GitHub, and Microsoft 365 with standardized authentication, session management, and credential handling. The provider framework enables consistent security assessments across different cloud platforms while maintaining platform-specific optimizations and capabilities.
Abstract base class defining the common interface for all cloud providers.
from abc import ABC, abstractmethod
class Provider(ABC):
"""
Abstract base class for all cloud providers.
Defines the standard interface that all provider implementations
must follow, ensuring consistent behavior across different cloud platforms.
Properties:
- type: str - Provider type identifier (aws, azure, gcp, etc.)
- identity: dict - Provider identity and account information
- session: object - Provider-specific session object
- audit_config: dict - Configuration for audit and compliance settings
"""
@property
@abstractmethod
def type(self) -> str:
"""
Provider type identifier.
Returns:
String identifier for the provider (e.g., 'aws', 'azure', 'gcp')
"""
@property
@abstractmethod
def identity(self) -> dict:
"""
Provider identity information.
Returns:
Dictionary containing account/subscription details and identity info
"""
@property
@abstractmethod
def session(self) -> object:
"""
Provider session object.
Returns:
Provider-specific session object for API interactions
"""
@property
@abstractmethod
def audit_config(self) -> dict:
"""
Audit configuration settings.
Returns:
Dictionary containing audit and compliance configuration
"""
@abstractmethod
def setup_session(self, **kwargs):
"""
Setup provider session with authentication.
Abstract method that each provider implements to establish
authenticated sessions using provider-specific methods.
Parameters:
**kwargs: Provider-specific authentication parameters
Raises:
ProwlerException: On authentication or session setup errors
"""
@abstractmethod
def print_credentials(self):
"""
Print provider credential information.
Abstract method for displaying current authentication
and authorization details in a standardized format.
Returns:
None (prints to stdout)
"""
@abstractmethod
def validate_arguments(self, **kwargs) -> bool:
"""
Validate provider-specific arguments.
Abstract method for validating configuration parameters
and ensuring provider requirements are met.
Parameters:
**kwargs: Provider-specific configuration parameters
Returns:
True if arguments are valid, False otherwise
Raises:
ProwlerException: On validation errors
"""
@staticmethod
def init_global_provider(provider_name: str, **kwargs):
"""
Initialize global provider instance.
Static method for creating and configuring the global
provider instance used throughout Prowler execution.
Parameters:
- provider_name: Name of provider to initialize
**kwargs: Provider-specific initialization parameters
Returns:
None (sets global provider instance)
Raises:
ProwlerException: On provider initialization errors
"""
@staticmethod
def get_global_provider():
"""
Get the current global provider instance.
Returns:
Current global Provider instance or None if not initialized
"""Amazon Web Services provider implementation with support for multiple authentication methods and AWS Organizations.
class AWSProvider(Provider):
"""
AWS provider implementation with comprehensive authentication and scanning capabilities.
Provides AWS security assessment capabilities with support for multiple
authentication methods, cross-account access, AWS Organizations integration,
resource filtering, and regional scanning.
"""
def __init__(
self,
retries_max_attempts: int = 3,
role_arn: str = None,
session_duration: int = 3600,
external_id: str = None,
role_session_name: str = None,
mfa: bool = False,
profile: str = None,
regions: set = set(),
organizations_role_arn: str = None,
scan_unused_services: bool = False,
resource_tags: list[str] = [],
resource_arn: list[str] = [],
config_path: str = None,
config_content: dict = None,
fixer_config: dict = {},
mutelist_path: str = None,
mutelist_content: dict = None,
aws_access_key_id: str = None,
aws_secret_access_key: str = None,
aws_session_token: str = None,
):
"""
Initialize AWS provider with comprehensive authentication and configuration options.
Parameters:
- retries_max_attempts: int = 3 - Maximum retry attempts for API calls
- role_arn: str = None - IAM role ARN to assume for cross-account access
- session_duration: int = 3600 - Session duration in seconds for assumed roles
- external_id: str = None - External ID for secure cross-account role assumption
- role_session_name: str = None - Custom session name for assumed role sessions
- mfa: bool = False - Whether to prompt for MFA token during authentication
- profile: str = None - AWS profile name from credentials/config files
- regions: set = set() - Set of AWS regions to include in scanning
- organizations_role_arn: str = None - Role ARN for AWS Organizations access
- scan_unused_services: bool = False - Whether to scan services not in use
- resource_tags: list[str] = [] - Resource tag filters for targeted scanning
- resource_arn: list[str] = [] - Specific resource ARNs to audit
- config_path: str = None - Path to custom configuration file
- config_content: dict = None - Configuration dictionary for inline config
- fixer_config: dict = {} - Configuration for automatic remediation
- mutelist_path: str = None - Path to mutelist (allowlist) file
- mutelist_content: dict = None - Mutelist dictionary for inline config
- aws_access_key_id: str = None - AWS access key ID for direct authentication
- aws_secret_access_key: str = None - AWS secret access key
- aws_session_token: str = None - AWS session token for temporary credentials
"""
# Properties
@property
def identity(self) -> 'AWSIdentityInfo':
"""AWS account and user identity information."""
@property
def type(self) -> str:
"""Returns 'aws'."""
@property
def session(self) -> 'AWSSession':
"""AWS session object for API interactions."""
@property
def organizations_metadata(self) -> 'AWSOrganizationsInfo':
"""AWS Organizations metadata and account information."""
@property
def audit_resources(self) -> list:
"""List of specific resources to audit."""
@property
def scan_unused_services(self) -> bool:
"""Whether unused services should be scanned."""
@property
def audit_config(self) -> dict:
"""Audit configuration settings."""
@property
def fixer_config(self) -> dict:
"""Automatic remediation configuration."""
@property
def mutelist(self) -> 'AWSMutelist':
"""AWS-specific mutelist configuration."""
# Instance Methods
def print_credentials(self) -> None:
"""Print current AWS authentication and identity information."""
def generate_regional_clients(self, service: str) -> dict:
"""
Generate boto3 clients for all enabled regions for a service.
Parameters:
- service: str - AWS service name (e.g., 'ec2', 's3', 'iam')
Returns:
dict: Dictionary mapping region names to boto3 service clients
"""
def get_checks_from_input_arn(self) -> set:
"""
Determine which checks to run based on input resource ARNs.
Returns:
set[str]: Set of check names that apply to the specified resources
"""
def get_regions_from_audit_resources(self, audit_resources: list) -> set:
"""
Extract AWS regions from resource ARNs.
Parameters:
- audit_resources: list - List of AWS resource ARNs
Returns:
set[str]: Set of AWS region names extracted from ARNs
"""
def get_tagged_resources(self, resource_tags: list[str]) -> list[str]:
"""
Find resources matching specified tag filters.
Parameters:
- resource_tags: list[str] - List of tag filters in key=value format
Returns:
list[str]: List of resource ARNs matching the tag criteria
"""
def get_default_region(self, service: str) -> str:
"""
Get the default region for an AWS service.
Parameters:
- service: str - AWS service name
Returns:
str: Default region name for the service
"""
def get_global_region(self) -> str:
"""
Get the global region identifier based on AWS partition.
Returns:
str: Global region (us-east-1 for aws, us-gov-east-1 for aws-us-gov, etc.)
"""
def get_aws_enabled_regions(self, current_session: 'Session') -> set:
"""
Get all regions enabled for the AWS account.
Parameters:
- current_session: boto3.Session - Authenticated boto3 session
Returns:
set[str]: Set of enabled AWS region names
"""
def get_checks_to_execute_by_audit_resources(self) -> set[str]:
"""
Filter checks based on audit resources and configuration.
Returns:
set[str]: Set of check names to execute based on resource filters
"""
# Static Methods
@staticmethod
def setup_session(
profile: str = None,
aws_access_key_id: str = None,
aws_secret_access_key: str = None,
aws_session_token: str = None,
region: str = None,
retries_max_attempts: int = 3,
) -> 'Session':
"""
Create and configure a boto3 session.
Parameters:
- profile: str = None - AWS profile name
- aws_access_key_id: str = None - AWS access key ID
- aws_secret_access_key: str = None - AWS secret access key
- aws_session_token: str = None - AWS session token
- region: str = None - Default AWS region
- retries_max_attempts: int = 3 - Retry configuration
Returns:
boto3.Session: Configured boto3 session object
"""
@staticmethod
def test_connection(
provider_id: str,
credentials: dict = None,
) -> 'Connection':
"""
Test connectivity and permissions for AWS.
Parameters:
- provider_id: str - Provider identifier for testing
- credentials: dict = None - AWS credential parameters
Returns:
Connection: Connection test result with success/failure status
"""
@staticmethod
def assume_role(
session: 'Session',
assumed_role_info: 'AWSAssumeRoleInfo'
) -> 'AWSCredentials':
"""
Assume an IAM role and return temporary credentials.
Parameters:
- session: boto3.Session - Base session for role assumption
- assumed_role_info: AWSAssumeRoleInfo - Role assumption parameters
Returns:
AWSCredentials: Temporary credentials for the assumed role
"""
@staticmethod
def validate_credentials(
session: 'Session',
aws_region: str
) -> 'AWSCallerIdentity':
"""
Validate AWS credentials and return caller identity.
Parameters:
- session: boto3.Session - Session to validate
- aws_region: str - AWS region for STS calls
Returns:
AWSCallerIdentity: Caller identity information
"""
@staticmethod
def get_regions(partition: 'Partition' = None) -> set:
"""
Get all available AWS regions for a partition.
Parameters:
- partition: Partition = None - AWS partition (aws, aws-cn, aws-us-gov)
Returns:
set[str]: Set of available region names
"""
@staticmethod
def get_available_aws_service_regions(
service: str,
partition: str,
audited_regions: set
) -> set:
"""
Get available regions for a specific AWS service.
Parameters:
- service: str - AWS service name
- partition: str - AWS partition
- audited_regions: set - Regions being audited
Returns:
set[str]: Set of regions where the service is available
"""Microsoft Azure provider implementation with support for service principals, managed identities, and multiple authentication methods.
class AzureProvider(Provider):
"""
Azure provider implementation with multiple authentication methods.
Provides Azure security assessment capabilities with support for Azure CLI
authentication, service principals, browser-based authentication, and
managed identity authentication across multiple subscriptions.
"""
def __init__(
self,
az_cli_auth: bool = False,
sp_env_auth: bool = False,
browser_auth: bool = False,
managed_identity_auth: bool = False,
tenant_id: str = None,
region: str = "AzureCloud",
subscription_ids: list = [],
config_path: str = None,
config_content: dict = None,
fixer_config: dict = {},
mutelist_path: str = None,
mutelist_content: dict = None,
client_id: str = None,
client_secret: str = None,
):
"""
Initialize Azure provider with comprehensive authentication options.
Parameters:
- az_cli_auth: bool = False - Use Azure CLI authentication
- sp_env_auth: bool = False - Use service principal from environment variables
- browser_auth: bool = False - Use interactive browser-based authentication
- managed_identity_auth: bool = False - Use managed identity authentication
- tenant_id: str = None - Azure Active Directory tenant ID
- region: str = "AzureCloud" - Azure cloud environment (AzureCloud, AzureChinaCloud, etc.)
- subscription_ids: list = [] - List of subscription IDs to audit
- config_path: str = None - Path to custom configuration file
- config_content: dict = None - Configuration dictionary for inline config
- fixer_config: dict = {} - Configuration for automatic remediation
- mutelist_path: str = None - Path to mutelist (allowlist) file
- mutelist_content: dict = None - Mutelist dictionary for inline config
- client_id: str = None - Service principal client ID
- client_secret: str = None - Service principal client secret
"""
# Properties
@property
def identity(self) -> 'AzureIdentityInfo':
"""Azure tenant and subscription identity information."""
@property
def type(self) -> str:
"""Returns 'azure'."""
@property
def session(self) -> 'DefaultAzureCredential':
"""Azure credential object for API authentication."""
@property
def region_config(self) -> 'AzureRegionConfig':
"""Azure cloud region configuration."""
@property
def locations(self) -> dict:
"""Available Azure locations by subscription."""
@property
def audit_config(self) -> dict:
"""Audit configuration settings."""
@property
def fixer_config(self) -> dict:
"""Automatic remediation configuration."""
@property
def mutelist(self) -> 'AzureMutelist':
"""Azure-specific mutelist configuration."""
# Instance Methods
def print_credentials(self) -> None:
"""Print current Azure authentication and identity information."""
def get_locations(self) -> dict[str, list[str]]:
"""
Retrieve available locations for each subscription.
Returns:
dict[str, list[str]]: Dictionary mapping subscription IDs to lists of available location names
"""
def get_regions(self, subscription_ids: list[str] = None) -> set:
"""
Get all available regions across specified subscriptions.
Parameters:
- subscription_ids: list[str] = None - Optional list of subscription IDs to check
Returns:
set[str]: Set of available Azure region names
"""
def setup_identity(self, **kwargs) -> 'AzureIdentityInfo':
"""
Setup Azure identity information from authentication context.
Parameters:
**kwargs: Authentication-specific parameters
Returns:
AzureIdentityInfo: Populated identity information object
"""
# Static Methods
@staticmethod
def validate_arguments(**kwargs) -> None:
"""
Validate Azure authentication arguments and configuration.
Parameters:
**kwargs: Authentication and configuration parameters
Raises:
ProwlerException: On validation errors or conflicting authentication methods
"""
@staticmethod
def setup_region_config(region: str) -> 'AzureRegionConfig':
"""
Setup Azure cloud region configuration.
Parameters:
- region: str - Azure cloud environment name
Returns:
AzureRegionConfig: Region configuration object
"""
@staticmethod
def setup_session(**kwargs) -> 'DefaultAzureCredential':
"""
Setup Azure authentication session with specified method.
Parameters:
**kwargs: Authentication parameters for the chosen method
Returns:
DefaultAzureCredential: Azure credential object for API calls
"""
@staticmethod
def test_connection(
provider_id: str,
credentials: dict = None,
) -> 'Connection':
"""
Test connectivity and permissions for Azure.
Parameters:
- provider_id: str - Provider identifier for testing
- credentials: dict = None - Azure credential parameters
Returns:
Connection: Connection test result with success/failure status
"""
@staticmethod
def check_service_principal_creds_env_vars() -> None:
"""
Check for service principal credentials in environment variables.
Validates presence of AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, and AZURE_TENANT_ID
environment variables when using service principal authentication.
Raises:
ProwlerException: If required environment variables are missing
"""
@staticmethod
def validate_static_credentials(**kwargs) -> dict:
"""
Validate static credential parameters for service principal authentication.
Parameters:
**kwargs: Credential parameters to validate
Returns:
dict: Validated credential information
Raises:
ProwlerException: On credential validation errors
"""
@staticmethod
def verify_client(tenant_id: str, client_id: str, client_secret: str) -> None:
"""
Verify service principal client credentials.
Parameters:
- tenant_id: str - Azure AD tenant ID
- client_id: str - Service principal client ID
- client_secret: str - Service principal client secret
Raises:
ProwlerException: On authentication failure or invalid credentials
"""Google Cloud Platform provider implementation with support for service accounts and Application Default Credentials.
class GCPProvider(Provider):
"""
GCP provider implementation with comprehensive project and authentication management.
Provides Google Cloud Platform security assessment capabilities with support for
service account authentication, Application Default Credentials, service account
impersonation, and organization-wide project discovery.
"""
def __init__(
self,
retries_max_attempts: int = None,
organization_id: str = None,
project_ids: list = None,
excluded_project_ids: list = None,
credentials_file: str = None,
impersonate_service_account: str = None,
list_project_ids: bool = False,
config_path: str = None,
config_content: dict = None,
fixer_config: dict = {},
mutelist_path: str = None,
mutelist_content: dict = None,
client_id: str = None,
client_secret: str = None,
refresh_token: str = None,
service_account_key: dict = None,
):
"""
Initialize GCP provider with comprehensive authentication and project management.
Parameters:
- retries_max_attempts: int = None - Maximum retry attempts for API calls
- organization_id: str = None - GCP organization ID for organization-wide scanning
- project_ids: list = None - List of specific project IDs to audit
- excluded_project_ids: list = None - List of project IDs to exclude from auditing
- credentials_file: str = None - Path to service account JSON key file
- impersonate_service_account: str = None - Service account email for impersonation
- list_project_ids: bool = False - Whether to list available project IDs and exit
- config_path: str = None - Path to custom configuration file
- config_content: dict = None - Configuration dictionary for inline config
- fixer_config: dict = {} - Configuration for automatic remediation
- mutelist_path: str = None - Path to mutelist (allowlist) file
- mutelist_content: dict = None - Mutelist dictionary for inline config
- client_id: str = None - OAuth 2.0 client ID
- client_secret: str = None - OAuth 2.0 client secret
- refresh_token: str = None - OAuth 2.0 refresh token
- service_account_key: dict = None - Service account key as dictionary
"""
# Properties
@property
def identity(self) -> 'GCPIdentityInfo':
"""GCP authentication and user identity information."""
@property
def type(self) -> str:
"""Returns 'gcp'."""
@property
def session(self) -> 'Credentials':
"""GCP credentials object for API authentication."""
@property
def projects(self) -> dict:
"""Dictionary of accessible GCP projects keyed by project ID."""
@property
def default_project_id(self) -> str:
"""Default project ID for the current session."""
@property
def impersonated_service_account(self) -> str:
"""Service account being impersonated, if any."""
@property
def project_ids(self) -> list:
"""List of project IDs to be audited."""
@property
def excluded_project_ids(self) -> list:
"""List of project IDs excluded from auditing."""
@property
def audit_config(self) -> dict:
"""Audit configuration settings."""
@property
def fixer_config(self) -> dict:
"""Automatic remediation configuration."""
@property
def mutelist(self) -> 'GCPMutelist':
"""GCP-specific mutelist configuration."""
# Instance Methods
def print_credentials(self) -> None:
"""Print current GCP authentication and identity information."""
def update_projects_with_organizations(self) -> None:
"""
Update project information with organization metadata.
Enriches project objects with organizational context when organization_id
is specified and the authenticated principal has appropriate permissions.
"""
def is_project_matching(self, input_project: str, project_to_match: str) -> bool:
"""
Check if project identifiers match using flexible matching rules.
Parameters:
- input_project: str - Input project identifier (can be project ID or number)
- project_to_match: str - Project identifier to match against
Returns:
bool: True if projects match, False otherwise
"""
def get_regions(self) -> set:
"""
Get all available regions for the configured project IDs.
Returns:
set[str]: Set of available GCP region names
"""
# Static Methods
@staticmethod
def setup_session(**kwargs) -> tuple:
"""
Setup GCP authentication session and return credentials.
Parameters:
**kwargs: Authentication parameters (credentials_file, service_account_key, etc.)
Returns:
tuple: (credentials_object, default_project_id) for API authentication
"""
@staticmethod
def test_connection(
provider_id: str,
credentials: dict = None,
) -> 'Connection':
"""
Test connectivity and permissions for GCP.
Parameters:
- provider_id: str - Provider identifier for testing
- credentials: dict = None - GCP credential parameters
Returns:
Connection: Connection test result with success/failure status
"""
@staticmethod
def get_projects(**kwargs) -> dict[str, 'GCPProject']:
"""
Discover and retrieve accessible GCP projects.
Parameters:
**kwargs: Authentication and filtering parameters
Returns:
dict[str, GCPProject]: Dictionary mapping project IDs to project objects
"""
@staticmethod
def validate_static_arguments(**kwargs) -> dict:
"""
Validate static authentication arguments for GCP.
Parameters:
**kwargs: Authentication parameters to validate
Returns:
dict: Validated authentication information
Raises:
ProwlerException: On validation errors or missing required parameters
"""
@staticmethod
def validate_project_id(provider_id: str, credentials: str = None) -> None:
"""
Validate that a project ID exists and is accessible.
Parameters:
- provider_id: str - GCP project ID to validate
- credentials: str = None - Optional credentials for validation
Raises:
ProwlerException: If project ID is invalid or inaccessible
"""Similar provider implementations for specialized platforms:
class KubernetesProvider(Provider):
"""
Kubernetes provider for cluster security assessments.
Supports kubeconfig files, in-cluster authentication,
and service account tokens.
"""
class GitHubProvider(Provider):
"""
GitHub provider for repository and organization security.
Supports GitHub tokens, GitHub Apps, and organization scanning.
"""
class M365Provider(Provider):
"""
Microsoft 365 provider for tenant security assessments.
Supports application authentication and delegated permissions.
"""from prowler.providers.aws.aws_provider import AwsProvider
from prowler.providers.aws.models import AWSCredentials
# Initialize with access keys
aws_provider = AwsProvider(
aws_access_key_id="AKIAIOSFODNN7EXAMPLE",
aws_secret_access_key="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
region="us-east-1"
)
# Initialize with AWS profile
aws_provider = AwsProvider(
profile_name="production",
region="us-west-2"
)
# Initialize with role assumption
aws_provider = AwsProvider(
role_arn="arn:aws:iam::123456789012:role/ProwlerRole",
external_id="unique-external-id",
role_session_name="prowler-scan"
)
# Setup session and validate
aws_provider.setup_session()
aws_provider.print_credentials()from prowler.providers.azure.azure_provider import AzureProvider
# Initialize with service principal
azure_provider = AzureProvider(
client_id="12345678-1234-1234-1234-123456789012",
client_secret="your-client-secret",
tenant_id="87654321-4321-4321-4321-210987654321",
subscription_id="abcdef12-3456-7890-abcd-ef1234567890"
)
# Initialize with managed identity
azure_provider = AzureProvider(
subscription_id="abcdef12-3456-7890-abcd-ef1234567890",
use_managed_identity=True
)
# Setup and validate
azure_provider.setup_session()
azure_provider.print_credentials()from prowler.providers.gcp.gcp_provider import GCPProvider
# Initialize with service account file
gcp_provider = GcpProvider(
service_account_file="/path/to/service-account.json",
project_ids=["project-1", "project-2", "project-3"]
)
# Initialize with Application Default Credentials
gcp_provider = GcpProvider(
project_ids=["my-project"]
)
# Setup and validate
gcp_provider.setup_session()
gcp_provider.print_credentials()from prowler.providers.common.provider import Provider
# Initialize global provider
Provider.init_global_provider("aws", profile_name="default")
# Get current provider
current_provider = Provider.get_global_provider()
print(f"Current provider type: {current_provider.type}")
print(f"Provider identity: {current_provider.identity}")
# Switch to different provider
Provider.init_global_provider("azure",
subscription_id="sub-id",
use_managed_identity=True
)from prowler.providers.aws.aws_provider import AwsProvider
aws_provider = AwsProvider(profile_name="test")
# Validate configuration before scanning
if aws_provider.validate_arguments():
print("AWS configuration is valid")
aws_provider.setup_session()
# Check permissions
identity = aws_provider.identity
print(f"Authenticated as: {identity['arn']}")
print(f"Account ID: {identity['account_id']}")
else:
print("AWS configuration validation failed")from prowler.providers.aws.aws_provider import AwsProvider
# Setup cross-account role assumption
aws_provider = AwsProvider(
role_arn="arn:aws:iam::TARGET-ACCOUNT:role/OrganizationAccessRole",
external_id="shared-external-id",
role_session_name="prowler-cross-account-scan"
)
aws_provider.setup_session()
# Verify cross-account access
identity = aws_provider.identity
print(f"Scanning account: {identity['account_id']}")
print(f"Using role: {identity['arn']}")Install with Tessl CLI
npx tessl i tessl/pypi-prowler