or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdcore-connector.mdcustom-processing.mddata-streams.mdindex.md

custom-processing.mddocs/

0

# Custom Data Processing

1

2

Specialized record extraction and transformation for complex Mailchimp API responses, particularly email activity data. Provides custom extractors that handle nested data structures and flatten complex API responses for downstream consumption.

3

4

## Capabilities

5

6

### Email Activity Record Extractor

7

8

Custom extractor for processing email activity data with nested activity arrays that need to be flattened into individual records.

9

10

```python { .api }

11

class MailChimpRecordExtractorEmailActivity(DpathExtractor):

12

"""

13

Custom record extractor for email activity data.

14

15

Extends DpathExtractor to handle complex nested activity structures

16

from Mailchimp's email activity API responses. Flattens activity

17

arrays into individual records while preserving parent record data.

18

"""

19

20

def extract_records(self, response: requests.Response) -> Iterable[Mapping[str, Any]]:

21

"""

22

Extract and flatten email activity records from API response.

23

24

Parameters:

25

- response: HTTP response from Mailchimp email activity API

26

27

Returns:

28

Iterator of flattened activity records

29

30

Process:

31

1. Extracts base records using parent DpathExtractor

32

2. Iterates through each record's 'activity' array

33

3. Flattens activity items by merging with parent record data

34

4. Yields individual activity records with complete context

35

36

Each output record contains both the email context and

37

specific activity details (opens, clicks, bounces, etc.).

38

"""

39

```

40

41

## Data Transformation Details

42

43

### Email Activity Flattening

44

45

The email activity API returns nested structures where each email record contains an array of activity events. The custom extractor transforms this structure:

46

47

**Input Structure:**

48

```json

49

{

50

"email_id": "abc123",

51

"campaign_id": "xyz789",

52

"list_id": "list456",

53

"email_address": "user@example.com",

54

"activity": [

55

{

56

"action": "open",

57

"timestamp": "2023-01-01T12:00:00+00:00",

58

"url": null,

59

"ip": "192.168.1.1"

60

},

61

{

62

"action": "click",

63

"timestamp": "2023-01-01T12:05:00+00:00",

64

"url": "https://example.com/link",

65

"ip": "192.168.1.1"

66

}

67

]

68

}

69

```

70

71

**Output Records:**

72

```json

73

[

74

{

75

"email_id": "abc123",

76

"campaign_id": "xyz789",

77

"list_id": "list456",

78

"email_address": "user@example.com",

79

"action": "open",

80

"timestamp": "2023-01-01T12:00:00+00:00",

81

"url": null,

82

"ip": "192.168.1.1"

83

},

84

{

85

"email_id": "abc123",

86

"campaign_id": "xyz789",

87

"list_id": "list456",

88

"email_address": "user@example.com",

89

"action": "click",

90

"timestamp": "2023-01-01T12:05:00+00:00",

91

"url": "https://example.com/link",

92

"ip": "192.168.1.1"

93

}

94

]

95

```

96

97

## Usage Examples

98

99

### Custom Extractor Implementation

100

101

```python

102

import requests

103

from typing import Any, Iterable, Mapping

104

from airbyte_cdk.sources.declarative.extractors import DpathExtractor

105

106

class MailChimpRecordExtractorEmailActivity(DpathExtractor):

107

def extract_records(self, response: requests.Response) -> Iterable[Mapping[str, Any]]:

108

# Get base records from parent extractor

109

records = super().extract_records(response=response)

110

111

# Flatten each record's activity array

112

for record in records:

113

activity_items = record.pop("activity", [])

114

for activity_item in activity_items:

115

# Merge parent record data with activity item

116

yield {**record, **activity_item}

117

```

118

119

### Integration with Declarative Framework

120

121

The custom extractor is configured in the manifest.yaml for the email_activity stream:

122

123

```yaml

124

email_activity_stream:

125

type: DeclarativeStream

126

retriever:

127

record_selector:

128

type: RecordSelector

129

extractor:

130

type: CustomRecordExtractor

131

class_name: source_mailchimp.components.MailChimpRecordExtractorEmailActivity

132

field_path: ["emails"]

133

```

134

135

### Record Processing Flow

136

137

```python

138

# 1. API response received with nested activity data

139

140

# 2. Base DpathExtractor processes response structure

141

base_records = super().extract_records(response)

142

143

# 3. Custom extractor flattens activity arrays

144

for record in base_records:

145

activities = record.pop("activity", []) # Remove activity array

146

147

# 4. Create individual records for each activity

148

for activity in activities:

149

flattened_record = {**record, **activity} # Merge data

150

yield flattened_record # Output individual activity record

151

```

152

153

## Benefits

154

155

### Enhanced Data Structure

156

157

- **Simplified Analytics**: Each activity becomes a separate record for easier analysis

158

- **Preserved Context**: Parent email data maintained in each activity record

159

- **Consistent Schema**: Flattened structure matches expected downstream formats

160

- **Complete Attribution**: Full email and campaign context for each activity event

161

162

### Performance Optimization

163

164

- **Streaming Processing**: Generator-based extraction for memory efficiency

165

- **Lazy Evaluation**: Records processed on-demand during iteration

166

- **Incremental Compatibility**: Works with timestamp-based incremental sync

167

- **Error Isolation**: Failed activity items don't impact other records