WebSocket

Real-time market data via WebSocket connections

WebSocket

The WebSocket client provides real-time streaming of market data including prices, trades, and orderbook updates.

Access

from dflow import DFlowClient
 
client = DFlowClient()
ws = client.ws

Connection

Async Context Manager

async with client.ws as ws:
    await ws.subscribe_prices(["market-id"])
    async for price in ws.prices():
        print(price)

Manual Connection

await client.ws.connect()
# ... use the connection
await client.ws.disconnect()

Subscriptions

subscribe_prices

Subscribe to price updates for specified markets.

await ws.subscribe_prices(market_ids: list[str])

subscribe_trades

Subscribe to trade updates for specified markets.

await ws.subscribe_trades(market_ids: list[str])

subscribe_orderbook

Subscribe to orderbook updates for specified markets.

await ws.subscribe_orderbook(market_ids: list[str])

unsubscribe

Unsubscribe from updates.

await ws.unsubscribe(market_ids: list[str])

Event Handlers

Callback Style

ws.on_price(callback: Callable[[PriceUpdate], None])
ws.on_trade(callback: Callable[[TradeUpdate], None])
ws.on_orderbook(callback: Callable[[OrderbookUpdate], None])
ws.on_error(callback: Callable[[Exception], None])

Example

def handle_price(price: PriceUpdate):
    print(f"Price update: {price.market_id} = ${price.yes_price}")
 
def handle_error(error: Exception):
    print(f"WebSocket error: {error}")
 
client.ws.on_price(handle_price)
client.ws.on_error(handle_error)
 
await client.ws.connect()
await client.ws.subscribe_prices(["market-1", "market-2"])

Async Iteration

async with client.ws as ws:
    await ws.subscribe_prices(["market-id"])
    
    async for update in ws.messages():
        if isinstance(update, PriceUpdate):
            print(f"Price: {update.yes_price}")
        elif isinstance(update, TradeUpdate):
            print(f"Trade: {update.size} @ {update.price}")

Complete Example

import asyncio
from dflow import DFlowClient, PriceUpdate, TradeUpdate
 
async def main():
    client = DFlowClient()
    
    async with client.ws as ws:
        # Subscribe to multiple data types
        await ws.subscribe_prices(["BTCUSD-25JAN-100000"])
        await ws.subscribe_trades(["BTCUSD-25JAN-100000"])
        
        # Process updates
        async for update in ws.messages():
            if isinstance(update, PriceUpdate):
                print(f"[PRICE] {update.market_id}: Yes=${update.yes_price}")
            elif isinstance(update, TradeUpdate):
                print(f"[TRADE] {update.side} {update.size} @ ${update.price}")
 
if __name__ == "__main__":
    asyncio.run(main())

Types

PriceUpdate

class PriceUpdate(BaseModel):
    market_id: str
    yes_price: float
    no_price: float
    timestamp: str

TradeUpdate

class TradeUpdate(BaseModel):
    market_id: str
    price: float
    size: float
    side: str
    timestamp: str

OrderbookUpdate

class OrderbookUpdate(BaseModel):
    market_id: str
    bids: list[OrderbookLevel]
    asks: list[OrderbookLevel]
    timestamp: str

Configuration

WebSocketOptions

from dflow import WebSocketOptions
 
options = WebSocketOptions(
    reconnect=True,
    reconnect_interval=5000,  # ms
    max_reconnect_attempts=10,
    ping_interval=30000  # ms
)
 
client = DFlowClient(websocket_options=options)

On this page