mqttmarionette/src/mqtt.rs

240 lines
7.3 KiB
Rust

use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use paho_mqtt::topic_matcher::TopicMatcher;
pub use paho_mqtt::{AsyncClient, Error, Message};
use paho_mqtt::{
AsyncReceiver, ConnectOptions, ConnectOptionsBuilder,
CreateOptionsBuilder, ServerResponse, SslOptionsBuilder,
};
use tokio::sync::{Mutex, Notify};
use tokio_stream::StreamExt;
use tracing::{debug, error, info, trace};
use crate::config::Configuration;
use crate::hass::{self, HassConfig};
#[async_trait]
pub trait MessageHandler {
async fn navigate(&mut self, publisher: &MqttPublisher, msg: &Message);
}
#[derive(Debug)]
pub enum MessageType {
Navigate,
}
pub struct MqttClient<'a> {
config: &'a Configuration,
client: Arc<Mutex<AsyncClient>>,
stream: AsyncReceiver<Option<Message>>,
topics: TopicMatcher<MessageType>,
stop: Arc<Notify>,
}
impl<'a> MqttClient<'a> {
pub fn new(
config: &'a Configuration,
stop: Arc<Notify>,
) -> Result<Self, Error> {
let uri = format!(
"{}://{}:{}",
if config.mqtt.tls { "ssl" } else { "tcp" },
config.mqtt.host,
config.mqtt.port
);
info!("Connecting to MQTT server {}", uri);
let client_opts = CreateOptionsBuilder::new()
.client_id(&config.unique_id)
.server_uri(uri)
.finalize();
let mut client = AsyncClient::new(client_opts)?;
let stream = client.get_stream(10);
let client = Arc::new(Mutex::new(client));
let topics = TopicMatcher::new();
Ok(Self {
config,
client,
stream,
topics,
stop,
})
}
pub async fn connect(&mut self) -> Result<ServerResponse, Error> {
let opts = self.conn_opts()?;
trace!("Connect options: {:?}", opts);
let res = self.client.lock().await.connect(opts).await?;
info!("Successfully connected to MQTT broker");
Ok(res)
}
pub async fn subscribe(&mut self) -> Result<ServerResponse, Error> {
let prefix = &self.config.mqtt.topic_prefix;
let t_nav = format!("{}/+/navigate", prefix);
let res = self.client.lock().await.subscribe(&t_nav, 0).await?;
self.topics.insert(t_nav, MessageType::Navigate);
Ok(res)
}
pub fn publisher(&mut self) -> MqttPublisher {
MqttPublisher {
config: self.config,
client: self.client.clone(),
}
}
pub async fn run<H>(mut self, mut handler: H)
where
H: MessageHandler,
{
let publisher = MqttPublisher {
config: self.config,
client: self.client.clone(),
};
let msg = self.offline_message();
tokio::spawn(async move {
self.stop.notified().await;
let client = self.client.lock().await;
if let Err(e) = client.publish(msg).await {
error!("Failed to publish offline message: {}", e);
}
client.disconnect(None);
});
while let Some(msg) = self.stream.next().await {
let Some(msg) = msg else {break};
trace!("Received message: {:?}", msg);
for m in self.topics.matches(msg.topic()) {
match m.1 {
MessageType::Navigate => {
handler.navigate(&publisher, &msg).await;
}
}
}
}
}
fn conn_opts(&self) -> Result<ConnectOptions, Error> {
let mut conn_opts = ConnectOptionsBuilder::new();
conn_opts.will_message(self.offline_message());
conn_opts.automatic_reconnect(
Duration::from_millis(500),
Duration::from_secs(30),
);
if self.config.mqtt.tls {
let ssl_opts = SslOptionsBuilder::new()
.trust_store(&self.config.mqtt.ca_file)?
.finalize();
conn_opts.ssl_options(ssl_opts);
}
if let [Some(username), Some(password)] =
[&self.config.mqtt.username, &self.config.mqtt.password]
{
conn_opts.user_name(username).password(password);
}
Ok(conn_opts.finalize())
}
fn offline_message(&self) -> Message {
Message::new_retained(
format!("{}/available", self.config.mqtt.topic_prefix),
"offline",
0,
)
}
}
pub struct MqttPublisher<'a> {
config: &'a Configuration,
client: Arc<tokio::sync::Mutex<AsyncClient>>,
}
impl<'a> MqttPublisher<'a> {
pub async fn publish_title(
&self,
screen: &str,
title: &str,
) -> Result<(), Error> {
let topic =
format!("{}/{}/title", self.config.mqtt.topic_prefix, screen);
let msg = Message::new_retained(topic, title, 0);
(*self.client.lock().await).publish(msg).await?;
Ok(())
}
pub async fn publish_url(
&self,
screen: &str,
url: &str,
) -> Result<(), Error> {
let topic =
format!("{}/{}/url", self.config.mqtt.topic_prefix, screen);
let msg = Message::new_retained(topic, url, 0);
self.client.lock().await.publish(msg).await?;
Ok(())
}
pub async fn publish_config(&self, screen: &str) -> Result<(), Error> {
debug!("Publishing Home Assistant configuration");
let prefix = &self.config.mqtt.topic_prefix;
let availability_topic = format!("{}/available", prefix);
let command_topic = Some(format!("{}/{}/navigate", prefix, screen));
let name = format!("{} URL", screen);
let state_topic = format!("{}/{}/url", prefix, screen);
let key = format!(
"browserhud_{}_{}",
self.config.unique_id,
hass::slugify(screen)
);
let unique_id = format!("text.{}_url", key);
let object_id = unique_id.clone();
let msg = Message::new_retained(&availability_topic, "online", 0);
trace!("Publishing message: {:?}", msg);
self.client.lock().await.publish(msg).await?;
let config = HassConfig {
availability_topic,
command_topic,
name,
state_topic,
unique_id,
object_id,
device: Default::default(),
icon: "mdi:monitor".into(),
retain: Some(true),
};
let msg = Message::new_retained(
format!("homeassistant/text/{}_url/config", key),
serde_json::to_string(&config).unwrap(),
0,
);
trace!("Publishing message: {:?}", msg);
self.client.lock().await.publish(msg).await?;
let unique_id = format!("sensor.{}_title", key);
let object_id = unique_id.clone();
let config = HassConfig {
command_topic: None,
state_topic: format!("{}/{}/title", prefix, screen),
name: format!("{} Title", screen),
unique_id,
object_id,
retain: None,
..config
};
let msg = Message::new_retained(
format!("homeassistant/sensor/{}_title/config", key),
serde_json::to_string(&config).unwrap(),
0,
);
trace!("Publishing message: {:?}", msg);
self.client.lock().await.publish(msg).await?;
info!("Succesfully published Home Assistant config");
Ok(())
}
}