use std::time::Duration; use paho_mqtt as mqtt; use futures::{executor::block_on, stream::StreamExt}; use actix_web::{App, get, HttpServer, Responder}; use serde_json; use status::*; mod status; #[get("/api/spaceapi/v13")] async fn api_spaceapi_v13() -> impl Responder { serde_json::to_string(&build_status_v13()) } #[get("/api/spaceapi/v14")] async fn api_spaceapi_v14() -> impl Responder { serde_json::to_string(&build_status_v14()) } const TOPICS: &[&str] = &["/status/flukso/powerinW"]; const QOS: &[i32] = &[1]; #[tokio::main] // or async fn main() -> std::io::Result<()> { let create_opts = mqtt::CreateOptionsBuilder::new() .server_uri("mqtt.ctdo.de") .client_id("ctdo-status") .finalize(); let mut cli = mqtt::AsyncClient::new(create_opts).expect("Error creating the client"); if let Err(err) = block_on(async { // Get message stream before connecting. let mut strm = cli.get_stream(25); // Define the set of options for the connection let conn_opts = mqtt::ConnectOptionsBuilder::new() .keep_alive_interval(Duration::from_secs(30)) .mqtt_version(mqtt::MQTT_VERSION_3_1_1) .clean_session(false) .finalize(); println!("Connecting to the MQTT server..."); cli.connect(conn_opts).await?; println!("Subscribing to topics: {:?}", TOPICS); cli.subscribe_many(TOPICS, QOS).await?; while let Some(msg_opt) = strm.next().await { if let Some(msg) = msg_opt { println!("{}", msg); } else { println!("Lost connection. Attempting reconnect."); while let Err(err) = cli.reconnect().await { println!("Error reconnecting..."); tokio::time::delay_for(Duration::from_millis(1000)).await; } } } Ok::<(), mqtt::Error>(()) }) { panic!("{}", err); } HttpServer::new(|| { App::new() .service(api_spaceapi_v13) .service(api_spaceapi_v14) }) .bind(("127.0.0.1", 8080))? .run() .await }