Hey guys! There is a task in front of me: I need in real time fetch candlesticks data for all Binance Futures coins and save it somewhere.
Right now I do it this way: for each Binance Futures coin I create a thread in which I make a websocket connection and then I put in database new candles messages data.
The problem is that, perhaps because of multithreading, perhaps because of the lock of working with the database, maybe because the program needs to process ALL new messages, a delay appears over time, and say, when the data for +2 minutes appeared, some coins still record candle data from 0 minutes launch. How can this be fixed?
Here is my code:
def websocket_connection(coin, exchange):
if str(exchange)=='Binance':
SOCKET = f'wss://fstream.binance.com/ws/{coin.lower()}usdt_perpetual@continuousKline_1m'
ws = websocket.WebSocketApp(SOCKET, on_message=on_message)
ws.run_forever()
def on_message(ws,msg):
connection = sqlite3.connect('smart', check_same_thread=False)
cursor = connection.cursor()
timestamp = msg.split('"T":')[1].split(',')[0]
timestamp2 = str(datetime.datetime.fromtimestamp(int(msg.split('"T":')[1].split(',')[0])/1000)).split('.')[0]
dt = datetime.datetime.strptime(timestamp2, "%Y-%m-%d %H:%M:%S")
# Format the datetime object as a string
coin = msg.split('"ps":"')[1].split('USDT')[0]
formatted_string = dt.strftime("%Y-%m-%d %H:%M")
open = float(msg.split('"o":"')[1].split('"')[0])
high = float(msg.split('"h":"')[1].split('"')[0])
low = float(msg.split('"l":"')[1].split('"')[0])
close = float(msg.split('"c":"')[1].split('"')[0])
print(formatted_string, coin, close)
with lock:
cursor.execute('SELECT * FROM Binance_candles WHERE coin = ? AND timestamp = ?', (coin, timestamp))
rows = cursor.fetchall()
candle_time = str(datetime.datetime.now())
if len(rows) == 0:
cursor.execute("""INSERT INTO Binance_candles (coin, open, high, low, close, timestamp, datetime, first_candle) VALUES (?, ?, ?, ?, ?, ?,?,?)""",(coin, open, high, low, close, timestamp, formatted_string,candle_time))
else:
cursor.execute("""UPDATE Binance_candles SET open = ?, high = ?, low = ?, close = ?, last_candle = ? WHERE coin = ? AND timestamp = ?""",(open, high, low, close,candle_time, coin, timestamp))
connection.commit()
for coin in coins:
t = threading.Thread(target=websocket_connection, args=(coin, 'Binance'))
t.start()
threads.append(t)
# wait for all threads to finish
for t in threads:
t.join()
So, I googled a lot and found that I can just use one websocket connection and multiple streams inside it. So, then I tried to do this:
cur.execute('SELECT * FROM Binance')
rows = cur.fetchall()
coins = []
for elem in rows:
coins.append(elem[0])
print(coins)
socket_url = "wss://fstream.binance.com/stream?streams="
for coin in coins:
socket_url += f"{coin.lower()}usdt_perpetual@continuousKline_1m/"
socket_url = socket_url[:-1]
print(socket_url)
ws = websocket.WebSocketApp(socket_url, on_open=on_open, on_close=on_close, on_message=on_message)
ws.run_forever()
but it again leads to nothing, maybe because of the problem with any coin inside coins (because if I create a stream for 3 or even 10 coins, but it will be BTC ETH BCH SUI and etc, everything works fine)