Highly concurrent networking library
—
System for transparently replacing standard library modules with green cooperative versions. Monkey patching enables existing code to work with eventlet's cooperative threading model without modification.
Replace standard library modules globally with green cooperative versions.
def monkey_patch(os=True, select=True, socket=True, thread=True,
time=True, ssl=True, httplib=False,
subprocess=False, all=None, profile=False,
aggressive=True):
"""
Globally patch system modules to be greenthread-friendly.
Parameters:
- os: bool, patch os module (default: True)
- select: bool, patch select module (default: True)
- socket: bool, patch socket module (default: True)
- thread: bool, patch thread module (default: True)
- time: bool, patch time module (default: True)
- ssl: bool, patch ssl module (default: True)
- httplib: bool, patch http.client module (default: False)
- subprocess: bool, patch subprocess module (default: False)
- all: bool, patch all available modules if True
- profile: bool, enable profiling of patched modules
- aggressive: bool, whether to be aggressive about patching
Returns:
None
Note:
Should be called early in program execution, before importing
other modules that might use the standard library modules.
"""Import specific modules with green versions without global patching.
def import_patched(modulename, *additional_modules, **kw_additional_modules):
"""
Import a module with green versions of standard library components.
Parameters:
- modulename: str, name of module to import with patching
- *additional_modules: additional module names to patch
- **kw_additional_modules: keyword arguments for module-specific options
Returns:
The imported module with green versions
Example:
urllib2 = import_patched('urllib2')
"""Check if modules have been monkey patched.
def is_monkey_patched(module):
"""
Check if a module has been monkey patched.
Parameters:
- module: str or module object to check
Returns:
bool: True if module is patched, False otherwise
"""Direct injection of green modules for advanced use cases.
def inject(module_name, new_globals, *additional_modules, **kw_additional_modules):
"""
Base method for injecting greenified modules into other modules.
Parameters:
- module_name: str, name of module to inject into
- new_globals: dict, global variables to inject
- *additional_modules: additional modules to process
- **kw_additional_modules: keyword arguments for modules
Returns:
The modified module
"""import eventlet
# Enable monkey patching at the start of your program
# This must be done before importing other modules
eventlet.monkey_patch()
# Now you can use standard library modules that will be cooperative
import socket
import urllib.request
import threading
import time
def fetch_url(url):
"""This will use green socket operations automatically"""
try:
response = urllib.request.urlopen(url)
data = response.read()
return f"Fetched {len(data)} bytes from {url}"
except Exception as e:
return f"Error fetching {url}: {e}"
def main():
"""Concurrent URL fetching using standard library"""
urls = [
'http://example.com',
'http://httpbin.org/delay/1',
'http://httpbin.org/delay/2',
'http://httpbin.org/json'
]
# Spawn greenthreads using patched modules
greenthreads = []
for url in urls:
gt = eventlet.spawn(fetch_url, url)
greenthreads.append(gt)
# Collect results
for gt in greenthreads:
result = gt.wait()
print(result)
if __name__ == "__main__":
main()import eventlet
# Only patch specific modules
eventlet.monkey_patch(socket=True, time=True, thread=False, ssl=False)
import socket
import time
import threading # This will still be regular threading
def test_selective_patching():
"""Test that only selected modules are patched"""
# Check patching status
print(f"Socket patched: {eventlet.patcher.is_monkey_patched('socket')}")
print(f"Time patched: {eventlet.patcher.is_monkey_patched('time')}")
print(f"Threading patched: {eventlet.patcher.is_monkey_patched('threading')}")
# Socket operations will be green
sock = socket.socket()
print(f"Socket type: {type(sock)}")
# Time.sleep will yield to other greenthreads
start = time.time()
time.sleep(1)
elapsed = time.time() - start
print(f"Sleep took {elapsed:.2f} seconds")
# Threading will still create OS threads
thread = threading.Thread(target=lambda: print("OS thread"))
thread.start()
thread.join()
if __name__ == "__main__":
test_selective_patching()import eventlet
# Import specific modules with green versions
# without global monkey patching
green_urllib = eventlet.import_patched('urllib.request')
green_socket = eventlet.import_patched('socket')
def fetch_with_green_urllib(url):
"""Use specifically imported green urllib"""
try:
response = green_urllib.urlopen(url)
data = response.read()
return data
except Exception as e:
return f"Error: {e}"
def server_with_green_socket():
"""Use specifically imported green socket"""
server_sock = green_socket.socket()
server_sock.bind(('localhost', 8080))
server_sock.listen(5)
print("Server listening with green socket")
while True:
client_sock, addr = server_sock.accept()
eventlet.spawn(handle_client, client_sock, addr)
def handle_client(sock, addr):
"""Handle client connection"""
try:
data = sock.recv(1024)
sock.send(b"Echo: " + data)
finally:
sock.close()
if __name__ == "__main__":
# Can use both approaches simultaneously
eventlet.spawn(server_with_green_socket)
result = fetch_with_green_urllib('http://example.com')
print(f"Fetched: {len(result) if isinstance(result, bytes) else result}")import eventlet
# Monkey patch before importing database modules
eventlet.monkey_patch()
import psycopg2 # PostgreSQL adapter - now cooperative
import mysql.connector # MySQL adapter - now cooperative
def query_postgresql(query):
"""Query PostgreSQL database with green connection"""
try:
conn = psycopg2.connect(
host="localhost",
database="testdb",
user="user",
password="password"
)
cursor = conn.cursor()
cursor.execute(query)
results = cursor.fetchall()
cursor.close()
conn.close()
return results
except Exception as e:
return f"PostgreSQL error: {e}"
def query_mysql(query):
"""Query MySQL database with green connection"""
try:
conn = mysql.connector.connect(
host="localhost",
database="testdb",
user="user",
password="password"
)
cursor = conn.cursor()
cursor.execute(query)
results = cursor.fetchall()
cursor.close()
conn.close()
return results
except Exception as e:
return f"MySQL error: {e}"
def concurrent_database_queries():
"""Run multiple database queries concurrently"""
queries = [
"SELECT COUNT(*) FROM users",
"SELECT COUNT(*) FROM orders",
"SELECT COUNT(*) FROM products",
"SELECT AVG(price) FROM products"
]
# Run PostgreSQL queries concurrently
pg_greenthreads = []
for query in queries:
gt = eventlet.spawn(query_postgresql, query)
pg_greenthreads.append(gt)
# Run MySQL queries concurrently
mysql_greenthreads = []
for query in queries:
gt = eventlet.spawn(query_mysql, query)
mysql_greenthreads.append(gt)
# Collect results
print("PostgreSQL results:")
for gt in pg_greenthreads:
result = gt.wait()
print(f" {result}")
print("MySQL results:")
for gt in mysql_greenthreads:
result = gt.wait()
print(f" {result}")
if __name__ == "__main__":
concurrent_database_queries()import eventlet
# Enable monkey patching for web scraping
eventlet.monkey_patch()
import urllib.request
import urllib.parse
import json
import time
def scrape_url(url):
"""Scrape a single URL"""
start_time = time.time()
try:
request = urllib.request.Request(url)
request.add_header('User-Agent', 'Eventlet Scraper 1.0')
response = urllib.request.urlopen(request, timeout=10)
data = response.read()
elapsed = time.time() - start_time
return {
'url': url,
'status': response.getcode(),
'size': len(data),
'time': elapsed
}
except Exception as e:
elapsed = time.time() - start_time
return {
'url': url,
'error': str(e),
'time': elapsed
}
def concurrent_scraping():
"""Scrape multiple URLs concurrently"""
urls = [
'http://example.com',
'http://httpbin.org/delay/1',
'http://httpbin.org/delay/2',
'http://httpbin.org/json',
'http://httpbin.org/user-agent',
'http://httpbin.org/headers',
'http://httpbin.org/ip',
'http://httpbin.org/status/200'
]
print(f"Starting concurrent scraping of {len(urls)} URLs...")
start_time = time.time()
# Spawn greenthreads for each URL
greenthreads = []
for url in urls:
gt = eventlet.spawn(scrape_url, url)
greenthreads.append(gt)
# Collect results as they complete
results = []
for gt in greenthreads:
result = gt.wait()
results.append(result)
if 'error' in result:
print(f"❌ {result['url']}: {result['error']} ({result['time']:.2f}s)")
else:
print(f"✅ {result['url']}: {result['status']} - {result['size']} bytes ({result['time']:.2f}s)")
total_time = time.time() - start_time
successful = len([r for r in results if 'error' not in r])
print(f"\nCompleted {len(urls)} requests in {total_time:.2f}s")
print(f"Success rate: {successful}/{len(urls)}")
if __name__ == "__main__":
concurrent_scraping()import eventlet
# Enable monkey patching for HTTP libraries
eventlet.monkey_patch()
import urllib.request
import urllib.parse
import http.cookiejar
import json
class GreenHTTPSession:
"""HTTP session using green urllib with cookie support"""
def __init__(self):
self.cookie_jar = http.cookiejar.CookieJar()
self.opener = urllib.request.build_opener(
urllib.request.HTTPCookieProcessor(self.cookie_jar)
)
self.opener.addheaders = [('User-Agent', 'GreenHTTPSession/1.0')]
def get(self, url, headers=None):
"""Perform GET request"""
request = urllib.request.Request(url)
if headers:
for key, value in headers.items():
request.add_header(key, value)
response = self.opener.open(request)
return {
'status': response.getcode(),
'headers': dict(response.headers),
'data': response.read(),
'url': response.geturl()
}
def post(self, url, data=None, headers=None):
"""Perform POST request"""
if isinstance(data, dict):
data = urllib.parse.urlencode(data).encode('utf-8')
request = urllib.request.Request(url, data=data, method='POST')
if headers:
for key, value in headers.items():
request.add_header(key, value)
response = self.opener.open(request)
return {
'status': response.getcode(),
'headers': dict(response.headers),
'data': response.read(),
'url': response.geturl()
}
def test_http_session():
"""Test HTTP session with concurrent requests"""
session = GreenHTTPSession()
def make_requests():
"""Make multiple requests with the same session"""
try:
# Login request (sets cookies)
login_data = {'username': 'test', 'password': 'test'}
login_response = session.post('http://httpbin.org/post', login_data)
print(f"Login: {login_response['status']}")
# Authenticated requests (uses cookies)
auth_response = session.get('http://httpbin.org/cookies')
print(f"Auth check: {auth_response['status']}")
# API call
api_response = session.get(
'http://httpbin.org/json',
headers={'Accept': 'application/json'}
)
print(f"API call: {api_response['status']}")
return "Session requests completed"
except Exception as e:
return f"Session error: {e}"
# Run multiple sessions concurrently
greenthreads = []
for i in range(3):
gt = eventlet.spawn(make_requests)
greenthreads.append(gt)
# Wait for all sessions to complete
for gt in greenthreads:
result = gt.wait()
print(result)
if __name__ == "__main__":
test_http_session()import eventlet
# 1. Monkey patch FIRST, before any other imports
eventlet.monkey_patch()
# 2. Then import standard library modules
import socket
import urllib.request
import threading
import time
# 3. Then import third-party modules
import requests # Will use patched socket
import psycopg2 # Will use patched socket
# 4. Finally import your application modules
import myapp.models
import myapp.viewsimport os
import eventlet
# Only enable monkey patching in certain environments
if os.environ.get('EVENTLET_ENABLED', '').lower() == 'true':
print("Enabling eventlet monkey patching")
eventlet.monkey_patch()
else:
print("Running without eventlet")
# Rest of application can work with or without patching
import socket
import time
def main():
# Code works the same either way
sock = socket.socket()
time.sleep(1)
print("Application running")
if __name__ == "__main__":
main()import eventlet
eventlet.monkey_patch()
def verify_patching():
"""Verify that monkey patching worked correctly"""
import socket
import time
import threading
import ssl
modules_to_check = ['socket', 'time', 'threading', 'ssl']
print("Monkey patching verification:")
for module_name in modules_to_check:
is_patched = eventlet.patcher.is_monkey_patched(module_name)
status = "✅ PATCHED" if is_patched else "❌ NOT PATCHED"
print(f" {module_name}: {status}")
# Test that patched modules work
print("\nTesting patched functionality:")
# Test socket
try:
sock = socket.socket()
print(" Socket creation: ✅")
sock.close()
except Exception as e:
print(f" Socket creation: ❌ {e}")
# Test time.sleep yields to other greenthreads
try:
start = time.time()
time.sleep(0.1)
elapsed = time.time() - start
print(f" Time.sleep: ✅ ({elapsed:.3f}s)")
except Exception as e:
print(f" Time.sleep: ❌ {e}")
if __name__ == "__main__":
verify_patching()Install with Tessl CLI
npx tessl i tessl/pypi-eventlet