mqtt: Add MqttClient::connect method

Separating the `connect` call out of the `MqttClient::new` function
makes is such that we do not have to create a new object for each
iteration of the initial connection loop.  Instead, we just create one
object and repeatedly call its `connect` method until it succeeds
dev/ci
Dustin 2022-12-30 14:39:10 -06:00
parent cef128d1ef
commit 41c87d87af
2 changed files with 19 additions and 18 deletions

View File

@ -3,20 +3,21 @@ use std::time::Duration;
pub use paho_mqtt::Error; pub use paho_mqtt::Error;
use paho_mqtt::{ use paho_mqtt::{
AsyncClient, AsyncReceiver, ConnectOptions, ConnectOptionsBuilder, AsyncClient, AsyncReceiver, ConnectOptions, ConnectOptionsBuilder,
CreateOptionsBuilder, Message, SslOptionsBuilder, CreateOptionsBuilder, Message, ServerResponse, SslOptionsBuilder,
}; };
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use tracing::{info, trace}; use tracing::{info, trace};
use crate::config::Configuration; use crate::config::Configuration;
pub struct MqttClient { pub struct MqttClient<'a> {
config: &'a Configuration,
client: AsyncClient, client: AsyncClient,
stream: AsyncReceiver<Option<Message>>, stream: AsyncReceiver<Option<Message>>,
} }
impl MqttClient { impl<'a> MqttClient<'a> {
pub async fn new(config: &Configuration) -> Result<MqttClient, Error> { pub fn new(config: &'a Configuration) -> Result<Self, Error> {
let uri = format!( let uri = format!(
"{}://{}:{}", "{}://{}:{}",
if config.mqtt.tls { "ssl" } else { "tcp" }, if config.mqtt.tls { "ssl" } else { "tcp" },
@ -28,10 +29,13 @@ impl MqttClient {
CreateOptionsBuilder::new().server_uri(uri).finalize(); CreateOptionsBuilder::new().server_uri(uri).finalize();
let mut client = AsyncClient::new(client_opts)?; let mut client = AsyncClient::new(client_opts)?;
let stream = client.get_stream(10); let stream = client.get_stream(10);
client.connect(Self::conn_opts(config)?).await?; Ok(Self { config, client, stream })
info!("Successfully connected to MQTT broker"); }
Ok(Self { client, stream }) pub async fn connect(&mut self) -> Result<ServerResponse, Error> {
let res = self.client.connect(self.conn_opts()?).await?;
info!("Successfully connected to MQTT broker");
Ok(res)
} }
pub async fn run(mut self) { pub async fn run(mut self) {
@ -41,20 +45,20 @@ impl MqttClient {
} }
} }
fn conn_opts(config: &Configuration) -> Result<ConnectOptions, Error> { fn conn_opts(&self) -> Result<ConnectOptions, Error> {
let mut conn_opts = ConnectOptionsBuilder::new(); let mut conn_opts = ConnectOptionsBuilder::new();
conn_opts.automatic_reconnect( conn_opts.automatic_reconnect(
Duration::from_millis(500), Duration::from_millis(500),
Duration::from_secs(30), Duration::from_secs(30),
); );
if config.mqtt.tls { if self.config.mqtt.tls {
let ssl_opts = SslOptionsBuilder::new() let ssl_opts = SslOptionsBuilder::new()
.trust_store(&config.mqtt.ca_file)? .trust_store(&self.config.mqtt.ca_file)?
.finalize(); .finalize();
conn_opts.ssl_options(ssl_opts); conn_opts.ssl_options(ssl_opts);
} }
if let [Some(username), Some(password)] = if let [Some(username), Some(password)] =
[&config.mqtt.username, &config.mqtt.password] [&self.config.mqtt.username, &self.config.mqtt.password]
{ {
conn_opts.user_name(username).password(password); conn_opts.user_name(username).password(password);
} }

View File

@ -2,11 +2,11 @@ use std::time::Duration;
use tracing::{debug, info, warn}; use tracing::{debug, info, warn};
use crate::mqtt::MqttClient;
use crate::browser::{Browser, BrowserError}; use crate::browser::{Browser, BrowserError};
use crate::config::Configuration; use crate::config::Configuration;
use crate::marionette::error::ConnectionError; use crate::marionette::error::ConnectionError;
use crate::marionette::Marionette; use crate::marionette::Marionette;
use crate::mqtt::MqttClient;
#[derive(Debug)] #[derive(Debug)]
pub enum SessionError { pub enum SessionError {
@ -84,13 +84,10 @@ impl Session {
} }
pub async fn run(&self) { pub async fn run(&self) {
let client; let mut client = MqttClient::new(&self.config).unwrap();
loop { loop {
match MqttClient::new(&self.config).await { match client.connect().await {
Ok(c) => { Ok(_) => break,
client = c;
break;
}
Err(e) => warn!("Failed to connect to MQTT server: {}", e), Err(e) => warn!("Failed to connect to MQTT server: {}", e),
} }
tokio::time::sleep(Duration::from_secs(1)).await; tokio::time::sleep(Duration::from_secs(1)).await;