CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-airbyte-source-mailchimp

Airbyte source connector that enables data extraction from Mailchimp's API for email marketing analytics and audience management

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

custom-processing.mddocs/

Custom Data Processing

Specialized record extraction and transformation for complex Mailchimp API responses, particularly email activity data. Provides custom extractors that handle nested data structures and flatten complex API responses for downstream consumption.

Capabilities

Email Activity Record Extractor

Custom extractor for processing email activity data with nested activity arrays that need to be flattened into individual records.

class MailChimpRecordExtractorEmailActivity(DpathExtractor):
    """
    Custom record extractor for email activity data.
    
    Extends DpathExtractor to handle complex nested activity structures
    from Mailchimp's email activity API responses. Flattens activity
    arrays into individual records while preserving parent record data.
    """
    
    def extract_records(self, response: requests.Response) -> Iterable[Mapping[str, Any]]:
        """
        Extract and flatten email activity records from API response.
        
        Parameters:
        - response: HTTP response from Mailchimp email activity API
        
        Returns:
        Iterator of flattened activity records
        
        Process:
        1. Extracts base records using parent DpathExtractor
        2. Iterates through each record's 'activity' array
        3. Flattens activity items by merging with parent record data
        4. Yields individual activity records with complete context
        
        Each output record contains both the email context and
        specific activity details (opens, clicks, bounces, etc.).
        """

Data Transformation Details

Email Activity Flattening

The email activity API returns nested structures where each email record contains an array of activity events. The custom extractor transforms this structure:

Input Structure:

{
  "email_id": "abc123",
  "campaign_id": "xyz789", 
  "list_id": "list456",
  "email_address": "user@example.com",
  "activity": [
    {
      "action": "open",
      "timestamp": "2023-01-01T12:00:00+00:00",
      "url": null,
      "ip": "192.168.1.1"
    },
    {
      "action": "click", 
      "timestamp": "2023-01-01T12:05:00+00:00",
      "url": "https://example.com/link",
      "ip": "192.168.1.1"
    }
  ]
}

Output Records:

[
  {
    "email_id": "abc123",
    "campaign_id": "xyz789",
    "list_id": "list456", 
    "email_address": "user@example.com",
    "action": "open",
    "timestamp": "2023-01-01T12:00:00+00:00",
    "url": null,
    "ip": "192.168.1.1"
  },
  {
    "email_id": "abc123",
    "campaign_id": "xyz789",
    "list_id": "list456",
    "email_address": "user@example.com", 
    "action": "click",
    "timestamp": "2023-01-01T12:05:00+00:00",
    "url": "https://example.com/link",
    "ip": "192.168.1.1"
  }
]

Usage Examples

Custom Extractor Implementation

import requests
from typing import Any, Iterable, Mapping
from airbyte_cdk.sources.declarative.extractors import DpathExtractor

class MailChimpRecordExtractorEmailActivity(DpathExtractor):
    def extract_records(self, response: requests.Response) -> Iterable[Mapping[str, Any]]:
        # Get base records from parent extractor
        records = super().extract_records(response=response)
        
        # Flatten each record's activity array
        for record in records:
            activity_items = record.pop("activity", [])
            for activity_item in activity_items:
                # Merge parent record data with activity item
                yield {**record, **activity_item}

Integration with Declarative Framework

The custom extractor is configured in the manifest.yaml for the email_activity stream:

email_activity_stream:
  type: DeclarativeStream
  retriever:
    record_selector:
      type: RecordSelector
      extractor:
        type: CustomRecordExtractor
        class_name: source_mailchimp.components.MailChimpRecordExtractorEmailActivity
        field_path: ["emails"]

Record Processing Flow

# 1. API response received with nested activity data

# 2. Base DpathExtractor processes response structure  
base_records = super().extract_records(response)

# 3. Custom extractor flattens activity arrays
for record in base_records:
    activities = record.pop("activity", [])  # Remove activity array
    
    # 4. Create individual records for each activity
    for activity in activities:
        flattened_record = {**record, **activity}  # Merge data
        yield flattened_record  # Output individual activity record

Benefits

Enhanced Data Structure

  • Simplified Analytics: Each activity becomes a separate record for easier analysis
  • Preserved Context: Parent email data maintained in each activity record
  • Consistent Schema: Flattened structure matches expected downstream formats
  • Complete Attribution: Full email and campaign context for each activity event

Performance Optimization

  • Streaming Processing: Generator-based extraction for memory efficiency
  • Lazy Evaluation: Records processed on-demand during iteration
  • Incremental Compatibility: Works with timestamp-based incremental sync
  • Error Isolation: Failed activity items don't impact other records

Install with Tessl CLI

npx tessl i tessl/pypi-airbyte-source-mailchimp

docs

configuration.md

core-connector.md

custom-processing.md

data-streams.md

index.md

tile.json