0
# Common Utilities
1
2
Shared utilities including configuration management, resource detection, scheduling, HTTP handling, and general helper functions used across the OpenCensus ecosystem.
3
4
## Capabilities
5
6
### Dynamic Configuration
7
8
Load and configure OpenCensus components dynamically using string expressions and namespace-based imports.
9
10
```python { .api }
11
def load(expr):
12
"""
13
Dynamically import OpenCensus components and evaluate configuration expressions.
14
15
Supports dot-notation imports and component instantiation from string expressions.
16
17
Parameters:
18
- expr: str, import expression (e.g., 'opencensus.trace.samplers.ProbabilitySampler')
19
20
Returns:
21
object: Imported module, class, or function
22
"""
23
24
class Namespace:
25
"""
26
Dynamic import helper for OpenCensus configuration.
27
28
Parameters:
29
- name: str, namespace name
30
- parent: Namespace, parent namespace (optional)
31
"""
32
def __init__(self, name, parent=None): ...
33
34
def __getattr__(self, item):
35
"""
36
Dynamic attribute access for importing submodules.
37
38
Parameters:
39
- item: str, attribute/module name
40
41
Returns:
42
Namespace: Child namespace for further traversal
43
"""
44
45
def __str__(self):
46
"""String representation of namespace path."""
47
48
def __call__(self, *args, **kwargs):
49
"""
50
Instantiate the referenced class/function.
51
52
Parameters:
53
- *args: positional arguments
54
- **kwargs: keyword arguments
55
56
Returns:
57
object: Instantiated object or function result
58
"""
59
60
@classmethod
61
def eval(cls, expr):
62
"""
63
Evaluate configuration expression.
64
65
Parameters:
66
- expr: str, configuration expression
67
68
Returns:
69
object: Evaluated result
70
"""
71
```
72
73
### String and Time Utilities
74
75
General-purpose utilities for string processing, time handling, and data manipulation.
76
77
```python { .api }
78
# Constants
79
UTF8 = 'utf-8'
80
"""str: UTF-8 encoding constant"""
81
82
MAX_LENGTH = 128
83
"""int: Default maximum string length"""
84
85
ISO_DATETIME_REGEX = '%Y-%m-%dT%H:%M:%S.%fZ'
86
"""str: ISO 8601 datetime format string"""
87
88
def get_truncatable_str(str_to_convert):
89
"""
90
Truncate string and record byte count for OpenCensus limits.
91
92
Parameters:
93
- str_to_convert: str, string to process
94
95
Returns:
96
dict: Dictionary with 'value' (truncated string) and 'truncated_byte_count'
97
"""
98
99
def check_str_length(str_to_check, limit=MAX_LENGTH):
100
"""
101
Check and truncate string length if necessary.
102
103
Parameters:
104
- str_to_check: str, string to validate
105
- limit: int, maximum allowed length (default: 128)
106
107
Returns:
108
str: Original string if within limit, truncated string otherwise
109
"""
110
111
def to_iso_str(ts=None):
112
"""
113
Get ISO 8601 string for UTC datetime.
114
115
Parameters:
116
- ts: datetime, timestamp to convert (default: current UTC time)
117
118
Returns:
119
str: ISO 8601 formatted datetime string
120
"""
121
122
def timestamp_to_microseconds(timestamp):
123
"""
124
Convert timestamp string to microseconds since epoch.
125
126
Parameters:
127
- timestamp: str, ISO 8601 timestamp string
128
129
Returns:
130
int: Microseconds since Unix epoch
131
"""
132
```
133
134
### Collection Utilities
135
136
Helper functions for working with iterables, including uniqueness filtering and windowing operations.
137
138
```python { .api }
139
def iuniq(ible):
140
"""
141
Iterator over unique items while preserving order.
142
143
Parameters:
144
- ible: iterable, input sequence
145
146
Yields:
147
object: Unique items from input sequence
148
"""
149
150
def uniq(ible):
151
"""
152
List of unique items while preserving order.
153
154
Parameters:
155
- ible: iterable, input sequence
156
157
Returns:
158
list: List containing unique items from input
159
"""
160
161
def window(ible, length):
162
"""
163
Split iterable into lists of specified length.
164
165
Parameters:
166
- ible: iterable, input sequence
167
- length: int, window size
168
169
Returns:
170
list: List of lists, each containing 'length' items
171
"""
172
173
def get_weakref(func):
174
"""
175
Get weak reference to bound or unbound function.
176
177
Handles both bound methods and regular functions, providing
178
weak references that don't prevent garbage collection.
179
180
Parameters:
181
- func: callable, function or method to reference
182
183
Returns:
184
weakref: Weak reference to the function
185
"""
186
```
187
188
### Resource Detection
189
190
Automatic detection of runtime environment and resource metadata for cloud platforms and containerized environments.
191
192
```python { .api }
193
class Resource:
194
"""
195
Entity description for signal reporting with type and labels.
196
197
Parameters:
198
- type_: str, resource type identifier
199
- labels: dict, key-value resource labels
200
"""
201
def __init__(self, type_=None, labels=None): ...
202
203
def get_type(self):
204
"""
205
Get resource type.
206
207
Returns:
208
str: Resource type identifier
209
"""
210
211
def get_labels(self):
212
"""
213
Get resource labels.
214
215
Returns:
216
dict: Resource labels dictionary
217
"""
218
219
def merge(self, other):
220
"""
221
Merge with another resource, combining labels.
222
223
Parameters:
224
- other: Resource, resource to merge with
225
226
Returns:
227
Resource: New resource with merged labels
228
"""
229
230
def merge_resources(resource_list):
231
"""
232
Merge multiple resources into a single resource.
233
234
Parameters:
235
- resource_list: list, list of Resource objects
236
237
Returns:
238
Resource: Merged resource with combined labels
239
"""
240
241
def check_ascii_256(string):
242
"""
243
Validate ASCII printable characters and length constraints.
244
245
Parameters:
246
- string: str, string to validate
247
248
Returns:
249
bool: True if string contains only ASCII printable chars and is <= 256 chars
250
"""
251
252
def unquote(string):
253
"""
254
Strip surrounding quotes from string.
255
256
Parameters:
257
- string: str, potentially quoted string
258
259
Returns:
260
str: String with surrounding quotes removed
261
"""
262
263
def parse_labels(labels_str):
264
"""
265
Parse label key-value pairs from string format.
266
267
Parses comma-separated key=value pairs, handling quoted values.
268
269
Parameters:
270
- labels_str: str, label string (e.g., "key1=value1,key2=value2")
271
272
Returns:
273
dict: Parsed labels dictionary
274
"""
275
276
def get_from_env():
277
"""
278
Get resource from environment variables.
279
280
Reads OC_RESOURCE_TYPE and OC_RESOURCE_LABELS environment variables
281
to construct a Resource object.
282
283
Returns:
284
Resource: Resource constructed from environment variables
285
"""
286
287
# Environment variable constants
288
OC_RESOURCE_TYPE = 'OC_RESOURCE_TYPE'
289
"""str: Environment variable for resource type"""
290
291
OC_RESOURCE_LABELS = 'OC_RESOURCE_LABELS'
292
"""str: Environment variable for resource labels"""
293
294
def is_gce_environment():
295
"""
296
Check if running on Google Compute Engine.
297
298
Returns:
299
bool: True if running on GCE
300
"""
301
302
def is_aws_environment():
303
"""
304
Check if running on AWS EC2.
305
306
Returns:
307
bool: True if running on AWS EC2
308
"""
309
310
def get_instance():
311
"""
312
Get resource for current cloud environment.
313
314
Automatically detects cloud platform and returns appropriate
315
resource with platform-specific metadata.
316
317
Returns:
318
Resource: Resource representing current environment
319
"""
320
```
321
322
### Scheduling and Background Tasks
323
324
Thread-based scheduling and queue management for background processing and periodic tasks.
325
326
```python { .api }
327
class PeriodicTask:
328
"""
329
Periodic thread execution for background tasks.
330
331
Parameters:
332
- interval: float, execution interval in seconds
333
- function: callable, function to execute periodically
334
- args: tuple, function arguments (optional)
335
- kwargs: dict, function keyword arguments (optional)
336
- name: str, task name for identification (optional)
337
"""
338
def __init__(self, interval, function, args=None, kwargs=None, name=None): ...
339
340
def run(self):
341
"""Start periodic execution in background thread."""
342
343
def cancel(self):
344
"""Cancel periodic execution and stop background thread."""
345
346
class QueueEvent:
347
"""
348
Thread synchronization event for queue coordination.
349
350
Parameters:
351
- name: str, event name for identification
352
"""
353
def __init__(self, name): ...
354
355
def set(self):
356
"""Set the event, releasing waiting threads."""
357
358
def wait(self, timeout=None):
359
"""
360
Wait for event to be set.
361
362
Parameters:
363
- timeout: float, maximum wait time in seconds (optional)
364
365
Returns:
366
bool: True if event was set, False if timeout occurred
367
"""
368
369
class QueueExitEvent:
370
"""
371
Special event for signaling queue shutdown.
372
373
Parameters:
374
- name: str, event name for identification
375
"""
376
def __init__(self, name): ...
377
378
class Queue:
379
"""
380
Thread-safe queue with capacity limits and bulk operations.
381
382
Parameters:
383
- capacity: int, maximum queue capacity
384
"""
385
def __init__(self, capacity): ...
386
387
def gets(self, count, timeout):
388
"""
389
Get multiple items from queue.
390
391
Parameters:
392
- count: int, maximum number of items to retrieve
393
- timeout: float, maximum wait time in seconds
394
395
Returns:
396
list: Retrieved items (may be fewer than requested)
397
"""
398
399
def is_empty(self):
400
"""
401
Check if queue is empty.
402
403
Returns:
404
bool: True if queue contains no items
405
"""
406
407
def flush(self, timeout=None):
408
"""
409
Remove and return all items from queue.
410
411
Parameters:
412
- timeout: float, maximum wait time in seconds (optional)
413
414
Returns:
415
list: All items that were in the queue
416
"""
417
418
def put(self, item, block=True, timeout=None):
419
"""
420
Put item in queue.
421
422
Parameters:
423
- item: object, item to add to queue
424
- block: bool, whether to block if queue is full
425
- timeout: float, maximum wait time if blocking
426
"""
427
428
def puts(self, items, block=True, timeout=None):
429
"""
430
Put multiple items in queue.
431
432
Parameters:
433
- items: list, items to add to queue
434
- block: bool, whether to block if queue is full
435
- timeout: float, maximum wait time if blocking
436
"""
437
```
438
439
### HTTP Utilities
440
441
Simple HTTP client functionality for internal OpenCensus communication and metadata retrieval.
442
443
```python { .api }
444
def get_request(request_url, request_headers=dict()):
445
"""
446
Execute HTTP GET request with timeout and error handling.
447
448
Parameters:
449
- request_url: str, URL to request
450
- request_headers: dict, HTTP headers (default: empty dict)
451
452
Returns:
453
requests.Response: HTTP response object
454
"""
455
456
# Constants
457
_REQUEST_TIMEOUT = 2
458
"""int: Default HTTP request timeout in seconds"""
459
```
460
461
### Compatibility Utilities
462
463
Backports and compatibility layers for supporting different Python versions and environments.
464
465
```python { .api }
466
class WeakMethod:
467
"""
468
Weak reference to bound methods (Python 2.6 compatibility).
469
470
Provides weak references to bound methods that don't prevent
471
garbage collection of the target object.
472
473
Parameters:
474
- meth: method, bound method to reference
475
- callback: callable, cleanup callback (optional)
476
"""
477
def __init__(self, meth, callback=None): ...
478
```
479
480
## Usage Examples
481
482
### Dynamic Configuration
483
484
```python
485
from opencensus.common.configuration import load, Namespace
486
487
# Load components dynamically from string expressions
488
sampler_class = load('opencensus.trace.samplers.ProbabilitySampler')
489
sampler = sampler_class(rate=0.1)
490
491
exporter_class = load('opencensus.trace.exporters.print_exporter.PrintExporter')
492
exporter = exporter_class()
493
494
# Use namespace for fluent configuration
495
oc = Namespace('opencensus')
496
tracer = oc.trace.tracer.Tracer(
497
sampler=oc.trace.samplers.ProbabilitySampler(rate=0.5),
498
exporter=oc.trace.exporters.print_exporter.PrintExporter()
499
)
500
501
# Configuration from strings (useful for config files)
502
config_expr = 'opencensus.trace.samplers.AlwaysOnSampler()'
503
sampler = Namespace.eval(config_expr)
504
```
505
506
### Resource Detection
507
508
```python
509
from opencensus.common.resource import get_from_env, get_instance
510
from opencensus.common.monitored_resource import is_gce_environment, is_aws_environment
511
import os
512
513
# Auto-detect current environment
514
if is_gce_environment():
515
print("Running on Google Compute Engine")
516
resource = get_instance()
517
print(f"Resource type: {resource.get_type()}")
518
print(f"Resource labels: {resource.get_labels()}")
519
520
elif is_aws_environment():
521
print("Running on AWS EC2")
522
resource = get_instance()
523
print(f"Resource type: {resource.get_type()}")
524
print(f"Resource labels: {resource.get_labels()}")
525
526
else:
527
print("Running on unknown platform")
528
529
# Get resource from environment variables
530
os.environ['OC_RESOURCE_TYPE'] = 'k8s_container'
531
os.environ['OC_RESOURCE_LABELS'] = 'cluster_name=prod,namespace=default,pod_name=app-123'
532
533
env_resource = get_from_env()
534
print(f"Environment resource: {env_resource.get_type()}")
535
print(f"Environment labels: {env_resource.get_labels()}")
536
537
# Merge resources
538
from opencensus.common.resource import Resource, merge_resources
539
540
app_resource = Resource('application', {'name': 'my-app', 'version': '1.0'})
541
platform_resource = get_instance()
542
543
merged = merge_resources([app_resource, platform_resource])
544
print(f"Merged resource: {merged.get_labels()}")
545
```
546
547
### Periodic Background Tasks
548
549
```python
550
from opencensus.common.schedule import PeriodicTask, Queue, QueueEvent
551
import time
552
import threading
553
554
# Simple periodic task
555
def heartbeat():
556
print(f"Heartbeat at {time.time()}")
557
558
# Create and start periodic task
559
task = PeriodicTask(
560
interval=5.0, # Every 5 seconds
561
function=heartbeat,
562
name="heartbeat_task"
563
)
564
565
task.run()
566
567
# Let it run for 30 seconds
568
time.sleep(30)
569
570
# Stop the task
571
task.cancel()
572
573
# Advanced example with queue processing
574
work_queue = Queue(capacity=1000)
575
stop_event = QueueEvent("stop_processing")
576
577
def worker():
578
"""Background worker that processes queue items."""
579
while not stop_event.wait(timeout=1.0):
580
try:
581
# Get up to 10 items at once
582
items = work_queue.gets(count=10, timeout=1.0)
583
584
if items:
585
print(f"Processing {len(items)} items")
586
for item in items:
587
process_item(item)
588
589
except Exception as e:
590
print(f"Worker error: {e}")
591
592
def process_item(item):
593
"""Process individual work item."""
594
print(f"Processing: {item}")
595
time.sleep(0.1) # Simulate work
596
597
# Start background worker
598
worker_thread = threading.Thread(target=worker)
599
worker_thread.start()
600
601
# Add work items
602
for i in range(50):
603
work_queue.put(f"work_item_{i}")
604
605
# Let worker process items
606
time.sleep(10)
607
608
# Signal shutdown and wait
609
stop_event.set()
610
worker_thread.join()
611
612
print("All work completed")
613
```
614
615
### String and Data Utilities
616
617
```python
618
from opencensus.common.utils import (
619
get_truncatable_str, check_str_length, to_iso_str,
620
timestamp_to_microseconds, uniq, window
621
)
622
from datetime import datetime
623
624
# String truncation with byte counting
625
long_string = "A" * 200
626
result = get_truncatable_str(long_string)
627
print(f"Truncated: {len(result['value'])} chars")
628
print(f"Bytes truncated: {result['truncated_byte_count']}")
629
630
# Length checking
631
test_strings = ["short", "a" * 100, "a" * 150]
632
for s in test_strings:
633
checked = check_str_length(s, limit=128)
634
print(f"'{s[:10]}...' -> {len(checked)} chars")
635
636
# Time utilities
637
now = datetime.utcnow()
638
iso_string = to_iso_str(now)
639
print(f"ISO format: {iso_string}")
640
641
microseconds = timestamp_to_microseconds(iso_string)
642
print(f"Microseconds: {microseconds}")
643
644
# Collection utilities
645
data = [1, 2, 2, 3, 1, 4, 5, 3]
646
unique_data = uniq(data)
647
print(f"Original: {data}")
648
print(f"Unique: {unique_data}")
649
650
# Window data for batch processing
651
windowed = window(range(15), length=4)
652
print(f"Windowed: {windowed}")
653
# Output: [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14]]
654
```
655
656
### HTTP Requests
657
658
```python
659
from opencensus.common.http_handler import get_request
660
661
# Make HTTP request with timeout
662
try:
663
response = get_request(
664
'https://api.example.com/health',
665
{'User-Agent': 'OpenCensus/1.0', 'Accept': 'application/json'}
666
)
667
668
if response.status_code == 200:
669
print("Service is healthy")
670
print(f"Response: {response.json()}")
671
else:
672
print(f"Service check failed: {response.status_code}")
673
674
except Exception as e:
675
print(f"Request failed: {e}")
676
677
# Fetch metadata (common in cloud environments)
678
def get_instance_metadata():
679
"""Get instance metadata from cloud provider."""
680
try:
681
# Example: GCE metadata
682
response = get_request(
683
'http://metadata.google.internal/computeMetadata/v1/instance/id',
684
{'Metadata-Flavor': 'Google'}
685
)
686
return response.text
687
except:
688
return None
689
690
instance_id = get_instance_metadata()
691
if instance_id:
692
print(f"Instance ID: {instance_id}")
693
```