CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-kucoin-python

A Python SDK for KuCoin cryptocurrency exchange API providing REST endpoints implementation, simple authentication handling, response exception handling, and websocket support for trading, market data, user account management, margin trading, lending, and earning features

Pending
Overview
Eval results
Files

websocket.mddocs/

WebSocket Streaming

Real-time market data and account updates through WebSocket connections. Provides automatic reconnection, subscription management, and comprehensive event handling for live cryptocurrency data streaming.

Capabilities

WebSocket Client

High-level WebSocket client for easy integration.

class KucoinWsClient:
    """High-level WebSocket client for KuCoin real-time data."""
    
    def __init__(self):
        """Initialize WebSocket client."""

    @classmethod
    async def create(cls, loop, client, callback, private: bool = False, sock=None):
        """
        Create and initialize WebSocket client.
        
        Args:
            loop: Event loop for async operations
            client: KuCoin API client instance (with authentication)
            callback: Message callback function
            private (bool): Enable private channel access
            sock: Optional socket configuration
        
        Returns:
            KucoinWsClient: Initialized WebSocket client
        """

    async def subscribe(self, topic: str):
        """
        Subscribe to a WebSocket topic.
        
        Args:
            topic (str): Subscription topic (e.g., '/market/ticker:BTC-USDT')
        """

    async def unsubscribe(self, topic: str):
        """
        Unsubscribe from a WebSocket topic.
        
        Args:
            topic (str): Topic to unsubscribe from
        """

    @property
    def topics(self):
        """Get list of currently subscribed topics."""

WebSocket Token Management

Manage authentication tokens for WebSocket connections.

class GetToken:
    """WebSocket token management."""
    
    def get_ws_token(self, is_private: bool = False):
        """
        Get WebSocket token for public or private channels.
        
        Args:
            is_private (bool): Request private channel token
        
        Returns:
            dict: Token, endpoint, and connection parameters
        """

Low-Level WebSocket Connection

Direct WebSocket connection management for advanced use cases.

class ConnectWebsocket:
    """Low-level WebSocket connection handler."""
    
    def __init__(self, loop, client, callback, private: bool = False, sock=None):
        """
        Initialize WebSocket connection.
        
        Args:
            loop: Event loop for async operations
            client: KuCoin API client instance
            callback: Message callback function
            private (bool): Enable private channel access
            sock: Optional socket configuration
        """

    async def send_message(self, msg: dict, retry_count: int = 0):
        """
        Send message to WebSocket server.
        
        Args:
            msg (dict): Message to send
            retry_count (int): Number of retry attempts
        """

    async def send_ping(self):
        """Send ping message to maintain connection."""

    @property
    def topics(self):
        """Get list of current subscription topics."""

Usage Examples

Basic Market Data Streaming

import asyncio
import json
from kucoin.ws_client import KucoinWsClient
from kucoin.client import Market

async def message_handler(message):
    """Handle incoming WebSocket messages."""
    if 'data' in message:
        data = message['data']
        topic = message.get('topic', '')
        
        if 'ticker' in topic:
            # Handle ticker updates
            ticker_data = data
            print(f"Ticker Update - {ticker_data['symbol']}: ${ticker_data['price']}")
            
        elif 'level2' in topic:
            # Handle order book updates
            ob_data = data
            print(f"OrderBook Update - {ob_data['symbol']}")
            print(f"Best Bid: {ob_data['bids'][0] if ob_data.get('bids') else 'N/A'}")
            print(f"Best Ask: {ob_data['asks'][0] if ob_data.get('asks') else 'N/A'}")

async def main():
    # Initialize market client for token generation
    market = Market()
    
    # Create WebSocket client
    loop = asyncio.get_event_loop()
    ws_client = await KucoinWsClient.create(loop, market, message_handler)
    
    # Subscribe to BTC-USDT ticker
    await ws_client.subscribe('/market/ticker:BTC-USDT')
    
    # Subscribe to order book updates
    await ws_client.subscribe('/market/level2:BTC-USDT')
    
    # Keep connection alive
    await asyncio.sleep(60)  # Stream for 1 minute

# Run the async example
asyncio.run(main())

Private Account Updates

import asyncio
from kucoin.ws_client import KucoinWsClient
from kucoin.client import User

async def private_message_handler(message):
    """Handle private channel messages."""
    if 'data' in message:
        data = message['data']
        topic = message.get('topic', '')
        
        if 'account' in topic:
            # Handle account balance updates
            account_data = data
            print(f"Balance Update - {account_data['currency']}: {account_data['available']}")
            
        elif 'tradeOrders' in topic:
            # Handle order updates
            order_data = data
            print(f"Order Update - {order_data['symbol']}: {order_data['type']} {order_data['side']}")
            print(f"Status: {order_data['status']}, Size: {order_data['size']}")

async def private_streaming():
    # Initialize authenticated client
    user = User(
        key='your-api-key',
        secret='your-api-secret', 
        passphrase='your-passphrase',
        is_sandbox=False
    )
    
    # Create WebSocket client with private channel access
    loop = asyncio.get_event_loop()
    ws_client = await KucoinWsClient.create(loop, user, private_message_handler, private=True)
    
    # Subscribe to private account updates
    await ws_client.subscribe('/account/balance')
    
    # Subscribe to order updates
    await ws_client.subscribe('/spotMarket/tradeOrders')
    
    # Keep connection alive
    await asyncio.sleep(300)  # Stream for 5 minutes

# Run private streaming
asyncio.run(private_streaming())

Advanced Multi-Symbol Streaming

class KuCoinStreamer:
    def __init__(self, symbols, api_credentials=None):
        self.symbols = symbols
        self.ws_client = KucoinWsClient(**api_credentials) if api_credentials else KucoinWsClient()
        self.connections = {}
        self.data_buffer = {}
        
    def start_streaming(self):
        """Start streaming for all symbols."""
        for symbol in self.symbols:
            # Subscribe to ticker updates
            ticker_topic = f'/market/ticker:{symbol}'
            self.connections[f'{symbol}_ticker'] = self.ws_client._socket(
                ticker_topic, 
                lambda msg, s=symbol: self.handle_ticker(msg, s)
            )
            
            # Subscribe to level2 order book updates
            orderbook_topic = f'/market/level2:{symbol}'
            self.connections[f'{symbol}_orderbook'] = self.ws_client._socket(
                orderbook_topic,
                lambda msg, s=symbol: self.handle_orderbook(msg, s)
            )
    
    def handle_ticker(self, message, symbol):
        """Handle ticker updates."""
        data = json.loads(message)
        if data['type'] == 'message':
            ticker = data['data']
            self.data_buffer[f'{symbol}_ticker'] = {
                'symbol': symbol,
                'price': float(ticker['price']),
                'change': float(ticker['changeRate']),
                'volume': float(ticker['vol']),
                'timestamp': ticker['time']
            }
            self.process_ticker_update(symbol)
    
    def handle_orderbook(self, message, symbol):
        """Handle order book updates."""
        data = json.loads(message)
        if data['type'] == 'message':
            ob_data = data['data']
            self.data_buffer[f'{symbol}_orderbook'] = {
                'symbol': symbol,
                'bids': ob_data['bids'][:5],  # Top 5 bids
                'asks': ob_data['asks'][:5],  # Top 5 asks
                'timestamp': ob_data['time']
            }
            self.process_orderbook_update(symbol)
    
    def process_ticker_update(self, symbol):
        """Process ticker data for analysis."""
        ticker = self.data_buffer.get(f'{symbol}_ticker')
        if ticker:
            print(f"{symbol}: ${ticker['price']} ({ticker['change']:+.2%})")
    
    def process_orderbook_update(self, symbol):
        """Process order book data."""
        ob = self.data_buffer.get(f'{symbol}_orderbook')
        if ob and ob['bids'] and ob['asks']:
            spread = float(ob['asks'][0][0]) - float(ob['bids'][0][0])
            spread_pct = spread / float(ob['bids'][0][0]) * 100
            print(f"{symbol} Spread: ${spread:.4f} ({spread_pct:.3f}%)")
    
    def get_market_snapshot(self):
        """Get current market snapshot."""
        snapshot = {}
        for symbol in self.symbols:
            ticker_key = f'{symbol}_ticker'
            ob_key = f'{symbol}_orderbook'
            
            if ticker_key in self.data_buffer and ob_key in self.data_buffer:
                ticker = self.data_buffer[ticker_key]
                orderbook = self.data_buffer[ob_key]
                
                snapshot[symbol] = {
                    'price': ticker['price'],
                    'change_24h': ticker['change'],
                    'volume_24h': ticker['volume'],
                    'bid': float(orderbook['bids'][0][0]) if orderbook['bids'] else None,
                    'ask': float(orderbook['asks'][0][0]) if orderbook['asks'] else None,
                    'spread': None
                }
                
                if snapshot[symbol]['bid'] and snapshot[symbol]['ask']:
                    snapshot[symbol]['spread'] = snapshot[symbol]['ask'] - snapshot[symbol]['bid']
        
        return snapshot
    
    def close(self):
        """Close all connections."""
        self.ws_client.close()

# Usage
symbols = ['BTC-USDT', 'ETH-USDT', 'ADA-USDT']
streamer = KuCoinStreamer(symbols)
streamer.start_streaming()

# Stream for 5 minutes
time.sleep(300)

# Get final snapshot
snapshot = streamer.get_market_snapshot()
print("\nMarket Snapshot:")
for symbol, data in snapshot.items():
    print(f"{symbol}: ${data['price']} | Spread: ${data['spread']:.4f}")

streamer.close()

Real-Time Trading Bot Integration

class TradingBot:
    def __init__(self, api_key, api_secret, api_passphrase, symbols):
        self.ws_client = KucoinWsClient(api_key, api_secret, api_passphrase)
        self.symbols = symbols
        self.market_data = {}
        self.positions = {}
        
    def start_monitoring(self):
        """Start monitoring market data and account updates."""
        # Monitor market data
        for symbol in self.symbols:
            topic = f'/market/ticker:{symbol}'
            self.ws_client._socket(topic, self.handle_market_data)
        
        # Monitor account balance changes
        self.ws_client._socket('/account/balance', self.handle_balance_update, is_private=True)
        
        # Monitor order execution
        self.ws_client._socket('/spotMarket/tradeOrders', self.handle_order_update, is_private=True)
    
    def handle_market_data(self, message):
        """Process market data for trading decisions."""
        data = json.loads(message)
        if data['type'] == 'message':
            ticker = data['data']
            symbol = ticker['symbol']
            price = float(ticker['price'])
            
            # Store market data
            self.market_data[symbol] = {
                'price': price,
                'change': float(ticker['changeRate']),
                'timestamp': ticker['time']
            }
            
            # Check for trading opportunities
            self.check_trading_signals(symbol, price)
    
    def handle_balance_update(self, message):
        """Handle account balance changes."""
        data = json.loads(message)
        if data['type'] == 'message':
            balance_data = data['data']
            currency = balance_data['currency']
            available = float(balance_data['available'])
            
            print(f"Balance Update: {currency} = {available}")
            
            # Update position tracking
            if currency in self.positions:
                self.positions[currency]['balance'] = available
    
    def handle_order_update(self, message):
        """Handle order execution updates."""
        data = json.loads(message)
        if data['type'] == 'message':
            order_data = data['data']
            
            print(f"Order Update: {order_data['symbol']} - {order_data['status']}")
            
            if order_data['status'] == 'match':
                # Order was filled
                self.on_order_filled(order_data)
    
    def check_trading_signals(self, symbol, price):
        """Check for trading opportunities."""
        # Implement your trading logic here
        # This is just a simple example
        
        if symbol not in self.market_data:
            return
        
        # Simple momentum strategy example
        change = self.market_data[symbol]['change']
        
        if change > 0.05:  # Price up 5%
            print(f"Strong upward momentum detected for {symbol}")
            # Consider buying logic here
            
        elif change < -0.05:  # Price down 5%
            print(f"Strong downward momentum detected for {symbol}")
            # Consider selling logic here
    
    def on_order_filled(self, order_data):
        """Handle order fill events."""
        symbol = order_data['symbol']
        side = order_data['side']
        size = float(order_data['dealSize'])
        price = float(order_data['dealFunds']) / size if size > 0 else 0
        
        print(f"Order Filled: {side} {size} {symbol} at ${price}")
        
        # Update position tracking
        base_currency = symbol.split('-')[0]
        quote_currency = symbol.split('-')[1]
        
        if side == 'buy':
            # Bought base currency with quote currency
            self.update_position(base_currency, size, price)
        else:
            # Sold base currency for quote currency
            self.update_position(base_currency, -size, price)
    
    def update_position(self, currency, size_change, price):
        """Update position tracking."""
        if currency not in self.positions:
            self.positions[currency] = {'size': 0, 'avg_price': 0}
        
        pos = self.positions[currency]
        
        if pos['size'] == 0:
            # New position
            pos['size'] = size_change
            pos['avg_price'] = price
        else:
            # Update existing position
            total_value = pos['size'] * pos['avg_price'] + size_change * price
            pos['size'] += size_change
            
            if pos['size'] != 0:
                pos['avg_price'] = total_value / pos['size']
            else:
                pos['avg_price'] = 0

# # Usage (commented out for example)
# bot = TradingBot(api_key, api_secret, api_passphrase, ['BTC-USDT', 'ETH-USDT'])
# bot.start_monitoring()

Types

WebSocketToken = dict
# {
#   "token": str,            # Connection token
#   "instanceServers": list, # Server endpoints
#   "pingInterval": int,     # Ping interval in ms
#   "pingTimeout": int       # Ping timeout in ms
# }

WebSocketMessage = dict
# {
#   "id": str,               # Message ID
#   "type": str,             # Message type ('message', 'welcome', 'ping', 'pong')
#   "topic": str,            # Subscription topic
#   "subject": str,          # Message subject
#   "data": dict             # Payload data
# }

TickerData = dict
# {
#   "symbol": str,           # Trading symbol
#   "sequence": str,         # Sequence number
#   "price": str,            # Last price
#   "size": str,             # Last size
#   "bestAsk": str,          # Best ask price
#   "bestAskSize": str,      # Best ask size
#   "bestBid": str,          # Best bid price
#   "bestBidSize": str,      # Best bid size
#   "time": int              # Timestamp
# }

OrderBookData = dict
# {
#   "symbol": str,           # Trading symbol
#   "sequence": str,         # Sequence number
#   "asks": list,            # Ask orders [[price, size], ...]
#   "bids": list,            # Bid orders [[price, size], ...]
#   "time": int              # Timestamp
# }

AccountBalanceData = dict
# {
#   "accountId": str,        # Account ID
#   "currency": str,         # Currency
#   "total": str,            # Total balance
#   "available": str,        # Available balance
#   "holds": str,            # Held balance
#   "relationEvent": str,    # Event that caused change
#   "relationEventId": str,  # Event ID
#   "time": int              # Timestamp
# }

OrderUpdateData = dict
# {
#   "symbol": str,           # Trading symbol
#   "orderType": str,        # Order type
#   "side": str,             # Order side
#   "orderId": str,          # Order ID
#   "type": str,             # Update type
#   "orderTime": int,        # Order timestamp
#   "size": str,             # Order size
#   "filledSize": str,       # Filled size
#   "price": str,            # Order price
#   "clientOid": str,        # Client order ID
#   "remainSize": str,       # Remaining size
#   "status": str,           # Order status
#   "ts": int                # Update timestamp
# }

Install with Tessl CLI

npx tessl i tessl/pypi-kucoin-python

docs

account-management.md

earn-products.md

index.md

lending.md

margin-trading.md

market-data.md

spot-trading.md

websocket.md

tile.json