Build custom Python data sources for Apache Spark using the PySpark DataSource API — batch and streaming readers/writers for external systems. Use this skill whenever someone wants to connect Spark to an external system (database, API, message queue, custom protocol), build a Spark connector or plugin in Python, implement a DataSourceReader or DataSourceWriter, pull data from or push data to a system via Spark, or work with the PySpark DataSource API in any way. Even if they just say "read from X in Spark" or "write DataFrame to Y" and there's no native connector, this skill applies.
95
93%
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Advisory
Suggest reviewing before use
Build custom Python data sources for Apache Spark 4.0+ to read from and write to external systems in batch and streaming modes.
You are an experienced Spark developer building custom Python data sources using the PySpark DataSource API. Follow these principles and patterns.
Each data source follows a flat, single-level inheritance structure:
DataSourceReader/DataSourceWriterDataSourceStreamReader/DataSourceStreamWriterSee implementation-template.md for the full annotated skeleton covering all four modes (batch read/write, stream read/write).
These are specific to the PySpark DataSource API and its driver/executor architecture — general Python best practices (clean code, minimal dependencies, no premature abstraction) still apply but aren't repeated here.
Flat single-level inheritance only. PySpark serializes reader/writer instances to ship them to executors. Complex inheritance hierarchies and abstract base classes break serialization and make cross-process debugging painful. Use one shared base class mixed with the PySpark interface (e.g., class YourBatchWriter(YourWriter, DataSourceWriter)).
Import third-party libraries inside executor methods. The read() and write() methods run on remote executor processes that don't share the driver's Python environment. Top-level imports from the driver won't be available on executors — always import libraries like requests or database drivers inside the methods that run on workers.
Minimize dependencies. Every package you add must be installed on all executor nodes in the cluster, not just the driver. Prefer the standard library; when external packages are needed, keep them few and well-known.
No async/await unless the external system's SDK is async-only. The PySpark DataSource API is synchronous, so async adds complexity with no benefit.
Create a Python project using a packaging tool such as uv, poetry, or hatch. Examples use uv (substitute your tool of choice):
uv init your-datasource
cd your-datasource
uv add pyspark pytest pytest-sparkyour-datasource/
├── pyproject.toml
├── src/
│ └── your_datasource/
│ ├── __init__.py
│ └── datasource.py
└── tests/
├── conftest.py
└── test_datasource.pyRun all commands through the packaging tool so they execute within the correct virtual environment:
uv run pytest # Run tests
uv run ruff check src/ # Lint
uv run ruff format src/ # Format
uv build # Build wheelPartitioning Strategy — choose based on data source characteristics:
Authentication — support multiple methods in priority order:
Type Conversion — map between Spark and external types:
Streaming Offsets — design for exactly-once semantics:
Error Handling — implement retries and resilience:
import pytest
from unittest.mock import patch, Mock
@pytest.fixture
def spark():
from pyspark.sql import SparkSession
return SparkSession.builder.master("local[2]").getOrCreate()
def test_data_source_name():
assert YourDataSource.name() == "your-format"
def test_writer_sends_data(spark):
with patch('requests.post') as mock_post:
mock_post.return_value = Mock(status_code=200)
df = spark.createDataFrame([(1, "test")], ["id", "value"])
df.write.format("your-format").option("url", "http://api").save()
assert mock_post.calledSee testing-patterns.md for unit/integration test patterns, fixtures, and running tests.
Study these for real-world patterns:
Create a Spark data source for reading from MongoDB with sharding support
Build a streaming connector for RabbitMQ with at-least-once delivery
Implement a batch writer for Snowflake with staged uploads
Write a data source for REST API with OAuth2 authentication and paginationDataSourceStreamReader or DataSourceStreamWriterb4071a0
If you maintain this skill, you can claim it as your own. Once claimed, you can manage eval scenarios, bundle related skills, attach documentation or rules, and ensure cross-agent compatibility.