or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/pypi-airbyte-source-rss

Airbyte source connector for extracting and synchronizing RSS feed data into data warehouses

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/airbyte-source-rss@1.0.x

To install, run

npx @tessl/cli install tessl/pypi-airbyte-source-rss@1.0.0

0

# Airbyte Source RSS

1

2

An Airbyte source connector for extracting and synchronizing RSS feed data into data warehouses and other destinations. This connector uses the Airbyte CDK with a declarative YAML configuration approach to parse RSS feeds, extract structured data, and enable incremental synchronization based on publication timestamps.

3

4

## Package Information

5

6

- **Package Name**: airbyte-source-rss

7

- **Package Type**: PyPI

8

- **Language**: Python

9

- **Installation**: `pip install airbyte-source-rss` (or via Poetry: `poetry add airbyte-source-rss`)

10

- **Version**: 1.0.31

11

- **Dependencies**: airbyte-cdk, feedparser, pytz

12

13

## Core Imports

14

15

```python

16

from source_rss import SourceRss

17

from source_rss.run import run

18

from source_rss.components import CustomExtractor

19

```

20

21

Required for launching connector:

22

23

```python

24

from airbyte_cdk.entrypoint import launch

25

```

26

27

## Basic Usage

28

29

### Using as Airbyte Connector

30

31

```python

32

from source_rss import SourceRss

33

from airbyte_cdk.entrypoint import launch

34

import sys

35

36

# Create and launch the source connector

37

source = SourceRss()

38

launch(source, sys.argv[1:])

39

```

40

41

### Command Line Usage

42

43

```bash

44

# Get connector specification (returns JSON schema for configuration)

45

poetry run source-rss spec

46

47

# Test connection with config (validates RSS URL accessibility)

48

poetry run source-rss check --config config.json

49

50

# Discover available streams (returns items stream schema)

51

poetry run source-rss discover --config config.json

52

53

# Extract data (reads RSS feed items according to catalog configuration)

54

poetry run source-rss read --config config.json --catalog catalog.json

55

56

# Extract with state for incremental sync

57

poetry run source-rss read --config config.json --catalog catalog.json --state state.json

58

```

59

60

### Configuration

61

62

```json

63

{

64

"url": "https://example.com/rss.xml"

65

}

66

```

67

68

## Architecture

69

70

The connector follows Airbyte's declarative configuration pattern:

71

72

- **SourceRss**: Main connector class extending YamlDeclarativeSource

73

- **CustomExtractor**: RSS-specific data extraction logic

74

- **Manifest YAML**: Declarative configuration defining streams, schema, and incremental sync

75

- **Poetry**: Dependency management and packaging

76

77

The connector extracts RSS items with fields like title, description, author, publication date, and other metadata, supporting incremental synchronization based on publication timestamps.

78

79

## Capabilities

80

81

### Source Connector

82

83

Main Airbyte source connector class that provides RSS feed extraction functionality.

84

85

```python { .api }

86

class SourceRss(YamlDeclarativeSource):

87

"""

88

Declarative source connector for RSS feeds.

89

90

Inherits from YamlDeclarativeSource and loads configuration from manifest.yaml.

91

"""

92

93

def __init__(self):

94

"""Initialize SourceRss with manifest.yaml configuration."""

95

```

96

97

### RSS Data Extraction

98

99

Custom extractor for parsing RSS feed responses and transforming them into structured records.

100

101

```python { .api }

102

class CustomExtractor(RecordExtractor):

103

"""

104

Custom RSS feed parser and record extractor.

105

106

Processes RSS feed XML responses and extracts structured item data

107

with timestamp-based filtering for incremental synchronization.

108

Uses feedparser library for robust RSS/Atom parsing and pytz for timezone handling.

109

"""

110

111

def extract_records(self, response: requests.Response, **kwargs) -> List[Mapping[str, Any]]:

112

"""

113

Extract and transform RSS feed items from HTTP response.

114

115

Parses RSS/Atom feed XML using feedparser, converts items to structured records,

116

applies timestamp filtering for incremental sync, and handles timezone conversions.

117

118

Args:

119

response (requests.Response): HTTP response containing RSS feed XML

120

**kwargs: Additional extraction parameters

121

122

Returns:

123

List[Mapping[str, Any]]: List of extracted RSS items as dictionaries

124

125

Extracted Fields:

126

- title (str, optional): RSS item title

127

- link (str, optional): RSS item URL

128

- description (str, optional): RSS item description/content

129

- author (str, optional): RSS item author

130

- category (str, optional): RSS item category

131

- comments (str, optional): RSS item comments URL

132

- enclosure (str, optional): RSS item enclosure/attachment

133

- guid (str, optional): RSS item unique identifier

134

- published (str): RSS item publication date in ISO format with UTC timezone

135

136

Processing:

137

- Extracts items from feed.entries in reverse order (oldest first)

138

- Converts published_parsed timestamps to UTC ISO format

139

- Filters items based on feed-level publication date for incremental sync

140

- Handles missing fields gracefully (sets to null)

141

"""

142

```

143

144

### Entry Point Function

145

146

Main entry point function for launching the connector.

147

148

```python { .api }

149

def run():

150

"""

151

Create SourceRss instance and launch connector via airbyte_cdk.entrypoint.launch.

152

153

Uses sys.argv[1:] for command line arguments processing.

154

Supports standard Airbyte connector commands: spec, check, discover, read.

155

"""

156

```

157

158

## Stream Configuration

159

160

The connector provides a single stream called `items` with the following characteristics:

161

162

### Stream Schema

163

164

```python { .api }

165

# RSS Items Stream Schema

166

{

167

"type": "object",

168

"additionalProperties": True,

169

"required": ["published"],

170

"properties": {

171

"title": {"type": ["null", "string"]},

172

"link": {"type": ["null", "string"]},

173

"description": {"type": ["null", "string"]},

174

"author": {"type": ["null", "string"]},

175

"category": {"type": ["null", "string"]},

176

"comments": {"type": ["null", "string"]},

177

"enclosure": {"type": ["null", "string"]},

178

"guid": {"type": ["null", "string"]},

179

"published": {"type": "string", "format": "date-time"}

180

}

181

}

182

```

183

184

### Synchronization Modes

185

186

The connector supports both full refresh and incremental synchronization:

187

188

- **Supported Sync Modes**: `full_refresh`, `incremental`

189

- **Destination Sync Modes**: `overwrite`, `append`

190

191

### Incremental Synchronization

192

193

The connector supports incremental synchronization using the `published` field as cursor:

194

195

- **Cursor Field**: `published` (datetime)

196

- **Datetime Format**: `%Y-%m-%dT%H:%M:%S%z`

197

- **Default Window**: Last 23 hours from current time

198

- **Filtering**: Records filtered by `published >= stream_interval['start_time']`

199

200

## Integration Testing

201

202

The package includes integration test support:

203

204

```python { .api }

205

import pytest

206

207

# Connector acceptance test fixture

208

@pytest.fixture(scope="session", autouse=True)

209

def connector_setup():

210

"""

211

Placeholder fixture for external resources that acceptance test might require.

212

"""

213

```

214

215

### Test Configuration Files

216

217

- `sample_config.json`: Example configuration with NASA RSS feed

218

- `configured_catalog.json`: Stream catalog configuration

219

- `invalid_config.json`: Invalid configuration for negative testing

220

- `sample_state.json`: Example state for incremental sync testing

221

222

## Usage Examples

223

224

### Basic RSS Feed Extraction

225

226

```python

227

from source_rss import SourceRss

228

from airbyte_cdk.entrypoint import launch

229

230

# Initialize connector

231

source = SourceRss()

232

233

# Configuration for RSS feed

234

config = {

235

"url": "https://www.nasa.gov/rss/dyn/breaking_news.rss"

236

}

237

238

# The connector will extract RSS items with fields:

239

# - title, link, description

240

# - author, category, comments

241

# - enclosure, guid

242

# - published (ISO datetime)

243

```

244

245

### Custom Extraction Logic

246

247

```python

248

from source_rss.components import CustomExtractor

249

import requests

250

251

# Create custom extractor instance

252

extractor = CustomExtractor()

253

254

# Process RSS feed response

255

response = requests.get("https://example.com/feed.rss")

256

records = extractor.extract_records(response)

257

258

# Each record contains RSS item fields

259

for record in records:

260

print(f"Title: {record.get('title')}")

261

print(f"Published: {record.get('published')}")

262

print(f"Link: {record.get('link')}")

263

```

264

265

### Docker Usage

266

267

```bash

268

# Build connector image

269

airbyte-ci connectors --name=source-rss build

270

271

# Run connector commands

272

docker run --rm airbyte/source-rss:dev spec

273

docker run --rm -v $(pwd)/config:/config airbyte/source-rss:dev check --config /config/config.json

274

docker run --rm -v $(pwd)/config:/config airbyte/source-rss:dev discover --config /config/config.json

275

docker run --rm -v $(pwd)/config:/config -v $(pwd)/catalog:/catalog airbyte/source-rss:dev read --config /config/config.json --catalog /catalog/catalog.json

276

```

277

278

## Error Handling

279

280

The connector handles common RSS parsing scenarios:

281

282

- **Missing Fields**: Optional RSS fields default to null if not present

283

- **Date Parsing**: Handles various RSS date formats and timezone conversions

284

- **Feed Parsing**: Uses feedparser library for robust RSS/Atom feed parsing

285

- **HTTP Errors**: Standard HTTP error handling via Airbyte CDK

286

- **Invalid XML**: feedparser handles malformed RSS feeds gracefully

287

288

## Dependencies

289

290

### Core Dependencies

291

292

```python

293

# Core Airbyte framework

294

airbyte-cdk = "^0"

295

296

# RSS/Atom feed parsing

297

feedparser = "6.0.10"

298

299

# Timezone handling

300

pytz = "2022.6"

301

```

302

303

### Development Dependencies

304

305

```python

306

# Testing framework

307

pytest = "*"

308

pytest-mock = "*"

309

310

# HTTP mocking for tests

311

requests-mock = "*"

312

```