0
# Airbyte Source RSS
1
2
An Airbyte source connector for extracting and synchronizing RSS feed data into data warehouses and other destinations. This connector uses the Airbyte CDK with a declarative YAML configuration approach to parse RSS feeds, extract structured data, and enable incremental synchronization based on publication timestamps.
3
4
## Package Information
5
6
- **Package Name**: airbyte-source-rss
7
- **Package Type**: PyPI
8
- **Language**: Python
9
- **Installation**: `pip install airbyte-source-rss` (or via Poetry: `poetry add airbyte-source-rss`)
10
- **Version**: 1.0.31
11
- **Dependencies**: airbyte-cdk, feedparser, pytz
12
13
## Core Imports
14
15
```python
16
from source_rss import SourceRss
17
from source_rss.run import run
18
from source_rss.components import CustomExtractor
19
```
20
21
Required for launching connector:
22
23
```python
24
from airbyte_cdk.entrypoint import launch
25
```
26
27
## Basic Usage
28
29
### Using as Airbyte Connector
30
31
```python
32
from source_rss import SourceRss
33
from airbyte_cdk.entrypoint import launch
34
import sys
35
36
# Create and launch the source connector
37
source = SourceRss()
38
launch(source, sys.argv[1:])
39
```
40
41
### Command Line Usage
42
43
```bash
44
# Get connector specification (returns JSON schema for configuration)
45
poetry run source-rss spec
46
47
# Test connection with config (validates RSS URL accessibility)
48
poetry run source-rss check --config config.json
49
50
# Discover available streams (returns items stream schema)
51
poetry run source-rss discover --config config.json
52
53
# Extract data (reads RSS feed items according to catalog configuration)
54
poetry run source-rss read --config config.json --catalog catalog.json
55
56
# Extract with state for incremental sync
57
poetry run source-rss read --config config.json --catalog catalog.json --state state.json
58
```
59
60
### Configuration
61
62
```json
63
{
64
"url": "https://example.com/rss.xml"
65
}
66
```
67
68
## Architecture
69
70
The connector follows Airbyte's declarative configuration pattern:
71
72
- **SourceRss**: Main connector class extending YamlDeclarativeSource
73
- **CustomExtractor**: RSS-specific data extraction logic
74
- **Manifest YAML**: Declarative configuration defining streams, schema, and incremental sync
75
- **Poetry**: Dependency management and packaging
76
77
The connector extracts RSS items with fields like title, description, author, publication date, and other metadata, supporting incremental synchronization based on publication timestamps.
78
79
## Capabilities
80
81
### Source Connector
82
83
Main Airbyte source connector class that provides RSS feed extraction functionality.
84
85
```python { .api }
86
class SourceRss(YamlDeclarativeSource):
87
"""
88
Declarative source connector for RSS feeds.
89
90
Inherits from YamlDeclarativeSource and loads configuration from manifest.yaml.
91
"""
92
93
def __init__(self):
94
"""Initialize SourceRss with manifest.yaml configuration."""
95
```
96
97
### RSS Data Extraction
98
99
Custom extractor for parsing RSS feed responses and transforming them into structured records.
100
101
```python { .api }
102
class CustomExtractor(RecordExtractor):
103
"""
104
Custom RSS feed parser and record extractor.
105
106
Processes RSS feed XML responses and extracts structured item data
107
with timestamp-based filtering for incremental synchronization.
108
Uses feedparser library for robust RSS/Atom parsing and pytz for timezone handling.
109
"""
110
111
def extract_records(self, response: requests.Response, **kwargs) -> List[Mapping[str, Any]]:
112
"""
113
Extract and transform RSS feed items from HTTP response.
114
115
Parses RSS/Atom feed XML using feedparser, converts items to structured records,
116
applies timestamp filtering for incremental sync, and handles timezone conversions.
117
118
Args:
119
response (requests.Response): HTTP response containing RSS feed XML
120
**kwargs: Additional extraction parameters
121
122
Returns:
123
List[Mapping[str, Any]]: List of extracted RSS items as dictionaries
124
125
Extracted Fields:
126
- title (str, optional): RSS item title
127
- link (str, optional): RSS item URL
128
- description (str, optional): RSS item description/content
129
- author (str, optional): RSS item author
130
- category (str, optional): RSS item category
131
- comments (str, optional): RSS item comments URL
132
- enclosure (str, optional): RSS item enclosure/attachment
133
- guid (str, optional): RSS item unique identifier
134
- published (str): RSS item publication date in ISO format with UTC timezone
135
136
Processing:
137
- Extracts items from feed.entries in reverse order (oldest first)
138
- Converts published_parsed timestamps to UTC ISO format
139
- Filters items based on feed-level publication date for incremental sync
140
- Handles missing fields gracefully (sets to null)
141
"""
142
```
143
144
### Entry Point Function
145
146
Main entry point function for launching the connector.
147
148
```python { .api }
149
def run():
150
"""
151
Create SourceRss instance and launch connector via airbyte_cdk.entrypoint.launch.
152
153
Uses sys.argv[1:] for command line arguments processing.
154
Supports standard Airbyte connector commands: spec, check, discover, read.
155
"""
156
```
157
158
## Stream Configuration
159
160
The connector provides a single stream called `items` with the following characteristics:
161
162
### Stream Schema
163
164
```python { .api }
165
# RSS Items Stream Schema
166
{
167
"type": "object",
168
"additionalProperties": True,
169
"required": ["published"],
170
"properties": {
171
"title": {"type": ["null", "string"]},
172
"link": {"type": ["null", "string"]},
173
"description": {"type": ["null", "string"]},
174
"author": {"type": ["null", "string"]},
175
"category": {"type": ["null", "string"]},
176
"comments": {"type": ["null", "string"]},
177
"enclosure": {"type": ["null", "string"]},
178
"guid": {"type": ["null", "string"]},
179
"published": {"type": "string", "format": "date-time"}
180
}
181
}
182
```
183
184
### Synchronization Modes
185
186
The connector supports both full refresh and incremental synchronization:
187
188
- **Supported Sync Modes**: `full_refresh`, `incremental`
189
- **Destination Sync Modes**: `overwrite`, `append`
190
191
### Incremental Synchronization
192
193
The connector supports incremental synchronization using the `published` field as cursor:
194
195
- **Cursor Field**: `published` (datetime)
196
- **Datetime Format**: `%Y-%m-%dT%H:%M:%S%z`
197
- **Default Window**: Last 23 hours from current time
198
- **Filtering**: Records filtered by `published >= stream_interval['start_time']`
199
200
## Integration Testing
201
202
The package includes integration test support:
203
204
```python { .api }
205
import pytest
206
207
# Connector acceptance test fixture
208
@pytest.fixture(scope="session", autouse=True)
209
def connector_setup():
210
"""
211
Placeholder fixture for external resources that acceptance test might require.
212
"""
213
```
214
215
### Test Configuration Files
216
217
- `sample_config.json`: Example configuration with NASA RSS feed
218
- `configured_catalog.json`: Stream catalog configuration
219
- `invalid_config.json`: Invalid configuration for negative testing
220
- `sample_state.json`: Example state for incremental sync testing
221
222
## Usage Examples
223
224
### Basic RSS Feed Extraction
225
226
```python
227
from source_rss import SourceRss
228
from airbyte_cdk.entrypoint import launch
229
230
# Initialize connector
231
source = SourceRss()
232
233
# Configuration for RSS feed
234
config = {
235
"url": "https://www.nasa.gov/rss/dyn/breaking_news.rss"
236
}
237
238
# The connector will extract RSS items with fields:
239
# - title, link, description
240
# - author, category, comments
241
# - enclosure, guid
242
# - published (ISO datetime)
243
```
244
245
### Custom Extraction Logic
246
247
```python
248
from source_rss.components import CustomExtractor
249
import requests
250
251
# Create custom extractor instance
252
extractor = CustomExtractor()
253
254
# Process RSS feed response
255
response = requests.get("https://example.com/feed.rss")
256
records = extractor.extract_records(response)
257
258
# Each record contains RSS item fields
259
for record in records:
260
print(f"Title: {record.get('title')}")
261
print(f"Published: {record.get('published')}")
262
print(f"Link: {record.get('link')}")
263
```
264
265
### Docker Usage
266
267
```bash
268
# Build connector image
269
airbyte-ci connectors --name=source-rss build
270
271
# Run connector commands
272
docker run --rm airbyte/source-rss:dev spec
273
docker run --rm -v $(pwd)/config:/config airbyte/source-rss:dev check --config /config/config.json
274
docker run --rm -v $(pwd)/config:/config airbyte/source-rss:dev discover --config /config/config.json
275
docker run --rm -v $(pwd)/config:/config -v $(pwd)/catalog:/catalog airbyte/source-rss:dev read --config /config/config.json --catalog /catalog/catalog.json
276
```
277
278
## Error Handling
279
280
The connector handles common RSS parsing scenarios:
281
282
- **Missing Fields**: Optional RSS fields default to null if not present
283
- **Date Parsing**: Handles various RSS date formats and timezone conversions
284
- **Feed Parsing**: Uses feedparser library for robust RSS/Atom feed parsing
285
- **HTTP Errors**: Standard HTTP error handling via Airbyte CDK
286
- **Invalid XML**: feedparser handles malformed RSS feeds gracefully
287
288
## Dependencies
289
290
### Core Dependencies
291
292
```python
293
# Core Airbyte framework
294
airbyte-cdk = "^0"
295
296
# RSS/Atom feed parsing
297
feedparser = "6.0.10"
298
299
# Timezone handling
300
pytz = "2022.6"
301
```
302
303
### Development Dependencies
304
305
```python
306
# Testing framework
307
pytest = "*"
308
pytest-mock = "*"
309
310
# HTTP mocking for tests
311
requests-mock = "*"
312
```