Google Cloud Monitoring API client library for collecting, analyzing, and alerting on metrics, events, and metadata from cloud and on-premise sources.
npx @tessl/cli install tessl/pypi-google-cloud-monitoring@2.27.0Google Cloud Monitoring is a comprehensive Python client library for Google Cloud Monitoring (formerly Stackdriver Monitoring) that enables developers to interact with Google's cloud monitoring and observability platform. It offers complete API coverage for collecting, analyzing, and alerting on metrics, events, and metadata from various cloud and on-premise sources including Google Cloud Platform, AWS, hybrid systems, and over 150 application components through BindPlane integration.
pip install google-cloud-monitoringfrom google.cloud import monitoringImport specific service clients:
from google.cloud.monitoring import AlertPolicyServiceClient
from google.cloud.monitoring import MetricServiceClient
from google.cloud.monitoring import NotificationChannelServiceClientImport types and data structures:
from google.cloud.monitoring import AlertPolicy
from google.cloud.monitoring import TimeSeries
from google.cloud.monitoring import TimeInterval
from google.cloud.monitoring import QueryServiceClientfrom google.cloud.monitoring import MetricServiceClient
from google.cloud.monitoring import TimeSeries, Point, TimeInterval
from google.api_core.datetime_helpers import DatetimeWithNanoseconds
import time
# Initialize the client (uses default credentials)
client = MetricServiceClient()
# Define project and time series data
project_name = f"projects/{project_id}"
now = time.time()
seconds = int(now)
nanos = int((now - seconds) * 10**9)
# Create a time series data point
series = TimeSeries()
series.metric.type = "custom.googleapis.com/my_metric"
series.resource.type = "gce_instance"
series.resource.labels["instance_id"] = "my-instance"
series.resource.labels["zone"] = "us-central1-a"
# Add a data point
point = Point()
point.value.double_value = 3.14
point.interval.end_time.seconds = seconds
point.interval.end_time.nanos = nanos
series.points = [point]
# Write the time series data
client.create_time_series(name=project_name, time_series=[series])The Google Cloud Monitoring library follows a service-oriented architecture with specialized clients for different monitoring capabilities:
Manages alert policies that define conditions for generating alerts based on metrics and logs. Supports creating, updating, deleting, and listing alert policies with complex condition logic.
class AlertPolicyServiceClient:
def list_alert_policies(self, request=None, *, name=None, retry=None, timeout=None, metadata=()) -> pagers.ListAlertPoliciesPager: ...
def get_alert_policy(self, request=None, *, name=None, retry=None, timeout=None, metadata=()) -> alert.AlertPolicy: ...
def create_alert_policy(self, request=None, *, name=None, alert_policy=None, retry=None, timeout=None, metadata=()) -> alert.AlertPolicy: ...
def update_alert_policy(self, request=None, *, update_mask=None, alert_policy=None, retry=None, timeout=None, metadata=()) -> alert.AlertPolicy: ...
def delete_alert_policy(self, request=None, *, name=None, retry=None, timeout=None, metadata=()) -> None: ...
class AlertPolicyServiceAsyncClient:
async def list_alert_policies(self, request=None, *, name=None, retry=None, timeout=None, metadata=()) -> pagers.ListAlertPoliciesAsyncPager: ...
async def get_alert_policy(self, request=None, *, name=None, retry=None, timeout=None, metadata=()) -> alert.AlertPolicy: ...
async def create_alert_policy(self, request=None, *, name=None, alert_policy=None, retry=None, timeout=None, metadata=()) -> alert.AlertPolicy: ...
async def update_alert_policy(self, request=None, *, update_mask=None, alert_policy=None, retry=None, timeout=None, metadata=()) -> alert.AlertPolicy: ...
async def delete_alert_policy(self, request=None, *, name=None, retry=None, timeout=None, metadata=()) -> None: ...Manages groups for organizing and categorizing monitored resources. Groups enable bulk operations and simplified resource management across your monitoring infrastructure.
class GroupServiceClient:
def list_groups(self, request=None, *, name=None, retry=None, timeout=None, metadata=()) -> pagers.ListGroupsPager: ...
def get_group(self, request=None, *, name=None, retry=None, timeout=None, metadata=()) -> group.Group: ...
def create_group(self, request=None, *, name=None, group=None, retry=None, timeout=None, metadata=()) -> group.Group: ...
def update_group(self, request=None, *, group=None, retry=None, timeout=None, metadata=()) -> group.Group: ...
def delete_group(self, request=None, *, name=None, retry=None, timeout=None, metadata=()) -> None: ...
def list_group_members(self, request=None, *, name=None, retry=None, timeout=None, metadata=()) -> pagers.ListGroupMembersPager: ...Core functionality for working with metrics, time series data, and monitored resources. Handles metric creation, data ingestion, querying, and resource descriptor management.
class MetricServiceClient:
def list_metric_descriptors(self, request=None, *, name=None, retry=None, timeout=None, metadata=()) -> pagers.ListMetricDescriptorsPager: ...
def get_metric_descriptor(self, request=None, *, name=None, retry=None, timeout=None, metadata=()) -> metric_pb2.MetricDescriptor: ...
def create_metric_descriptor(self, request=None, *, name=None, metric_descriptor=None, retry=None, timeout=None, metadata=()) -> metric_pb2.MetricDescriptor: ...
def delete_metric_descriptor(self, request=None, *, name=None, retry=None, timeout=None, metadata=()) -> None: ...
def list_time_series(self, request=None, *, name=None, filter=None, interval=None, view=None, retry=None, timeout=None, metadata=()) -> pagers.ListTimeSeriesPager: ...
def create_time_series(self, request=None, *, name=None, time_series=None, retry=None, timeout=None, metadata=()) -> None: ...Manages notification channels for delivering alert notifications via email, SMS, Slack, PagerDuty, and other supported channels with verification and configuration management.
class NotificationChannelServiceClient:
def list_notification_channels(self, request=None, *, name=None, retry=None, timeout=None, metadata=()) -> pagers.ListNotificationChannelsPager: ...
def get_notification_channel(self, request=None, *, name=None, retry=None, timeout=None, metadata=()) -> notification_pb2.NotificationChannel: ...
def create_notification_channel(self, request=None, *, name=None, notification_channel=None, retry=None, timeout=None, metadata=()) -> notification_pb2.NotificationChannel: ...
def update_notification_channel(self, request=None, *, update_mask=None, notification_channel=None, retry=None, timeout=None, metadata=()) -> notification_pb2.NotificationChannel: ...
def delete_notification_channel(self, request=None, *, name=None, force=None, retry=None, timeout=None, metadata=()) -> None: ...
def verify_notification_channel(self, request=None, *, name=None, code=None, retry=None, timeout=None, metadata=()) -> notification_pb2.NotificationChannel: ...Notification Channel Management
Provides time series querying capabilities using Monitoring Query Language (MQL). This service is deprecated in favor of PromQL-based querying.
class QueryServiceClient:
def query_time_series(self, request=None, *, retry=None, timeout=None, metadata=()) -> pagers.QueryTimeSeriesPager: ...
class QueryServiceAsyncClient:
async def query_time_series(self, request=None, *, retry=None, timeout=None, metadata=()) -> pagers.QueryTimeSeriesAsyncPager: ...Note: This service is deprecated. Use PromQL queries with the Monitoring API instead.
Manages services and Service Level Objectives (SLOs) for service-oriented monitoring, including SLI definitions, error budgets, and service health tracking.
class ServiceMonitoringServiceClient:
def create_service(self, request=None, *, parent=None, service=None, retry=None, timeout=None, metadata=()) -> service.Service: ...
def get_service(self, request=None, *, name=None, retry=None, timeout=None, metadata=()) -> service.Service: ...
def list_services(self, request=None, *, parent=None, retry=None, timeout=None, metadata=()) -> pagers.ListServicesPager: ...
def update_service(self, request=None, *, service=None, retry=None, timeout=None, metadata=()) -> service.Service: ...
def delete_service(self, request=None, *, name=None, retry=None, timeout=None, metadata=()) -> None: ...
def create_service_level_objective(self, request=None, *, parent=None, service_level_objective=None, retry=None, timeout=None, metadata=()) -> service.ServiceLevelObjective: ...
def get_service_level_objective(self, request=None, *, name=None, retry=None, timeout=None, metadata=()) -> service.ServiceLevelObjective: ...
def list_service_level_objectives(self, request=None, *, parent=None, retry=None, timeout=None, metadata=()) -> pagers.ListServiceLevelObjectivesPager: ...
def update_service_level_objective(self, request=None, *, service_level_objective=None, retry=None, timeout=None, metadata=()) -> service.ServiceLevelObjective: ...
def delete_service_level_objective(self, request=None, *, name=None, retry=None, timeout=None, metadata=()) -> None: ...Manages uptime check configurations for monitoring the availability and response time of HTTP/HTTPS endpoints, TCP services, and other network resources.
class UptimeCheckServiceClient:
def list_uptime_check_configs(self, request=None, *, parent=None, retry=None, timeout=None, metadata=()) -> pagers.ListUptimeCheckConfigsPager: ...
def get_uptime_check_config(self, request=None, *, name=None, retry=None, timeout=None, metadata=()) -> uptime.UptimeCheckConfig: ...
def create_uptime_check_config(self, request=None, *, parent=None, uptime_check_config=None, retry=None, timeout=None, metadata=()) -> uptime.UptimeCheckConfig: ...
def update_uptime_check_config(self, request=None, *, uptime_check_config=None, retry=None, timeout=None, metadata=()) -> uptime.UptimeCheckConfig: ...
def delete_uptime_check_config(self, request=None, *, name=None, retry=None, timeout=None, metadata=()) -> None: ...
def list_uptime_check_ips(self, request=None, *, retry=None, timeout=None, metadata=()) -> pagers.ListUptimeCheckIpsPager: ...Temporarily suppresses alert notifications using snooze functionality, allowing planned maintenance windows and temporary alert silencing with configurable duration and criteria.
class SnoozeServiceClient:
def create_snooze(self, request=None, *, parent=None, snooze=None, retry=None, timeout=None, metadata=()) -> snooze.Snooze: ...
def list_snoozes(self, request=None, *, parent=None, retry=None, timeout=None, metadata=()) -> pagers.ListSnoozesPager: ...
def get_snooze(self, request=None, *, name=None, retry=None, timeout=None, metadata=()) -> snooze.Snooze: ...
def update_snooze(self, request=None, *, snooze=None, update_mask=None, retry=None, timeout=None, metadata=()) -> snooze.Snooze: ...Core data types used across all monitoring services:
class TimeInterval:
end_time: Timestamp
start_time: Timestamp
class TypedValue:
bool_value: bool
int64_value: int
double_value: float
string_value: str
distribution_value: Distribution
class Aggregation:
alignment_period: Duration
per_series_aligner: Aligner
cross_series_reducer: Reducer
group_by_fields: List[str]
class ComparisonType(enum.Enum):
COMPARISON_GREATER = 1
COMPARISON_GREATER_EQUAL = 2
COMPARISON_LESS = 3
COMPARISON_LESS_EQUAL = 4
COMPARISON_EQUAL = 5
COMPARISON_NOT_EQUAL = 6
class ServiceTier(enum.Enum):
SERVICE_TIER_BASIC = 1
SERVICE_TIER_PREMIUM = 2
class DroppedLabels:
label: Dict[str, str] # Labels that were dropped
class SpanContext:
span_name: str # Span name for tracing integration
class TextLocator:
source: str # Source text
start_position: int # Start position
end_position: int # End position
class LabelValue:
bool_value: bool # Boolean label value
int64_value: int # Integer label value
string_value: str # String label value
class QueryError:
locator: TextLocator # Error location
message: str # Error message
class QueryErrorList:
errors: List[QueryError] # List of query errors
class TimeSeriesData:
label_values: List[LabelValue] # Label values
point_data: List[Point.PointData] # Point data
class TimeSeriesDescriptor:
label_descriptors: List[LabelDescriptor] # Label descriptors
point_descriptor: Point.PointDescriptor # Point descriptor
class QueryTimeSeriesRequest:
name: str # Required. Project name
query: str # Required. MQL query string
page_size: int # Maximum results per page
page_token: str # Page token for pagination
class QueryTimeSeriesResponse:
time_series_descriptor: TimeSeriesDescriptor # Time series metadata
time_series_data: List[TimeSeriesData] # Time series data
next_page_token: str # Next page token
partial_errors: List[Status] # Partial errorsThe library supports multiple authentication methods:
# Default credentials (recommended)
from google.cloud.monitoring import MetricServiceClient
client = MetricServiceClient()
# Service account file
client = MetricServiceClient.from_service_account_file("path/to/credentials.json")
# Service account info (dict)
client = MetricServiceClient.from_service_account_info(credentials_dict)
# Explicit credentials
from google.oauth2 import service_account
credentials = service_account.Credentials.from_service_account_file("path/to/file.json")
client = MetricServiceClient(credentials=credentials)All service methods can raise Google Cloud exceptions:
from google.api_core import exceptions
from google.cloud.monitoring import MetricServiceClient
client = MetricServiceClient()
try:
metric = client.get_metric_descriptor(name="projects/my-project/metricDescriptors/invalid")
except exceptions.NotFound:
print("Metric descriptor not found")
except exceptions.PermissionDenied:
print("Insufficient permissions")
except exceptions.GoogleAPIError as e:
print(f"API error: {e}")