Airbyte source connector for extracting data from Microsoft OneDrive cloud storage with OAuth authentication and file-based streaming capabilities.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
File discovery, enumeration, and reading capabilities for Microsoft OneDrive files including support for nested folder structures, glob pattern matching, shared items access, and efficient streaming with metadata extraction.
Primary class for handling file operations across OneDrive drives and shared items with lazy initialization and caching.
class SourceMicrosoftOneDriveStreamReader(AbstractFileBasedStreamReader):
ROOT_PATH: List[str] = [".", "/"]
def __init__(self):
"""Initialize the stream reader with lazy-loaded clients."""
@property
def config(self) -> SourceMicrosoftOneDriveSpec:
"""Get the current configuration."""
@config.setter
def config(self, value: SourceMicrosoftOneDriveSpec):
"""
Set configuration with type validation.
Parameters:
- value: SourceMicrosoftOneDriveSpec - Must be valid configuration spec
"""
@property
def auth_client(self):
"""Lazy initialization of the authentication client."""
@property
def one_drive_client(self):
"""Lazy initialization of the Microsoft Graph client."""
def get_access_token(self):
"""Directly fetch a new access token from the auth_client."""
@property
def drives(self):
"""
Retrieves and caches OneDrive drives, including the user's drive.
Filters to only personal and business drive types.
Returns:
List of OneDrive drive objects accessible to authenticated user
"""Methods for discovering and filtering files across different OneDrive locations.
def get_matching_files(
self,
globs: List[str],
prefix: Optional[str],
logger: logging.Logger
) -> Iterable[RemoteFile]:
"""
Retrieve all files matching the specified glob patterns in OneDrive.
Handles the special case where the drive might be empty by catching StopIteration.
Parameters:
- globs: List[str] - Glob patterns to match files against
- prefix: Optional[str] - Optional prefix filter (not used in OneDrive implementation)
- logger: logging.Logger - Logger for operation tracking
Returns:
Iterable[RemoteFile]: Iterator of MicrosoftOneDriveRemoteFile objects
Raises:
- AirbyteTracedException: If drive is empty or does not exist
Implementation:
Uses a special approach to handle empty drives by checking for StopIteration
from the files generator and yielding files in two phases.
"""
def get_all_files(self):
"""
Generator yielding all accessible files based on search scope configuration.
Handles both accessible drives and shared items based on search_scope setting.
Yields:
Tuple[str, str, datetime]: File path, download URL, and last modified time
"""
def get_files_by_drive_name(self, drive_name: str, folder_path: str):
"""
Yields files from the specified drive and folder path.
Parameters:
- drive_name: str - Name of the OneDrive drive to search
- folder_path: str - Path within the drive to search
Yields:
Tuple[str, str, str]: File path, download URL, and last modified datetime string
"""Methods for opening and reading OneDrive files with proper encoding support.
def open_file(
self,
file: RemoteFile,
mode: FileReadMode,
encoding: Optional[str],
logger: logging.Logger
) -> IOBase:
"""
Open a OneDrive file for reading using smart-open.
Parameters:
- file: RemoteFile - File object with download URL
- mode: FileReadMode - File reading mode (typically READ)
- encoding: Optional[str] - Text encoding (e.g., 'utf-8', 'latin-1')
- logger: logging.Logger - Logger for error tracking
Returns:
IOBase: Opened file-like object for reading
Raises:
- Exception: If file cannot be opened or accessed
"""Methods for recursive directory traversal and file enumeration.
def list_directories_and_files(self, root_folder, path: Optional[str] = None):
"""
Enumerates folders and files starting from a root folder recursively.
Parameters:
- root_folder: OneDrive folder object to start enumeration from
- path: Optional[str] - Current path for building full file paths
Returns:
List[Tuple[str, str, str]]: List of (file_path, download_url, last_modified)
"""Methods for accessing files shared with the authenticated user.
def _get_shared_files_from_all_drives(self, parsed_drive_id: str):
"""
Get files from shared items across all drives.
Parameters:
- parsed_drive_id: str - Drive ID to exclude from results to avoid duplicates
Yields:
Tuple[str, str, datetime]: File path, download URL, and last modified time
"""
def _get_shared_drive_object(self, drive_id: str, object_id: str, path: str) -> List[Tuple[str, str, datetime]]:
"""
Retrieves a list of all nested files under the specified shared object.
Parameters:
- drive_id: str - The ID of the drive containing the object
- object_id: str - The ID of the object to start the search from
- path: str - Base path for building file paths
Returns:
List[Tuple[str, str, datetime]]: File information tuples
Raises:
- RuntimeError: If an error occurs during the Microsoft Graph API request
"""File representation with OneDrive-specific attributes.
class MicrosoftOneDriveRemoteFile(RemoteFile):
download_url: str
"""Direct download URL from Microsoft Graph API for file content access."""from source_microsoft_onedrive.stream_reader import SourceMicrosoftOneDriveStreamReader
from source_microsoft_onedrive.spec import SourceMicrosoftOneDriveSpec
import logging
# Configure stream reader
config = SourceMicrosoftOneDriveSpec(**{
"credentials": {
"auth_type": "Client",
"tenant_id": "your-tenant-id",
"client_id": "your-client-id",
"client_secret": "your-client-secret",
"refresh_token": "your-refresh-token"
},
"drive_name": "OneDrive",
"search_scope": "ACCESSIBLE_DRIVES",
"folder_path": "Documents"
})
reader = SourceMicrosoftOneDriveStreamReader()
reader.config = config
# Get files matching patterns
logger = logging.getLogger(__name__)
files = reader.get_matching_files(["*.pdf", "*.docx"], None, logger)
for file in files:
print(f"File: {file.uri}, Modified: {file.last_modified}")from airbyte_cdk.sources.file_based.file_based_stream_reader import FileReadMode
# Open and read a file
for file in files:
with reader.open_file(file, FileReadMode.READ, "utf-8", logger) as f:
content = f.read()
print(f"Content length: {len(content)}")# Get all files based on search scope
all_files = reader.get_all_files()
for file_path, download_url, last_modified in all_files:
print(f"Path: {file_path}")
print(f"URL: {download_url}")
print(f"Modified: {last_modified}")
print("---")from airbyte_cdk import AirbyteTracedException
try:
files = reader.get_matching_files(["*.txt"], None, logger)
file_list = list(files) # Convert iterator to list
print(f"Found {len(file_list)} files")
except AirbyteTracedException as e:
if "empty or does not exist" in e.message:
print("Drive is empty or inaccessible")
else:
print(f"Error: {e.message}")# Access available drives
drives = reader.drives
for drive in drives:
print(f"Drive: {drive.name}, Type: {drive.drive_type}, ID: {drive.id}")Each discovered file includes:
File operations include comprehensive error handling:
Install with Tessl CLI
npx tessl i tessl/pypi-source-microsoft-onedrive