How can I add ping pong to this code so that the stream is not interrupted?
use websocket::client::ClientBuilder;
use websocket::OwnedMessage;
use std::time::Instant;
use std::str::FromStr;
use futures::prelude::*;
use influxdb2::models::DataPoint;
use influxdb2::Client;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = ClientBuilder::new("wss://fstream.binance.com/stream")
.unwrap()
.connect(None)
.expect("Failed to connect");
let host = "http://localhost:8086";
let org = "wannafly";
let token = "QPjlMBmeh-fANqZpkC_sScPbW8weLxsw9yl8Q8s7PsUiATo43_plZSathKcRHJ6NOBx9UB3Uh05RixzG4ZT7yA==";
let bucket = "Ticks";
let clientdb = Client::new(host, org, token);
let mut prev_time = Instant::now();
let done = false;
client
.send_message(&OwnedMessage::Text(
r#"{"method": "SUBSCRIBE", "params": ["!ticker@arr"], "id": 1}"#.to_string(),
))
.expect("Failed to send message");
println!("новый цикл: {:?}", prev_time);
for message in client.incoming_messages() {
println!("message: {:?}", message);
println!("внутри цикла");
let message = match message {
Ok(message) => message,
Err(err) => {
println!("Failed to receive message: {:?}", err);
continue;
}
};
match message {
OwnedMessage::Text(text) => {
let tick: serde_json::Value =
serde_json::from_str(&text).expect("Failed to parse JSON");
if let Some(data) = tick.get("data") {
if let Some(tickers) = data.as_array() {
println!("------------------------");
let mut n = 0;
for ticker in tickers {
n = n + 1;
let mut symbol: String;
let mut price: String;
if let Some(s) = ticker.get("s") {
symbol = s.to_string();
symbol = symbol.replace("\"", "");
} else {
symbol = "none".to_string();
}
if let Some(p) = ticker.get("c") {
price = p.to_string();
price = price.replace("\"", "");
match f64::from_str(&price) {
Ok(price) => {
println!("symbol: {}:{:?}", symbol, price);
let points = vec![
DataPoint::builder("Binance")
.tag("Symbol", symbol)
.field("Price", price)
.build()?,
];
clientdb.write(&bucket, stream::iter(points)).await?;
if done {
break;
}
}
Err(err) => {
println!("Price error: {:?}", err);
}
}
} else {
continue;
}
}
let current_time = Instant::now();
let time_diff = current_time.duration_since(prev_time);
prev_time = current_time;
println!("count: {}", n);
println!("Time interval: {:?}", time_diff);
}
}
}
OwnedMessage::Close(_) => {
println!("Connection closed");
break;
}
_ => {}
}
}
println!("конец цикла");
Ok(())
}