Overview
I am conducting market activity analysis and facing a challenge with processing Binance WebSocket aggregate trade data. When receiving data via WebSocket, market trades are split into many chunks.
Problem Description
For example, looking at ALPHAUSDT trades, we see a single trade of 37,101 units at 20:58:41. However, in the WebSocket stream, this trade comes in multiple separate chunks with varying parameters. The challenge is to correctly identify and merge these chunks into the original trades.
Current Implementation
@dataclass
class TradeChunk:
price: float
quantity: float
timestamp: int
is_maker_buyer: bool
f_id: int
l_id: int
@dataclass
class Trade:
price: float
quantity: float
trade_value: float
timestamp: int
is_maker_buyer: bool
@dataclass
class PendingTrade:
chunks: List[TradeChunk] = field(default_factory=list)
def to_trade(self):
last_chunk = self.chunks[-1]
total_quantity = sum(chunk.quantity for chunk in self.chunks)
total_value = sum(chunk.price * chunk.quantity for chunk in self.chunks)
return Trade(price=last_chunk.price, quantity=total_quantity, trade_value=total_value,
timestamp=last_chunk.timestamp, is_maker_buyer=last_chunk.is_maker_buyer)
class TradeProcessor:
def __init__(self):
self.pending_chunks: dict = {}
self.processing_lock = asyncio.Lock()
@staticmethod
def should_append_to_trade(chunk: TradeChunk, last_chunk: Optional[TradeChunk]) -> bool:
if not last_chunk:
return True
# here should be the logic for determining if a chunk belongs to the current trade
def create_trade(self, symbol: str, chunk: TradeChunk) -> Optional[PendingTrade]:
if symbol not in self.pending_chunks:
self.pending_chunks[symbol] = PendingTrade()
last_chunk = self.pending_chunks[symbol].chunks[-1] if self.pending_chunks[symbol].chunks else None
if self.should_append_to_trade(chunk, last_chunk):
self.pending_chunks[symbol].chunks.append(chunk)
return None
pending_trade = self.pending_chunks.pop(symbol)
self.pending_chunks[symbol] = PendingTrade([chunk])
return pending_trade
async def handle_message(self, message: dict, market_type: str):
try:
data, symbol = message["data"], message["data"]["s"]
chunk = TradeChunk(price=float(data["p"]), quantity=float(data["q"]), timestamp=data["T"],
is_maker_buyer=data["m"], f_id=data["f"], l_id=data["l"])
async with self.processing_lock:
pending_trade = self.create_trade(symbol, chunk)
if pending_trade:
logger.info(f"Processing trade with quantity {pending_trade.to_trade().quantity} at time {datetime.fromtimestamp(pending_trade.to_trade().timestamp / 1000).strftime('%H:%M:%S')}")
except Exception as e:
logger.error(f"Error processing trade message: {e}", exc_info=True)
Currently, I group chunks using time intervals. This works for low-activity pairs like ALPHA where there are clear time gaps between different trades. However, for high-activity pairs like WLDUSDT with 20-50 chunks per second from different trades, this approach fails.
Challenges with Parameters
Binance WebSocket API provides aggregated trades streams in the following format:
{
"e": "aggTrade", // Event type
"E": 123456789, // Event time
"s": "BTCUSDT", // Symbol
"a": 5933014, // Aggregate trade ID
"p": "0.001", // Price
"q": "100", // Quantity
"f": 100, // First trade ID
"l": 105, // Last trade ID
"T": 123456785, // Trade time
"m": true, // Is the buyer the market maker?
}
- Event type (e): Does not provide information for grouping, just indicates the type of event.
- Event time (E): Can vary significantly between chunks of the same transaction. The difference between chunks of a single trade may be greater than between chunks of different trades, making timestamps an unreliable criterion for grouping.
- Symbol (s): Only identifies a trading pair, does not help in chunk grouping.
- Aggregate trade ID (a): Unique for each chunk, even within a single trade, making it useless for grouping.
- Price (p): Can vary between chunks of the same trade, as an order can be executed at different price levels.
- Quantity (q): Represents only a portion of the total volume of a trade, does not give information about belonging to a specific trade.
- First/Last trade ID (f/l): Always consistent, even between different trades. The ID of the last trade of one chunk is always one less than the ID of the first trade of the next chunk, regardless of trade affiliation.
- Trade time (T): Like Event time, can vary between chunks of the same trade and does not provide a reliable criterion for grouping.
- Is buyer maker (m): Can vary between chunks of the same trade, since a large order can be executed against orders of different market participants.
Creating individual time intervals for each trading pair is not a solution either, as:
- Trading activity constantly changes
- Requires manual parameter adjustments
- Needs continuous market monitoring
- Not scalable for 200+ trading pairs
Question
I need a universal algorithm that can correctly group chunks belonging to the same trade, regardless of the trading pair’s activity level. Either that, or perhaps I’m misinterpreting the API’s functionality. Are there any patterns or parameters in the API data that could help solve this challenge? Or is there anything else I should be using?
Any insights or suggestions would be greatly appreciated