A complete Python library for interacting with the XRP Ledger blockchain, providing transaction creation, account management, and comprehensive XRPL protocol support
Connect to XRPL networks using JSON-RPC or WebSocket protocols with both synchronous and asynchronous interfaces. Clients provide the network communication layer for all XRPL operations.
Blocking clients for straightforward request-response communication patterns.
from xrpl.clients import JsonRpcClient, WebsocketClient
class JsonRpcClient:
"""Synchronous JSON-RPC client for XRPL nodes."""
def __init__(self, url: str):
"""
Initialize JSON-RPC client.
Args:
url: URL of the XRPL JSON-RPC endpoint
"""
def request(self, request):
"""
Send a request to the XRPL node.
Args:
request: Request object (from xrpl.models.requests)
Returns:
Response object with result data
Raises:
XRPLRequestFailureException: If request fails
"""
class WebsocketClient:
"""Synchronous WebSocket client for XRPL nodes."""
def __init__(self, url: str):
"""
Initialize WebSocket client.
Args:
url: URL of the XRPL WebSocket endpoint
"""
def send(self, request):
"""
Send a request over WebSocket connection.
Args:
request: Request object
Returns:
Response object
"""
def open(self):
"""Open the WebSocket connection."""
def close(self):
"""Close the WebSocket connection."""
def is_open(self) -> bool:
"""Check if WebSocket connection is open."""Non-blocking clients for high-performance concurrent operations.
from xrpl.asyncio.clients import AsyncJsonRpcClient, AsyncWebsocketClient
class AsyncJsonRpcClient:
"""Asynchronous JSON-RPC client for XRPL nodes."""
def __init__(self, url: str):
"""
Initialize async JSON-RPC client.
Args:
url: URL of the XRPL JSON-RPC endpoint
"""
async def request(self, request):
"""
Send an async request to the XRPL node.
Args:
request: Request object
Returns:
Response object with result data
"""
async def close(self):
"""Close the client and clean up resources."""
class AsyncWebsocketClient:
"""Asynchronous WebSocket client for XRPL nodes."""
def __init__(self, url: str):
"""
Initialize async WebSocket client.
Args:
url: URL of the XRPL WebSocket endpoint
"""
async def send(self, request):
"""
Send an async request over WebSocket.
Args:
request: Request object
Returns:
Response object
"""
async def open(self):
"""Open the WebSocket connection asynchronously."""
async def close(self):
"""Close the WebSocket connection asynchronously."""
def is_open(self) -> bool:
"""Check if WebSocket connection is open."""Utilities for processing client responses and converting between formats.
from xrpl.clients.utils import (
json_to_response, request_to_json_rpc,
request_to_websocket, websocket_to_response
)
def json_to_response(json_object: dict):
"""
Convert JSON response to Response object.
Args:
json_object: Raw JSON response from server
Returns:
Structured Response object
"""
def request_to_json_rpc(request):
"""
Convert Request object to JSON-RPC format.
Args:
request: Request object to convert
Returns:
JSON-RPC formatted request
"""
def request_to_websocket(request):
"""
Convert Request object to WebSocket format.
Args:
request: Request object to convert
Returns:
WebSocket formatted request
"""
def websocket_to_response(websocket_response: dict):
"""
Convert WebSocket response to Response object.
Args:
websocket_response: Raw WebSocket response
Returns:
Structured Response object
"""# JSON-RPC endpoints
mainnet_rpc_urls = [
"https://s1.ripple.com:51234",
"https://s2.ripple.com:51234",
"https://xrplcluster.com",
"https://xrpl.ws"
]
# WebSocket endpoints
mainnet_ws_urls = [
"wss://s1.ripple.com:443",
"wss://s2.ripple.com:443",
"wss://xrplcluster.com",
"wss://xrpl.ws"
]# JSON-RPC endpoints
testnet_rpc_urls = [
"https://s.altnet.rippletest.net:51234",
"https://testnet.xrpl-labs.com"
]
# WebSocket endpoints
testnet_ws_urls = [
"wss://s.altnet.rippletest.net:443",
"wss://testnet.xrpl-labs.com"
]# JSON-RPC endpoints
devnet_rpc_urls = [
"https://s.devnet.rippletest.net:51234"
]
# WebSocket endpoints
devnet_ws_urls = [
"wss://s.devnet.rippletest.net:443"
]from xrpl.clients import JsonRpcClient
from xrpl.models.requests import AccountInfo, Ledger
# Connect to testnet
client = JsonRpcClient("https://s.altnet.rippletest.net:51234")
# Make account info request
account_request = AccountInfo(account="rN7n7otQDd6FczFgLdSqtcsAUxDkw6fzRH")
response = client.request(account_request)
if response.is_successful():
account_data = response.result["account_data"]
print(f"Account balance: {account_data['Balance']} drops")
print(f"Sequence number: {account_data['Sequence']}")
else:
print(f"Request failed: {response.result}")
# Get current ledger info
ledger_request = Ledger(ledger_index="current")
ledger_response = client.request(ledger_request)
print(f"Current ledger: {ledger_response.result['ledger_index']}")from xrpl.clients import WebsocketClient
from xrpl.models.requests import Subscribe, AccountInfo
import time
# Connect to testnet WebSocket
client = WebsocketClient("wss://s.altnet.rippletest.net:443")
try:
# Open connection
client.open()
# Subscribe to ledger stream
subscribe_request = Subscribe(streams=["ledger"])
response = client.send(subscribe_request)
print(f"Subscription result: {response.result}")
# Make account query
account_request = AccountInfo(account="rN7n7otQDd6FczFgLdSqtcsAUxDkw6fzRH")
account_response = client.send(account_request)
print(f"Account info: {account_response.result}")
# Keep connection open to receive stream updates
print("Listening for ledger updates...")
time.sleep(10) # Listen for 10 seconds
finally:
client.close()import asyncio
from xrpl.asyncio.clients import AsyncJsonRpcClient
from xrpl.models.requests import AccountInfo, ServerInfo, Fee
async def get_multiple_data(client):
"""Fetch multiple pieces of data concurrently."""
# Create multiple requests
account_request = AccountInfo(account="rN7n7otQDd6FczFgLdSqtcsAUxDkw6fzRH")
server_request = ServerInfo()
fee_request = Fee()
# Execute all requests concurrently
account_task = client.request(account_request)
server_task = client.request(server_request)
fee_task = client.request(fee_request)
# Wait for all to complete
account_response, server_response, fee_response = await asyncio.gather(
account_task, server_task, fee_task
)
return {
"account": account_response.result,
"server": server_response.result,
"fees": fee_response.result
}
async def main():
client = AsyncJsonRpcClient("https://s.altnet.rippletest.net:51234")
try:
# Get data concurrently
data = await get_multiple_data(client)
print("Account data:", data["account"]["account_data"]["Balance"])
print("Server version:", data["server"]["info"]["build_version"])
print("Current fees:", data["fees"]["drops"])
finally:
await client.close()
# Run async example
asyncio.run(main())from xrpl.clients import JsonRpcClient, WebsocketClient
from xrpl.asyncio.clients import AsyncJsonRpcClient
import time
class ManagedClient:
"""Wrapper for client connection management."""
def __init__(self, url: str, client_type: str = "jsonrpc"):
self.url = url
self.client_type = client_type
self.client = None
def connect(self):
"""Establish connection."""
if self.client_type == "jsonrpc":
self.client = JsonRpcClient(self.url)
elif self.client_type == "websocket":
self.client = WebsocketClient(self.url)
self.client.open()
print(f"Connected to {self.url}")
return self.client
def disconnect(self):
"""Close connection."""
if self.client and self.client_type == "websocket":
self.client.close()
self.client = None
print("Disconnected")
def is_connected(self) -> bool:
"""Check connection status."""
if self.client_type == "websocket":
return self.client and self.client.is_open()
else:
return self.client is not None
# Usage
manager = ManagedClient("wss://s.altnet.rippletest.net:443", "websocket")
try:
client = manager.connect()
# Use client
from xrpl.models.requests import Ping
ping_response = client.send(Ping())
print(f"Ping successful: {ping_response.is_successful()}")
finally:
manager.disconnect()from xrpl.clients import JsonRpcClient
from xrpl.models.requests import AccountInfo
import time
import random
def request_with_retry(client, request, max_retries: int = 3, backoff_factor: float = 1.0):
"""Make request with exponential backoff retry logic."""
for attempt in range(max_retries + 1):
try:
response = client.request(request)
if response.is_successful():
return response
else:
print(f"Request failed (attempt {attempt + 1}): {response.result}")
except Exception as e:
print(f"Network error (attempt {attempt + 1}): {e}")
# Don't sleep after the last attempt
if attempt < max_retries:
sleep_time = backoff_factor * (2 ** attempt) + random.uniform(0, 1)
print(f"Retrying in {sleep_time:.2f} seconds...")
time.sleep(sleep_time)
raise Exception(f"Request failed after {max_retries + 1} attempts")
# Usage with multiple endpoints
endpoints = [
"https://s.altnet.rippletest.net:51234",
"https://testnet.xrpl-labs.com"
]
def request_with_failover(request, endpoints: list):
"""Make request with automatic endpoint failover."""
for i, endpoint in enumerate(endpoints):
try:
print(f"Trying endpoint {i + 1}: {endpoint}")
client = JsonRpcClient(endpoint)
return request_with_retry(client, request)
except Exception as e:
print(f"Endpoint {i + 1} failed: {e}")
if i == len(endpoints) - 1: # Last endpoint
raise Exception("All endpoints failed")
# Example usage
account_request = AccountInfo(account="rN7n7otQDd6FczFgLdSqtcsAUxDkw6fzRH")
response = request_with_failover(account_request, endpoints)
print(f"Success: {response.result}")class XRPLRequestFailureException(XRPLException):
"""Exception raised when XRPL network requests fail."""Install with Tessl CLI
npx tessl i tessl/pypi-xrpl-py