or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/pypi-multitasking

Non-blocking Python methods using decorators

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/multitasking@0.0.x

To install, run

npx @tessl/cli install tessl/pypi-multitasking@0.0.0

0

# MultiTasking

1

2

A lightweight Python library that enables developers to convert synchronous Python methods into asynchronous, non-blocking methods using simple decorators. It offers thread and process-based execution engines with automatic pool management, semaphore-controlled concurrency limiting, and comprehensive task monitoring capabilities.

3

4

## Package Information

5

6

- **Package Name**: multitasking

7

- **Language**: Python

8

- **Installation**: `pip install multitasking`

9

10

## Core Imports

11

12

```python

13

import multitasking

14

```

15

16

For specific functionality:

17

18

```python

19

from multitasking import task, wait_for_tasks, set_max_threads, createPool

20

```

21

22

## Basic Usage

23

24

```python

25

import multitasking

26

import time

27

28

# Convert any function to asynchronous using the @task decorator

29

@multitasking.task

30

def fetch_data(url_id):

31

# Simulate API call or I/O operation

32

time.sleep(2)

33

print(f"Fetched data from URL {url_id}")

34

return f"Data from {url_id}"

35

36

# These calls run concurrently, not sequentially

37

for i in range(5):

38

fetch_data(i)

39

40

# Wait for all tasks to complete

41

multitasking.wait_for_tasks()

42

print("All tasks completed!")

43

```

44

45

## Architecture

46

47

MultiTasking uses a pool-based architecture for managing concurrent task execution:

48

49

- **Global Configuration**: Central `config` dictionary tracks pools, tasks, and settings

50

- **Execution Pools**: Named pools with semaphore-controlled concurrency limits

51

- **Task Decorator**: `@task` decorator converts functions to async execution

52

- **Engine Selection**: Choose between threading (I/O-bound) or multiprocessing (CPU-bound)

53

- **Lifecycle Management**: Automatic task tracking and graceful shutdown capabilities

54

55

## Capabilities

56

57

### Task Decoration and Execution

58

59

Core functionality for converting synchronous functions to asynchronous tasks using the main `@task` decorator.

60

61

```python { .api }

62

def task(callee: Callable[..., Any]) -> Callable[..., Optional[Union[Thread, Process]]]:

63

"""

64

Decorator that converts a function into an asynchronous task.

65

66

Returns:

67

Decorated function that returns Thread/Process object or None

68

"""

69

```

70

71

[Task Management](./task-management.md)

72

73

### Pool Configuration and Management

74

75

Create and configure execution pools with specific thread counts and engine types for different workloads.

76

77

```python { .api }

78

def createPool(name: str = "main", threads: Optional[int] = None, engine: Optional[str] = None) -> None:

79

"""Create a new execution pool with specified configuration."""

80

81

def getPool(name: Optional[str] = None) -> Dict[str, Union[str, int]]:

82

"""Retrieve information about an execution pool."""

83

```

84

85

[Pool Management](./pool-management.md)

86

87

### Global Configuration

88

89

Configure the maximum concurrent threads and execution engine for all new pools and tasks.

90

91

```python { .api }

92

def set_max_threads(threads: Optional[int] = None) -> None:

93

"""Configure the maximum number of concurrent threads/processes."""

94

95

def set_engine(kind: str = "") -> None:

96

"""Configure the execution engine for new pools."""

97

```

98

99

[Configuration](./configuration.md)

100

101

### Task Monitoring and Control

102

103

Monitor active tasks, wait for completion, and control execution lifecycle.

104

105

```python { .api }

106

def get_active_tasks() -> List[Union[Thread, Process]]:

107

"""Retrieve only the currently running tasks."""

108

109

def wait_for_tasks(sleep: float = 0) -> bool:

110

"""Block until all background tasks complete execution."""

111

112

def killall(self: Any = None, cls: Any = None) -> None:

113

"""Emergency shutdown function that terminates the entire program."""

114

```

115

116

[Monitoring and Control](./monitoring-control.md)

117

118

## Types

119

120

```python { .api }

121

class PoolConfig(TypedDict):

122

"""Type definition for execution pool configuration."""

123

pool: Optional[Semaphore] # Controls concurrent task execution

124

engine: Union[type[Thread], type[Process]] # Execution engine

125

name: str # Human-readable pool identifier

126

threads: int # Maximum concurrent tasks (0 = unlimited)

127

128

class Config(TypedDict):

129

"""Type definition for global multitasking configuration."""

130

CPU_CORES: int # Number of CPU cores detected

131

ENGINE: str # Default engine type ("thread" or "process")

132

MAX_THREADS: int # Global maximum thread/process count

133

KILL_RECEIVED: bool # Signal to stop accepting new tasks

134

TASKS: List[Union[Thread, Process]] # All created tasks

135

POOLS: Dict[str, PoolConfig] # Named execution pools

136

POOL_NAME: str # Currently active pool name

137

138

# Global configuration instance

139

config: Config

140

```

141

142

## Constants

143

144

```python { .api }

145

__version__: str = "0.0.12"

146

```