Airbyte source connector for extracting and synchronizing RSS feed data into data warehouses
npx @tessl/cli install tessl/pypi-airbyte-source-rss@1.0.0An Airbyte source connector for extracting and synchronizing RSS feed data into data warehouses and other destinations. This connector uses the Airbyte CDK with a declarative YAML configuration approach to parse RSS feeds, extract structured data, and enable incremental synchronization based on publication timestamps.
pip install airbyte-source-rss (or via Poetry: poetry add airbyte-source-rss)from source_rss import SourceRss
from source_rss.run import run
from source_rss.components import CustomExtractorRequired for launching connector:
from airbyte_cdk.entrypoint import launchfrom source_rss import SourceRss
from airbyte_cdk.entrypoint import launch
import sys
# Create and launch the source connector
source = SourceRss()
launch(source, sys.argv[1:])# Get connector specification (returns JSON schema for configuration)
poetry run source-rss spec
# Test connection with config (validates RSS URL accessibility)
poetry run source-rss check --config config.json
# Discover available streams (returns items stream schema)
poetry run source-rss discover --config config.json
# Extract data (reads RSS feed items according to catalog configuration)
poetry run source-rss read --config config.json --catalog catalog.json
# Extract with state for incremental sync
poetry run source-rss read --config config.json --catalog catalog.json --state state.json{
"url": "https://example.com/rss.xml"
}The connector follows Airbyte's declarative configuration pattern:
The connector extracts RSS items with fields like title, description, author, publication date, and other metadata, supporting incremental synchronization based on publication timestamps.
Main Airbyte source connector class that provides RSS feed extraction functionality.
class SourceRss(YamlDeclarativeSource):
"""
Declarative source connector for RSS feeds.
Inherits from YamlDeclarativeSource and loads configuration from manifest.yaml.
"""
def __init__(self):
"""Initialize SourceRss with manifest.yaml configuration."""Custom extractor for parsing RSS feed responses and transforming them into structured records.
class CustomExtractor(RecordExtractor):
"""
Custom RSS feed parser and record extractor.
Processes RSS feed XML responses and extracts structured item data
with timestamp-based filtering for incremental synchronization.
Uses feedparser library for robust RSS/Atom parsing and pytz for timezone handling.
"""
def extract_records(self, response: requests.Response, **kwargs) -> List[Mapping[str, Any]]:
"""
Extract and transform RSS feed items from HTTP response.
Parses RSS/Atom feed XML using feedparser, converts items to structured records,
applies timestamp filtering for incremental sync, and handles timezone conversions.
Args:
response (requests.Response): HTTP response containing RSS feed XML
**kwargs: Additional extraction parameters
Returns:
List[Mapping[str, Any]]: List of extracted RSS items as dictionaries
Extracted Fields:
- title (str, optional): RSS item title
- link (str, optional): RSS item URL
- description (str, optional): RSS item description/content
- author (str, optional): RSS item author
- category (str, optional): RSS item category
- comments (str, optional): RSS item comments URL
- enclosure (str, optional): RSS item enclosure/attachment
- guid (str, optional): RSS item unique identifier
- published (str): RSS item publication date in ISO format with UTC timezone
Processing:
- Extracts items from feed.entries in reverse order (oldest first)
- Converts published_parsed timestamps to UTC ISO format
- Filters items based on feed-level publication date for incremental sync
- Handles missing fields gracefully (sets to null)
"""Main entry point function for launching the connector.
def run():
"""
Create SourceRss instance and launch connector via airbyte_cdk.entrypoint.launch.
Uses sys.argv[1:] for command line arguments processing.
Supports standard Airbyte connector commands: spec, check, discover, read.
"""The connector provides a single stream called items with the following characteristics:
# RSS Items Stream Schema
{
"type": "object",
"additionalProperties": True,
"required": ["published"],
"properties": {
"title": {"type": ["null", "string"]},
"link": {"type": ["null", "string"]},
"description": {"type": ["null", "string"]},
"author": {"type": ["null", "string"]},
"category": {"type": ["null", "string"]},
"comments": {"type": ["null", "string"]},
"enclosure": {"type": ["null", "string"]},
"guid": {"type": ["null", "string"]},
"published": {"type": "string", "format": "date-time"}
}
}The connector supports both full refresh and incremental synchronization:
full_refresh, incrementaloverwrite, appendThe connector supports incremental synchronization using the published field as cursor:
published (datetime)%Y-%m-%dT%H:%M:%S%zpublished >= stream_interval['start_time']The package includes integration test support:
import pytest
# Connector acceptance test fixture
@pytest.fixture(scope="session", autouse=True)
def connector_setup():
"""
Placeholder fixture for external resources that acceptance test might require.
"""sample_config.json: Example configuration with NASA RSS feedconfigured_catalog.json: Stream catalog configurationinvalid_config.json: Invalid configuration for negative testingsample_state.json: Example state for incremental sync testingfrom source_rss import SourceRss
from airbyte_cdk.entrypoint import launch
# Initialize connector
source = SourceRss()
# Configuration for RSS feed
config = {
"url": "https://www.nasa.gov/rss/dyn/breaking_news.rss"
}
# The connector will extract RSS items with fields:
# - title, link, description
# - author, category, comments
# - enclosure, guid
# - published (ISO datetime)from source_rss.components import CustomExtractor
import requests
# Create custom extractor instance
extractor = CustomExtractor()
# Process RSS feed response
response = requests.get("https://example.com/feed.rss")
records = extractor.extract_records(response)
# Each record contains RSS item fields
for record in records:
print(f"Title: {record.get('title')}")
print(f"Published: {record.get('published')}")
print(f"Link: {record.get('link')}")# Build connector image
airbyte-ci connectors --name=source-rss build
# Run connector commands
docker run --rm airbyte/source-rss:dev spec
docker run --rm -v $(pwd)/config:/config airbyte/source-rss:dev check --config /config/config.json
docker run --rm -v $(pwd)/config:/config airbyte/source-rss:dev discover --config /config/config.json
docker run --rm -v $(pwd)/config:/config -v $(pwd)/catalog:/catalog airbyte/source-rss:dev read --config /config/config.json --catalog /catalog/catalog.jsonThe connector handles common RSS parsing scenarios:
# Core Airbyte framework
airbyte-cdk = "^0"
# RSS/Atom feed parsing
feedparser = "6.0.10"
# Timezone handling
pytz = "2022.6"# Testing framework
pytest = "*"
pytest-mock = "*"
# HTTP mocking for tests
requests-mock = "*"