Issues with processing depth data for Multicurrencies

Hi guys,

I´ve already been working on this for 2 days and couldn´t resolve the problem. I have a similar script and when I isolate the script for one pair let´s say BTCUSDT everything is fine. Depth and Trade data coming in and they are processed and saved on my drive as it should. But when I adjust the script for multipairs the script simply can´t process the incoming depth data .

I have included printing error messages especially in the part of process depth data and this came from my prompt:

Received depth message for zrxusdt: {“lastUpdateId”:1994301986,“bids”:[[“0.78610000”,“414.00000000”],[“0.78600000”,“670.00000000”],["0.7…
Missing ‘b’ or ‘a’ in depth data for zrxusdt: {‘lastUpdateId’: 1994301986, ‘bids’: [[‘0.78610000’, ‘414.00000000’], [‘0.78600000’, ‘670.00000000’], [‘0.78590000’, ‘1849.00000000’], [‘0.78580000’, ‘1605.00000000’], [‘0.78570000’, ‘6839.00000000’]], ‘asks’: [[‘0.78630000’, ‘896.00000000’], [‘0.78640000’, ‘15.00000000’], [‘0.78650000’, ‘222.00000000’], [‘0.78660000’, ‘2018.00000000’], [‘0.78670000’, ‘431.00000000’]]}
Processed trade data for zrxusdt

It says b or a is missing which can´t be true because I have fetched the JSON format data for btcusdt and everything is there. So for some reasons the code can´t recognize a and b in the depth data. Accoprding to printings everything is coming in correctly but it just can´t be processed. The script generates depth data files as it does for trade data but of course the depth files have no content because of the problem.

I am new to this and working with passion on this and would appreciate your help.

import json
import asyncio
import aiohttp
import aiofiles
import csv
import os
from concurrent.futures import ThreadPoolExecutor

async def process_trade_data(data, symbol, folder_path):
    filepath_trade = os.path.join(folder_path, f"{symbol}_trade_data.csv")
    async with aiofiles.open(filepath_trade, mode='a', newline='') as file:
        writer = csv.writer(file)
        await writer.writerow([data['E'], symbol, data['p'], data['q']])
        print(f"Processed trade data for {symbol}")

async def process_depth_data(data, symbol, folder_path):
    filepath_depth = os.path.join(folder_path, f"{symbol}_depth_data.csv")
    try:
        async with aiofiles.open(filepath_depth, mode='a') as file:
            bids = data.get('b', [])
            asks = data.get('a', [])
            print(f"Bids for {symbol}: {bids}, Asks for {symbol}: {asks}")  # New debug print

            for bid in bids:
                row = f"{data.get('E', 'NA')},{symbol},'bid',{','.join(bid)}\n"
                await file.write(row)
            for ask in asks:
                row = f"{data.get('E', 'NA')},{symbol},'ask',{','.join(ask)}\n"
                await file.write(row)
            print(f"Processed depth data for {symbol}")

    except Exception as e:
        print(f"Error processing depth data for {symbol}: {e}")

async def fetch_data(session, symbol, folder_path, executor):
    trade_url = f"wss://stream.binance.com:9443/ws/{symbol}@trade"
    depth_url = f"wss://stream.binance.com:9443/ws/{symbol}@depth5@100ms"

    async with session.ws_connect(trade_url) as trade_ws, session.ws_connect(depth_url) as depth_ws:
        print(f"Connected to {symbol} trade and depth streams")
        while True:
            trade_msg = await trade_ws.receive()
            depth_msg = await depth_ws.receive()

            if trade_msg.type == aiohttp.WSMsgType.TEXT:
                trade_data = json.loads(trade_msg.data)
                print(f"Received trade message for {symbol}: {trade_msg.data[:100]}...")
                loop = asyncio.get_running_loop()
                asyncio.run_coroutine_threadsafe(process_trade_data(trade_data, symbol, folder_path), loop)

            if depth_msg.type == aiohttp.WSMsgType.TEXT:
                depth_data = json.loads(depth_msg.data)
                print(f"Received depth message for {symbol}: {depth_msg.data[:100]}...")
                loop = asyncio.get_running_loop()
                asyncio.run_coroutine_threadsafe(process_depth_data(depth_data, symbol, folder_path), loop)

def fetch_trading_pairs(folder_path, limit=10):
    trading_pairs = []
    for filename in os.listdir(folder_path):
        if '_historical_data.csv' in filename:
            pair_name = filename.replace('_historical_data.csv', '').lower()
            trading_pairs.append(pair_name)
            if len(trading_pairs) >= limit:
                break
    return trading_pairs

async def main():
    folder_path = "G:\\My Drive\\Binance_Testrun_Realtime"
    os.makedirs(folder_path, exist_ok=True)
    trading_pairs = fetch_trading_pairs('G:\\My Drive\\Binance_USDT')
    stop = asyncio.Event()

    executor = ThreadPoolExecutor(max_workers=10)  # Adjust max_workers as needed
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_data(session, symbol, folder_path, executor) for symbol in trading_pairs]
        await asyncio.gather(*tasks)

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("Script stopped by user.")


Kr

Guys I have an updated code now and it processes finally the depth data but when I checked the depth data for some reasons it doesn´t get the timestamp but everything else is available. I am very close now I guess.

Here the code:

import json
import asyncio
import aiohttp
import csv
import os
from concurrent.futures import ThreadPoolExecutor

async def process_trade_data(data, symbol, folder_path):
    filepath_trade = os.path.join(folder_path, f"{symbol}_trade_data.csv")
    print(f"Processing trade data for {symbol}")
    with open(filepath_trade, 'a', newline='') as file:
        writer = csv.writer(file)
        writer.writerow([data['E'], data['s'], data['p'], data['q']])

async def process_depth_data(data, symbol, folder_path):
    filepath_depth = os.path.join(folder_path, f"{symbol}_depth_data.csv")
    try:
        print(f"Attempting to process depth data for {symbol}")
        print(f"Depth data for {symbol}: {data}")
        
        bids = data.get('bids')
        asks = data.get('asks')
        print(f"'bids' content for {symbol}: {bids}, 'asks' content for {symbol}: {asks}")

        with open(filepath_depth, 'a', newline='') as file:
            writer = csv.writer(file)
            if bids is not None and asks is not None:
                for bid in bids:
                    writer.writerow([data.get('E', 'NA'), symbol, 'bid'] + bid)
                for ask in asks:
                    writer.writerow([data.get('E', 'NA'), symbol, 'ask'] + ask)
                print(f"Processed depth data for {symbol}")
            else:
                print(f"Incomplete depth data received for {symbol}: {data}")
    except Exception as e:
        print(f"Error processing depth data for {symbol}: {e}")

# Function to fetch data for each pair
async def fetch_data(session, symbol, folder_path, executor):
    trade_url = f"wss://stream.binance.com:9443/ws/{symbol}@trade"
    depth_url = f"wss://stream.binance.com:9443/ws/{symbol}@depth5@100ms"
    loop = asyncio.get_running_loop()

    print(f"Starting WebSocket connection for {symbol}")
    async with session.ws_connect(trade_url) as trade_ws, session.ws_connect(depth_url) as depth_ws:
        print(f"WebSocket connected for {symbol}")
        while True:
            trade_msg = await trade_ws.receive()
            depth_msg = await depth_ws.receive()

            print(f"Received message for {symbol}. Trade message type: {trade_msg.type}, Depth message type: {depth_msg.type}")

            if trade_msg.type == aiohttp.WSMsgType.TEXT:
                trade_data = json.loads(trade_msg.data)
                asyncio.run_coroutine_threadsafe(process_trade_data(trade_data, symbol, folder_path), loop)

            if depth_msg.type == aiohttp.WSMsgType.TEXT:
                depth_data = json.loads(depth_msg.data)
                asyncio.run_coroutine_threadsafe(process_depth_data(depth_data, symbol, folder_path), loop)

# Function to get trading pairs
def fetch_trading_pairs(folder_path, limit=5):  # Limiting the number of pairs for testing
    trading_pairs = []
    for filename in os.listdir(folder_path):
        if '_historical_data.csv' in filename:
            pair_name = filename.replace('_historical_data.csv', '').lower()
            trading_pairs.append(pair_name)
            if len(trading_pairs) >= limit:
                break
    return trading_pairs

# Main function
async def main():
    folder_path = "G:\\My Drive\\Binance_Testrun_Realtime"
    os.makedirs(folder_path, exist_ok=True)

    trading_pairs = fetch_trading_pairs('G:\\My Drive\\Binance_USDT')
    executor = ThreadPoolExecutor(max_workers=10)

    print(f"Starting data collection for {len(trading_pairs)} pairs")
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_data(session, symbol, folder_path, executor) for symbol in trading_pairs]
        await asyncio.gather(*tasks)
    print("Data collection completed")

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("Script stopped by user.")


1 Like