CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-arctic

AHL Research Versioned TimeSeries and Tick store for high-performance financial data storage and analysis

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

bson-store.mddocs/

BSON Store Operations

Raw MongoDB document storage providing full PyMongo interface for direct database operations. Enables custom document structures, aggregation pipelines, and advanced MongoDB features within Arctic's framework for maximum flexibility with unstructured or semi-structured data.

Capabilities

BSONStore Class

Direct MongoDB document storage with complete PyMongo interface for custom data structures and advanced operations.

class BSONStore:
    """
    Raw BSON document storage with full MongoDB interface.
    
    Provides direct access to MongoDB operations within Arctic's
    framework, enabling custom document structures, complex queries,
    and advanced MongoDB features for unstructured data.
    """

Document Query Operations

Standard MongoDB query operations for finding and retrieving documents.

def find(self, *args, **kwargs):
    """
    Find documents matching criteria (PyMongo interface).
    
    Parameters:
    - filter: Query filter dictionary (default: {})
    - projection: Fields to include/exclude
    - limit: Maximum number of documents to return
    - sort: Sort specification
    - **kwargs: Additional PyMongo find parameters
    
    Returns:
    pymongo.Cursor: Cursor over matching documents
    """

def find_one(self, *args, **kwargs):
    """
    Find single document matching criteria.
    
    Parameters:
    - filter: Query filter dictionary (default: {})
    - projection: Fields to include/exclude
    - **kwargs: Additional PyMongo find_one parameters
    
    Returns:
    dict or None: First matching document or None if not found
    """

def count(self, filter, **kwargs):
    """
    Count documents matching filter criteria.
    
    Parameters:
    - filter: Query filter dictionary
    - **kwargs: Additional count parameters
    
    Returns:
    int: Number of matching documents
    """

def distinct(self, key, **kwargs):
    """
    Get distinct values for specified field.
    
    Parameters:
    - key: Field name to get distinct values for
    - filter: Optional query filter
    - **kwargs: Additional distinct parameters
    
    Returns:
    List of distinct values
    """

Document Write Operations

Methods for inserting, updating, and replacing documents in the collection.

def insert_one(self, document, **kwargs):
    """
    Insert single document into collection.
    
    Parameters:
    - document: Document dictionary to insert
    - **kwargs: Additional insert parameters
    
    Returns:
    pymongo.results.InsertOneResult: Insert operation result
    
    Raises:
    - DuplicateKeyError: If document violates unique constraints
    """

def insert_many(self, documents, **kwargs):
    """
    Insert multiple documents into collection.
    
    Parameters:
    - documents: List of document dictionaries to insert
    - ordered: Whether to stop on first error (default: True)
    - **kwargs: Additional insert parameters
    
    Returns:
    pymongo.results.InsertManyResult: Insert operation result
    """

def update_one(self, filter, update, **kwargs):
    """
    Update single document matching filter.
    
    Parameters:
    - filter: Query filter to match document
    - update: Update operations to apply
    - upsert: Create document if not found (default: False)
    - **kwargs: Additional update parameters
    
    Returns:
    pymongo.results.UpdateResult: Update operation result
    """

def update_many(self, filter, update, **kwargs):
    """
    Update all documents matching filter.
    
    Parameters:
    - filter: Query filter to match documents
    - update: Update operations to apply
    - upsert: Create document if not found (default: False)
    - **kwargs: Additional update parameters
    
    Returns:
    pymongo.results.UpdateResult: Update operation result
    """

def replace_one(self, filter, replacement, **kwargs):
    """
    Replace single document matching filter.
    
    Parameters:
    - filter: Query filter to match document
    - replacement: New document to replace with
    - upsert: Create document if not found (default: False)
    - **kwargs: Additional replace parameters
    
    Returns:
    pymongo.results.UpdateResult: Replace operation result
    """

Document Delete Operations

Methods for removing documents from the collection.

def delete_one(self, filter, **kwargs):
    """
    Delete single document matching filter.
    
    Parameters:
    - filter: Query filter to match document
    - **kwargs: Additional delete parameters
    
    Returns:
    pymongo.results.DeleteResult: Delete operation result
    """

def delete_many(self, filter, **kwargs):
    """
    Delete all documents matching filter.
    
    Parameters:
    - filter: Query filter to match documents
    - **kwargs: Additional delete parameters
    
    Returns:
    pymongo.results.DeleteResult: Delete operation result
    """

Advanced Query Operations

Methods for atomic operations and complex document modifications.

def find_one_and_replace(self, filter, replacement, **kwargs):
    """
    Find document and replace it atomically.
    
    Parameters:
    - filter: Query filter to match document
    - replacement: New document to replace with
    - return_document: Return original or updated document
    - upsert: Create document if not found (default: False)
    - **kwargs: Additional parameters
    
    Returns:
    dict or None: Original or updated document based on return_document
    """

def find_one_and_update(self, filter, update, **kwargs):
    """
    Find document and update it atomically.
    
    Parameters:
    - filter: Query filter to match document
    - update: Update operations to apply
    - return_document: Return original or updated document
    - upsert: Create document if not found (default: False)
    - **kwargs: Additional parameters
    
    Returns:
    dict or None: Original or updated document based on return_document
    """

def find_one_and_delete(self, filter, **kwargs):
    """
    Find document and delete it atomically.
    
    Parameters:
    - filter: Query filter to match document
    - **kwargs: Additional parameters
    
    Returns:
    dict or None: Deleted document or None if not found
    """

Bulk Operations

Methods for executing multiple operations efficiently in batches.

def bulk_write(self, requests, **kwargs):
    """
    Execute multiple write operations in batch.
    
    Parameters:
    - requests: List of write operation objects
    - ordered: Execute operations in order (default: True)
    - **kwargs: Additional bulk write parameters
    
    Returns:
    pymongo.results.BulkWriteResult: Bulk operation result
    """

Aggregation Operations

Methods for complex data processing and analysis using MongoDB aggregation framework.

def aggregate(self, pipeline, **kwargs):
    """
    Execute aggregation pipeline for complex data processing.
    
    Parameters:
    - pipeline: List of aggregation stage dictionaries
    - allowDiskUse: Allow disk usage for large operations
    - **kwargs: Additional aggregation parameters
    
    Returns:
    pymongo.CommandCursor: Cursor over aggregation results
    """

Index Management

Methods for creating and managing database indexes for query performance.

def create_index(self, keys, **kwargs):
    """
    Create database index for improved query performance.
    
    Parameters:
    - keys: Index specification (field names and direction)
    - unique: Create unique index (default: False)
    - sparse: Create sparse index (default: False)
    - **kwargs: Additional index parameters
    
    Returns:
    str: Index name
    """

def drop_index(self, index_or_name):
    """
    Drop database index.
    
    Parameters:
    - index_or_name: Index name or specification to drop
    """

def index_information(self):
    """
    Get information about all indexes on collection.
    
    Returns:
    dict: Index information including names, keys, and options
    """

Store Management

Methods for managing the BSON store and getting statistics.

def enable_sharding(self):
    """
    Enable MongoDB sharding for the collection.
    
    Enables horizontal scaling across multiple MongoDB instances
    for handling large document collections.
    """

def stats(self):
    """
    Get BSON store statistics and performance metrics.
    
    Returns:
    dict: Collection statistics including document counts, storage size
    """

Usage Examples

Basic Document Operations

from arctic import Arctic, BSON_STORE
from datetime import datetime
import pymongo

# Setup BSON store
arctic_conn = Arctic('mongodb://localhost:27017')
arctic_conn.initialize_library('documents', BSON_STORE)
doc_store = arctic_conn['documents']

# Insert documents
documents = [
    {
        'user_id': 'user123',
        'event': 'login',
        'timestamp': datetime.now(),
        'metadata': {'ip': '192.168.1.1', 'device': 'mobile'}
    },
    {
        'user_id': 'user456', 
        'event': 'purchase',
        'timestamp': datetime.now(),
        'amount': 99.99,
        'product_id': 'prod789'
    }
]

# Insert single document
result = doc_store.insert_one(documents[0])
print(f"Inserted document ID: {result.inserted_id}")

# Insert multiple documents
result = doc_store.insert_many(documents)
print(f"Inserted {len(result.inserted_ids)} documents")

Querying Documents

# Find all documents
all_docs = list(doc_store.find())
print(f"Total documents: {len(all_docs)}")

# Find with filter
login_events = list(doc_store.find({'event': 'login'}))
print(f"Login events: {len(login_events)}")

# Find with projection (specific fields only)
user_events = list(doc_store.find(
    {'user_id': 'user123'},
    {'event': 1, 'timestamp': 1, '_id': 0}
))

# Find one document
recent_purchase = doc_store.find_one(
    {'event': 'purchase'},
    sort=[('timestamp', pymongo.DESCENDING)]
)

# Count documents
purchase_count = doc_store.count({'event': 'purchase'})
print(f"Total purchases: {purchase_count}")

# Get distinct values
unique_events = doc_store.distinct('event')
print(f"Event types: {unique_events}")

Updating Documents

# Update single document
doc_store.update_one(
    {'user_id': 'user123', 'event': 'login'},
    {'$set': {'processed': True}}
)

# Update multiple documents
doc_store.update_many(
    {'event': 'login'},
    {'$set': {'category': 'authentication'}}
)

# Upsert (insert if not exists)
doc_store.update_one(
    {'user_id': 'user999', 'event': 'signup'},
    {
        '$set': {
            'timestamp': datetime.now(),
            'source': 'web'
        }
    },
    upsert=True
)

# Replace entire document
doc_store.replace_one(
    {'user_id': 'user456'},
    {
        'user_id': 'user456',
        'event': 'purchase',
        'timestamp': datetime.now(),
        'amount': 149.99,
        'product_id': 'prod999',
        'status': 'completed'
    }
)

Advanced Operations

# Atomic find and modify
updated_doc = doc_store.find_one_and_update(
    {'user_id': 'user123'},
    {'$inc': {'login_count': 1}},
    return_document=pymongo.ReturnDocument.AFTER,
    upsert=True
)

# Atomic find and delete
deleted_doc = doc_store.find_one_and_delete(
    {'event': 'temp_event'}
)

# Bulk operations
from pymongo import InsertOne, UpdateOne, DeleteOne

bulk_ops = [
    InsertOne({'user_id': 'bulk1', 'event': 'test'}),
    UpdateOne({'user_id': 'bulk1'}, {'$set': {'processed': True}}),
    DeleteOne({'user_id': 'old_user'})
]

result = doc_store.bulk_write(bulk_ops)
print(f"Bulk operations result: {result.bulk_api_result}")

Aggregation Pipeline

# Complex aggregation for analytics
pipeline = [
    # Group by event type and count
    {
        '$group': {
            '_id': '$event',
            'count': {'$sum': 1},
            'avg_amount': {'$avg': '$amount'}
        }
    },
    # Sort by count descending
    {
        '$sort': {'count': -1}
    },
    # Add computed fields
    {
        '$addFields': {
            'percentage': {
                '$multiply': [
                    {'$divide': ['$count', {'$sum': '$count'}]},
                    100
                ]
            }
        }
    }
]

results = list(doc_store.aggregate(pipeline))
for result in results:
    print(f"Event: {result['_id']}, Count: {result['count']}")

# Time-based aggregation
daily_stats = list(doc_store.aggregate([
    {
        '$group': {
            '_id': {
                'year': {'$year': '$timestamp'},
                'month': {'$month': '$timestamp'},
                'day': {'$dayOfMonth': '$timestamp'}
            },
            'events': {'$sum': 1},
            'revenue': {'$sum': '$amount'}
        }
    },
    {
        '$sort': {'_id': 1}
    }
]))

Index Management

# Create indexes for better query performance
doc_store.create_index('user_id')
doc_store.create_index('event')
doc_store.create_index([('timestamp', pymongo.DESCENDING)])

# Create compound index
doc_store.create_index([
    ('user_id', pymongo.ASCENDING),
    ('timestamp', pymongo.DESCENDING)
])

# Create unique index
doc_store.create_index('transaction_id', unique=True)

# Get index information
indexes = doc_store.index_information()
for index_name, index_info in indexes.items():
    print(f"Index: {index_name}, Keys: {index_info['key']}")

# Enable sharding for large collections
doc_store.enable_sharding()

Complex Queries and Filtering

from datetime import timedelta

# Date range queries
yesterday = datetime.now() - timedelta(days=1) 
recent_docs = list(doc_store.find({
    'timestamp': {'$gte': yesterday}
}))

# Complex filters with operators
filtered_docs = list(doc_store.find({
    '$and': [
        {'event': {'$in': ['purchase', 'refund']}},
        {'amount': {'$gt': 50}},
        {'timestamp': {'$gte': yesterday}}
    ]
}))

# Text search (requires text index)
doc_store.create_index([('description', 'text')])
text_results = list(doc_store.find({
    '$text': {'$search': 'important event'}
}))

# Regular expression queries
pattern_docs = list(doc_store.find({
    'user_id': {'$regex': '^user[0-9]+$'}
}))

# Array and nested document queries
nested_docs = list(doc_store.find({
    'metadata.device': 'mobile',
    'tags': {'$in': ['premium', 'vip']}
}))

Install with Tessl CLI

npx tessl i tessl/pypi-arctic

docs

arctic-connection.md

async-operations.md

bson-store.md

chunk-store.md

date-utilities.md

index.md

tick-store.md

version-store.md

tile.json