AHL Research Versioned TimeSeries and Tick store for high-performance financial data storage and analysis
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Raw MongoDB document storage providing full PyMongo interface for direct database operations. Enables custom document structures, aggregation pipelines, and advanced MongoDB features within Arctic's framework for maximum flexibility with unstructured or semi-structured data.
Direct MongoDB document storage with complete PyMongo interface for custom data structures and advanced operations.
class BSONStore:
"""
Raw BSON document storage with full MongoDB interface.
Provides direct access to MongoDB operations within Arctic's
framework, enabling custom document structures, complex queries,
and advanced MongoDB features for unstructured data.
"""Standard MongoDB query operations for finding and retrieving documents.
def find(self, *args, **kwargs):
"""
Find documents matching criteria (PyMongo interface).
Parameters:
- filter: Query filter dictionary (default: {})
- projection: Fields to include/exclude
- limit: Maximum number of documents to return
- sort: Sort specification
- **kwargs: Additional PyMongo find parameters
Returns:
pymongo.Cursor: Cursor over matching documents
"""
def find_one(self, *args, **kwargs):
"""
Find single document matching criteria.
Parameters:
- filter: Query filter dictionary (default: {})
- projection: Fields to include/exclude
- **kwargs: Additional PyMongo find_one parameters
Returns:
dict or None: First matching document or None if not found
"""
def count(self, filter, **kwargs):
"""
Count documents matching filter criteria.
Parameters:
- filter: Query filter dictionary
- **kwargs: Additional count parameters
Returns:
int: Number of matching documents
"""
def distinct(self, key, **kwargs):
"""
Get distinct values for specified field.
Parameters:
- key: Field name to get distinct values for
- filter: Optional query filter
- **kwargs: Additional distinct parameters
Returns:
List of distinct values
"""Methods for inserting, updating, and replacing documents in the collection.
def insert_one(self, document, **kwargs):
"""
Insert single document into collection.
Parameters:
- document: Document dictionary to insert
- **kwargs: Additional insert parameters
Returns:
pymongo.results.InsertOneResult: Insert operation result
Raises:
- DuplicateKeyError: If document violates unique constraints
"""
def insert_many(self, documents, **kwargs):
"""
Insert multiple documents into collection.
Parameters:
- documents: List of document dictionaries to insert
- ordered: Whether to stop on first error (default: True)
- **kwargs: Additional insert parameters
Returns:
pymongo.results.InsertManyResult: Insert operation result
"""
def update_one(self, filter, update, **kwargs):
"""
Update single document matching filter.
Parameters:
- filter: Query filter to match document
- update: Update operations to apply
- upsert: Create document if not found (default: False)
- **kwargs: Additional update parameters
Returns:
pymongo.results.UpdateResult: Update operation result
"""
def update_many(self, filter, update, **kwargs):
"""
Update all documents matching filter.
Parameters:
- filter: Query filter to match documents
- update: Update operations to apply
- upsert: Create document if not found (default: False)
- **kwargs: Additional update parameters
Returns:
pymongo.results.UpdateResult: Update operation result
"""
def replace_one(self, filter, replacement, **kwargs):
"""
Replace single document matching filter.
Parameters:
- filter: Query filter to match document
- replacement: New document to replace with
- upsert: Create document if not found (default: False)
- **kwargs: Additional replace parameters
Returns:
pymongo.results.UpdateResult: Replace operation result
"""Methods for removing documents from the collection.
def delete_one(self, filter, **kwargs):
"""
Delete single document matching filter.
Parameters:
- filter: Query filter to match document
- **kwargs: Additional delete parameters
Returns:
pymongo.results.DeleteResult: Delete operation result
"""
def delete_many(self, filter, **kwargs):
"""
Delete all documents matching filter.
Parameters:
- filter: Query filter to match documents
- **kwargs: Additional delete parameters
Returns:
pymongo.results.DeleteResult: Delete operation result
"""Methods for atomic operations and complex document modifications.
def find_one_and_replace(self, filter, replacement, **kwargs):
"""
Find document and replace it atomically.
Parameters:
- filter: Query filter to match document
- replacement: New document to replace with
- return_document: Return original or updated document
- upsert: Create document if not found (default: False)
- **kwargs: Additional parameters
Returns:
dict or None: Original or updated document based on return_document
"""
def find_one_and_update(self, filter, update, **kwargs):
"""
Find document and update it atomically.
Parameters:
- filter: Query filter to match document
- update: Update operations to apply
- return_document: Return original or updated document
- upsert: Create document if not found (default: False)
- **kwargs: Additional parameters
Returns:
dict or None: Original or updated document based on return_document
"""
def find_one_and_delete(self, filter, **kwargs):
"""
Find document and delete it atomically.
Parameters:
- filter: Query filter to match document
- **kwargs: Additional parameters
Returns:
dict or None: Deleted document or None if not found
"""Methods for executing multiple operations efficiently in batches.
def bulk_write(self, requests, **kwargs):
"""
Execute multiple write operations in batch.
Parameters:
- requests: List of write operation objects
- ordered: Execute operations in order (default: True)
- **kwargs: Additional bulk write parameters
Returns:
pymongo.results.BulkWriteResult: Bulk operation result
"""Methods for complex data processing and analysis using MongoDB aggregation framework.
def aggregate(self, pipeline, **kwargs):
"""
Execute aggregation pipeline for complex data processing.
Parameters:
- pipeline: List of aggregation stage dictionaries
- allowDiskUse: Allow disk usage for large operations
- **kwargs: Additional aggregation parameters
Returns:
pymongo.CommandCursor: Cursor over aggregation results
"""Methods for creating and managing database indexes for query performance.
def create_index(self, keys, **kwargs):
"""
Create database index for improved query performance.
Parameters:
- keys: Index specification (field names and direction)
- unique: Create unique index (default: False)
- sparse: Create sparse index (default: False)
- **kwargs: Additional index parameters
Returns:
str: Index name
"""
def drop_index(self, index_or_name):
"""
Drop database index.
Parameters:
- index_or_name: Index name or specification to drop
"""
def index_information(self):
"""
Get information about all indexes on collection.
Returns:
dict: Index information including names, keys, and options
"""Methods for managing the BSON store and getting statistics.
def enable_sharding(self):
"""
Enable MongoDB sharding for the collection.
Enables horizontal scaling across multiple MongoDB instances
for handling large document collections.
"""
def stats(self):
"""
Get BSON store statistics and performance metrics.
Returns:
dict: Collection statistics including document counts, storage size
"""from arctic import Arctic, BSON_STORE
from datetime import datetime
import pymongo
# Setup BSON store
arctic_conn = Arctic('mongodb://localhost:27017')
arctic_conn.initialize_library('documents', BSON_STORE)
doc_store = arctic_conn['documents']
# Insert documents
documents = [
{
'user_id': 'user123',
'event': 'login',
'timestamp': datetime.now(),
'metadata': {'ip': '192.168.1.1', 'device': 'mobile'}
},
{
'user_id': 'user456',
'event': 'purchase',
'timestamp': datetime.now(),
'amount': 99.99,
'product_id': 'prod789'
}
]
# Insert single document
result = doc_store.insert_one(documents[0])
print(f"Inserted document ID: {result.inserted_id}")
# Insert multiple documents
result = doc_store.insert_many(documents)
print(f"Inserted {len(result.inserted_ids)} documents")# Find all documents
all_docs = list(doc_store.find())
print(f"Total documents: {len(all_docs)}")
# Find with filter
login_events = list(doc_store.find({'event': 'login'}))
print(f"Login events: {len(login_events)}")
# Find with projection (specific fields only)
user_events = list(doc_store.find(
{'user_id': 'user123'},
{'event': 1, 'timestamp': 1, '_id': 0}
))
# Find one document
recent_purchase = doc_store.find_one(
{'event': 'purchase'},
sort=[('timestamp', pymongo.DESCENDING)]
)
# Count documents
purchase_count = doc_store.count({'event': 'purchase'})
print(f"Total purchases: {purchase_count}")
# Get distinct values
unique_events = doc_store.distinct('event')
print(f"Event types: {unique_events}")# Update single document
doc_store.update_one(
{'user_id': 'user123', 'event': 'login'},
{'$set': {'processed': True}}
)
# Update multiple documents
doc_store.update_many(
{'event': 'login'},
{'$set': {'category': 'authentication'}}
)
# Upsert (insert if not exists)
doc_store.update_one(
{'user_id': 'user999', 'event': 'signup'},
{
'$set': {
'timestamp': datetime.now(),
'source': 'web'
}
},
upsert=True
)
# Replace entire document
doc_store.replace_one(
{'user_id': 'user456'},
{
'user_id': 'user456',
'event': 'purchase',
'timestamp': datetime.now(),
'amount': 149.99,
'product_id': 'prod999',
'status': 'completed'
}
)# Atomic find and modify
updated_doc = doc_store.find_one_and_update(
{'user_id': 'user123'},
{'$inc': {'login_count': 1}},
return_document=pymongo.ReturnDocument.AFTER,
upsert=True
)
# Atomic find and delete
deleted_doc = doc_store.find_one_and_delete(
{'event': 'temp_event'}
)
# Bulk operations
from pymongo import InsertOne, UpdateOne, DeleteOne
bulk_ops = [
InsertOne({'user_id': 'bulk1', 'event': 'test'}),
UpdateOne({'user_id': 'bulk1'}, {'$set': {'processed': True}}),
DeleteOne({'user_id': 'old_user'})
]
result = doc_store.bulk_write(bulk_ops)
print(f"Bulk operations result: {result.bulk_api_result}")# Complex aggregation for analytics
pipeline = [
# Group by event type and count
{
'$group': {
'_id': '$event',
'count': {'$sum': 1},
'avg_amount': {'$avg': '$amount'}
}
},
# Sort by count descending
{
'$sort': {'count': -1}
},
# Add computed fields
{
'$addFields': {
'percentage': {
'$multiply': [
{'$divide': ['$count', {'$sum': '$count'}]},
100
]
}
}
}
]
results = list(doc_store.aggregate(pipeline))
for result in results:
print(f"Event: {result['_id']}, Count: {result['count']}")
# Time-based aggregation
daily_stats = list(doc_store.aggregate([
{
'$group': {
'_id': {
'year': {'$year': '$timestamp'},
'month': {'$month': '$timestamp'},
'day': {'$dayOfMonth': '$timestamp'}
},
'events': {'$sum': 1},
'revenue': {'$sum': '$amount'}
}
},
{
'$sort': {'_id': 1}
}
]))# Create indexes for better query performance
doc_store.create_index('user_id')
doc_store.create_index('event')
doc_store.create_index([('timestamp', pymongo.DESCENDING)])
# Create compound index
doc_store.create_index([
('user_id', pymongo.ASCENDING),
('timestamp', pymongo.DESCENDING)
])
# Create unique index
doc_store.create_index('transaction_id', unique=True)
# Get index information
indexes = doc_store.index_information()
for index_name, index_info in indexes.items():
print(f"Index: {index_name}, Keys: {index_info['key']}")
# Enable sharding for large collections
doc_store.enable_sharding()from datetime import timedelta
# Date range queries
yesterday = datetime.now() - timedelta(days=1)
recent_docs = list(doc_store.find({
'timestamp': {'$gte': yesterday}
}))
# Complex filters with operators
filtered_docs = list(doc_store.find({
'$and': [
{'event': {'$in': ['purchase', 'refund']}},
{'amount': {'$gt': 50}},
{'timestamp': {'$gte': yesterday}}
]
}))
# Text search (requires text index)
doc_store.create_index([('description', 'text')])
text_results = list(doc_store.find({
'$text': {'$search': 'important event'}
}))
# Regular expression queries
pattern_docs = list(doc_store.find({
'user_id': {'$regex': '^user[0-9]+$'}
}))
# Array and nested document queries
nested_docs = list(doc_store.find({
'metadata.device': 'mobile',
'tags': {'$in': ['premium', 'vip']}
}))Install with Tessl CLI
npx tessl i tessl/pypi-arctic