CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-geopy

Python Geocoding Toolbox providing comprehensive geocoding services and geodesic distance calculations

Pending
Overview
Eval results
Files

async-support.mddocs/

Async Support

Geopy provides comprehensive asynchronous geocoding support using aiohttp adapter for high-performance applications requiring concurrent geocoding requests. All geocoder classes support async operations with minimal code changes.

Capabilities

Async Adapter

HTTP adapter for asynchronous operations using aiohttp.

from geopy.adapters import AioHTTPAdapter

class AioHTTPAdapter:
    """
    HTTP adapter using aiohttp for asynchronous requests.
    Requires: pip install "geopy[aiohttp]"
    """
    
    def __init__(self, proxy_info=None, ssl_context=None):
        """
        Initialize AioHTTP adapter.
        
        Parameters:
        - proxy_info: Proxy configuration (optional)
        - ssl_context: SSL context for HTTPS requests (optional)
        """
    
    async def get_text(self, url, timeout, headers):
        """
        Perform async GET request returning text.
        
        Parameters:
        - url (str): Request URL
        - timeout (float): Request timeout in seconds
        - headers (dict): HTTP headers
        
        Returns:
        str: Response text
        """
    
    async def get_json(self, url, timeout, headers):
        """
        Perform async GET request returning parsed JSON.
        
        Parameters:
        - url (str): Request URL
        - timeout (float): Request timeout in seconds
        - headers (dict): HTTP headers
        
        Returns:
        dict: Parsed JSON response
        """

Async Geocoder Usage

All geocoder classes support async operations when initialized with AioHTTPAdapter.

# Example with Nominatim - same pattern applies to all geocoders
from geopy.geocoders import Nominatim
from geopy.adapters import AioHTTPAdapter

async def async_geocoding_example():
    async with Nominatim(
        user_agent="async_app",
        adapter_factory=AioHTTPAdapter
    ) as geolocator:
        # All geocoder methods become async when using async adapter
        location = await geolocator.geocode("New York City")
        reverse_location = await geolocator.reverse("40.7128, -74.0060")
        return location, reverse_location

Async Rate Limiter

Rate limiting functionality for asynchronous geocoding operations.

from geopy.extra.rate_limiter import AsyncRateLimiter

class AsyncRateLimiter:
    """
    Rate limiting wrapper for asynchronous geocoding functions.
    """
    
    def __init__(self, func, min_delay_seconds=0.0, max_retries=2, 
                 error_wait_seconds=5.0, swallow_exceptions=True,
                 return_value_on_exception=None):
        """
        Initialize async rate limiter.
        
        Parameters:
        - func: Async function to wrap
        - min_delay_seconds (float): Minimum delay between calls
        - max_retries (int): Number of retry attempts on errors
        - error_wait_seconds (float): Wait time after errors
        - swallow_exceptions (bool): Whether to suppress final exceptions
        - return_value_on_exception: Return value when exceptions are swallowed
        """
    
    async def __call__(self, *args, **kwargs):
        """Execute rate-limited async function call"""

Usage Examples

Basic Async Geocoding

import asyncio
from geopy.geocoders import Nominatim
from geopy.adapters import AioHTTPAdapter

async def basic_async_geocoding():
    """Basic async geocoding example"""
    
    async with Nominatim(
        user_agent="basic_async_app",
        adapter_factory=AioHTTPAdapter
    ) as geolocator:
        
        # Forward geocoding
        location = await geolocator.geocode("Paris, France")
        print(f"Forward: {location.address if location else 'Not found'}")
        
        # Reverse geocoding
        if location:
            reverse = await geolocator.reverse(f"{location.latitude}, {location.longitude}")
            print(f"Reverse: {reverse.address if reverse else 'Not found'}")

# Run async function
asyncio.run(basic_async_geocoding())

Concurrent Geocoding

import asyncio
from geopy.geocoders import Nominatim
from geopy.adapters import AioHTTPAdapter

async def concurrent_geocoding():
    """Geocode multiple addresses concurrently"""
    
    addresses = [
        "New York City, USA",
        "London, UK", 
        "Tokyo, Japan",
        "Sydney, Australia",
        "São Paulo, Brazil",
        "Mumbai, India",
        "Cairo, Egypt",
        "Berlin, Germany"
    ]
    
    async with Nominatim(
        user_agent="concurrent_app",
        adapter_factory=AioHTTPAdapter
    ) as geolocator:
        
        # Create tasks for concurrent execution
        tasks = [
            geolocator.geocode(address)
            for address in addresses
        ]
        
        # Execute all tasks concurrently
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # Process results
        for address, result in zip(addresses, results):
            if isinstance(result, Exception):
                print(f"Error geocoding '{address}': {result}")
            elif result:
                print(f"{address} -> {result.latitude:.4f}, {result.longitude:.4f}")
            else:
                print(f"No results for '{address}'")

asyncio.run(concurrent_geocoding())

Async with Rate Limiting

import asyncio
from geopy.geocoders import Nominatim
from geopy.adapters import AioHTTPAdapter
from geopy.extra.rate_limiter import AsyncRateLimiter

async def rate_limited_async_geocoding():
    """Async geocoding with rate limiting"""
    
    addresses = [
        "Times Square, New York",
        "Eiffel Tower, Paris", 
        "Big Ben, London",
        "Colosseum, Rome",
        "Statue of Liberty, New York",
        "Golden Gate Bridge, San Francisco",
        "Sydney Opera House, Australia",
        "Machu Picchu, Peru"
    ]
    
    async with Nominatim(
        user_agent="rate_limited_app",
        adapter_factory=AioHTTPAdapter
    ) as geolocator:
        
        # Wrap geocoder with rate limiter (1 second minimum delay)
        rate_limited_geocode = AsyncRateLimiter(
            geolocator.geocode,
            min_delay_seconds=1.0,
            max_retries=3,
            error_wait_seconds=2.0
        )
        
        # Process addresses with rate limiting
        results = []
        for address in addresses:
            try:
                result = await rate_limited_geocode(address)
                results.append((address, result))
                print(f"✓ Geocoded: {address}")
            except Exception as e:
                print(f"✗ Failed: {address} - {e}")
                results.append((address, None))
        
        return results

asyncio.run(rate_limited_async_geocoding())

Multiple Service Async Comparison

import asyncio
from geopy.geocoders import Nominatim, Photon, OpenCage
from geopy.adapters import AioHTTPAdapter

async def compare_services_async():
    """Compare multiple geocoding services asynchronously"""
    
    address = "Central Park, New York City"
    
    # Initialize multiple geocoders
    geocoders = {
        'Nominatim': Nominatim(user_agent="comparison_app", adapter_factory=AioHTTPAdapter),
        'Photon': Photon(user_agent="comparison_app", adapter_factory=AioHTTPAdapter),
        # Add more services as needed (with proper API keys)
    }
    
    async def geocode_with_service(name, geocoder, query):
        """Geocode with a specific service"""
        try:
            async with geocoder:
                result = await geocoder.geocode(query)
                return name, result
        except Exception as e:
            return name, f"Error: {e}"
    
    # Create tasks for all services
    tasks = [
        geocode_with_service(name, geocoder, address)
        for name, geocoder in geocoders.items()
    ]
    
    # Execute concurrently
    results = await asyncio.gather(*tasks)
    
    # Display results
    print(f"Geocoding results for '{address}':")
    for service_name, result in results:
        if isinstance(result, str):  # Error
            print(f"{service_name}: {result}")
        elif result:
            print(f"{service_name}: {result.address}")
            print(f"  Coordinates: {result.latitude:.6f}, {result.longitude:.6f}")
        else:
            print(f"{service_name}: No results")
        print()

asyncio.run(compare_services_async())

Async Batch Processing

import asyncio
from geopy.geocoders import Nominatim
from geopy.adapters import AioHTTPAdapter
from geopy.extra.rate_limiter import AsyncRateLimiter
import csv
import json

class AsyncBatchGeocoder:
    """Batch geocoding processor with async support"""
    
    def __init__(self, geocoder_class, batch_size=10, delay=1.0, **geocoder_kwargs):
        self.geocoder_class = geocoder_class
        self.geocoder_kwargs = geocoder_kwargs
        self.batch_size = batch_size
        self.delay = delay
        
    async def geocode_batch(self, addresses):
        """Geocode a batch of addresses"""
        
        async with self.geocoder_class(
            adapter_factory=AioHTTPAdapter,
            **self.geocoder_kwargs
        ) as geolocator:
            
            # Set up rate limiting
            rate_limited_geocode = AsyncRateLimiter(
                geolocator.geocode,
                min_delay_seconds=self.delay,
                max_retries=3
            )
            
            # Process in batches to avoid overwhelming the service
            results = []
            for i in range(0, len(addresses), self.batch_size):
                batch = addresses[i:i + self.batch_size]
                print(f"Processing batch {i//self.batch_size + 1}/{(len(addresses)-1)//self.batch_size + 1}")
                
                # Process batch concurrently
                batch_tasks = [
                    self._geocode_single(rate_limited_geocode, addr, idx + i)
                    for idx, addr in enumerate(batch)
                ]
                
                batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True)
                results.extend(batch_results)
                
                # Brief pause between batches
                if i + self.batch_size < len(addresses):
                    await asyncio.sleep(1)
            
            return results
    
    async def _geocode_single(self, geocoder_func, address, index):
        """Geocode a single address with error handling"""
        try:
            result = await geocoder_func(address)
            return {
                'index': index,
                'input_address': address,
                'status': 'success' if result else 'no_results',
                'result_address': result.address if result else None,
                'latitude': result.latitude if result else None,
                'longitude': result.longitude if result else None,
                'raw': result.raw if result else None
            }
        except Exception as e:
            return {
                'index': index,
                'input_address': address,
                'status': 'error',
                'error': str(e),
                'result_address': None,
                'latitude': None,
                'longitude': None,
                'raw': None
            }

async def batch_geocoding_example():
    """Example of batch geocoding from CSV file"""
    
    # Sample data (normally would read from CSV)
    addresses = [
        "1600 Amphitheatre Parkway, Mountain View, CA",
        "1 Apple Park Way, Cupertino, CA", 
        "350 Fifth Avenue, New York, NY",
        "Times Square, New York, NY",
        "Golden Gate Bridge, San Francisco, CA",
        "Space Needle, Seattle, WA",
        "Willis Tower, Chicago, IL",
        "Hollywood Sign, Los Angeles, CA"
    ]
    
    # Initialize batch geocoder
    batch_geocoder = AsyncBatchGeocoder(
        Nominatim,
        batch_size=3,
        delay=1.0,
        user_agent="batch_geocoding_app"
    )
    
    # Process addresses
    print(f"Starting batch geocoding of {len(addresses)} addresses...")
    results = await batch_geocoder.geocode_batch(addresses)
    
    # Analyze results
    successful = sum(1 for r in results if r['status'] == 'success')
    no_results = sum(1 for r in results if r['status'] == 'no_results')
    errors = sum(1 for r in results if r['status'] == 'error')
    
    print(f"\nBatch geocoding completed:")
    print(f"  Successful: {successful}")
    print(f"  No results: {no_results}")
    print(f"  Errors: {errors}")
    
    # Save results to file
    with open('geocoding_results.json', 'w') as f:
        json.dump(results, f, indent=2)
    
    print("Results saved to geocoding_results.json")
    
    return results

# Run batch processing
asyncio.run(batch_geocoding_example())

Async Error Handling

import asyncio
from geopy.geocoders import Nominatim
from geopy.adapters import AioHTTPAdapter
from geopy.exc import *

async def robust_async_geocoding():
    """Async geocoding with comprehensive error handling"""
    
    async def safe_geocode(geolocator, query, max_retries=3):
        """Safely geocode with retries and error handling"""
        
        for attempt in range(max_retries):
            try:
                result = await geolocator.geocode(query, timeout=10)
                return {'status': 'success', 'result': result, 'query': query}
                
            except GeocoderRateLimited as e:
                if attempt == max_retries - 1:
                    return {'status': 'rate_limited', 'error': str(e), 'query': query}
                
                wait_time = e.retry_after if e.retry_after else 2 ** attempt
                print(f"Rate limited for '{query}', waiting {wait_time}s...")
                await asyncio.sleep(wait_time)
                
            except GeocoderTimedOut as e:
                if attempt == max_retries - 1:
                    return {'status': 'timeout', 'error': str(e), 'query': query}
                
                print(f"Timeout for '{query}', retrying...")
                await asyncio.sleep(1)
                
            except GeocoderServiceError as e:
                return {'status': 'service_error', 'error': str(e), 'query': query}
                
            except Exception as e:
                return {'status': 'unexpected_error', 'error': str(e), 'query': query}
        
        return {'status': 'max_retries_exceeded', 'query': query}
    
    queries = [
        "Valid Address, New York, NY",
        "Invalid Address 123456789",
        "London, UK",
        "Tokyo, Japan"
    ]
    
    async with Nominatim(
        user_agent="robust_async_app", 
        adapter_factory=AioHTTPAdapter
    ) as geolocator:
        
        # Process all queries concurrently
        tasks = [safe_geocode(geolocator, query) for query in queries]
        results = await asyncio.gather(*tasks)
        
        # Analyze results
        for result in results:
            status = result['status']
            query = result['query']
            
            if status == 'success':
                location = result['result']
                if location:
                    print(f"✓ {query} -> {location.address}")
                else:
                    print(f"○ {query} -> No results")
            else:
                error = result.get('error', 'Unknown error')
                print(f"✗ {query} -> {status}: {error}")

asyncio.run(robust_async_geocoding())

Performance Monitoring

import asyncio
import time
from geopy.geocoders import Nominatim
from geopy.adapters import AioHTTPAdapter

async def async_performance_test():
    """Compare sync vs async performance"""
    
    addresses = [
        "New York, NY", "Los Angeles, CA", "Chicago, IL", 
        "Houston, TX", "Phoenix, AZ", "Philadelphia, PA",
        "San Antonio, TX", "San Diego, CA", "Dallas, TX",
        "San Jose, CA"
    ]
    
    # Async performance test
    start_time = time.time()
    
    async with Nominatim(
        user_agent="performance_test",
        adapter_factory=AioHTTPAdapter
    ) as geolocator:
        
        tasks = [geolocator.geocode(addr) for addr in addresses]
        async_results = await asyncio.gather(*tasks, return_exceptions=True)
    
    async_time = time.time() - start_time
    
    # Sync performance test (for comparison)
    start_time = time.time()
    sync_geolocator = Nominatim(user_agent="performance_test")
    sync_results = []
    
    for addr in addresses:
        try:
            result = sync_geolocator.geocode(addr)
            sync_results.append(result)
        except Exception as e:
            sync_results.append(e)
    
    sync_time = time.time() - start_time
    
    # Results
    async_success = sum(1 for r in async_results if hasattr(r, 'address'))
    sync_success = sum(1 for r in sync_results if hasattr(r, 'address'))
    
    print(f"Performance Comparison:")
    print(f"Async: {async_time:.2f}s ({async_success}/{len(addresses)} successful)")
    print(f"Sync:  {sync_time:.2f}s ({sync_success}/{len(addresses)} successful)")
    print(f"Speedup: {sync_time/async_time:.2f}x")

asyncio.run(async_performance_test())

Install with Tessl CLI

npx tessl i tessl/pypi-geopy

docs

async-support.md

core-data-types.md

distance-calculations.md

error-handling.md

geocoding-services.md

index.md

rate-limiting.md

tile.json