Python SDK for interacting with Globus web APIs including Transfer, Auth, and other research data management services
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Metadata indexing and search capabilities for research data discovery with support for custom schemas, faceted search, and real-time indexing. The Search service enables powerful data discovery across distributed research collections with rich metadata search and filtering capabilities.
Core client for search index management, data indexing, and query operations with comprehensive search functionality and metadata management.
class SearchClient(BaseClient):
"""
Client for Globus Search service operations.
Provides methods for index management, data ingestion, and search queries
with support for both simple and advanced search capabilities including
filters, facets, and complex query structures.
"""
def __init__(
self,
*,
app: GlobusApp | None = None,
authorizer: GlobusAuthorizer | None = None,
environment: str | None = None,
base_url: str | None = None,
**kwargs
) -> None: ...Create, configure, and manage search indices for organizing and discovering research data with custom schemas and policies.
def create_index(
self,
display_name: str,
description: str
) -> GlobusHTTPResponse:
"""
Create a new search index.
Creates a new index for storing and searching metadata documents.
New indices default to trial status and may have usage limitations.
Parameters:
- display_name: Human-readable name for the index
- description: Detailed description of the index purpose and content
Returns:
GlobusHTTPResponse with created index details including ID
"""
def get_index(
self,
index_id: str | UUID,
*,
query_params: dict[str, Any] | None = None
) -> GlobusHTTPResponse:
"""
Get index configuration and metadata.
Returns complete index information including schema, statistics,
access policies, and configuration settings.
Parameters:
- index_id: UUID of the index to retrieve
- query_params: Additional query parameters
Returns:
GlobusHTTPResponse with index configuration and statistics
"""
def get_index_list(
self,
*,
query_params: dict[str, Any] | None = None
) -> IndexListResponse:
"""
List accessible search indices.
Returns all indices the user can access including owned indices
and indices shared with appropriate permissions.
Parameters:
- query_params: Additional query parameters for filtering
Returns:
IndexListResponse with paginated index listings
"""
def delete_index(self, index_id: str | UUID) -> GlobusHTTPResponse:
"""
Mark an index for deletion.
Sets index status to "delete-pending". Actual deletion happens
asynchronously and may take time to complete fully.
Parameters:
- index_id: UUID of index to delete
Returns:
GlobusHTTPResponse confirming deletion request
"""Add, update, and remove data from search indices with support for batch operations and real-time indexing.
def ingest(
self,
index_id: str | UUID,
data: dict[str, Any]
) -> GlobusHTTPResponse:
"""
Ingest data into a search index.
Adds or updates documents in the index as an asynchronous task.
Data can be provided as a single document or list of documents
with flexible schema support for metadata organization.
Parameters:
- index_id: UUID of the target index
- data: Document(s) to ingest, can be single dict or list of dicts
Returns:
GlobusHTTPResponse with ingestion task information
"""
def delete_by_query(
self,
index_id: str | UUID,
data: dict[str, Any]
) -> GlobusHTTPResponse:
"""
Delete documents matching a query.
Removes all documents that match the specified query criteria
as an asynchronous task, enabling bulk deletion operations.
Parameters:
- index_id: UUID of the index
- data: Query specification for documents to delete
Returns:
GlobusHTTPResponse with deletion task information
"""
def get_task(
self,
task_id: str | UUID,
*,
query_params: dict[str, Any] | None = None
) -> GlobusHTTPResponse:
"""
Get status of an indexing task.
Returns current status and results of ingestion or deletion tasks,
useful for monitoring asynchronous operations.
Parameters:
- task_id: UUID of the task to check
- query_params: Additional query parameters
Returns:
GlobusHTTPResponse with task status and results
"""
def get_task_list(
self,
*,
query_params: dict[str, Any] | None = None
) -> GlobusHTTPResponse:
"""
List indexing tasks with optional filtering.
Returns tasks for monitoring ingestion and deletion operations
across all accessible indices.
Parameters:
- query_params: Query parameters for filtering tasks
Returns:
GlobusHTTPResponse with task listings
"""Perform powerful searches with support for simple queries, advanced syntax, filters, facets, and result pagination.
def search(
self,
index_id: str | UUID,
q: str,
*,
offset: int = 0,
limit: int = 10,
advanced: bool = True,
query_params: dict[str, Any] | None = None
) -> GlobusHTTPResponse:
"""
Perform a simple search query.
Executes a text search against the index with basic parameters
for straightforward search operations.
Parameters:
- index_id: UUID of the index to search
- q: Query string for search
- offset: Starting position for results (pagination)
- limit: Maximum number of results to return
- advanced: Enable advanced query syntax (default: True)
- query_params: Additional query parameters
Returns:
GlobusHTTPResponse with search results and metadata
"""
def post_search(
self,
index_id: str | UUID,
data: dict[str, Any] | SearchQuery,
*,
offset: int | None = None,
limit: int | None = None,
query_params: dict[str, Any] | None = None
) -> GlobusHTTPResponse:
"""
Perform an advanced search with complex query structure.
Supports sophisticated queries including filters, facets,
sorting, and other advanced search features using POST body.
Parameters:
- index_id: UUID of the index to search
- data: Complex search query specification or SearchQuery object
- offset: Starting position for results
- limit: Maximum results to return
- query_params: Additional parameters
Returns:
GlobusHTTPResponse with comprehensive search results
"""
def scroll_search(
self,
index_id: str | UUID,
data: SearchScrollQuery | dict[str, Any],
*,
query_params: dict[str, Any] | None = None
) -> GlobusHTTPResponse:
"""
Perform a scroll search for large result sets.
Uses scroll API for efficiently retrieving large numbers of
search results without traditional pagination limitations.
Parameters:
- index_id: UUID of the index to search
- data: Scroll query specification
- query_params: Additional parameters
Returns:
GlobusHTTPResponse with scroll results and continuation token
"""Type-safe query construction with chainable API for building complex search queries with filters, facets, and advanced options.
class SearchQuery(PayloadWrapper):
"""
Modern search query builder for constructing complex search requests.
Provides a fluent API for building queries with filters, facets,
sorting, and other advanced search features with type safety.
"""
def __init__(
self,
q: str | None = None,
*,
offset: int | None = None,
limit: int | None = None,
advanced: bool | None = None,
**kwargs
) -> None: ...
def set_query(self, query: str) -> SearchQuery:
"""
Set the main query string.
Parameters:
- query: Text query to search for
Returns:
Self for method chaining
"""
def set_limit(self, limit: int) -> SearchQuery:
"""
Set maximum number of results to return.
Parameters:
- limit: Maximum results per page
Returns:
Self for method chaining
"""
def set_offset(self, offset: int) -> SearchQuery:
"""
Set starting position for results (pagination).
Parameters:
- offset: Starting result position
Returns:
Self for method chaining
"""
def set_advanced(self, advanced: bool) -> SearchQuery:
"""
Enable or disable advanced query syntax.
Parameters:
- advanced: Whether to use advanced query parsing
Returns:
Self for method chaining
"""
def add_filter(
self,
field_name: str,
values: list[str],
*,
type: str = "match_all",
additional_fields: dict[str, Any] | None = None
) -> SearchQuery:
"""
Add a filter to constrain search results.
Parameters:
- field_name: Field to filter on
- values: Values to match in the filter
- type: Filter type (match_all, match_any, range, etc.)
- additional_fields: Additional filter configuration
Returns:
Self for method chaining
"""
def add_facet(
self,
name: str,
field_name: str,
*,
type: str = "terms",
size: int | None = None,
additional_fields: dict[str, Any] | None = None
) -> SearchQuery:
"""
Add a facet for result aggregation.
Parameters:
- name: Name for the facet in results
- field_name: Field to facet on
- type: Facet type (terms, date_histogram, etc.)
- size: Maximum facet values to return
- additional_fields: Additional facet configuration
Returns:
Self for method chaining
"""
def add_sort(
self,
field_name: str,
order: str = "asc"
) -> SearchQuery:
"""
Add sort criteria to results.
Parameters:
- field_name: Field to sort by
- order: Sort order (asc, desc)
Returns:
Self for method chaining
"""
def set_field_list(self, fields: list[str]) -> SearchQuery:
"""
Specify which fields to return in results.
Parameters:
- fields: List of field names to include
Returns:
Self for method chaining
"""
class SearchQueryV1(SearchQuery):
"""
Legacy search query builder for API v1 compatibility.
Maintains compatibility with older search API versions
while providing similar functionality to modern SearchQuery.
"""
class SearchScrollQuery(PayloadWrapper):
"""
Query builder for scroll-based search operations.
Designed for efficiently retrieving large result sets using
the scroll API pattern for deep pagination.
"""
def __init__(
self,
q: str | None = None,
*,
limit: int | None = None,
advanced: bool | None = None,
scroll: str | None = None,
scroll_id: str | None = None,
**kwargs
) -> None: ...
def set_scroll_size(self, size: int) -> SearchScrollQuery:
"""Set the scroll window size for batch retrieval."""
def set_scroll_id(self, scroll_id: str) -> SearchScrollQuery:
"""Set scroll ID for continuing a scroll operation."""Specialized response classes providing enhanced access to search results and index listings with iteration support.
class IndexListResponse(GlobusHTTPResponse):
"""
Response class for index listing operations.
Provides enhanced access to index listings with metadata
and convenient iteration over available indices.
"""
def __iter__(self) -> Iterator[dict[str, Any]]:
"""Iterate over index records."""Search-specific error handling for indexing operations and query processing.
class SearchAPIError(GlobusAPIError):
"""
Error class for Search service API errors.
Provides enhanced error handling for search-specific error
conditions including indexing failures and query syntax errors.
"""from globus_sdk import SearchClient
# Initialize search client
search_client = SearchClient(authorizer=authorizer)
# Create a new index for research data
index_response = search_client.create_index(
display_name="Climate Research Data",
description="Searchable metadata for climate research datasets"
)
index_id = index_response["id"]
# Ingest sample data documents
documents = [
{
"subject": "Temperature Measurements - Station A",
"description": "Daily temperature recordings from weather station A",
"creator": "Climate Research Lab",
"date_created": "2024-01-15",
"keywords": ["temperature", "climate", "weather"],
"data_type": "time-series",
"location": {"lat": 40.7128, "lon": -74.0060},
"file_format": "CSV",
"size_mb": 15.2
},
{
"subject": "Precipitation Data - Regional Survey",
"description": "Monthly precipitation measurements across the region",
"creator": "Weather Monitoring Network",
"date_created": "2024-02-01",
"keywords": ["precipitation", "rainfall", "climate", "regional"],
"data_type": "geospatial",
"location": {"lat": 41.8781, "lon": -87.6298},
"file_format": "NetCDF",
"size_mb": 45.7
}
]
# Ingest documents into the index
ingest_response = search_client.ingest(index_id, {"ingest_type": "GMetaList", "ingest_data": {"gmeta": documents}})
task_id = ingest_response["task_id"]
print(f"Ingestion task started: {task_id}")from globus_sdk import SearchQuery
# Build a complex search query
query = (SearchQuery("climate temperature")
.set_limit(20)
.set_advanced(True)
.add_filter("data_type", ["time-series", "geospatial"], type="match_any")
.add_filter("size_mb", ["0", "50"], type="range")
.add_facet("creator", "creator", size=10)
.add_facet("keywords", "keywords", size=20)
.add_facet("location_facet", "location.country", size=15)
.add_sort("date_created", "desc")
.set_field_list(["subject", "description", "creator", "date_created", "keywords"])
)
# Execute the search
results = search_client.post_search(index_id, query)
print(f"Found {results['total']} results")
# Process search results
for hit in results["gmeta"]:
content = hit["content"][0]
print(f"Title: {content['subject']}")
print(f"Creator: {content['creator']}")
print(f"Keywords: {', '.join(content.get('keywords', []))}")
print("---")
# Process facets for building user interface
facets = results.get("facet_results", [])
for facet in facets:
print(f"\nFacet: {facet['name']}")
for bucket in facet["buckets"]:
print(f" {bucket['value']}: {bucket['count']}")# Simple keyword search
simple_results = search_client.search(
index_id,
q="temperature climate data",
limit=10,
offset=0,
advanced=True
)
print(f"Simple search found {simple_results['total']} results")
for hit in simple_results["gmeta"]:
content = hit["content"][0]
print(f"- {content['subject']}")from globus_sdk import SearchScrollQuery
# Create scroll query for large datasets
scroll_query = SearchScrollQuery("*") # Match all documents
scroll_query.set_scroll_size(100) # Retrieve 100 at a time
# Start scrolling
scroll_response = search_client.scroll_search(index_id, scroll_query)
all_results = []
scroll_id = scroll_response.get("scroll_id")
# Continue scrolling until no more results
while scroll_response.get("gmeta"):
all_results.extend(scroll_response["gmeta"])
print(f"Retrieved {len(scroll_response['gmeta'])} more results")
if not scroll_id:
break
# Continue with next batch
continue_query = SearchScrollQuery().set_scroll_id(scroll_id)
scroll_response = search_client.scroll_search(index_id, continue_query)
scroll_id = scroll_response.get("scroll_id")
print(f"Total results retrieved: {len(all_results)}")# Update existing documents (replace by ID)
updated_doc = {
"subject": "Temperature Measurements - Station A (Updated)",
"description": "Updated daily temperature recordings with quality control",
"creator": "Climate Research Lab",
"date_created": "2024-01-15",
"date_modified": "2024-03-01",
"keywords": ["temperature", "climate", "weather", "quality-controlled"],
"data_type": "time-series",
"version": "2.0"
}
# Ingest updated document (same ID will replace)
update_response = search_client.ingest(
index_id,
{"ingest_type": "GMetaList", "ingest_data": {"gmeta": [updated_doc]}}
)
# Delete documents matching criteria
delete_query = {
"q": "*",
"filters": [
{
"field_name": "data_type",
"values": ["obsolete"],
"type": "match_any"
}
]
}
delete_response = search_client.delete_by_query(index_id, delete_query)
print(f"Deletion task: {delete_response['task_id']}")import time
# Monitor ingestion/deletion tasks
def wait_for_task(search_client, task_id, timeout=300):
start_time = time.time()
while time.time() - start_time < timeout:
task_status = search_client.get_task(task_id)
status = task_status.get("state", "PENDING")
print(f"Task {task_id}: {status}")
if status in ["SUCCESS", "FAILED"]:
return task_status
time.sleep(5)
raise TimeoutError(f"Task {task_id} did not complete within {timeout} seconds")
# Wait for ingestion to complete
try:
final_status = wait_for_task(search_client, task_id)
if final_status["state"] == "SUCCESS":
print("Ingestion completed successfully")
else:
print(f"Ingestion failed: {final_status.get('message')}")
except TimeoutError as e:
print(e)# List all accessible indices
indices = search_client.get_index_list()
for index in indices:
print(f"Index: {index['display_name']} ({index['id']})")
print(f" Description: {index['description']}")
print(f" Status: {index.get('status', 'active')}")
print(f" Document count: {index.get('size', 'unknown')}")
# Get detailed index information
index_details = search_client.get_index(index_id)
print(f"Index created: {index_details['creation_date']}")
print(f"Index permissions: {index_details.get('permissions', [])}")
# List recent tasks for monitoring
tasks = search_client.get_task_list()
for task in tasks.get("tasks", []):
print(f"Task {task['task_id']}: {task['state']} - {task.get('task_type', 'unknown')}")# Geographic search with location filters
geo_query = (SearchQuery("research station")
.add_filter("location.country", ["United States", "Canada"])
.add_filter("coordinates", ["-90,-180", "90,180"], type="geo_bounding_box")
.add_facet("country_facet", "location.country")
.add_sort("date_created", "desc")
)
# Date range search
date_query = (SearchQuery("climate data")
.add_filter("date_created", ["2024-01-01", "2024-12-31"], type="date_range")
.add_facet("monthly", "date_created", type="date_histogram",
additional_fields={"interval": "month"})
)
# Full-text search with boosting
boosted_query = (SearchQuery("temperature precipitation climate")
.set_advanced(True)
.add_sort("_score", "desc") # Sort by relevance
.set_field_list(["subject", "description", "creator", "keywords", "_score"])
)
# Execute queries
geo_results = search_client.post_search(index_id, geo_query)
date_results = search_client.post_search(index_id, date_query)
relevance_results = search_client.post_search(index_id, boosted_query)Install with Tessl CLI
npx tessl i tessl/pypi-globus-sdk