diff --git a/Cargo.toml b/Cargo.toml index a28e8e4..55470d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,9 @@ readme = "README.md" serde = { version = "1.0", features = ["derive"]} serde_json = "1.0" chrono = { version = "0.4", features = ["serde"]} -reqwest = "0.11" +reqwest = { version = "0.11", features = ["stream"]} +futures-util = "0.3" +async-stream = "0.3.3" base64 = "0.13" [features] diff --git a/src/client.rs b/src/client.rs index 8954ae7..d700863 100644 --- a/src/client.rs +++ b/src/client.rs @@ -190,8 +190,8 @@ impl Client { mod tests { use crate::email::EmailBodyPart; - #[test] - fn test_serialize() { + //#[test] + fn _test_serialize() { println!( "{:?}", serde_json::from_slice::( diff --git a/src/email/mod.rs b/src/email/mod.rs index 70eeb7f..9593fb1 100644 --- a/src/email/mod.rs +++ b/src/email/mod.rs @@ -260,7 +260,7 @@ pub struct EmailHeader { value: String, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum Property { Id, BlobId, diff --git a/src/email_submission/mod.rs b/src/email_submission/mod.rs index 635b81b..7fa90a5 100644 --- a/src/email_submission/mod.rs +++ b/src/email_submission/mod.rs @@ -127,7 +127,7 @@ pub enum Displayed { Yes, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Copy)] pub enum Property { #[serde(rename = "id")] Id, diff --git a/src/event_source/mod.rs b/src/event_source/mod.rs index 9b45ae3..1860794 100644 --- a/src/event_source/mod.rs +++ b/src/event_source/mod.rs @@ -1,4 +1,11 @@ -use crate::core::session::URLParser; +pub mod parser; +pub mod stream; + +use std::collections::HashMap; + +use serde::{Deserialize, Serialize}; + +use crate::{core::session::URLParser, TypeState}; pub enum URLParameter { Types, @@ -16,3 +23,22 @@ impl URLParser for URLParameter { } } } + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct Changes { + changes: HashMap>, +} + +impl Changes { + pub fn account_changes(&mut self, account_id: &str) -> Option> { + self.changes.remove(account_id) + } + + pub fn changed_accounts(&self) -> impl Iterator { + self.changes.keys() + } + + pub fn into_innter(self) -> HashMap> { + self.changes + } +} diff --git a/src/event_source/parser.rs b/src/event_source/parser.rs new file mode 100644 index 0000000..ffcd204 --- /dev/null +++ b/src/event_source/parser.rs @@ -0,0 +1,292 @@ +use crate::StateChange; + +use super::Changes; + +const MAX_EVENT_SIZE: usize = 1024 * 1024; + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum EventType { + Ping, + State, +} + +impl Default for EventType { + fn default() -> Self { + Self::State + } +} + +#[derive(Default, Debug)] +pub struct Event { + pub event: EventType, + pub id: Vec, + pub data: Vec, +} + +#[derive(Debug, Copy, Clone)] +enum EventParserState { + Init, + Comment, + Field, + Value, +} + +impl Default for EventParserState { + fn default() -> Self { + Self::Init + } +} + +#[derive(Default, Debug)] +pub struct EventParser { + state: EventParserState, + field: Vec, + value: Vec, + bytes: Option>, + pos: usize, + result: Event, +} + +impl EventParser { + pub fn push_bytes(&mut self, bytes: Vec) { + self.bytes = Some(bytes); + } + + pub fn needs_bytes(&self) -> bool { + self.bytes.is_none() + } + + pub fn filter_state(&mut self) -> Option> { + #[allow(clippy::while_let_on_iterator)] + while let Some(event) = self.next() { + match event { + Ok(Event { + event: EventType::State, + data, + .. + }) => { + return match serde_json::from_slice::(&data) { + Ok(state_change) => Some(Ok(Changes { + changes: state_change.changed, + })), + Err(err) => Some(Err(err.into())), + }; + } + Ok(Event { + event: EventType::Ping, + .. + }) => { + #[cfg(feature = "debug")] + use std::iter::FromIterator; + #[cfg(feature = "debug")] + return Some(Ok(Changes { + changes: std::collections::HashMap::from_iter([( + "ping".to_string(), + std::collections::HashMap::new(), + )]), + })); + } + Err(err) => return Some(Err(err)), + } + } + None + } +} + +impl Iterator for EventParser { + type Item = crate::Result; + + fn next(&mut self) -> Option { + let bytes = self.bytes.as_ref()?; + + for byte in bytes.get(self.pos..)? { + self.pos += 1; + + match self.state { + EventParserState::Init => match byte { + b':' => { + self.state = EventParserState::Comment; + } + b'\r' | b' ' => (), + b'\n' => { + return Some(Ok(std::mem::take(&mut self.result))); + } + _ => { + self.state = EventParserState::Field; + self.field.push(*byte); + } + }, + EventParserState::Comment => { + if *byte == b'\n' { + self.state = EventParserState::Init; + } + } + EventParserState::Field => match byte { + b'\r' => (), + b'\n' => { + self.state = EventParserState::Init; + self.field.clear(); + } + b':' => { + self.state = EventParserState::Value; + } + _ => { + if self.field.len() >= MAX_EVENT_SIZE { + return Some(Err(crate::Error::Internal( + "EventSource response is too long.".to_string(), + ))); + } + + self.field.push(*byte); + } + }, + EventParserState::Value => match byte { + b'\r' => (), + b' ' if self.value.is_empty() => (), + b'\n' => { + self.state = EventParserState::Init; + match &self.field[..] { + b"id" => { + self.result.id.extend_from_slice(&self.value); + } + b"data" => { + self.result.data.extend_from_slice(&self.value); + } + b"event" => { + if self.value == b"ping" { + self.result.event = EventType::Ping; + } else { + self.result.event = EventType::State; + } + } + _ => { + //ignore + } + } + self.field.clear(); + self.value.clear(); + } + _ => { + if (self.field.len() + self.value.len()) >= MAX_EVENT_SIZE { + return Some(Err(crate::Error::Internal( + "EventSource response is too long.".to_string(), + ))); + } + + self.value.push(*byte); + } + }, + } + } + + self.bytes = None; + self.pos = 0; + + None + } +} + +#[cfg(test)] +mod tests { + + use super::{Event, EventType}; + + #[derive(Debug, PartialEq, Eq)] + struct EventString { + event: EventType, + id: String, + data: String, + } + + impl From for EventString { + fn from(event: Event) -> Self { + Self { + event: event.event, + id: String::from_utf8(event.id).unwrap(), + data: String::from_utf8(event.data).unwrap(), + } + } + } + + #[test] + fn parse() { + let mut parser = super::EventParser::default(); + let mut results = Vec::new(); + + for frame in [ + Vec::from("event: state\nid: 0\ndata: test\n\n"), + Vec::from("event: ping\nid:123\ndata: ping pa"), + Vec::from("yload"), + Vec::from("\n\n"), + Vec::from(":comment\n\n"), + Vec::from("data: YHOO\n"), + Vec::from("data: +2\n"), + Vec::from("data: 10\n\n"), + Vec::from(": test stream\n"), + Vec::from("data: first event\n"), + Vec::from("id: 1\n\n"), + Vec::from("data:second event\n"), + Vec::from("id\n\n"), + Vec::from("data: third event\n\n"), + Vec::from("data:hello\n\ndata: world\n\n"), + ] { + parser.push_bytes(frame); + + #[allow(clippy::while_let_on_iterator)] + while let Some(event) = parser.next() { + results.push(EventString::from(event.unwrap())); + } + } + + assert_eq!( + results, + vec![ + EventString { + event: EventType::State, + id: "0".to_string(), + data: "test".to_string() + }, + EventString { + event: EventType::Ping, + id: "123".to_string(), + data: "ping payload".to_string() + }, + EventString { + event: EventType::State, + id: "".to_string(), + data: "".to_string() + }, + EventString { + event: EventType::State, + id: "".to_string(), + data: "YHOO+210".to_string() + }, + EventString { + event: EventType::State, + id: "1".to_string(), + data: "first event".to_string() + }, + EventString { + event: EventType::State, + id: "".to_string(), + data: "second event".to_string() + }, + EventString { + event: EventType::State, + id: "".to_string(), + data: "third event".to_string() + }, + EventString { + event: EventType::State, + id: "".to_string(), + data: "hello".to_string() + }, + EventString { + event: EventType::State, + id: "".to_string(), + data: "world".to_string() + } + ] + ); + } +} diff --git a/src/event_source/stream.rs b/src/event_source/stream.rs new file mode 100644 index 0000000..5c1f179 --- /dev/null +++ b/src/event_source/stream.rs @@ -0,0 +1,101 @@ +use std::time::Duration; + +use crate::{client::Client, core::session::URLPart, event_source::parser::EventParser, TypeState}; +use async_stream::stream; +use futures_util::{Stream, StreamExt}; +use reqwest::header::{HeaderValue, ACCEPT, CONTENT_TYPE}; + +use super::Changes; + +impl Client { + pub async fn event_source( + &mut self, + mut types: Option>, + close_after_state: bool, + ping: Option, + last_event_id: Option<&str>, + ) -> crate::Result>> { + let mut event_source_url = String::with_capacity(self.session().event_source_url().len()); + + for part in self.event_source_url() { + match part { + URLPart::Value(value) => { + event_source_url.push_str(value); + } + URLPart::Parameter(param) => match param { + super::URLParameter::Types => { + if let Some(types) = Option::take(&mut types) { + event_source_url.push_str( + &types + .into_iter() + .map(|state| state.to_string()) + .collect::>() + .join(","), + ); + } else { + event_source_url.push('*'); + } + } + super::URLParameter::CloseAfter => { + event_source_url.push_str(if close_after_state { "state" } else { "no" }); + } + super::URLParameter::Ping => { + if let Some(ping) = ping { + event_source_url.push_str(&ping.to_string()); + } else { + event_source_url.push('0'); + } + } + }, + } + } + + // Add headers + let mut headers = self.headers().clone(); + headers.remove(CONTENT_TYPE); + headers.insert(ACCEPT, HeaderValue::from_static("text/event-stream")); + if let Some(last_event_id) = last_event_id { + headers.insert( + "Last-Event-ID", + HeaderValue::from_str(last_event_id).unwrap(), + ); + } + + let mut stream = Client::handle_error( + reqwest::Client::builder() + .timeout(Duration::from_millis(self.timeout())) + .default_headers(headers) + .build()? + .get(event_source_url) + .send() + .await?, + ) + .await? + .bytes_stream(); + let mut parser = EventParser::default(); + + // TODO - use poll_next() to avoid pin_mut() call. + Ok(stream! { + loop { + if let Some(changes) = parser.filter_state() { + yield changes; + continue; + } + if let Some(result) = stream.next().await { + match result { + Ok(bytes) => { + parser.push_bytes(bytes.to_vec()); + continue; + } + Err(err) => { + yield Err(err.into()); + break; + } + } + } else { + break; + } + } + }) + } +} diff --git a/src/identity/mod.rs b/src/identity/mod.rs index 3983b0b..fa5c4cc 100644 --- a/src/identity/mod.rs +++ b/src/identity/mod.rs @@ -49,7 +49,7 @@ pub struct Identity { pub may_delete: Option, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Copy)] pub enum Property { #[serde(rename = "id")] Id, diff --git a/src/lib.rs b/src/lib.rs index 706f50b..bb68d72 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,6 +17,8 @@ pub mod push_subscription; pub mod thread; pub mod vacation_response; +pub use futures_util; + #[derive(Debug, Clone, Serialize, Deserialize, Hash, PartialEq, Eq)] pub enum URI { #[serde(rename = "urn:ietf:params:jmap:core")] @@ -100,17 +102,13 @@ pub enum Method { } #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Hash, Clone)] -pub enum Object { - Core, +pub enum TypeState { Mailbox, Thread, Email, EmailDelivery, - SearchSnippet, Identity, EmailSubmission, - VacationResponse, - PushSubscription, } #[derive(Deserialize)] @@ -120,9 +118,9 @@ pub enum StateChangeType { #[derive(Deserialize)] pub struct StateChange { - #[serde(rename(serialize = "@type"))] + #[serde(rename = "@type")] pub type_: StateChangeType, - pub changed: HashMap>, + pub changed: HashMap>, } #[derive(Debug, Clone)] @@ -186,3 +184,16 @@ impl Display for Error { } } } + +impl Display for TypeState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + TypeState::Mailbox => write!(f, "Mailbox"), + TypeState::Thread => write!(f, "Thread"), + TypeState::Email => write!(f, "Email"), + TypeState::EmailDelivery => write!(f, "EmailDelivery"), + TypeState::Identity => write!(f, "Identity"), + TypeState::EmailSubmission => write!(f, "EmailSubmission"), + } + } +} diff --git a/src/mailbox/helpers.rs b/src/mailbox/helpers.rs index dd76d4d..3325f66 100644 --- a/src/mailbox/helpers.rs +++ b/src/mailbox/helpers.rs @@ -120,7 +120,7 @@ impl Client { pub async fn mailbox_query( &mut self, filter: Option>>, - sort: Option>>, + sort: Option>>, ) -> crate::Result { let mut request = self.build(); let query_request = request.query_mailbox(); @@ -132,6 +132,18 @@ impl Client { } request.send_single::().await } + + pub async fn mailbox_changes( + &mut self, + since_state: impl Into, + max_changes: usize, + ) -> crate::Result> { + let mut request = self.build(); + request + .changes_mailbox(since_state) + .max_changes(max_changes); + request.send_single().await + } } impl Request<'_> { diff --git a/src/mailbox/mod.rs b/src/mailbox/mod.rs index 3ca9693..f72b593 100644 --- a/src/mailbox/mod.rs +++ b/src/mailbox/mod.rs @@ -87,12 +87,19 @@ pub struct Mailbox { #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] pub enum Role { + #[serde(rename = "archive", alias = "ARCHIVE")] Archive, + #[serde(rename = "drafts", alias = "DRAFTS")] Drafts, + #[serde(rename = "importante", alias = "IMPORTANT")] Important, + #[serde(rename = "inbox", alias = "INBOX")] Inbox, + #[serde(rename = "junk", alias = "JUNK")] Junk, + #[serde(rename = "sent", alias = "SENT")] Sent, + #[serde(rename = "trash", alias = "TRASH")] Trash, None, } @@ -127,7 +134,7 @@ pub struct MailboxRights { may_submit: bool, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Copy)] pub enum Property { #[serde(rename = "id")] Id, diff --git a/src/mailbox/query.rs b/src/mailbox/query.rs index df39e22..fdecd56 100644 --- a/src/mailbox/query.rs +++ b/src/mailbox/query.rs @@ -41,12 +41,16 @@ pub enum Comparator { } impl Filter { - pub fn parent_id(value: Option) -> Self { - Filter::ParentId { value } + pub fn parent_id(value: Option>) -> Self { + Filter::ParentId { + value: value.map(Into::into), + } } - pub fn name(value: String) -> Self { - Filter::Name { value } + pub fn name(value: impl Into) -> Self { + Filter::Name { + value: value.into(), + } } pub fn role(value: Role) -> Self { diff --git a/src/mailbox/set.rs b/src/mailbox/set.rs index 094a8be..21e274c 100644 --- a/src/mailbox/set.rs +++ b/src/mailbox/set.rs @@ -13,6 +13,11 @@ impl Mailbox { self } + pub fn parent_id_ref(&mut self, parent_id_ref: &str) -> &mut Self { + self.parent_id = format!("#{}", parent_id_ref).into(); + self + } + pub fn role(&mut self, role: Role) -> &mut Self { if !matches!(role, Role::None) { self.role = Some(role); diff --git a/src/push_subscription/get.rs b/src/push_subscription/get.rs index d3d39ac..8ff90c3 100644 --- a/src/push_subscription/get.rs +++ b/src/push_subscription/get.rs @@ -1,4 +1,4 @@ -use crate::{Get, Object}; +use crate::{Get, TypeState}; use super::{Keys, PushSubscription}; @@ -27,7 +27,7 @@ impl PushSubscription { self.expires.map(|v| v.timestamp()) } - pub fn types(&self) -> Option<&[Object]> { + pub fn types(&self) -> Option<&[TypeState]> { self.types.as_deref() } } diff --git a/src/push_subscription/mod.rs b/src/push_subscription/mod.rs index 8c8d78b..f7a2380 100644 --- a/src/push_subscription/mod.rs +++ b/src/push_subscription/mod.rs @@ -8,7 +8,7 @@ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use crate::core::set::list_not_set; -use crate::{Get, Object}; +use crate::{Get, TypeState}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PushSubscription { @@ -44,10 +44,10 @@ pub struct PushSubscription { #[serde(rename = "types")] #[serde(skip_serializing_if = "list_not_set")] - types: Option>, + types: Option>, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Copy)] pub enum Property { #[serde(rename = "id")] Id, diff --git a/src/push_subscription/set.rs b/src/push_subscription/set.rs index 0cc0347..44e1154 100644 --- a/src/push_subscription/set.rs +++ b/src/push_subscription/set.rs @@ -1,6 +1,6 @@ use crate::{ core::set::{from_timestamp, Create}, - Object, Set, + Set, TypeState, }; use super::{Keys, PushSubscription}; @@ -31,7 +31,7 @@ impl PushSubscription { self } - pub fn types(&mut self, types: Option>) -> &mut Self { + pub fn types(&mut self, types: Option>) -> &mut Self { self.types = types.map(|s| s.collect()); self } diff --git a/src/thread/mod.rs b/src/thread/mod.rs index a2cd68c..5a76957 100644 --- a/src/thread/mod.rs +++ b/src/thread/mod.rs @@ -12,7 +12,7 @@ pub struct Thread { email_ids: Vec, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Copy)] pub enum Property { #[serde(rename = "id")] Id, diff --git a/src/vacation_response/mod.rs b/src/vacation_response/mod.rs index 4e3a920..7e14707 100644 --- a/src/vacation_response/mod.rs +++ b/src/vacation_response/mod.rs @@ -47,7 +47,7 @@ pub struct VacationResponse { html_body: Option, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Copy)] pub enum Property { #[serde(rename = "id")] Id,