Highly concurrent networking library
—
Cooperative versions of Python standard library modules that work seamlessly with eventlet's green threading model. The eventlet.green package provides drop-in replacements for standard library modules.
Essential modules that provide the foundation for cooperative I/O operations.
# Socket operations
import eventlet.green.socket as socket
# SSL/TLS support
import eventlet.green.ssl as ssl
# I/O multiplexing
import eventlet.green.select as select
# Threading primitives
import eventlet.green.threading as threading
# Time operations
import eventlet.green.time as time
# OS operations
import eventlet.green.os as os
# Process management
import eventlet.green.subprocess as subprocessGreen versions of HTTP client and server modules for web applications.
# HTTP client operations
import eventlet.green.http.client as http_client
# HTTP server operations
import eventlet.green.http.server as http_server
# HTTP cookie handling
import eventlet.green.http.cookies as http_cookies
# HTTP cookie jar
import eventlet.green.http.cookiejar as http_cookiejarGreen versions of URL parsing and request handling modules.
# URL parsing utilities
import eventlet.green.urllib.parse as urllib_parse
# URL request handling
import eventlet.green.urllib.request as urllib_request
# URL error handling
import eventlet.green.urllib.error as urllib_error
# Base urllib functionality
import eventlet.green.urllib as urllibGreen versions of queue and threading data structures.
# Queue operations (thread-safe)
import eventlet.green.Queue as QueueOther standard library modules with cooperative versions.
# File I/O operations
import eventlet.green.io as io
# Regular expressions
import eventlet.green.re as re
# Database interfaces
import eventlet.green.dbm as dbm
# Compression utilities
import eventlet.green.gzip as gzip
# Archive utilities
import eventlet.green.tarfile as tarfile
import eventlet.green.zipfile as zipfileimport eventlet
from eventlet.green import socket
import time
def tcp_server():
"""TCP server using green socket"""
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_sock.bind(('localhost', 8080))
server_sock.listen(5)
print("Green TCP server listening on localhost:8080")
while True:
client_sock, addr = server_sock.accept() # Cooperative accept
eventlet.spawn(handle_tcp_client, client_sock, addr)
def handle_tcp_client(sock, addr):
"""Handle TCP client connection"""
print(f"Client connected: {addr}")
try:
while True:
data = sock.recv(1024) # Cooperative recv
if not data:
break
sock.send(data) # Echo back - cooperative send
except Exception as e:
print(f"Client error: {e}")
finally:
sock.close()
print(f"Client disconnected: {addr}")
def tcp_client():
"""TCP client using green socket"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect(('localhost', 8080)) # Cooperative connect
for i in range(5):
message = f"Message {i}"
sock.send(message.encode()) # Cooperative send
response = sock.recv(1024) # Cooperative recv
print(f"Sent: {message}, Received: {response.decode()}")
time.sleep(1) # Cooperative sleep
except Exception as e:
print(f"Client error: {e}")
finally:
sock.close()
if __name__ == "__main__":
# Start server
eventlet.spawn(tcp_server)
# Wait a moment for server to start
eventlet.sleep(0.5)
# Start multiple clients concurrently
for i in range(3):
eventlet.spawn(tcp_client)
# Keep the program running
eventlet.sleep(10)import eventlet
from eventlet.green import urllib.request, urllib.parse, urllib.error
from eventlet.green.http import client as http_client
import json
def fetch_with_urllib(url):
"""Fetch URL using green urllib"""
try:
request = urllib.request.Request(url)
request.add_header('User-Agent', 'Eventlet Green Client')
# This will be cooperative I/O
response = urllib.request.urlopen(request, timeout=10)
data = response.read()
return {
'url': url,
'status': response.getcode(),
'headers': dict(response.headers),
'data': data.decode('utf-8')[:200] # First 200 chars
}
except urllib.error.URLError as e:
return {'url': url, 'error': str(e)}
def fetch_with_http_client(host, path):
"""Fetch using green HTTP client directly"""
try:
# Create green HTTP connection
conn = http_client.HTTPConnection(host, timeout=10)
# Make request - cooperative I/O
conn.request('GET', path, headers={
'User-Agent': 'Eventlet HTTP Client',
'Accept': 'application/json'
})
# Get response - cooperative I/O
response = conn.getresponse()
data = response.read()
conn.close()
return {
'host': host,
'path': path,
'status': response.status,
'reason': response.reason,
'headers': dict(response.getheaders()),
'data': data.decode('utf-8')
}
except Exception as e:
return {'host': host, 'path': path, 'error': str(e)}
def concurrent_http_requests():
"""Make multiple HTTP requests concurrently"""
# URLs for urllib testing
urllib_urls = [
'http://httpbin.org/json',
'http://httpbin.org/user-agent',
'http://httpbin.org/headers',
'http://example.com'
]
# Host/path pairs for HTTP client testing
http_requests = [
('httpbin.org', '/ip'),
('httpbin.org', '/delay/1'),
('httpbin.org', '/status/200'),
('example.com', '/')
]
print("Starting concurrent HTTP requests...")
# Spawn urllib requests
urllib_gts = []
for url in urllib_urls:
gt = eventlet.spawn(fetch_with_urllib, url)
urllib_gts.append(gt)
# Spawn HTTP client requests
http_gts = []
for host, path in http_requests:
gt = eventlet.spawn(fetch_with_http_client, host, path)
http_gts.append(gt)
# Collect urllib results
print("\nUrllib results:")
for gt in urllib_gts:
result = gt.wait()
if 'error' in result:
print(f"❌ {result['url']}: {result['error']}")
else:
print(f"✅ {result['url']}: {result['status']}")
# Collect HTTP client results
print("\nHTTP client results:")
for gt in http_gts:
result = gt.wait()
if 'error' in result:
print(f"❌ {result['host']}{result['path']}: {result['error']}")
else:
print(f"✅ {result['host']}{result['path']}: {result['status']}")
if __name__ == "__main__":
concurrent_http_requests()import eventlet
from eventlet.green import socket, ssl
import time
def ssl_server():
"""SSL server using green SSL"""
# Create server socket
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_sock.bind(('localhost', 8443))
server_sock.listen(5)
# Create SSL context
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
context.load_cert_chain('server.crt', 'server.key') # Requires cert files
print("Green SSL server listening on localhost:8443")
while True:
client_sock, addr = server_sock.accept() # Cooperative accept
try:
# Wrap with SSL - cooperative handshake
ssl_sock = context.wrap_socket(client_sock, server_side=True)
eventlet.spawn(handle_ssl_client, ssl_sock, addr)
except ssl.SSLError as e:
print(f"SSL handshake failed for {addr}: {e}")
client_sock.close()
def handle_ssl_client(ssl_sock, addr):
"""Handle SSL client connection"""
print(f"SSL client connected: {addr}")
try:
# Get client certificate info
peer_cert = ssl_sock.getpeercert()
cipher = ssl_sock.cipher()
print(f"Client {addr} - Cipher: {cipher[0] if cipher else 'None'}")
while True:
# Cooperative SSL read
data = ssl_sock.recv(1024)
if not data:
break
# Cooperative SSL write
ssl_sock.send(b"SSL Echo: " + data)
except Exception as e:
print(f"SSL client error: {e}")
finally:
ssl_sock.close()
print(f"SSL client disconnected: {addr}")
def ssl_client():
"""SSL client using green SSL"""
try:
# Create socket and SSL context
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
context = ssl.create_default_context()
context.check_hostname = False # For self-signed certs
context.verify_mode = ssl.CERT_NONE # For testing
# Connect and wrap with SSL
sock.connect(('localhost', 8443)) # Cooperative connect
ssl_sock = context.wrap_socket(sock) # Cooperative handshake
print(f"SSL connected - Cipher: {ssl_sock.cipher()[0]}")
# Send messages
for i in range(3):
message = f"Secure message {i}"
ssl_sock.send(message.encode()) # Cooperative SSL send
response = ssl_sock.recv(1024) # Cooperative SSL recv
print(f"Sent: {message}, Received: {response.decode()}")
eventlet.sleep(1)
except Exception as e:
print(f"SSL client error: {e}")
finally:
ssl_sock.close()
# Note: This example requires SSL certificate files
# Generate test certificates with:
# openssl req -x509 -newkey rsa:4096 -keyout server.key -out server.crt -days 365 -nodesimport eventlet
from eventlet.green import threading, time
from eventlet.green import Queue
def green_threading_example():
"""Example using green threading primitives"""
# Green threading objects work with eventlet
lock = threading.Lock()
condition = threading.Condition(lock)
shared_data = {'counter': 0, 'items': []}
def producer(name, count):
"""Producer that adds items"""
for i in range(count):
with condition:
shared_data['counter'] += 1
shared_data['items'].append(f"{name}-item-{i}")
print(f"Producer {name} added item {i} (total: {shared_data['counter']})")
condition.notify_all() # Wake up consumers
time.sleep(0.1) # Cooperative sleep
def consumer(name):
"""Consumer that processes items"""
processed = 0
while processed < 5: # Process 5 items
with condition:
while not shared_data['items']:
print(f"Consumer {name} waiting for items...")
condition.wait() # Cooperative wait
if shared_data['items']:
item = shared_data['items'].pop(0)
shared_data['counter'] -= 1
processed += 1
print(f"Consumer {name} processed {item} (remaining: {shared_data['counter']})")
time.sleep(0.2) # Simulate processing time
print(f"Consumer {name} finished")
# Start producers and consumers
eventlet.spawn(producer, "P1", 3)
eventlet.spawn(producer, "P2", 4)
eventlet.spawn(consumer, "C1")
eventlet.spawn(consumer, "C2")
# Wait for completion
eventlet.sleep(5)
def green_queue_example():
"""Example using green Queue"""
q = Queue.Queue(maxsize=5) # Bounded queue
def producer(name, items):
"""Add items to queue"""
for item in items:
print(f"Producer {name} adding {item}")
q.put(item) # Cooperative put (may block if queue full)
time.sleep(0.1)
print(f"Producer {name} finished")
def consumer(name):
"""Get items from queue"""
items_processed = 0
while items_processed < 5:
try:
# Cooperative get with timeout
item = q.get(timeout=2.0)
print(f"Consumer {name} got {item}")
q.task_done() # Mark task as done
items_processed += 1
time.sleep(0.2) # Simulate processing
except Queue.Empty:
print(f"Consumer {name} timed out waiting")
break
print(f"Consumer {name} finished")
# Start producers
eventlet.spawn(producer, "P1", ["apple", "banana", "cherry"])
eventlet.spawn(producer, "P2", ["orange", "grape", "kiwi"])
# Start consumers
eventlet.spawn(consumer, "C1")
eventlet.spawn(consumer, "C2")
# Wait for all tasks to complete
eventlet.spawn(lambda: q.join()) # Wait for all items processed
eventlet.sleep(5)
if __name__ == "__main__":
print("=== Green Threading Example ===")
green_threading_example()
print("\n=== Green Queue Example ===")
green_queue_example()import eventlet
from eventlet.green import subprocess
import os
def run_green_subprocess():
"""Run subprocesses cooperatively"""
def run_command(name, command):
"""Run a command and capture output"""
try:
print(f"Starting {name}: {' '.join(command)}")
# Create subprocess - cooperative
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# Wait for completion - cooperative
stdout, stderr = process.communicate()
return {
'name': name,
'command': command,
'returncode': process.returncode,
'stdout': stdout.strip(),
'stderr': stderr.strip()
}
except Exception as e:
return {
'name': name,
'command': command,
'error': str(e)
}
# Commands to run concurrently
commands = [
("list_files", ["ls", "-la"]),
("current_date", ["date"]),
("system_info", ["uname", "-a"]),
("disk_usage", ["df", "-h"]),
("memory_info", ["free", "-h"])
]
print("Running multiple commands concurrently...")
# Start all commands concurrently
greenthreads = []
for name, command in commands:
gt = eventlet.spawn(run_command, name, command)
greenthreads.append(gt)
# Collect results
results = []
for gt in greenthreads:
result = gt.wait()
results.append(result)
# Display results
print("\nCommand Results:")
for result in results:
print(f"\n--- {result['name']} ---")
if 'error' in result:
print(f"Error: {result['error']}")
else:
print(f"Return code: {result['returncode']}")
if result['stdout']:
print(f"Output:\n{result['stdout']}")
if result['stderr']:
print(f"Error output:\n{result['stderr']}")
def interactive_subprocess():
"""Example of interactive subprocess communication"""
def chat_with_process():
"""Interactive communication with subprocess"""
try:
# Start a Python subprocess for interaction
process = subprocess.Popen(
['python3', '-c', '''
import sys
while True:
try:
line = input()
if line == "quit":
break
print(f"Echo: {line}")
sys.stdout.flush()
except EOFError:
break
'''],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=0 # Unbuffered
)
# Send messages and read responses
messages = ["Hello", "World", "How are you?", "quit"]
for message in messages:
print(f"Sending: {message}")
process.stdin.write(message + "\n")
process.stdin.flush()
if message == "quit":
break
# Read response - cooperative
response = process.stdout.readline().strip()
print(f"Received: {response}")
eventlet.sleep(0.5)
# Wait for process to finish
process.wait()
print("Interactive subprocess finished")
except Exception as e:
print(f"Interactive subprocess error: {e}")
eventlet.spawn(chat_with_process)
eventlet.sleep(5)
if __name__ == "__main__":
print("=== Green Subprocess Example ===")
run_green_subprocess()
print("\n=== Interactive Subprocess Example ===")
interactive_subprocess()import eventlet
from eventlet.green import io, gzip, tarfile
import tempfile
import os
def green_file_operations():
"""File I/O operations using green modules"""
def read_large_file(filename):
"""Read a large file cooperatively"""
try:
with io.open(filename, 'r') as f:
line_count = 0
char_count = 0
# Read line by line - cooperative
for line in f:
line_count += 1
char_count += len(line)
# Yield occasionally for other greenthreads
if line_count % 1000 == 0:
eventlet.sleep(0)
return {
'filename': filename,
'lines': line_count,
'characters': char_count
}
except Exception as e:
return {'filename': filename, 'error': str(e)}
def compress_file(input_file, output_file):
"""Compress a file using green gzip"""
try:
with io.open(input_file, 'rb') as f_in:
with gzip.open(output_file, 'wb') as f_out:
# Copy data in chunks - cooperative
while True:
chunk = f_in.read(8192)
if not chunk:
break
f_out.write(chunk)
eventlet.sleep(0) # Yield for other greenthreads
# Get compression ratio
orig_size = os.path.getsize(input_file)
comp_size = os.path.getsize(output_file)
ratio = (orig_size - comp_size) / orig_size * 100
return {
'input_file': input_file,
'output_file': output_file,
'original_size': orig_size,
'compressed_size': comp_size,
'compression_ratio': ratio
}
except Exception as e:
return {'input_file': input_file, 'error': str(e)}
# Create test files
test_files = []
for i in range(3):
fd, filepath = tempfile.mkstemp(suffix=f'_test_{i}.txt')
with os.fdopen(fd, 'w') as f:
# Write test data
for j in range(1000):
f.write(f"Line {j} in file {i} - some test data here\n")
test_files.append(filepath)
try:
print("Processing files concurrently...")
# Read files concurrently
read_gts = []
for filepath in test_files:
gt = eventlet.spawn(read_large_file, filepath)
read_gts.append(gt)
# Compress files concurrently
compress_gts = []
for filepath in test_files:
output_file = filepath + '.gz'
gt = eventlet.spawn(compress_file, filepath, output_file)
compress_gts.append(gt)
# Collect read results
print("\nFile reading results:")
for gt in read_gts:
result = gt.wait()
if 'error' in result:
print(f"❌ {result['filename']}: {result['error']}")
else:
print(f"✅ {result['filename']}: {result['lines']} lines, {result['characters']} chars")
# Collect compression results
print("\nFile compression results:")
for gt in compress_gts:
result = gt.wait()
if 'error' in result:
print(f"❌ {result['input_file']}: {result['error']}")
else:
print(f"✅ {result['input_file']}: {result['compression_ratio']:.1f}% compression")
finally:
# Clean up test files
for filepath in test_files:
try:
os.unlink(filepath)
if os.path.exists(filepath + '.gz'):
os.unlink(filepath + '.gz')
except:
pass
if __name__ == "__main__":
green_file_operations()The eventlet.green package includes cooperative versions of most Python standard library modules. Key modules include:
socket, ssl, select, iohttp.client, http.server, urllib.*threading, subprocess, Queueos, time, signaldbm, gzip, tarfile, zipfileftplib, smtplib, poplib, imaplibEach green module provides the same API as its standard library counterpart but with cooperative I/O that works with eventlet's event loop.
Install with Tessl CLI
npx tessl i tessl/pypi-eventlet