Binance Websocket @aggTrade

Hallo zusammen,

ich möchte die @aggTrade mit Hilfe von Websockets für den 15 Minuten Zeitintervall überwachen. In den 15 Minuten sollen die Käufe und/oder Verkäufe akkumuliert werden. Nach den 15 Minuten soll eine automatische bereinigung statt finden und die Berechnung fängt von vorne an.
Mein Problem ist es: wenn die WebsocketVerbindung unterbrochen wird (Timeout, sent 1011 (internal error) keepalive ping timeout; no close frame received), werden die akkumilierten Daten auch verloren gehen und somit habe ich keinen aktuellen Stand.

Hat jemand von Euch eine Lösung für das Problem?

Hey,
As outlined in the documentation, you must respond to a ping frame sent by the server within 10 minutes to maintain the connection. Use the pong frame to keep the connection alive: Web Socket API | Binance Open Platform

Hallo,

danke für die Rückmeldung!
Das habe ich bereits in dem Script eingegeben.
Trotzdem werden viele Verbindung geschlossen.

async with websockets.connect(ticker_url, ping_interval=20, ping_timeout=20, close_timeout=30) as ticker_ws,
websockets.connect(trade_url, ping_interval=20, ping_timeout=20, close_timeout=30) as trade_ws:

Do I understand it correctly?

You want to open a connection, subscribe to <symbol>@aggTrade, wait for 15 minutes, and only then receive buffered events?

Hi,

Not quite. The script should not wait until the 15-minute period ends and then receive the events, but rather monitor continuously.

The idea was: continuous monitoring of the accumulated volume values of buy or sell trades within a 15-minute time interval. The events should be displayed immediately, regardless of whether the 15 minutes have passed or not. For example: I am interested in buy or sell trades that are larger than $100,000. When I start the WebSocket connection, the script begins to calculate the accumulated volume values of buy or sell trades. But if the connection suddenly breaks, the collected data is also lost, and the script starts the calculation again after the connection is re-established. Then I have several problems at the same time: The calculation is not correct, and the 15-minute time interval is not correct.

I hope I was able to explain it reasonably well :slight_smile:

I see. That’s the correct approach then :+1: I was afraid you were not receiving data from the connection for extended time which can cause timeout and disconnect.

Another common reason for such timeouts is too much data. If the client is not able to keep up with the stream, the backlog of messages will grow which eventually leads to issues. The client can send its ping, and the server responded with a pong, but there are hundreds of thousands of messages yet to receive and process before you get to that response.

You can monitor the difference between the current time and "E" or "T" in the events.

I am not quite sure what do you mean by that?

If it’s your script discarding data because of the exception then don’t do that :slight_smile: Handle exceptions.

If you mean that you’re losing all events that happened while you were disconnected, then yeah, that’s natural. You can query the missing aggtrades via REST API, for example: GET /api/v3/aggTrade. Note the last "a" you received, then the first one you get after reconnection, and query the missing ones via the API.

By the way, if you’re interested only in total volume over 15 minutes, not individual trades, and can tolerate 1-second latency, then you might want to look into <symbol>@kline_15m streams.

Correct, the events collected up to the disconnection are lost. I’m not sure if I can retrieve the missing aggtrades via the REST API. The reason is that I monitor all USDT symbols on Binance. And that can, of course, exceed the number of requests allowed via the REST API. So, I built logic into the script to store the accumulated values as temporary data every 10 minutes. After 10 minutes, the temporary data is deleted. But it doesn’t work so well yet :slight_smile:

What do you mean by that?

By the way, if you’re interested only in total volume over 15 minutes, not individual trades, and can tolerate 1-second latency, then you might want to look into <symbol>@kline_15m streams.
I understand what you mean. Nevertheless, I actually want to monitor the buy and sell volumes in USDT :slight_smile:

P.S.:I feared that the number of USDT symbols might be causing the connection to break. Nevertheless, I want to monitor all USDT symbols.

O-o-oh! Yes, in this case you definitely need a bigger boat, so to speak. I’m afraid listening to too many streams using too few connections is exactly what’s causing the overload.

  • Use multiple connections, with smaller subset of stream subscriptions in each. Keep scaling until you can keep up.
  • Do minimum possible work in WebSocket listeners between receiving events. Process events asynchronously.

Compare time when you receive events with "E" (time when event is generated). If the delay keeps growing, you are not keeping up with the even stream. You need to reduce the number of subscriptions per connection. Or process the events faster.

Kline events include base ("v") and quote asset volume ("q"). You can use "q" to get the trading volume in USDT.

It’s more of a question do you need per-trade resolution in real-time, or is 2-second delay is good enough for you. The event rate on kline streams is bounded, making it easier to keep up with them.

Thank you very much for the detailed explanation! :handshake:

I already used that in my script:

def main():
usdt_symbols = fetch_usdt_symbols()
num_processes = 4
chunk_size = len(usdt_symbols) // num_processes

with Manager() as manager:
    shared_data = manager.dict()

    processes = []
    for i in range(num_processes):
        start_idx = i * chunk_size
        end_idx = (i + 1) * chunk_size if i != num_processes - 1 else len(usdt_symbols)
        symbols_subset = usdt_symbols[start_idx:end_idx]
        p = Process(target=monitor_symbols_subset, args=(symbols_subset, shared_data))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

Kline events include base ("v") and quote asset volume ("q"). You can use "q" to get the trading volume in USDT.

It’s more of a question do you need per-trade resolution in real-time, or is 2-second delay is good enough for you. The event rate on kline streams is bounded, making it easier to keep up with them.

By default, Kline streams provide aggregated trading data in the form of candles (open, high, low, close) over a certain period of time, but they do not directly indicate the buy or sell volume separately.

But it is important for me to distinguish whether it was a buy or a sell :slight_smile:

Nice! :+1: That covers sharding. You might want to try increasing the number of processes, see if that helps.

What about processing itself? If you want to ensure you never lose the connection, you’d want to do absolute minimum in the process that receives from the socket: just add the received buffer to the queue and that’s it. No processing, likely no parsing either; that should happen on some other thread.

Also, remember that past certain scale you might need to graduate from Python.

Kline events include both total volume as well as taker buy volume. Subtract buy volume from total volume to get sell volume.

A very crude but effective solution that i implemented sometime back when I was constatnly getting discoonection was to save the information in csv file and put a counter, whenever the counter is 1, the csv file is open, read and populate the data in the list… for example if you create a list where you are storing data for 15minutes time window… if diconnection happens data will be stored in the csv file.
On reconnection data will be read from csv file, and uploaded in your list of data and your calculation will continue…
But I have a feeling that I am missing something here, which you might have tried to explain in your query…
Do you think this would help or some modified form of this… insted of csv file you can use db…

Hello,

Thank you very much for the answer!

I actually tried the database, but unfortunately it didn’t help much. The values ​​were wrong.
I’m currently using the data caching in the script as a buffer based on deque and defaultdict to store trading data for each symbol. This doesn’t work properly either. The data is displayed incorrectly. Either the accumulated values ​​are too low or too high than they actually are. When I compare it with the stock exchange. A few hundred dollars is no problem, but in some cases it is more than 10,000 dollars

class Trades:
def init(self, buffer_time):
self.buffer_time = buffer_time * 60 * 1000
self.trades_buffer = defaultdict(deque)

async def remove_old_trades(self):
    while True:
        current_time = time.time() * 1000
        for symbol in list(self.trades_buffer.keys()):
            while self.trades_buffer[symbol] and self.trades_buffer[symbol][0]['T'] < current_time - self.buffer_time:
                self.trades_buffer[symbol].popleft()
        await asyncio.sleep(2)

def accumulate_volume(self, symbol, trade_data):
 
    self.trades_buffer[symbol].append(trade_data)
    # Kauf- und Verkaufsvolumen basierend auf den gepufferten Daten berechnen
    buy_volume = sum(float(t['q']) * float(t['p']) for t in self.trades_buffer[symbol] if not t['m'])
    sell_volume = sum(float(t['q']) * float(t['p']) for t in self.trades_buffer[symbol] if t['m'])
    return buy_volume, sell_volume

Looks like you want to compute the volume over a sliding window, similar to ticker events produce. However, those are computed over 24 hours, not 1 minute as in this snippet. There is no 1-minute ticker streams though. And 1-minute klines are not exactly that either.

What are the “actual” values you compare to?

One optimization that I can suggest is to adjust the volume when you add and remove trades, not recompute it from scratch. It looks like you iterate over all accumulated trades every time you add a new one. That makes computation effectively quadratic. Instead

  • when you add a new trade, compute its volume and add it to the total volume
  • when you remove an old trade, compute its volume and subtract it from the total volume

That way you can maintain the volume for a sliding window more efficiently.

The “actual” accumulated values can be taken as an example from coinglass.com by simply analyzing the 15-minute time interval. When the values are accumulated, they must continuously increase. However, in the bot.log file, I observed that the accumulated values are not correctly stored (so to speak).

Example: First 5 minutes (connection stable):

3 trades are recorded: Trade 1: Purchase of 50,000 USDT Trade 2: Purchase of 30,000 USDT Trade 3: Sale of 40,000 USDT Accumulation after 5 minutes: Purchase volume = 80,000 USDT, Sale volume = 40,000 USDT

Connection loss (for 2 minutes):

During this time, we miss all trades that could have been recorded. Important: The previously collected data (Purchase = 80,000 USDT, Sale = 40,000 USDT) remains in memory. Connection restoration:

After the restoration, new trades are immediately recorded. Let’s say the following new trades are recorded:

Trade 4: Sale of 60,000 USDT Trade 5: Purchase of 50,000 USDT Accumulation after another 8 minutes: Purchase volume = 130,000 USDT (80,000 + 50,000), Sale volume = 100,000 USDT (40,000 + 60,000).

Result after 15 minutes:

At the end of the 15 minutes, we have correctly accumulated: Purchase = 130,000 USDT, Sale = 100,000 USDT.

This is the ideal case. The reality looks different: either duplicate values, or after the connection loss, the data is lost and recalculated from scratch within the 15 minutes, showing lower values than the “actual” ones.

Thank you! I tried it, but I still have error values. I will now try to reduce the symbols and increase the multiprocesses.

I need to try to avoid connection drops.