Airbyte source connector for extracting data from Pipedrive CRM with comprehensive stream support and incremental sync capabilities
pkg:docker/airbyte/source-pipedrive@2.4.x
npx @tessl/cli install tessl/docker-airbyte-source-pipedrive@2.4.0A manifest-only Airbyte source connector that extracts data from Pipedrive CRM using their REST API. This connector operates as a declarative configuration-based data integration tool that runs within the base source-declarative-manifest Docker image, providing comprehensive data extraction from Pipedrive including deals, activities, persons, organizations, pipelines, stages, users, and custom fields.
docker pull airbyte/source-pipedrive:2.4.0docker.io/airbyte/source-declarative-manifest:6.33.1# Get connector specification
docker run --rm airbyte/source-pipedrive:2.4.0 spec
# Test connection
docker run --rm -v $(pwd)/config:/config airbyte/source-pipedrive:2.4.0 check --config /config/config.json
# Discover available streams
docker run --rm -v $(pwd)/config:/config airbyte/source-pipedrive:2.4.0 discover --config /config/config.json
# Extract data
docker run --rm -v $(pwd)/config:/config -v $(pwd)/catalog:/catalog airbyte/source-pipedrive:2.4.0 read --config /config/config.json --catalog /catalog/configured_catalog.json{
"api_token": "your_pipedrive_api_token",
"replication_start_date": "2017-01-25 00:00:00Z"
}The connector uses Airbyte's low-code connector development kit (CDK) with a declarative YAML-based architecture. It includes:
manifest.yamlHandles connector configuration, authentication, and connection testing.
# Connection specification
spec:
type: Spec
connection_specification:
type: object
required:
- api_token
- replication_start_date
properties:
api_token:
type: string
description: The Pipedrive API Token
airbyte_secret: true
replication_start_date:
type: string
description: UTC date and time in the format 2017-01-25T00:00:00ZExtracts primary business entities from Pipedrive with incremental sync capabilities.
# Example core entity stream structure
deals:
type: DeclarativeStream
name: deals
primary_key: [id]
retriever:
type: SimpleRetriever
requester:
path: v1/recents
request_parameters:
api_token: "{{ config['api_token'] }}"
items: deal
incremental_sync:
type: DatetimeBasedCursor
cursor_field: update_timeSupported entities: deals, persons, activities, notes, files, products, leads
Extracts Pipedrive configuration metadata and custom field definitions.
# Example metadata stream structure
deal_fields:
type: DeclarativeStream
name: deal_fields
retriever:
type: SimpleRetriever
requester:
path: v1/dealFieldsIncludes: deal_fields, organization_fields, person_fields, activity_fields, activity_types, product_fields, pipelines, stages, users, roles, permission_sets, currencies, lead_labels, goals, filters
Metadata and Configuration Streams
Handles relationships between entities and communication data.
# Example relationship stream
deal_products:
type: DeclarativeStream
name: deal_products
retriever:
type: SimpleRetriever
requester:
path: v1/deals/{parent_id}/products
partition_router:
type: SubstreamPartitionRouter
parent_stream_configs:
- stream: dealsIncludes: deal_products, mail, mailThreads
Relationship and Communication Streams
Python components for handling Pipedrive's API inconsistencies.
@dataclass
class NullCheckedDpathExtractor(RecordExtractor):
"""Custom extractor for Pipedrive's inconsistent API responses."""
field_path: List[Union[InterpolatedString, str]]
nullable_nested_field: Union[InterpolatedString, str]
config: Config
def extract_records(self, response: requests.Response) -> List[Mapping[str, Any]]:
"""Extract records, handling null data fields."""# Base stream configuration
DeclarativeStream:
type: object
properties:
type:
const: DeclarativeStream
name: string
primary_key: array
retriever: SimpleRetriever
incremental_sync: DatetimeBasedCursor
schema_loader: InlineSchemaLoader
# Request configuration
SimpleRetriever:
type: object
properties:
type:
const: SimpleRetriever
requester: HttpRequester
record_selector: RecordSelector
paginator: DefaultPaginator
# Authentication and pagination
HttpRequester:
type: object
properties:
url_base: string
path: string
http_method: string
request_parameters: object
# Incremental sync configuration
DatetimeBasedCursor:
type: object
properties:
type:
const: DatetimeBasedCursor
cursor_field: string
cursor_datetime_formats: array
datetime_format: string
start_datetime: MinMaxDatetime