or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/pypi-airbyte-source-rss

Airbyte source connector for extracting and synchronizing RSS feed data into data warehouses

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/airbyte-source-rss@1.0.x

To install, run

npx @tessl/cli install tessl/pypi-airbyte-source-rss@1.0.0

index.mddocs/

Airbyte Source RSS

An Airbyte source connector for extracting and synchronizing RSS feed data into data warehouses and other destinations. This connector uses the Airbyte CDK with a declarative YAML configuration approach to parse RSS feeds, extract structured data, and enable incremental synchronization based on publication timestamps.

Package Information

  • Package Name: airbyte-source-rss
  • Package Type: PyPI
  • Language: Python
  • Installation: pip install airbyte-source-rss (or via Poetry: poetry add airbyte-source-rss)
  • Version: 1.0.31
  • Dependencies: airbyte-cdk, feedparser, pytz

Core Imports

from source_rss import SourceRss
from source_rss.run import run
from source_rss.components import CustomExtractor

Required for launching connector:

from airbyte_cdk.entrypoint import launch

Basic Usage

Using as Airbyte Connector

from source_rss import SourceRss
from airbyte_cdk.entrypoint import launch
import sys

# Create and launch the source connector
source = SourceRss()
launch(source, sys.argv[1:])

Command Line Usage

# Get connector specification (returns JSON schema for configuration)
poetry run source-rss spec

# Test connection with config (validates RSS URL accessibility)
poetry run source-rss check --config config.json

# Discover available streams (returns items stream schema)
poetry run source-rss discover --config config.json

# Extract data (reads RSS feed items according to catalog configuration)
poetry run source-rss read --config config.json --catalog catalog.json

# Extract with state for incremental sync
poetry run source-rss read --config config.json --catalog catalog.json --state state.json

Configuration

{
  "url": "https://example.com/rss.xml"
}

Architecture

The connector follows Airbyte's declarative configuration pattern:

  • SourceRss: Main connector class extending YamlDeclarativeSource
  • CustomExtractor: RSS-specific data extraction logic
  • Manifest YAML: Declarative configuration defining streams, schema, and incremental sync
  • Poetry: Dependency management and packaging

The connector extracts RSS items with fields like title, description, author, publication date, and other metadata, supporting incremental synchronization based on publication timestamps.

Capabilities

Source Connector

Main Airbyte source connector class that provides RSS feed extraction functionality.

class SourceRss(YamlDeclarativeSource):
    """
    Declarative source connector for RSS feeds.
    
    Inherits from YamlDeclarativeSource and loads configuration from manifest.yaml.
    """
    
    def __init__(self):
        """Initialize SourceRss with manifest.yaml configuration."""

RSS Data Extraction

Custom extractor for parsing RSS feed responses and transforming them into structured records.

class CustomExtractor(RecordExtractor):
    """
    Custom RSS feed parser and record extractor.
    
    Processes RSS feed XML responses and extracts structured item data
    with timestamp-based filtering for incremental synchronization.
    Uses feedparser library for robust RSS/Atom parsing and pytz for timezone handling.
    """
    
    def extract_records(self, response: requests.Response, **kwargs) -> List[Mapping[str, Any]]:
        """
        Extract and transform RSS feed items from HTTP response.
        
        Parses RSS/Atom feed XML using feedparser, converts items to structured records,
        applies timestamp filtering for incremental sync, and handles timezone conversions.
        
        Args:
            response (requests.Response): HTTP response containing RSS feed XML
            **kwargs: Additional extraction parameters
            
        Returns:
            List[Mapping[str, Any]]: List of extracted RSS items as dictionaries
            
        Extracted Fields:
            - title (str, optional): RSS item title
            - link (str, optional): RSS item URL
            - description (str, optional): RSS item description/content  
            - author (str, optional): RSS item author
            - category (str, optional): RSS item category
            - comments (str, optional): RSS item comments URL
            - enclosure (str, optional): RSS item enclosure/attachment
            - guid (str, optional): RSS item unique identifier
            - published (str): RSS item publication date in ISO format with UTC timezone
            
        Processing:
            - Extracts items from feed.entries in reverse order (oldest first)
            - Converts published_parsed timestamps to UTC ISO format
            - Filters items based on feed-level publication date for incremental sync
            - Handles missing fields gracefully (sets to null)
        """

Entry Point Function

Main entry point function for launching the connector.

def run():
    """
    Create SourceRss instance and launch connector via airbyte_cdk.entrypoint.launch.
    
    Uses sys.argv[1:] for command line arguments processing.
    Supports standard Airbyte connector commands: spec, check, discover, read.
    """

Stream Configuration

The connector provides a single stream called items with the following characteristics:

Stream Schema

# RSS Items Stream Schema
{
    "type": "object",
    "additionalProperties": True,
    "required": ["published"],
    "properties": {
        "title": {"type": ["null", "string"]},
        "link": {"type": ["null", "string"]}, 
        "description": {"type": ["null", "string"]},
        "author": {"type": ["null", "string"]},
        "category": {"type": ["null", "string"]},
        "comments": {"type": ["null", "string"]},
        "enclosure": {"type": ["null", "string"]},
        "guid": {"type": ["null", "string"]},
        "published": {"type": "string", "format": "date-time"}
    }
}

Synchronization Modes

The connector supports both full refresh and incremental synchronization:

  • Supported Sync Modes: full_refresh, incremental
  • Destination Sync Modes: overwrite, append

Incremental Synchronization

The connector supports incremental synchronization using the published field as cursor:

  • Cursor Field: published (datetime)
  • Datetime Format: %Y-%m-%dT%H:%M:%S%z
  • Default Window: Last 23 hours from current time
  • Filtering: Records filtered by published >= stream_interval['start_time']

Integration Testing

The package includes integration test support:

import pytest

# Connector acceptance test fixture
@pytest.fixture(scope="session", autouse=True)
def connector_setup():
    """
    Placeholder fixture for external resources that acceptance test might require.
    """

Test Configuration Files

  • sample_config.json: Example configuration with NASA RSS feed
  • configured_catalog.json: Stream catalog configuration
  • invalid_config.json: Invalid configuration for negative testing
  • sample_state.json: Example state for incremental sync testing

Usage Examples

Basic RSS Feed Extraction

from source_rss import SourceRss
from airbyte_cdk.entrypoint import launch

# Initialize connector
source = SourceRss()

# Configuration for RSS feed
config = {
    "url": "https://www.nasa.gov/rss/dyn/breaking_news.rss"
}

# The connector will extract RSS items with fields:
# - title, link, description
# - author, category, comments  
# - enclosure, guid
# - published (ISO datetime)

Custom Extraction Logic

from source_rss.components import CustomExtractor
import requests

# Create custom extractor instance
extractor = CustomExtractor()

# Process RSS feed response
response = requests.get("https://example.com/feed.rss")
records = extractor.extract_records(response)

# Each record contains RSS item fields
for record in records:
    print(f"Title: {record.get('title')}")
    print(f"Published: {record.get('published')}")
    print(f"Link: {record.get('link')}")

Docker Usage

# Build connector image
airbyte-ci connectors --name=source-rss build

# Run connector commands
docker run --rm airbyte/source-rss:dev spec
docker run --rm -v $(pwd)/config:/config airbyte/source-rss:dev check --config /config/config.json
docker run --rm -v $(pwd)/config:/config airbyte/source-rss:dev discover --config /config/config.json
docker run --rm -v $(pwd)/config:/config -v $(pwd)/catalog:/catalog airbyte/source-rss:dev read --config /config/config.json --catalog /catalog/catalog.json

Error Handling

The connector handles common RSS parsing scenarios:

  • Missing Fields: Optional RSS fields default to null if not present
  • Date Parsing: Handles various RSS date formats and timezone conversions
  • Feed Parsing: Uses feedparser library for robust RSS/Atom feed parsing
  • HTTP Errors: Standard HTTP error handling via Airbyte CDK
  • Invalid XML: feedparser handles malformed RSS feeds gracefully

Dependencies

Core Dependencies

# Core Airbyte framework
airbyte-cdk = "^0"

# RSS/Atom feed parsing
feedparser = "6.0.10" 

# Timezone handling
pytz = "2022.6"

Development Dependencies

# Testing framework
pytest = "*"
pytest-mock = "*"

# HTTP mocking for tests  
requests-mock = "*"