A Python SDK for KuCoin cryptocurrency exchange API providing REST endpoints implementation, simple authentication handling, response exception handling, and websocket support for trading, market data, user account management, margin trading, lending, and earning features
—
Real-time market data and account updates through WebSocket connections. Provides automatic reconnection, subscription management, and comprehensive event handling for live cryptocurrency data streaming.
High-level WebSocket client for easy integration.
class KucoinWsClient:
"""High-level WebSocket client for KuCoin real-time data."""
def __init__(self):
"""Initialize WebSocket client."""
@classmethod
async def create(cls, loop, client, callback, private: bool = False, sock=None):
"""
Create and initialize WebSocket client.
Args:
loop: Event loop for async operations
client: KuCoin API client instance (with authentication)
callback: Message callback function
private (bool): Enable private channel access
sock: Optional socket configuration
Returns:
KucoinWsClient: Initialized WebSocket client
"""
async def subscribe(self, topic: str):
"""
Subscribe to a WebSocket topic.
Args:
topic (str): Subscription topic (e.g., '/market/ticker:BTC-USDT')
"""
async def unsubscribe(self, topic: str):
"""
Unsubscribe from a WebSocket topic.
Args:
topic (str): Topic to unsubscribe from
"""
@property
def topics(self):
"""Get list of currently subscribed topics."""Manage authentication tokens for WebSocket connections.
class GetToken:
"""WebSocket token management."""
def get_ws_token(self, is_private: bool = False):
"""
Get WebSocket token for public or private channels.
Args:
is_private (bool): Request private channel token
Returns:
dict: Token, endpoint, and connection parameters
"""Direct WebSocket connection management for advanced use cases.
class ConnectWebsocket:
"""Low-level WebSocket connection handler."""
def __init__(self, loop, client, callback, private: bool = False, sock=None):
"""
Initialize WebSocket connection.
Args:
loop: Event loop for async operations
client: KuCoin API client instance
callback: Message callback function
private (bool): Enable private channel access
sock: Optional socket configuration
"""
async def send_message(self, msg: dict, retry_count: int = 0):
"""
Send message to WebSocket server.
Args:
msg (dict): Message to send
retry_count (int): Number of retry attempts
"""
async def send_ping(self):
"""Send ping message to maintain connection."""
@property
def topics(self):
"""Get list of current subscription topics."""import asyncio
import json
from kucoin.ws_client import KucoinWsClient
from kucoin.client import Market
async def message_handler(message):
"""Handle incoming WebSocket messages."""
if 'data' in message:
data = message['data']
topic = message.get('topic', '')
if 'ticker' in topic:
# Handle ticker updates
ticker_data = data
print(f"Ticker Update - {ticker_data['symbol']}: ${ticker_data['price']}")
elif 'level2' in topic:
# Handle order book updates
ob_data = data
print(f"OrderBook Update - {ob_data['symbol']}")
print(f"Best Bid: {ob_data['bids'][0] if ob_data.get('bids') else 'N/A'}")
print(f"Best Ask: {ob_data['asks'][0] if ob_data.get('asks') else 'N/A'}")
async def main():
# Initialize market client for token generation
market = Market()
# Create WebSocket client
loop = asyncio.get_event_loop()
ws_client = await KucoinWsClient.create(loop, market, message_handler)
# Subscribe to BTC-USDT ticker
await ws_client.subscribe('/market/ticker:BTC-USDT')
# Subscribe to order book updates
await ws_client.subscribe('/market/level2:BTC-USDT')
# Keep connection alive
await asyncio.sleep(60) # Stream for 1 minute
# Run the async example
asyncio.run(main())import asyncio
from kucoin.ws_client import KucoinWsClient
from kucoin.client import User
async def private_message_handler(message):
"""Handle private channel messages."""
if 'data' in message:
data = message['data']
topic = message.get('topic', '')
if 'account' in topic:
# Handle account balance updates
account_data = data
print(f"Balance Update - {account_data['currency']}: {account_data['available']}")
elif 'tradeOrders' in topic:
# Handle order updates
order_data = data
print(f"Order Update - {order_data['symbol']}: {order_data['type']} {order_data['side']}")
print(f"Status: {order_data['status']}, Size: {order_data['size']}")
async def private_streaming():
# Initialize authenticated client
user = User(
key='your-api-key',
secret='your-api-secret',
passphrase='your-passphrase',
is_sandbox=False
)
# Create WebSocket client with private channel access
loop = asyncio.get_event_loop()
ws_client = await KucoinWsClient.create(loop, user, private_message_handler, private=True)
# Subscribe to private account updates
await ws_client.subscribe('/account/balance')
# Subscribe to order updates
await ws_client.subscribe('/spotMarket/tradeOrders')
# Keep connection alive
await asyncio.sleep(300) # Stream for 5 minutes
# Run private streaming
asyncio.run(private_streaming())class KuCoinStreamer:
def __init__(self, symbols, api_credentials=None):
self.symbols = symbols
self.ws_client = KucoinWsClient(**api_credentials) if api_credentials else KucoinWsClient()
self.connections = {}
self.data_buffer = {}
def start_streaming(self):
"""Start streaming for all symbols."""
for symbol in self.symbols:
# Subscribe to ticker updates
ticker_topic = f'/market/ticker:{symbol}'
self.connections[f'{symbol}_ticker'] = self.ws_client._socket(
ticker_topic,
lambda msg, s=symbol: self.handle_ticker(msg, s)
)
# Subscribe to level2 order book updates
orderbook_topic = f'/market/level2:{symbol}'
self.connections[f'{symbol}_orderbook'] = self.ws_client._socket(
orderbook_topic,
lambda msg, s=symbol: self.handle_orderbook(msg, s)
)
def handle_ticker(self, message, symbol):
"""Handle ticker updates."""
data = json.loads(message)
if data['type'] == 'message':
ticker = data['data']
self.data_buffer[f'{symbol}_ticker'] = {
'symbol': symbol,
'price': float(ticker['price']),
'change': float(ticker['changeRate']),
'volume': float(ticker['vol']),
'timestamp': ticker['time']
}
self.process_ticker_update(symbol)
def handle_orderbook(self, message, symbol):
"""Handle order book updates."""
data = json.loads(message)
if data['type'] == 'message':
ob_data = data['data']
self.data_buffer[f'{symbol}_orderbook'] = {
'symbol': symbol,
'bids': ob_data['bids'][:5], # Top 5 bids
'asks': ob_data['asks'][:5], # Top 5 asks
'timestamp': ob_data['time']
}
self.process_orderbook_update(symbol)
def process_ticker_update(self, symbol):
"""Process ticker data for analysis."""
ticker = self.data_buffer.get(f'{symbol}_ticker')
if ticker:
print(f"{symbol}: ${ticker['price']} ({ticker['change']:+.2%})")
def process_orderbook_update(self, symbol):
"""Process order book data."""
ob = self.data_buffer.get(f'{symbol}_orderbook')
if ob and ob['bids'] and ob['asks']:
spread = float(ob['asks'][0][0]) - float(ob['bids'][0][0])
spread_pct = spread / float(ob['bids'][0][0]) * 100
print(f"{symbol} Spread: ${spread:.4f} ({spread_pct:.3f}%)")
def get_market_snapshot(self):
"""Get current market snapshot."""
snapshot = {}
for symbol in self.symbols:
ticker_key = f'{symbol}_ticker'
ob_key = f'{symbol}_orderbook'
if ticker_key in self.data_buffer and ob_key in self.data_buffer:
ticker = self.data_buffer[ticker_key]
orderbook = self.data_buffer[ob_key]
snapshot[symbol] = {
'price': ticker['price'],
'change_24h': ticker['change'],
'volume_24h': ticker['volume'],
'bid': float(orderbook['bids'][0][0]) if orderbook['bids'] else None,
'ask': float(orderbook['asks'][0][0]) if orderbook['asks'] else None,
'spread': None
}
if snapshot[symbol]['bid'] and snapshot[symbol]['ask']:
snapshot[symbol]['spread'] = snapshot[symbol]['ask'] - snapshot[symbol]['bid']
return snapshot
def close(self):
"""Close all connections."""
self.ws_client.close()
# Usage
symbols = ['BTC-USDT', 'ETH-USDT', 'ADA-USDT']
streamer = KuCoinStreamer(symbols)
streamer.start_streaming()
# Stream for 5 minutes
time.sleep(300)
# Get final snapshot
snapshot = streamer.get_market_snapshot()
print("\nMarket Snapshot:")
for symbol, data in snapshot.items():
print(f"{symbol}: ${data['price']} | Spread: ${data['spread']:.4f}")
streamer.close()class TradingBot:
def __init__(self, api_key, api_secret, api_passphrase, symbols):
self.ws_client = KucoinWsClient(api_key, api_secret, api_passphrase)
self.symbols = symbols
self.market_data = {}
self.positions = {}
def start_monitoring(self):
"""Start monitoring market data and account updates."""
# Monitor market data
for symbol in self.symbols:
topic = f'/market/ticker:{symbol}'
self.ws_client._socket(topic, self.handle_market_data)
# Monitor account balance changes
self.ws_client._socket('/account/balance', self.handle_balance_update, is_private=True)
# Monitor order execution
self.ws_client._socket('/spotMarket/tradeOrders', self.handle_order_update, is_private=True)
def handle_market_data(self, message):
"""Process market data for trading decisions."""
data = json.loads(message)
if data['type'] == 'message':
ticker = data['data']
symbol = ticker['symbol']
price = float(ticker['price'])
# Store market data
self.market_data[symbol] = {
'price': price,
'change': float(ticker['changeRate']),
'timestamp': ticker['time']
}
# Check for trading opportunities
self.check_trading_signals(symbol, price)
def handle_balance_update(self, message):
"""Handle account balance changes."""
data = json.loads(message)
if data['type'] == 'message':
balance_data = data['data']
currency = balance_data['currency']
available = float(balance_data['available'])
print(f"Balance Update: {currency} = {available}")
# Update position tracking
if currency in self.positions:
self.positions[currency]['balance'] = available
def handle_order_update(self, message):
"""Handle order execution updates."""
data = json.loads(message)
if data['type'] == 'message':
order_data = data['data']
print(f"Order Update: {order_data['symbol']} - {order_data['status']}")
if order_data['status'] == 'match':
# Order was filled
self.on_order_filled(order_data)
def check_trading_signals(self, symbol, price):
"""Check for trading opportunities."""
# Implement your trading logic here
# This is just a simple example
if symbol not in self.market_data:
return
# Simple momentum strategy example
change = self.market_data[symbol]['change']
if change > 0.05: # Price up 5%
print(f"Strong upward momentum detected for {symbol}")
# Consider buying logic here
elif change < -0.05: # Price down 5%
print(f"Strong downward momentum detected for {symbol}")
# Consider selling logic here
def on_order_filled(self, order_data):
"""Handle order fill events."""
symbol = order_data['symbol']
side = order_data['side']
size = float(order_data['dealSize'])
price = float(order_data['dealFunds']) / size if size > 0 else 0
print(f"Order Filled: {side} {size} {symbol} at ${price}")
# Update position tracking
base_currency = symbol.split('-')[0]
quote_currency = symbol.split('-')[1]
if side == 'buy':
# Bought base currency with quote currency
self.update_position(base_currency, size, price)
else:
# Sold base currency for quote currency
self.update_position(base_currency, -size, price)
def update_position(self, currency, size_change, price):
"""Update position tracking."""
if currency not in self.positions:
self.positions[currency] = {'size': 0, 'avg_price': 0}
pos = self.positions[currency]
if pos['size'] == 0:
# New position
pos['size'] = size_change
pos['avg_price'] = price
else:
# Update existing position
total_value = pos['size'] * pos['avg_price'] + size_change * price
pos['size'] += size_change
if pos['size'] != 0:
pos['avg_price'] = total_value / pos['size']
else:
pos['avg_price'] = 0
# # Usage (commented out for example)
# bot = TradingBot(api_key, api_secret, api_passphrase, ['BTC-USDT', 'ETH-USDT'])
# bot.start_monitoring()WebSocketToken = dict
# {
# "token": str, # Connection token
# "instanceServers": list, # Server endpoints
# "pingInterval": int, # Ping interval in ms
# "pingTimeout": int # Ping timeout in ms
# }
WebSocketMessage = dict
# {
# "id": str, # Message ID
# "type": str, # Message type ('message', 'welcome', 'ping', 'pong')
# "topic": str, # Subscription topic
# "subject": str, # Message subject
# "data": dict # Payload data
# }
TickerData = dict
# {
# "symbol": str, # Trading symbol
# "sequence": str, # Sequence number
# "price": str, # Last price
# "size": str, # Last size
# "bestAsk": str, # Best ask price
# "bestAskSize": str, # Best ask size
# "bestBid": str, # Best bid price
# "bestBidSize": str, # Best bid size
# "time": int # Timestamp
# }
OrderBookData = dict
# {
# "symbol": str, # Trading symbol
# "sequence": str, # Sequence number
# "asks": list, # Ask orders [[price, size], ...]
# "bids": list, # Bid orders [[price, size], ...]
# "time": int # Timestamp
# }
AccountBalanceData = dict
# {
# "accountId": str, # Account ID
# "currency": str, # Currency
# "total": str, # Total balance
# "available": str, # Available balance
# "holds": str, # Held balance
# "relationEvent": str, # Event that caused change
# "relationEventId": str, # Event ID
# "time": int # Timestamp
# }
OrderUpdateData = dict
# {
# "symbol": str, # Trading symbol
# "orderType": str, # Order type
# "side": str, # Order side
# "orderId": str, # Order ID
# "type": str, # Update type
# "orderTime": int, # Order timestamp
# "size": str, # Order size
# "filledSize": str, # Filled size
# "price": str, # Order price
# "clientOid": str, # Client order ID
# "remainSize": str, # Remaining size
# "status": str, # Order status
# "ts": int # Update timestamp
# }Install with Tessl CLI
npx tessl i tessl/pypi-kucoin-python