Trying to create Multiple Web Socket Streams to use separate streams for price, order book, and kline data to distribute load and improve responsiveness.
Below test_websockets.js file works.
const Binance = require('node-binance-api');
const log = console.log;
// Load environment variables (ensure .env has BINANCE_API_KEY and BINANCE_API_SECRET for testnet)
require('dotenv').config();
// Initialize Binance API with testnet options
const binance = new Binance().options({
APIKEY: process.env.BINANCE_API_KEY,
APISECRET: process.env.BINANCE_API_SECRET,
test: true, // Use testnet
urls: {
base: 'https://testnet.binance.vision/api/',
combineStream: 'wss://testnet.binance.vision/stream?streams=',
stream: 'wss://testnet.binance.vision/ws/'
}
});
const symbol = 'ETHUSDT';
const interval = '15m';
// Function to test WebSocket subscriptions
function testWebSockets() {
binance.websockets.terminate(symbol);
// Test bookTickers
binance.websockets.bookTickers(symbol, (data) => {
try {
log(`📡 Received bookTicker data for ${symbol}: ${JSON.stringify(data)}`);
} catch (err) {
log(`❌ Error in bookTicker handler: ${err.message}`);
}
}, (error) => {
log(`⚠️ bookTicker connection error for ${symbol}: ${error ? error.message : 'Unknown error'}`);
});
// Test candlesticks
binance.websockets.candlesticks(symbol, interval, (data) => {
try {
log(`📡 Received candlestick data for ${symbol}: ${JSON.stringify(data)}`);
} catch (err) {
log(`❌ Error in candlestick handler: ${err.message}`);
}
}, (error) => {
log(`⚠️ Candlestick connection error for ${symbol}: ${error ? error.message : 'Unknown error'}`);
});
// Test depth
binance.websockets.depth(symbol, (data) => {
try {
log(`📊 Received depth data for ${symbol}: ${JSON.stringify(data)}`);
} catch (err) {
log(`❌ Error in depth handler: ${err.message}`);
}
}, (error) => {
log(`⚠️ Depth connection error for ${symbol}: ${error ? error.message : 'Unknown error'}`);
});
}
// Start the test
log(`🔍 Starting WebSocket test for ${symbol} at ${new Date().toISOString()}`);
testWebSockets();
Screenshot is attached, but fails when used as follows :-
// ======== WEB SOCKET RECONNECTION ========
function subscribeToWebSocket() {
binance.websockets.terminate(currentSymbol);
const streams = [`${currentSymbol.toLowerCase()}@bookTicker`, `${currentSymbol.toLowerCase()}@kline_${currentInterval}`, `${currentSymbol.toLowerCase()}@depth`];
log(`🔗 Subscribing to WebSocket streams: ${streams.join(', ')}`);
binance.websockets.subscribe(streams, (data) => {
try {
if (data.e === 'kline') {
const candle = data.k;
ohlcvData.push({ time: candle.t, open: parseFloat(candle.o), high: parseFloat(candle.h), low: parseFloat(candle.l), close: parseFloat(candle.c), volume: parseFloat(candle.v) });
if (ohlcvData.length > 100) ohlcvData.shift();
} else if (data.e === 'bookTicker') {
currentPrice = validatePrice(parseFloat(data.b));
priceHistory.push({ time: new Date().toISOString(), price: currentPrice });
if (priceHistory.length > 1000) priceHistory.shift();
} else if (data.e === 'depthUpdate') {
// Handle order book depth data
}
if (!INITIAL_PRICE) {
INITIAL_PRICE = currentPrice;
basePriceBuyBOT = currentPrice;
basePriceSellBOT = currentPrice;
lowestPriceBuyBot = currentPrice;
highestPriceSellBot = currentPrice;
log(`🎯 Initial price set: ${currentPrice.toFixed(pricePrecision)}`);
}
updateTrailingStops();
const dataScore = calculateDataScore();
if (lowestPriceBuyBot > currentPrice) {
lowestPriceBuyBot = currentPrice;
log(`🔄 BuyBOT: Updated lowestPriceBuyBot to ${lowestPriceBuyBot.toFixed(pricePrecision)}`);
} else if (lowestPriceBuyBot < currentPrice && dataScore > indicatorConfig.buy_score_activate) {
maybeExecute('buy', basePriceBuyBOT, currentPrice);
}
if (highestPriceSellBot < currentPrice) {
highestPriceSellBot = currentPrice;
log(`🔄 SellBOT: Updated highestPriceSellBot to ${highestPriceSellBot.toFixed(pricePrecision)}`);
} else if (highestPriceSellBot > currentPrice && dataScore < indicatorConfig.sell_score_activate) {
maybeExecute('sell', basePriceSellBOT, currentPrice);
}
if (Date.now() % 5000 < 100) {
const buyOrders = openOrders.filter(o => o.mode === 'buy' && o.botType === 'buyBot');
const sellOrders = openOrders.filter(o => o.mode === 'sell' && o.botType === 'sellBot');
saveJSON(DASHBOARD_FILE, {
timestamp: new Date().toISOString(),
symbol: currentSymbol,
price: currentPrice,
priceUSDT: BTCtoUSDT(currentPrice).toFixed(2),
buyPoints: buyOrders.length,
sellPoints: sellOrders.length,
valueBuy: buyOrders.reduce((sum, o) => sum + o.value, 0),
valueSell: sellOrders.reduce((sum, o) => sum + o.value, 0),
valueBuyUSDT: BTCtoUSDT(buyOrders.reduce((sum, o) => sum + o.value, 0)).toFixed(2),
valueSellUSDT: BTCtoUSDT(sellOrders.reduce((sum, o) => sum + o.value, 0)).toFixed(2),
initialPrice: INITIAL_PRICE,
basePriceBuyBOT,
lowestPriceBuyBot,
basePriceSellBOT,
highestPriceSellBot,
pointStep,
pointsToIgnore,
makerFee: fees.maker,
takerFee: fees.taker,
maxBuyOrderValue: maxBuyOrderValue.toFixed(6),
maxSellOrderValue: maxSellOrderValue.toFixed(6),
maxOpenExposure: maxOpenExposure.toFixed(6),
maxOpenExposureUSDT: BTCtoUSDT(maxOpenExposure).toFixed(2),
status: `BUY[${buyOrders.length}] SELL[${sellOrders.length}] | Price: ${currentPrice.toFixed(pricePrecision)} BTC (${BTCtoUSDT(currentPrice).toFixed(2)} USDT)`,
lastTrade: tradeHistory.at(-1) || null,
activeStops: activeStopOrders.size,
priceAnomalies: priceHistory.length - priceHistory.filter(p => p.price === currentPrice).length,
priceHistory: priceHistory,
accountBalances: accountData?.balances?.reduce((acc, b) => {
if (parseFloat(b.free) > 0 || parseFloat(b.locked) > 0) {
acc[b.asset] = { free: parseFloat(b.free), locked: parseFloat(b.locked) };
}
return acc;
}, {}) || {}
});
}
} catch (err) {
log(`❌ Error in WebSocket handler: ${err.message}`);
setTimeout(subscribeToWebSocket, 5000);
}
}, (error) => {
log(`⚠️ WebSocket connection error: ${error.message}`);
setTimeout(subscribeToWebSocket, 5000);
});
}
Screenschot is attached. How to implement it?