ctdo-status/src/main.rs

113 lines
3.4 KiB
Rust

use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
use actix_web::{App, error, get, HttpServer, Responder, Result, web};
use actix_web::error::ErrorInternalServerError;
use actix_web::web::Data;
use futures::{executor::block_on, stream::StreamExt};
use paho_mqtt as mqtt;
use tokio::spawn;
use status::*;
mod status;
#[get("/api/spaceapi/v13")]
async fn api_spaceapi_v13(info: web::Data<Arc<RwLock<SharedInfo>>>) -> Result<impl Responder> {
let info = info.read().unwrap();
let info = SharedInfo {
power_usage: info.power_usage,
open: info.open
};
let status = build_status_v13(info).map_err(|e| ErrorInternalServerError(e))?;
Ok(web::Json(status))
}
#[get("/api/spaceapi/v14")]
async fn api_spaceapi_v14(info: web::Data<Arc<RwLock<SharedInfo>>>) -> Result<impl Responder> {
let info = info.read().unwrap();
let info = SharedInfo {
power_usage: info.power_usage,
open: info.open
};
let status = build_status_v13(info).map_err(|e| ErrorInternalServerError(e))?;
Ok(web::Json(status))
}
struct SharedInfo {
power_usage: u64,
open: bool,
}
const TOPICS: &[&str] = &["/status/flukso/powerinW"];
const QOS: &[i32] = &[1];
#[tokio::main] // or
async fn main() -> std::io::Result<()> {
let shared_info: Arc<RwLock<SharedInfo>> = Arc::new(RwLock::new(SharedInfo {
power_usage: 0,
open: false
}));
let create_opts = mqtt::CreateOptionsBuilder::new()
.server_uri("mqtt.ctdo.de")
.client_id("ctdo-status")
.finalize();
let mut cli = mqtt::AsyncClient::new(create_opts).expect("Error creating the client");
spawn(async move {
// Get message stream before connecting.
let mut strm = cli.get_stream(25);
let conn_opts = mqtt::ConnectOptionsBuilder::new()
.keep_alive_interval(Duration::from_secs(30))
.mqtt_version(mqtt::MQTT_VERSION_3_1_1)
.clean_session(false)
.finalize();
println!("Connecting to the MQTT server...");
cli.connect(conn_opts).await?;
println!("Subscribing to topics: {:?}", TOPICS);
cli.subscribe_many(TOPICS, QOS).await?;
while let Some(msg_opt) = strm.next().await {
if let Some(msg) = msg_opt {
match msg.topic() {
"/status/flukso/powerinW" => {
let power_usage = msg.payload_str().parse::<u64>()
.map_err(|e| {
println!("invalid power value: {}", e);
0
})?;
let mut lock = shared_info.write().unwrap();
lock.power_usage = power_usage;
}
_ => println!("{}", msg)
}
} else {
println!("Lost connection. Attempting reconnect.");
while let Err(err) = cli.reconnect().await {
println!("Error reconnecting...");
tokio::time::sleep(Duration::from_millis(1000)).await;
}
}
}
Ok::<(), mqtt::Error>(())
});
HttpServer::new(|| {
App::new()
.app_data()
.service(api_spaceapi_v13)
.service(api_spaceapi_v14)
})
.bind(("127.0.0.1", 8080))?
.run()
.await
}