Hi guys,
I´ve already been working on this for 2 days and couldn´t resolve the problem. I have a similar script and when I isolate the script for one pair let´s say BTCUSDT everything is fine. Depth and Trade data coming in and they are processed and saved on my drive as it should. But when I adjust the script for multipairs the script simply can´t process the incoming depth data .
I have included printing error messages especially in the part of process depth data and this came from my prompt:
Received depth message for zrxusdt: {“lastUpdateId”:1994301986,“bids”:[[“0.78610000”,“414.00000000”],[“0.78600000”,“670.00000000”],["0.7…
Missing ‘b’ or ‘a’ in depth data for zrxusdt: {‘lastUpdateId’: 1994301986, ‘bids’: [[‘0.78610000’, ‘414.00000000’], [‘0.78600000’, ‘670.00000000’], [‘0.78590000’, ‘1849.00000000’], [‘0.78580000’, ‘1605.00000000’], [‘0.78570000’, ‘6839.00000000’]], ‘asks’: [[‘0.78630000’, ‘896.00000000’], [‘0.78640000’, ‘15.00000000’], [‘0.78650000’, ‘222.00000000’], [‘0.78660000’, ‘2018.00000000’], [‘0.78670000’, ‘431.00000000’]]}
Processed trade data for zrxusdt
It says b or a is missing which can´t be true because I have fetched the JSON format data for btcusdt and everything is there. So for some reasons the code can´t recognize a and b in the depth data. Accoprding to printings everything is coming in correctly but it just can´t be processed. The script generates depth data files as it does for trade data but of course the depth files have no content because of the problem.
I am new to this and working with passion on this and would appreciate your help.
import json
import asyncio
import aiohttp
import aiofiles
import csv
import os
from concurrent.futures import ThreadPoolExecutor
async def process_trade_data(data, symbol, folder_path):
filepath_trade = os.path.join(folder_path, f"{symbol}_trade_data.csv")
async with aiofiles.open(filepath_trade, mode='a', newline='') as file:
writer = csv.writer(file)
await writer.writerow([data['E'], symbol, data['p'], data['q']])
print(f"Processed trade data for {symbol}")
async def process_depth_data(data, symbol, folder_path):
filepath_depth = os.path.join(folder_path, f"{symbol}_depth_data.csv")
try:
async with aiofiles.open(filepath_depth, mode='a') as file:
bids = data.get('b', [])
asks = data.get('a', [])
print(f"Bids for {symbol}: {bids}, Asks for {symbol}: {asks}") # New debug print
for bid in bids:
row = f"{data.get('E', 'NA')},{symbol},'bid',{','.join(bid)}\n"
await file.write(row)
for ask in asks:
row = f"{data.get('E', 'NA')},{symbol},'ask',{','.join(ask)}\n"
await file.write(row)
print(f"Processed depth data for {symbol}")
except Exception as e:
print(f"Error processing depth data for {symbol}: {e}")
async def fetch_data(session, symbol, folder_path, executor):
trade_url = f"wss://stream.binance.com:9443/ws/{symbol}@trade"
depth_url = f"wss://stream.binance.com:9443/ws/{symbol}@depth5@100ms"
async with session.ws_connect(trade_url) as trade_ws, session.ws_connect(depth_url) as depth_ws:
print(f"Connected to {symbol} trade and depth streams")
while True:
trade_msg = await trade_ws.receive()
depth_msg = await depth_ws.receive()
if trade_msg.type == aiohttp.WSMsgType.TEXT:
trade_data = json.loads(trade_msg.data)
print(f"Received trade message for {symbol}: {trade_msg.data[:100]}...")
loop = asyncio.get_running_loop()
asyncio.run_coroutine_threadsafe(process_trade_data(trade_data, symbol, folder_path), loop)
if depth_msg.type == aiohttp.WSMsgType.TEXT:
depth_data = json.loads(depth_msg.data)
print(f"Received depth message for {symbol}: {depth_msg.data[:100]}...")
loop = asyncio.get_running_loop()
asyncio.run_coroutine_threadsafe(process_depth_data(depth_data, symbol, folder_path), loop)
def fetch_trading_pairs(folder_path, limit=10):
trading_pairs = []
for filename in os.listdir(folder_path):
if '_historical_data.csv' in filename:
pair_name = filename.replace('_historical_data.csv', '').lower()
trading_pairs.append(pair_name)
if len(trading_pairs) >= limit:
break
return trading_pairs
async def main():
folder_path = "G:\\My Drive\\Binance_Testrun_Realtime"
os.makedirs(folder_path, exist_ok=True)
trading_pairs = fetch_trading_pairs('G:\\My Drive\\Binance_USDT')
stop = asyncio.Event()
executor = ThreadPoolExecutor(max_workers=10) # Adjust max_workers as needed
async with aiohttp.ClientSession() as session:
tasks = [fetch_data(session, symbol, folder_path, executor) for symbol in trading_pairs]
await asyncio.gather(*tasks)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("Script stopped by user.")
Kr