Hi jonte i have implemented the code as you asked, this is the current version
import { EventEmitter } from "events";
import { BinanceSocket } from "../../../clients/binance/WebSocketClient";
import { publicProcedure } from "../../../routers/trpc";
import { throwError } from "../../../utils/binance/errorHandlers";
import {
CandlestickInput,
CandlestickStreamSchema,
type KlineInterval,
} from "../../../validators/binance/anaysis/candlesticksValidator";
import type { Candlestick } from "../../../types/analysis";
const ee = new EventEmitter();
// Mapping of valid intervals and their durations in milliseconds
const validIntervals: Record<KlineInterval, number> = {
"5m": 5 * 60 * 1000 - 1,
"15m": 15 * 60 * 1000,
"30m": 30 * 60 * 1000,
"1h": 60 * 60 * 1000,
"4h": 4 * 60 * 60 * 1000,
"1d": 24 * 60 * 60 * 1000,
"1w": 7 * 24 * 60 * 60 * 1000,
};
export const candlestickStream = publicProcedure
.input(CandlestickInput)
.subscription(async ({ input }) => {
const expectedInterval = input.interval;
const subscribeWebSocket = () => {
new BinanceSocket(
`ws/${input.symbol.toLowerCase()}@kline_${expectedInterval}`,
ee
);
};
subscribeWebSocket();
return observable<Candlestick>((emit) => {
const update = (data: Buffer) => {
try {
const jsonData = JSON.parse(data.toString());
const { k } = CandlestickStreamSchema.parse(jsonData);
// Calculate the interval based on openTime and closeTime
const intervalDuration = k.T - k.t;
// Find the matching interval
let detectedInterval = null;
for (const interval in validIntervals) {
if (
intervalDuration === validIntervals[interval as KlineInterval]
) {
detectedInterval = interval;
break;
}
}
if (!detectedInterval) {
// Interval mismatch, resubscribe
console.log(
`Received Kline with unrecognized interval. Resubscribing...`
);
console.log({
expectedInterval,
intervalDuration,
detectedInterval,
});
subscribeWebSocket();
return;
}
if (detectedInterval !== expectedInterval) {
// Interval mismatch, resubscribe
console.log(
`Received Kline with interval ${detectedInterval}, expected ${expectedInterval}. Resubscribing...`
);
console.log({
expectedInterval,
intervalDuration,
detectedInterval,
});
subscribeWebSocket();
return;
}
const newCandlestick: Candlestick = {
openTime: k.t,
closeTime: k.T,
open: Number(k.o),
close: Number(k.c),
high: Number(k.h),
low: Number(k.l),
baseVolume: Number(k.v),
tradesCount: k.n,
isClosed: k.x,
};
console.log(
`emitting new candlestick of interval ${expectedInterval} time: ${new Date().toISOString()}`
);
emit.next(newCandlestick);
} catch (error) {
if (error instanceof Error) {
emit.error(error);
throwError(
error,
`ws/${input.symbol.toLowerCase()}@kline_${input.interval}`
);
}
}
};
ee.on("update", update);
return () => {
ee.off("update", update);
};
});
});
as you can see i have subtracted one on the interval i am testing because i was getting those console logs:
Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '5m',
[0] | intervalDuration: 299999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '5m',
[0] | intervalDuration: 299999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '5m',
[0] | intervalDuration: 299999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '5m',
[0] | intervalDuration: 299999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '5m',
[0] | intervalDuration: 299999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '5m',
[0] | intervalDuration: 299999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '5m',
[0] | intervalDuration: 299999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '5m',
[0] | intervalDuration: 299999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '5m',
[0] | intervalDuration: 299999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '5m',
[0] | intervalDuration: 299999,
[0] | detectedInterval: null
[0] | }
[0] | WebSocket connected
[0] | file:///Users/seven/Dev/trading_spot/packages/api/src/utils/binance/errorHandlers.ts:58
[0] | throw new TRPCError({
[0] | ^
[0] |
[0] | TRPCError: An unexpected error occurred on ws/ethbtc@kline_5m : AggregateError
[0] | at throwError (file:///Users/seven/Dev/trading_spot/packages/api/src/utils/binance/errorHandlers.ts:58:11)
[0] | at BinanceSocket.onError (file:///Users/seven/Dev/trading_spot/packages/api/src/clients/binance/WebSocketClient.ts:31:13)
[0] | at WebSocket.emit (node:events:514:28)
[0] | at emitErrorAndClose (/Users/seven/Dev/trading_spot/node_modules/ws/lib/websocket.js:1016:13)
[0] | at ClientRequest.<anonymous> (/Users/seven/Dev/trading_spot/node_modules/ws/lib/websocket.js:864:5)
[0] | at ClientRequest.emit (node:events:514:28)
[0] | at TLSSocket.socketErrorListener (node:_http_client:495:9)
[0] | at TLSSocket.emit (node:events:514:28)
[0] | at emitErrorNT (node:internal/streams/destroy:151:8)
[0] | at emitErrorCloseNT (node:internal/streams/destroy:116:3) {
[0] | code: 'INTERNAL_SERVER_ERROR',
[0] | [cause]: undefined
[0] | }
[0] |
[0] | Node.js v20.5.0
[0] | [nodemon] app crashed - waiting for file changes before starting...
[0] | [nodemon] restarting due to changes...
[0] | [nodemon] starting `node --experimental-specifier-resolution=node --loader ts-node/esm src/app.ts`
[0] | (node:41724) ExperimentalWarning: Custom ESM Loaders is an experimental feature and might change at any time
[0] | (Use `node --trace-warnings ...` to show where the warning was created)
advise me please, i don’t have much exp with websockets, if i resubscribe without cleaning up previous connection those would stack up? I was getting more and more console logs and eventually the server crashed. After subtracting 1 from the equation it seems not to mismatch the candles, hovewer those rarely come in 2 sec interval as you can see on the console logs below:
emitting new candlestick of interval 5m time: 2023-09-15T05:04:35.869Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:05:00.082Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:05:02.546Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:05:13.594Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:05:18.560Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:05:43.623Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:06:44.129Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:07:26.885Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:07:29.551Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:07:40.072Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:07:43.827Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:07:46.449Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:07:55.345Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:08:00.836Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:08:10.356Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:09:17.105Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:09:21.554Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:09:43.257Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:09:55.104Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:10:00.080Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:10:02.560Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:10:07.088Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:10:10.666Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:10:13.549Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:10:25.127Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:10:31.250Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:10:52.926Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:10:57.529Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:11:15.895Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:11:27.469Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:11:30.196Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:11:32.565Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:11:35.562Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:11:38.564Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:11:55.627Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:12:11.499Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:12:13.567Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:12:16.599Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:12:21.019Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:12:47.497Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:12:51.161Z
[0] | emitting new candlestick of interval 5m time: 2023-09-15T05:12:53.566Z
I will keep monitoring this endpoint and leave the console logs, plase tell me if this 1ms gap is it expected, and should data be comming in regular 2s interval or this randomness is expected? Below i paste logs from all intervals i use to document if there is a mismatch
15min:
WebSocket connected
[0] | WebSocket connected
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '15m',
[0] | intervalDuration: 899999,
[0] | detectedInterval: null
[0] | }
[0] | WebSocket connected
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '15m',
[0] | intervalDuration: 899999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '15m',
[0] | intervalDuration: 899999,
[0] | detectedInterval: null
[0] | }
[0] | WebSocket connected
[0] | WebSocket connected
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '15m',
[0] | intervalDuration: 899999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '15m',
[0] | intervalDuration: 899999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '15m',
[0] | intervalDuration: 899999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '15m',
[0] | intervalDuration: 899999,
[0] | detectedInterval: null
[0] | }
WARNING HERE! MISMATCH DETECTED on 30min
trading_spot_backend
[0] | WebSocket connected
[0] | WebSocket connected
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '30m',
[0] | intervalDuration: 899999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '30m',
[0] | intervalDuration: 1799999,
[0] | detectedInterval: null
[0] | }
[0] | WebSocket connected
[0] | WebSocket connected
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '30m',
[0] | intervalDuration: 1799999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '30m',
[0] | intervalDuration: 899999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '30m',
[0] | intervalDuration: 1799999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '30m',
[0] | intervalDuration: 1799999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '30m',
[0] | intervalDuration: 1799999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '30m',
[0] | intervalDuration: 899999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '30m',
[0] | intervalDuration: 1799999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '30m',
[0] | intervalDuration: 1799999,
[0] | detectedInterval: null
[0] | }
[0] | WebSocket connected
[0] | WebSocket connected
[0] | WebSocket connected
[0] | WebSocket connected
[0] | WebSocket connected
[0] | WebSocket connected
[0] | WebSocket connected
[0] | WebSocket connected
1h:
| WebSocket connected
[0] | WebSocket connected
[0] | WebSocket connected
[0] | WebSocket connected
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '1h',
[0] | intervalDuration: 3599999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '1h',
[0] | intervalDuration: 3599999,
[0] | detectedInterval: null
[0] | }
[0] | WebSocket connected
[0] | WebSocket connected
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '1h',
[0] | intervalDuration: 3599999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '1h',
[0] | intervalDuration: 3599999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '1h',
[0] | intervalDuration: 3599999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '1h',
[0] | intervalDuration: 3599999,
[0] | detectedInterval: null
[0] | }
4h:
{
[0] | expectedInterval: '4h',
[0] | intervalDuration: 14399999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '4h',
[0] | intervalDuration: 14399999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '4h',
[0] | intervalDuration: 14399999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '4h',
[0] | intervalDuration: 14399999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '4h',
[0] | intervalDuration: 14399999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '4h',
[0] | intervalDuration: 14399999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '4h',
[0] | intervalDuration: 14399999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '4h',
[0] | intervalDuration: 14399999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '4h',
[0] | intervalDuration: 14399999,
[0] | detectedInterval: null
[0] | }
[0] | WebSocket connected
[0] | WebSocket connected
[0] | WebSocket connected
[0] | WebSocket connected
[0] | WebSocket connected
[0] | WebSocket connected
[0] | WebSocket connected
[0] | WebSocket connected
[0] | WebSocket connected
[0] | WebSocket connected
[0] | WebSocket connected
[0] | WebSocket connected
[0] | WebSocket connected
[0] | WebSocket connected
[0] | WebSocket connected
[0] | WebSocket connected
1d:
WebSocket connected
[0] | WebSocket connected
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '1d',
[0] | intervalDuration: 86399999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '1d',
[0] | intervalDuration: 86399999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '1d',
[0] | intervalDuration: 86399999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '1d',
[0] | intervalDuration: 86399999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '1d',
[0] | intervalDuration: 86399999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '1d',
[0] | intervalDuration: 86399999,
[0] | detectedInterval: null
[0] | }
[0] | WebSocket connected
[0] | WebSocket connected
[0] | WebSocket connected
[0] | WebSocket connected
[0] | WebSocket connected
[0] | WebSocket connected
[0] | WebSocket connected
[0] | WebSocket connected
[0] | WebSocket connected
[0] | WebSocket connected
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '1d',
[0] | intervalDuration: 86399999,
[0] | detectedInterval: null
[0] | }
[0] | Received Kline with unrecognized interval. Resubscribing...
[0] | {
[0] | expectedInterval: '1d',
[0] | intervalDuration: 86399999,
[0] | detectedInterval: null
[0] | }
I know that this is not enough data tell for sure but last time i had this mismatch i changed my interval from 1h to 5m and i had mismatch between those 2 intervals, now i had mismatch between 15 and 30 min after changing between those 2 intervals so there seems to be a pattern here, if Binance collects data about the intervals that users are using this part of code may be to blame. And again plaese tell me what should i be expecting about this irregularity in the time that candle data comes (rarely this is 2s interval), and if this 99999 ending is something that should be alerting me that something is wrong with the data i receive.
Hope this helps, if i can help any further please let me know.