From 93802339c25668229781b7286699d8b9192045ec Mon Sep 17 00:00:00 2001 From: Mauro D Date: Tue, 17 May 2022 14:41:16 +0000 Subject: [PATCH] WebSockets implementation. --- Cargo.toml | 6 +- src/client.rs | 14 ++ src/client_ws.rs | 285 +++++++++++++++++++++++++++++++ src/core/error.rs | 23 +++ src/core/request.rs | 11 +- src/core/response.rs | 28 ++- src/core/session.rs | 68 +++++++- src/event_source/mod.rs | 4 + src/event_source/stream.rs | 10 +- src/lib.rs | 40 ++++- src/push_subscription/helpers.rs | 15 +- src/push_subscription/set.rs | 4 +- 12 files changed, 487 insertions(+), 21 deletions(-) create mode 100644 src/client_ws.rs diff --git a/Cargo.toml b/Cargo.toml index 9ede690..ba54ebc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,10 +17,14 @@ serde_json = "1.0" chrono = { version = "0.4", features = ["serde"]} reqwest = { version = "0.11", default-features = false, features = ["stream", "rustls-tls"]} futures-util = "0.3" -async-stream = "0.3.3" +async-stream = "0.3" base64 = "0.13" +tokio-tungstenite = { version = "0.17", features = ["rustls-tls-webpki-roots"], optional = true} +tokio = { version = "1.16", default-features = false, features = ["io-util"], optional = true } [features] +default = [] +websockets = ["tokio", "tokio-tungstenite"] debug = [] [profile.bench] diff --git a/src/client.rs b/src/client.rs index d700863..d8d7a68 100644 --- a/src/client.rs +++ b/src/client.rs @@ -28,6 +28,8 @@ pub struct Client { timeout: u64, headers: header::HeaderMap, default_account_id: String, + #[cfg(feature = "websockets")] + ws: Option, } impl Client { @@ -77,6 +79,8 @@ impl Client { timeout: DEFAULT_TIMEOUT_MS, headers, default_account_id, + #[cfg(feature = "websockets")] + ws: None, }) } @@ -184,6 +188,16 @@ impl Client { Err(Error::Server(format!("{}", response.status()))) } } + + #[cfg(feature = "websockets")] + pub fn set_ws_stream(&mut self, ws: crate::client_ws::WsStream) { + self.ws = Some(ws); + } + + #[cfg(feature = "websockets")] + pub fn ws_stream(&mut self) -> Option<&mut crate::client_ws::WsStream> { + self.ws.as_mut() + } } #[cfg(test)] diff --git a/src/client_ws.rs b/src/client_ws.rs new file mode 100644 index 0000000..c8b9d93 --- /dev/null +++ b/src/client_ws.rs @@ -0,0 +1,285 @@ +use std::{collections::HashMap, pin::Pin}; + +use futures_util::{stream::SplitSink, SinkExt, Stream, StreamExt}; +use serde::{Deserialize, Serialize}; +use tokio::net::TcpStream; +use tokio_tungstenite::{ + tungstenite::{client::IntoClientRequest, Message}, + MaybeTlsStream, WebSocketStream, +}; + +use crate::{ + client::Client, + core::{ + error::{ProblemDetails, ProblemType}, + request::{Arguments, Request}, + response::{MethodResponse, Response}, + }, + event_source::Changes, + Method, StateChangeType, TypeState, URI, +}; + +#[derive(Debug, Serialize)] +struct WebSocketRequest { + #[serde(rename = "@type")] + pub _type: WebSocketRequestType, + + #[serde(skip_serializing_if = "Option::is_none")] + pub id: Option, + + using: Vec, + + #[serde(rename = "methodCalls")] + method_calls: Vec<(Method, Arguments, String)>, + + #[serde(rename = "createdIds")] + #[serde(skip_serializing_if = "Option::is_none")] + created_ids: Option>, +} + +#[derive(Debug, Deserialize)] +pub struct WebSocketResponse { + #[serde(rename = "@type")] + _type: WebSocketResponseType, + + #[serde(rename = "requestId")] + request_id: Option, + + #[serde(rename = "methodResponses")] + method_responses: Vec, + + #[serde(rename = "createdIds")] + created_ids: Option>, + + #[serde(rename = "sessionState")] + session_state: String, +} + +#[derive(Debug, Serialize, Deserialize)] +enum WebSocketResponseType { + Response, +} + +#[derive(Debug, Serialize)] +struct WebSocketPushEnable { + #[serde(rename = "@type")] + _type: WebSocketPushEnableType, + + #[serde(rename = "dataTypes")] + data_types: Option>, + + #[serde(rename = "pushState")] + #[serde(skip_serializing_if = "Option::is_none")] + push_state: Option, +} + +#[derive(Debug, Serialize)] +struct WebSocketPushDisable { + #[serde(rename = "@type")] + _type: WebSocketPushDisableType, +} + +#[derive(Debug, Serialize)] +enum WebSocketRequestType { + Request, +} + +#[derive(Debug, Serialize)] +enum WebSocketPushEnableType { + WebSocketPushEnable, +} + +#[derive(Debug, Serialize)] +enum WebSocketPushDisableType { + WebSocketPushDisable, +} + +#[derive(Serialize, Deserialize, Debug)] +pub enum WebSocketStateChangeType { + StateChange, +} + +#[derive(Deserialize, Debug)] +pub struct WebSocketStateChange { + #[serde(rename = "@type")] + pub type_: WebSocketStateChangeType, + + pub changed: HashMap>, + + #[serde(rename = "pushState")] + push_state: Option, +} + +#[derive(Debug, Deserialize)] +pub struct WebSocketProblem { + #[serde(rename = "@type")] + pub type_: WebSocketProblemType, + + #[serde(rename = "requestId")] + pub request_id: Option, + + #[serde(rename = "type")] + p_type: ProblemType, + status: Option, + title: Option, + detail: Option, + limit: Option, +} + +#[derive(Serialize, Deserialize, Debug)] +pub enum WebSocketProblemType { + Problem, +} + +#[derive(Debug, Deserialize)] +#[serde(untagged)] +enum WebSocketMessage_ { + Response(WebSocketResponse), + StateChange(WebSocketStateChange), + Error(WebSocketProblem), +} + +#[derive(Debug)] +pub enum WebSocketMessage { + Response(Response), + StateChange(Changes), +} + +pub struct WsStream { + tx: SplitSink>, Message>, + req_id: usize, +} + +impl Client { + pub async fn connect_ws( + &mut self, + ) -> crate::Result>>>> { + let capabilities = self.session().websocket_capabilities().ok_or_else(|| { + crate::Error::Internal( + "JMAP server does not advertise any websocket capabilities.".to_string(), + ) + })?; + + let mut request = capabilities.url().into_client_request()?; + request + .headers_mut() + .insert("Authorization", "Bearer 123".parse().unwrap()); //TODO implement + + let (stream, _) = tokio_tungstenite::connect_async(request).await?; + let (tx, mut rx) = stream.split(); + + self.set_ws_stream(WsStream { tx, req_id: 0 }); + + Ok(Box::pin(async_stream::stream! { + while let Some(message) = rx.next().await { + match message { + Ok(message) if message.is_text() => { + match serde_json::from_slice::(&message.into_data()) { + Ok(message) => match message { + WebSocketMessage_::Response(response) => { + yield Ok(WebSocketMessage::Response(Response::new( + response.method_responses, + response.created_ids, + response.session_state, + response.request_id, + ))) + } + WebSocketMessage_::StateChange(changes) => { + yield Ok(WebSocketMessage::StateChange(Changes::new( + changes.push_state, + changes.changed, + ))) + } + WebSocketMessage_::Error(err) => yield Err(ProblemDetails::from(err).into()), + }, + Err(err) => yield Err(err.into()), + } + } + Ok(_) => (), + Err(err) => yield Err(err.into()), + } + } + })) + } + + pub async fn send_ws(&mut self, request: Request<'_>) -> crate::Result { + let ws = self + .ws_stream() + .ok_or_else(|| crate::Error::Internal("Websocket stream not set.".to_string()))?; + + // Assing request id + let request_id = ws.req_id.to_string(); + ws.req_id += 1; + + ws.tx + .send(Message::text( + serde_json::to_string(&WebSocketRequest { + _type: WebSocketRequestType::Request, + id: request_id.clone().into(), + using: request.using, + method_calls: request.method_calls, + created_ids: request.created_ids, + }) + .unwrap_or_default(), + )) + .await?; + + Ok(request_id) + } + + pub async fn enable_push_ws( + &mut self, + data_types: Option>, + push_state: Option>, + ) -> crate::Result<()> { + self.ws_stream() + .ok_or_else(|| crate::Error::Internal("Websocket stream not set.".to_string()))? + .tx + .send(Message::text( + serde_json::to_string(&WebSocketPushEnable { + _type: WebSocketPushEnableType::WebSocketPushEnable, + data_types: data_types.map(|it| it.into_iter().collect()), + push_state: push_state.map(|it| it.into()), + }) + .unwrap_or_default(), + )) + .await + .map_err(|err| err.into()) + } + + pub async fn disable_push_ws(&mut self) -> crate::Result<()> { + self.ws_stream() + .ok_or_else(|| crate::Error::Internal("Websocket stream not set.".to_string()))? + .tx + .send(Message::text( + serde_json::to_string(&WebSocketPushDisable { + _type: WebSocketPushDisableType::WebSocketPushDisable, + }) + .unwrap_or_default(), + )) + .await + .map_err(|err| err.into()) + } + + pub async fn ws_ping(&mut self) -> crate::Result<()> { + self.ws_stream() + .ok_or_else(|| crate::Error::Internal("Websocket stream not set.".to_string()))? + .tx + .send(Message::Ping(vec![])) + .await + .map_err(|err| err.into()) + } +} + +impl From for ProblemDetails { + fn from(problem: WebSocketProblem) -> Self { + ProblemDetails::new( + problem.p_type, + problem.status, + problem.title, + problem.detail, + problem.limit, + problem.request_id, + ) + } +} diff --git a/src/core/error.rs b/src/core/error.rs index 4f69f1f..192fd8b 100644 --- a/src/core/error.rs +++ b/src/core/error.rs @@ -10,6 +10,7 @@ pub struct ProblemDetails { title: Option, detail: Option, limit: Option, + request_id: Option, } #[derive(Debug, Deserialize)] @@ -75,6 +76,24 @@ pub enum MethodErrorType { } impl ProblemDetails { + pub fn new( + p_type: ProblemType, + status: Option, + title: Option, + detail: Option, + limit: Option, + request_id: Option, + ) -> Self { + ProblemDetails { + p_type, + status, + title, + detail, + limit, + request_id, + } + } + pub fn error(&self) -> &ProblemType { &self.p_type } @@ -94,6 +113,10 @@ impl ProblemDetails { pub fn limit(&self) -> Option { self.limit } + + pub fn request_id(&self) -> Option<&str> { + self.request_id.as_deref() + } } impl MethodError { diff --git a/src/core/request.rs b/src/core/request.rs index 4f51f7a..fea906f 100644 --- a/src/core/request.rs +++ b/src/core/request.rs @@ -33,14 +33,14 @@ pub struct Request<'x> { #[serde(skip)] default_account_id: String, - using: Vec, + pub using: Vec, #[serde(rename = "methodCalls")] - method_calls: Vec<(Method, Arguments, String)>, + pub method_calls: Vec<(Method, Arguments, String)>, #[serde(rename = "createdIds")] #[serde(skip_serializing_if = "Option::is_none")] - created_ids: Option>, + pub created_ids: Option>, } #[derive(Debug, Clone, Serialize)] @@ -416,6 +416,11 @@ impl<'x> Request<'x> { Option::take(&mut self.client).unwrap().send(&self).await } + #[cfg(feature = "websockets")] + pub async fn send_ws(mut self) -> crate::Result { + Option::take(&mut self.client).unwrap().send_ws(self).await + } + pub async fn send_single(mut self) -> crate::Result where T: DeserializeOwned, diff --git a/src/core/response.rs b/src/core/response.rs index 91f5081..5353b96 100644 --- a/src/core/response.rs +++ b/src/core/response.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use crate::{ blob::copy::CopyBlobResponse, @@ -19,7 +19,7 @@ use super::{ query::QueryResponse, query_changes::QueryChangesResponse, set::SetResponse, }; -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Serialize)] pub struct Response { #[serde(rename = "methodResponses")] method_responses: Vec, @@ -29,9 +29,25 @@ pub struct Response { #[serde(rename = "sessionState")] session_state: String, + + request_id: Option, } impl Response { + pub fn new( + method_responses: Vec, + created_ids: Option>, + session_state: String, + request_id: Option, + ) -> Self { + Response { + method_responses, + created_ids, + session_state, + request_id, + } + } + pub fn method_responses(&self) -> &[T] { self.method_responses.as_ref() } @@ -40,6 +56,10 @@ impl Response { self.method_responses } + pub fn unwrap_method_response(mut self) -> T { + self.method_responses.pop().unwrap() + } + pub fn created_ids(&self) -> Option> { self.created_ids.as_ref().map(|map| map.iter()) } @@ -47,6 +67,10 @@ impl Response { pub fn session_state(&self) -> &str { &self.session_state } + + pub fn request_id(&self) -> Option<&str> { + self.request_id.as_deref() + } } impl Response { diff --git a/src/core/session.rs b/src/core/session.rs index 2f9f099..0910aae 100644 --- a/src/core/session.rs +++ b/src/core/session.rs @@ -2,7 +2,10 @@ use std::collections::HashMap; use serde::{Deserialize, Serialize}; -use crate::email::{MailCapabilities, SubmissionCapabilities}; +use crate::{ + email::{MailCapabilities, SubmissionCapabilities}, + URI, +}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Session { @@ -55,6 +58,7 @@ pub enum Capabilities { Core(CoreCapabilities), Mail(MailCapabilities), Submission(SubmissionCapabilities), + WebSocket(WebSocketCapabilities), Empty(EmptyCapabilities), Other(serde_json::Value), } @@ -86,6 +90,14 @@ pub struct CoreCapabilities { collation_algorithms: Vec, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WebSocketCapabilities { + #[serde(rename = "url")] + url: String, + #[serde(rename = "supportsPush")] + supports_push: bool, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct EmptyCapabilities {} @@ -94,8 +106,48 @@ impl Session { self.capabilities.keys() } - pub fn capability(&self, capability: &str) -> Option<&Capabilities> { - self.capabilities.get(capability) + pub fn capability(&self, capability: impl AsRef) -> Option<&Capabilities> { + self.capabilities.get(capability.as_ref()) + } + + pub fn has_capability(&self, capability: impl AsRef) -> bool { + self.capabilities.contains_key(capability.as_ref()) + } + + pub fn websocket_capabilities(&self) -> Option<&WebSocketCapabilities> { + self.capabilities + .get(URI::WebSocket.as_ref()) + .and_then(|v| match v { + Capabilities::WebSocket(capabilities) => Some(capabilities), + _ => None, + }) + } + + pub fn core_capabilities(&self) -> Option<&CoreCapabilities> { + self.capabilities + .get(URI::Core.as_ref()) + .and_then(|v| match v { + Capabilities::Core(capabilities) => Some(capabilities), + _ => None, + }) + } + + pub fn mail_capabilities(&self) -> Option<&MailCapabilities> { + self.capabilities + .get(URI::Mail.as_ref()) + .and_then(|v| match v { + Capabilities::Mail(capabilities) => Some(capabilities), + _ => None, + }) + } + + pub fn submission_capabilities(&self) -> Option<&SubmissionCapabilities> { + self.capabilities + .get(URI::Submission.as_ref()) + .and_then(|v| match v { + Capabilities::Submission(capabilities) => Some(capabilities), + _ => None, + }) } pub fn accounts(&self) -> impl Iterator { @@ -191,6 +243,16 @@ impl CoreCapabilities { } } +impl WebSocketCapabilities { + pub fn url(&self) -> &str { + &self.url + } + + pub fn supports_push(&self) -> bool { + self.supports_push + } +} + pub trait URLParser: Sized { fn parse(value: &str) -> Option; } diff --git a/src/event_source/mod.rs b/src/event_source/mod.rs index 67e2108..fde0133 100644 --- a/src/event_source/mod.rs +++ b/src/event_source/mod.rs @@ -31,6 +31,10 @@ pub struct Changes { } impl Changes { + pub fn new(id: Option, changes: HashMap>) -> Self { + Self { id, changes } + } + pub fn id(&self) -> Option<&str> { self.id.as_deref() } diff --git a/src/event_source/stream.rs b/src/event_source/stream.rs index 5c1f179..7e83cb5 100644 --- a/src/event_source/stream.rs +++ b/src/event_source/stream.rs @@ -1,7 +1,6 @@ -use std::time::Duration; +use std::{pin::Pin, time::Duration}; use crate::{client::Client, core::session::URLPart, event_source::parser::EventParser, TypeState}; -use async_stream::stream; use futures_util::{Stream, StreamExt}; use reqwest::header::{HeaderValue, ACCEPT, CONTENT_TYPE}; @@ -14,7 +13,7 @@ impl Client { close_after_state: bool, ping: Option, last_event_id: Option<&str>, - ) -> crate::Result>> { + ) -> crate::Result>>>> { let mut event_source_url = String::with_capacity(self.session().event_source_url().len()); for part in self.event_source_url() { @@ -74,8 +73,7 @@ impl Client { .bytes_stream(); let mut parser = EventParser::default(); - // TODO - use poll_next() to avoid pin_mut() call. - Ok(stream! { + Ok(Box::pin(async_stream::stream! { loop { if let Some(changes) = parser.filter_state() { yield changes; @@ -96,6 +94,6 @@ impl Client { break; } } - }) + })) } } diff --git a/src/lib.rs b/src/lib.rs index bb68d72..6e0c89c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,7 +17,8 @@ pub mod push_subscription; pub mod thread; pub mod vacation_response; -pub use futures_util; +#[cfg(feature = "websockets")] +pub mod client_ws; #[derive(Debug, Clone, Serialize, Deserialize, Hash, PartialEq, Eq)] pub enum URI { @@ -33,6 +34,22 @@ pub enum URI { Contacts, #[serde(rename = "urn:ietf:params:jmap:calendars")] Calendars, + #[serde(rename = "urn:ietf:params:jmap:websocket")] + WebSocket, +} + +impl AsRef for URI { + fn as_ref(&self) -> &str { + match self { + URI::Core => "urn:ietf:params:jmap:core", + URI::Mail => "urn:ietf:params:jmap:mail", + URI::Submission => "urn:ietf:params:jmap:submission", + URI::VacationResponse => "urn:ietf:params:jmap:vacationresponse", + URI::Contacts => "urn:ietf:params:jmap:contacts", + URI::Calendars => "urn:ietf:params:jmap:calendars", + URI::WebSocket => "urn:ietf:params:jmap:websocket", + } + } } #[derive(Debug, Clone, Copy, Serialize, Deserialize)] @@ -111,12 +128,12 @@ pub enum TypeState { EmailSubmission, } -#[derive(Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub enum StateChangeType { StateChange, } -#[derive(Deserialize)] +#[derive(Debug, Deserialize)] pub struct StateChange { #[serde(rename = "@type")] pub type_: StateChangeType, @@ -139,6 +156,8 @@ pub enum Error { Server(String), Method(MethodError), Set(SetError), + #[cfg(feature = "websockets")] + WebSocket(tokio_tungstenite::tungstenite::error::Error), } impl From for Error { @@ -159,6 +178,12 @@ impl From for Error { } } +impl From for Error { + fn from(e: ProblemDetails) -> Self { + Error::Problem(e) + } +} + impl From> for Error { fn from(e: SetError) -> Self { Error::Set(e) @@ -171,6 +196,13 @@ impl From<&str> for Error { } } +#[cfg(feature = "websockets")] +impl From for Error { + fn from(e: tokio_tungstenite::tungstenite::error::Error) -> Self { + Error::WebSocket(e) + } +} + impl Display for Error { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -181,6 +213,8 @@ impl Display for Error { Error::Server(e) => write!(f, "Server error: {}", e), Error::Method(e) => write!(f, "Method error: {}", e), Error::Set(e) => write!(f, "Set error: {}", e), + #[cfg(feature = "websockets")] + Error::WebSocket(e) => write!(f, "WebSocket error: {}", e), } } } diff --git a/src/push_subscription/helpers.rs b/src/push_subscription/helpers.rs index 2bedbdf..241bfdb 100644 --- a/src/push_subscription/helpers.rs +++ b/src/push_subscription/helpers.rs @@ -6,7 +6,7 @@ use crate::{ response::{PushSubscriptionGetResponse, PushSubscriptionSetResponse}, set::{Create, SetRequest}, }, - Method, Set, + Method, Set, TypeState, }; use super::{Keys, PushSubscription}; @@ -52,6 +52,19 @@ impl Client { .updated(id) } + pub async fn push_subscription_update_types( + &mut self, + id: &str, + types: Option>, + ) -> crate::Result> { + let mut request = self.build(); + request.set_push_subscription().update(id).types(types); + request + .send_single::() + .await? + .updated(id) + } + pub async fn push_subscription_destroy(&mut self, id: &str) -> crate::Result<()> { let mut request = self.build(); request.set_push_subscription().destroy([id]); diff --git a/src/push_subscription/set.rs b/src/push_subscription/set.rs index 44e1154..09c0401 100644 --- a/src/push_subscription/set.rs +++ b/src/push_subscription/set.rs @@ -31,8 +31,8 @@ impl PushSubscription { self } - pub fn types(&mut self, types: Option>) -> &mut Self { - self.types = types.map(|s| s.collect()); + pub fn types(&mut self, types: Option>) -> &mut Self { + self.types = types.map(|s| s.into_iter().collect()); self } }