Python client for OpenSearch providing comprehensive search, indexing, and cluster management capabilities
—
Authentication mechanisms and security features for secure connections to OpenSearch clusters, including AWS IAM integration, SSL/TLS configuration, and credential management.
AWS Signature Version 4 authentication for OpenSearch Service and self-managed clusters.
class RequestsAWSV4SignerAuth:
def __init__(self, credentials, region, service='es'):
"""
AWS V4 signature authentication for requests-based connections.
Parameters:
- credentials: AWS credentials (boto3 credentials object)
- region (str): AWS region
- service (str): AWS service name (default: 'es')
"""
class Urllib3AWSV4SignerAuth:
def __init__(self, credentials, region, service='es'):
"""
AWS V4 signature authentication for urllib3-based connections.
Parameters:
- credentials: AWS credentials (boto3 credentials object)
- region (str): AWS region
- service (str): AWS service name (default: 'es')
"""
class AWSV4SignerAsyncAuth:
def __init__(self, credentials, region, service='es'):
"""
AWS V4 signature authentication for async connections.
Parameters:
- credentials: AWS credentials (boto3 credentials object)
- region (str): AWS region
- service (str): AWS service name (default: 'es')
"""Authentication options available during client initialization.
class OpenSearch:
def __init__(
self,
hosts=None,
http_auth=None,
use_ssl=False,
verify_certs=True,
ssl_context=None,
ssl_show_warn=True,
ssl_assert_hostname=None,
ssl_assert_fingerprint=None,
ca_certs=None,
client_cert=None,
client_key=None,
**kwargs
):
"""
Initialize OpenSearch client with authentication and SSL settings.
Authentication Parameters:
- http_auth (tuple/callable): HTTP authentication (username, password) or auth handler
- use_ssl (bool): Use HTTPS connections (default: False)
- verify_certs (bool): Verify SSL certificates (default: True)
- ssl_context: Custom SSL context
- ssl_show_warn (bool): Show SSL warnings (default: True)
- ssl_assert_hostname (str/bool): Assert SSL hostname
- ssl_assert_fingerprint (str): Assert SSL certificate fingerprint
- ca_certs (str): Path to CA certificate bundle
- client_cert (str): Path to client certificate
- client_key (str): Path to client private key
"""Secure connection configuration options.
import ssl
from opensearchpy import OpenSearch
# Custom SSL context
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
client = OpenSearch(
hosts=[{'host': 'localhost', 'port': 9200}],
use_ssl=True,
ssl_context=ssl_context
)from opensearchpy import OpenSearch
# Basic username/password authentication
client = OpenSearch(
hosts=[{'host': 'localhost', 'port': 9200}],
http_auth=('username', 'password'),
use_ssl=True,
verify_certs=True
)
# Test authentication
response = client.info()
print(f"Cluster: {response['cluster_name']}")import boto3
from opensearchpy import OpenSearch, RequestsAWSV4SignerAuth
# Get AWS credentials from default credential chain
credentials = boto3.Session().get_credentials()
auth = RequestsAWSV4SignerAuth(credentials, 'us-east-1')
# Create client with AWS authentication
client = OpenSearch(
hosts=[{'host': 'my-domain.us-east-1.es.amazonaws.com', 'port': 443}],
http_auth=auth,
use_ssl=True,
verify_certs=True,
ssl_assert_hostname=False,
ssl_show_warn=False
)
# Test connection
response = client.cluster.health()
print(f"Cluster status: {response['status']}")import boto3
from botocore.credentials import Credentials
from opensearchpy import OpenSearch, RequestsAWSV4SignerAuth
# Create custom credentials
credentials = Credentials(
access_key='AKIAIOSFODNN7EXAMPLE',
secret_key='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY',
token='session-token' # Optional for temporary credentials
)
auth = RequestsAWSV4SignerAuth(credentials, 'us-west-2', service='es')
client = OpenSearch(
hosts=[{'host': 'search-mydomain.us-west-2.es.amazonaws.com', 'port': 443}],
http_auth=auth,
use_ssl=True,
verify_certs=True
)from opensearchpy import OpenSearch
# Client certificate authentication
client = OpenSearch(
hosts=[{'host': 'secure-cluster.example.com', 'port': 9200}],
use_ssl=True,
verify_certs=True,
ca_certs='/path/to/ca-bundle.crt',
client_cert='/path/to/client.crt',
client_key='/path/to/client.key',
ssl_assert_hostname=True
)
# Test secure connection
response = client.ping()
print(f"Connection successful: {response}")import ssl
from opensearchpy import OpenSearch
# Create custom SSL context
ssl_context = ssl.create_default_context()
ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2
ssl_context.set_ciphers('ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:!aNULL:!MD5:!DSS')
# Load custom CA certificate
ssl_context.load_verify_locations('/path/to/custom-ca.pem')
# Load client certificate
ssl_context.load_cert_chain('/path/to/client.crt', '/path/to/client.key')
client = OpenSearch(
hosts=[{'host': 'secure-cluster.example.com', 'port': 9200}],
use_ssl=True,
ssl_context=ssl_context
)import os
from opensearchpy import OpenSearch
# Configuration from environment variables
host = os.getenv('OPENSEARCH_HOST', 'localhost')
port = int(os.getenv('OPENSEARCH_PORT', '9200'))
username = os.getenv('OPENSEARCH_USERNAME')
password = os.getenv('OPENSEARCH_PASSWORD')
use_ssl = os.getenv('OPENSEARCH_USE_SSL', 'false').lower() == 'true'
auth = (username, password) if username and password else None
client = OpenSearch(
hosts=[{'host': host, 'port': port}],
http_auth=auth,
use_ssl=use_ssl,
verify_certs=use_ssl,
ssl_show_warn=False
)from opensearchpy import OpenSearch
class MultiAuthClient:
def __init__(self):
self.clients = {}
def add_basic_auth_client(self, name, host, username, password):
"""Add client with basic authentication."""
self.clients[name] = OpenSearch(
hosts=[{'host': host, 'port': 9200}],
http_auth=(username, password),
use_ssl=True,
verify_certs=True
)
def add_aws_client(self, name, host, region):
"""Add client with AWS authentication."""
import boto3
from opensearchpy import RequestsAWSV4SignerAuth
credentials = boto3.Session().get_credentials()
auth = RequestsAWSV4SignerAuth(credentials, region)
self.clients[name] = OpenSearch(
hosts=[{'host': host, 'port': 443}],
http_auth=auth,
use_ssl=True,
verify_certs=True
)
def get_client(self, name):
"""Get client by name."""
return self.clients.get(name)
# Usage
auth_manager = MultiAuthClient()
auth_manager.add_basic_auth_client('local', 'localhost', 'admin', 'admin')
auth_manager.add_aws_client('aws', 'search-domain.us-east-1.es.amazonaws.com', 'us-east-1')
local_client = auth_manager.get_client('local')
aws_client = auth_manager.get_client('aws')from opensearchpy import OpenSearch
from opensearchpy.connection import create_ssl_context
# Multiple hosts with authentication
hosts = [
{'host': 'node1.cluster.com', 'port': 9200},
{'host': 'node2.cluster.com', 'port': 9200},
{'host': 'node3.cluster.com', 'port': 9200}
]
# Custom SSL context for all connections
ssl_context = create_ssl_context(
cafile='/path/to/ca.pem',
certfile='/path/to/client.crt',
keyfile='/path/to/client.key'
)
client = OpenSearch(
hosts=hosts,
http_auth=('cluster_user', 'cluster_password'),
use_ssl=True,
ssl_context=ssl_context,
# Connection pool settings
maxsize=25,
max_retries=3,
retry_on_timeout=True,
# Health check settings
sniff_on_start=True,
sniff_on_connection_fail=True,
sniffer_timeout=60
)import asyncio
import boto3
from opensearchpy import AsyncOpenSearch, AWSV4SignerAsyncAuth
async def async_auth_example():
# AWS async authentication
credentials = boto3.Session().get_credentials()
auth = AWSV4SignerAsyncAuth(credentials, 'us-east-1')
client = AsyncOpenSearch(
hosts=[{'host': 'search-domain.us-east-1.es.amazonaws.com', 'port': 443}],
http_auth=auth,
use_ssl=True,
verify_certs=True
)
try:
# Test async connection
response = await client.info()
print(f"Async connection successful: {response['cluster_name']}")
# Perform async operations
health = await client.cluster.health()
print(f"Cluster health: {health['status']}")
finally:
await client.close()
# Run async example
asyncio.run(async_auth_example())from opensearchpy import OpenSearch
import requests
class CustomTokenAuth:
def __init__(self, token_url, client_id, client_secret):
self.token_url = token_url
self.client_id = client_id
self.client_secret = client_secret
self.token = None
self.token_expires = 0
def get_token(self):
"""Get or refresh access token."""
import time
if self.token and time.time() < self.token_expires:
return self.token
# Request new token
response = requests.post(self.token_url, {
'grant_type': 'client_credentials',
'client_id': self.client_id,
'client_secret': self.client_secret
})
token_data = response.json()
self.token = token_data['access_token']
self.token_expires = time.time() + token_data['expires_in'] - 60
return self.token
def __call__(self, r):
"""Add authorization header to request."""
token = self.get_token()
r.headers['Authorization'] = f'Bearer {token}'
return r
# Use custom authentication
auth = CustomTokenAuth(
token_url='https://oauth.example.com/token',
client_id='your-client-id',
client_secret='your-client-secret'
)
client = OpenSearch(
hosts=[{'host': 'secure-cluster.example.com', 'port': 9200}],
http_auth=auth,
use_ssl=True,
verify_certs=True
)Install with Tessl CLI
npx tessl i tessl/pypi-opensearch-py