The python wrapper for the GitLab REST and GraphQL APIs.
—
Continuous integration and deployment pipeline control, job management, runner administration, and artifact handling. Comprehensive coverage of GitLab's CI/CD system including pipelines, jobs, runners, variables, and deployment workflows.
class ProjectPipeline(RefreshMixin, ObjectDeleteMixin, RESTObject):
"""
CI/CD pipeline object with comprehensive lifecycle management.
Key Attributes:
- id: Pipeline ID
- iid: Internal pipeline ID
- status: Pipeline status ("created", "waiting_for_resource", "preparing", "pending", "running", "success", "failed", "canceled", "skipped", "manual", "scheduled")
- source: Pipeline trigger source ("push", "web", "trigger", "schedule", "api", "external", "pipeline", "chat", "webide", "merge_request_event", "external_pull_request_event", "parent_pipeline", "ondemand_dast_scan", "ondemand_dast_validation")
- ref: Git reference (branch/tag)
- sha: Commit SHA
- before_sha: Previous commit SHA
- tag: Tag flag
- yaml_errors: YAML validation errors
- user: User who triggered pipeline
- created_at: Creation timestamp
- updated_at: Update timestamp
- started_at: Start timestamp
- finished_at: Completion timestamp
- committed_at: Commit timestamp
- duration: Pipeline duration in seconds
- queued_duration: Queue duration in seconds
- coverage: Code coverage percentage
- web_url: Pipeline web URL
"""
def cancel(self) -> dict:
"""
Cancel the pipeline.
Returns:
Dictionary with cancellation result
Raises:
GitlabPipelineCancelError: If cancellation fails
"""
def retry(self) -> dict:
"""
Retry the pipeline.
Returns:
Dictionary with retry result
Raises:
GitlabPipelineRetryError: If retry fails
"""
def play(self, job_variables: dict | None = None) -> dict:
"""
Play/trigger manual pipeline.
Parameters:
- job_variables: Variables to pass to jobs
Returns:
Dictionary with play result
Raises:
GitlabPipelinePlayError: If play fails
"""
def delete(self) -> None:
"""Delete pipeline and its jobs."""
class ProjectPipelineManager(CRUDMixin[ProjectPipeline]):
"""Manager for project pipelines."""
def list(
self,
scope: str | None = None,
status: str | None = None,
ref: str | None = None,
sha: str | None = None,
yaml_errors: bool | None = None,
name: str | None = None,
username: str | None = None,
updated_after: str | None = None,
updated_before: str | None = None,
order_by: str | None = None,
sort: str | None = None,
source: str | None = None,
**kwargs
) -> list[ProjectPipeline]:
"""List project pipelines with filtering."""
def create(self, data: dict, **kwargs) -> ProjectPipeline:
"""
Create new pipeline.
Required fields:
- ref: Git reference (branch/tag/commit)
Optional fields:
- variables: List of pipeline variables
"""class ProjectJob(RefreshMixin, RESTObject):
"""
CI/CD job object with execution control.
Key Attributes:
- id: Job ID
- name: Job name
- stage: Pipeline stage
- status: Job status ("created", "pending", "running", "success", "failed", "canceled", "skipped", "manual")
- created_at: Creation timestamp
- started_at: Start timestamp
- finished_at: Completion timestamp
- duration: Job duration in seconds
- queued_duration: Queue duration in seconds
- user: User information
- commit: Commit information
- pipeline: Pipeline information
- web_url: Job web URL
- artifacts: Artifacts information
- artifacts_file: Artifacts file information
- artifacts_expire_at: Artifacts expiration
- tag_list: Runner tags
- runner: Runner information
- coverage: Code coverage percentage
- allow_failure: Allow failure flag
- failure_reason: Failure reason
- erased_at: Erasure timestamp
- erased_by: User who erased job
"""
def cancel(self) -> dict:
"""
Cancel the job.
Returns:
Dictionary with cancellation result
Raises:
GitlabJobCancelError: If cancellation fails
"""
def retry(self) -> dict:
"""
Retry the job.
Returns:
Dictionary with retry result
Raises:
GitlabJobRetryError: If retry fails
"""
def play(self, job_variables_attributes: list[dict] | None = None) -> dict:
"""
Play/trigger manual job.
Parameters:
- job_variables_attributes: List of job variables
Returns:
Dictionary with play result
Raises:
GitlabJobPlayError: If play fails
"""
def erase(self) -> dict:
"""
Erase job artifacts and trace.
Returns:
Dictionary with erase result
Raises:
GitlabJobEraseError: If erasure fails
"""
def keep_artifacts(self) -> dict:
"""Keep job artifacts (prevent expiration)."""
def delete_artifacts(self) -> dict:
"""Delete job artifacts."""
def trace(self) -> bytes:
"""Get job execution trace/log."""
class ProjectJobManager(RetrieveMixin[ProjectJob], ListMixin[ProjectJob]):
"""Manager for project jobs."""
def list(
self,
scope: str | None = None,
include_retried: bool | None = None,
**kwargs
) -> list[ProjectJob]:
"""
List project jobs.
Parameters:
- scope: Job scope ("created", "pending", "running", "failed", "success", "canceled", "skipped", "manual")
- include_retried: Include retried jobs
"""class Runner(SaveMixin, ObjectDeleteMixin, RESTObject):
"""
GitLab runner object with configuration management.
Key Attributes:
- id: Runner ID
- description: Runner description
- active: Active status
- paused: Paused status
- is_shared: Shared runner flag
- runner_type: Runner type ("instance_type", "group_type", "project_type")
- name: Runner name
- online: Online status
- status: Runner status
- ip_address: Runner IP address
- tag_list: Runner tags
- version: Runner version
- revision: Runner revision
- platform: Runner platform
- architecture: Runner architecture
- contacted_at: Last contact timestamp
- token: Registration token (masked)
- locked: Locked to project flag
- access_level: Access level
- maximum_timeout: Maximum job timeout
- groups: Associated groups
- projects: Associated projects
"""
def save(self) -> None:
"""Save runner configuration changes."""
def delete(self) -> None:
"""Delete runner permanently."""
class RunnerManager(CRUDMixin[Runner]):
"""Manager for GitLab runners."""
def list(
self,
scope: str | None = None,
type: str | None = None,
status: str | None = None,
paused: bool | None = None,
tag_list: list[str] | None = None,
**kwargs
) -> list[Runner]:
"""
List runners with filtering.
Parameters:
- scope: Runner scope ("active", "paused", "online", "offline")
- type: Runner type ("instance_type", "group_type", "project_type")
- status: Runner status ("online", "offline", "stale", "never_contacted")
- paused: Filter by paused status
- tag_list: Filter by runner tags
"""
class ProjectRunner(RESTObject):
"""Project-specific runner information."""
class ProjectRunnerManager(ListMixin[ProjectRunner], CreateMixin[ProjectRunner], DeleteMixin[ProjectRunner]):
"""Manager for project runners."""
def create(self, data: dict, **kwargs) -> ProjectRunner:
"""
Enable runner for project.
Required fields:
- runner_id: Runner ID to enable
"""class ProjectVariable(SaveMixin, ObjectDeleteMixin, RESTObject):
"""
Project CI/CD variable object.
Attributes:
- key: Variable name
- value: Variable value (may be masked)
- variable_type: Variable type ("env_var", "file")
- protected: Protected variable flag
- masked: Masked variable flag
- raw: Raw variable flag
- environment_scope: Environment scope
- description: Variable description
"""
def save(self) -> None:
"""Save variable changes."""
def delete(self) -> None:
"""Delete variable."""
class ProjectVariableManager(CRUDMixin[ProjectVariable]):
"""Manager for project CI/CD variables."""
def create(self, data: dict, **kwargs) -> ProjectVariable:
"""
Create new variable.
Required fields:
- key: Variable name
- value: Variable value
Optional fields:
- variable_type: Variable type ("env_var", "file")
- protected: Protect variable (accessible only in protected branches/tags)
- masked: Mask variable in logs
- raw: Treat as raw variable
- environment_scope: Environment scope pattern
- description: Variable description
"""class ProjectTrigger(SaveMixin, ObjectDeleteMixin, RESTObject):
"""
Pipeline trigger token object.
Attributes:
- id: Trigger ID
- description: Trigger description
- created_at: Creation timestamp
- last_used: Last usage timestamp
- token: Trigger token
- updated_at: Update timestamp
- owner: Token owner
"""
def save(self) -> None:
"""Save trigger changes."""
def delete(self) -> None:
"""Delete trigger."""
class ProjectTriggerManager(CRUDMixin[ProjectTrigger]):
"""Manager for pipeline triggers."""
def create(self, data: dict, **kwargs) -> ProjectTrigger:
"""
Create new trigger.
Required fields:
- description: Trigger description
"""class ProjectPipelineSchedule(SaveMixin, ObjectDeleteMixin, RESTObject):
"""
Pipeline schedule object for automated pipeline execution.
Attributes:
- id: Schedule ID
- description: Schedule description
- ref: Git reference
- cron: Cron expression
- cron_timezone: Timezone
- next_run_at: Next execution timestamp
- active: Active status
- created_at: Creation timestamp
- updated_at: Update timestamp
- owner: Schedule owner
- last_pipeline: Last pipeline information
"""
def save(self) -> None:
"""Save schedule changes."""
def delete(self) -> None:
"""Delete schedule."""
def play(self) -> dict:
"""Trigger schedule immediately."""
def take_ownership(self) -> dict:
"""Take ownership of schedule."""
class ProjectPipelineScheduleManager(CRUDMixin[ProjectPipelineSchedule]):
"""Manager for pipeline schedules."""
def create(self, data: dict, **kwargs) -> ProjectPipelineSchedule:
"""
Create new schedule.
Required fields:
- description: Schedule description
- ref: Git reference
- cron: Cron expression
Optional fields:
- cron_timezone: Timezone (default: UTC)
- active: Active status (default: true)
"""import gitlab
gl = gitlab.Gitlab("https://gitlab.example.com", private_token="your-token")
project = gl.projects.get(123)
# Pipeline management
pipelines = project.pipelines.list(
status="running",
ref="main",
order_by="updated_at"
)
# Create pipeline
pipeline_data = {
"ref": "main",
"variables": [
{"key": "DEPLOY_ENV", "value": "staging"},
{"key": "DEBUG", "value": "true"}
]
}
pipeline = project.pipelines.create(pipeline_data)
# Pipeline operations
pipeline.cancel()
pipeline.retry()
# Job management
jobs = project.jobs.list(scope="failed")
for job in jobs:
print(f"Job {job.name}: {job.status}")
if job.status == "failed":
job.retry()
# Get job trace
job = project.jobs.get(456)
trace = job.trace()
print(trace.decode('utf-8'))
# Job operations
job.cancel()
job.play()
job.erase()
job.keep_artifacts()
# Runner management
runners = gl.runners.list(scope="online", type="project_type")
# Enable runner for project
project.runners.create({"runner_id": 789})
# CI/CD Variables
variables = project.variables.list()
# Create variable
var_data = {
"key": "DATABASE_URL",
"value": "postgresql://user:pass@host/db",
"protected": True,
"masked": True,
"environment_scope": "production"
}
variable = project.variables.create(var_data)
# Update variable
variable.value = "new_value"
variable.save()
# Pipeline triggers
triggers = project.triggers.list()
# Create trigger
trigger_data = {"description": "External deployment trigger"}
trigger = project.triggers.create(trigger_data)
# Pipeline schedules
schedules = project.pipelineschedules.list()
# Create schedule
schedule_data = {
"description": "Nightly build",
"ref": "main",
"cron": "0 2 * * *", # 2 AM daily
"cron_timezone": "UTC",
"active": True
}
schedule = project.pipelineschedules.create(schedule_data)
# Schedule operations
schedule.play() # Run immediately
schedule.take_ownership()
# Pipeline artifacts
artifacts = pipeline.jobs.list()
for job in artifacts:
if job.artifacts_file:
print(f"Job {job.name} has artifacts")
job.delete_artifacts()class GitlabPipelineCancelError(GitlabCancelError):
"""Raised when pipeline cancellation fails."""
pass
class GitlabPipelineRetryError(GitlabRetryError):
"""Raised when pipeline retry fails."""
pass
class GitlabPipelinePlayError(GitlabRetryError):
"""Raised when pipeline play fails."""
pass
class GitlabJobCancelError(GitlabCancelError):
"""Raised when job cancellation fails."""
pass
class GitlabJobRetryError(GitlabRetryError):
"""Raised when job retry fails."""
pass
class GitlabJobPlayError(GitlabRetryError):
"""Raised when job play fails."""
pass
class GitlabJobEraseError(GitlabRetryError):
"""Raised when job erasure fails."""
passExample error handling:
try:
pipeline.cancel()
except gitlab.GitlabPipelineCancelError as e:
print(f"Cannot cancel pipeline: {e}")
try:
job.retry()
except gitlab.GitlabJobRetryError as e:
print(f"Job retry failed: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-python-gitlab