diff options
author | hozan23 <hozan23@proton.me> | 2023-11-09 11:38:19 +0300 |
---|---|---|
committer | hozan23 <hozan23@proton.me> | 2023-11-09 11:38:19 +0300 |
commit | 849d827486c75b2ab223d7b0e638dbb5b74d4d1d (patch) | |
tree | 41cd3babc37147ec4a40cab8ce8ae31c91cce33b /karyons_core/src/event.rs | |
parent | de1354525895ffbad18f90a5246fd65157f7449e (diff) |
rename crates
Diffstat (limited to 'karyons_core/src/event.rs')
-rw-r--r-- | karyons_core/src/event.rs | 451 |
1 files changed, 0 insertions, 451 deletions
diff --git a/karyons_core/src/event.rs b/karyons_core/src/event.rs deleted file mode 100644 index b856385..0000000 --- a/karyons_core/src/event.rs +++ /dev/null @@ -1,451 +0,0 @@ -use std::{ - any::Any, - collections::HashMap, - marker::PhantomData, - sync::{Arc, Weak}, -}; - -use chrono::{DateTime, Utc}; -use log::{error, trace}; -use smol::{ - channel::{Receiver, Sender}, - lock::Mutex, -}; - -use crate::{utils::random_16, Result}; - -pub type ArcEventSys<T> = Arc<EventSys<T>>; -pub type WeakEventSys<T> = Weak<EventSys<T>>; -pub type EventListenerID = u16; - -type Listeners<T> = HashMap<T, HashMap<String, HashMap<EventListenerID, Sender<Event>>>>; - -/// EventSys supports event emission to registered listeners based on topics. -/// # Example -/// -/// ``` -/// use karyons_core::event::{EventSys, EventValueTopic, EventValue}; -/// -/// async { -/// let event_sys = EventSys::new(); -/// -/// #[derive(Hash, PartialEq, Eq, Debug, Clone)] -/// enum Topic { -/// TopicA, -/// TopicB, -/// } -/// -/// #[derive(Clone, Debug, PartialEq)] -/// struct A(usize); -/// -/// impl EventValue for A { -/// fn id() -> &'static str { -/// "A" -/// } -/// } -/// -/// let listener = event_sys.register::<A>(&Topic::TopicA).await; -/// -/// event_sys.emit_by_topic(&Topic::TopicA, &A(3)) .await; -/// let msg: A = listener.recv().await.unwrap(); -/// -/// #[derive(Clone, Debug, PartialEq)] -/// struct B(usize); -/// -/// impl EventValue for B { -/// fn id() -> &'static str { -/// "B" -/// } -/// } -/// -/// impl EventValueTopic for B { -/// type Topic = Topic; -/// fn topic() -> Self::Topic{ -/// Topic::TopicB -/// } -/// } -/// -/// let listener = event_sys.register::<B>(&Topic::TopicB).await; -/// -/// event_sys.emit(&B(3)) .await; -/// let msg: B = listener.recv().await.unwrap(); -/// -/// // .... -/// }; -/// -/// ``` -/// -pub struct EventSys<T> { - listeners: Mutex<Listeners<T>>, -} - -impl<T> EventSys<T> -where - T: std::hash::Hash + Eq + std::fmt::Debug + Clone, -{ - /// Creates a new `EventSys` - pub fn new() -> ArcEventSys<T> { - Arc::new(Self { - listeners: Mutex::new(HashMap::new()), - }) - } - - /// Emits an event to the listeners. - /// - /// The event must implement the `EventValueTopic` trait to indicate the - /// topic of the event. Otherwise, you can use `emit_by_topic()`. - pub async fn emit<E: EventValueTopic<Topic = T> + Clone>(&self, value: &E) { - let topic = E::topic(); - self.emit_by_topic(&topic, value).await; - } - - /// Emits an event to the listeners. - pub async fn emit_by_topic<E: EventValueAny + EventValue + Clone>(&self, topic: &T, value: &E) { - let value: Arc<dyn EventValueAny> = Arc::new(value.clone()); - let event = Event::new(value); - - let mut topics = self.listeners.lock().await; - - if !topics.contains_key(topic) { - error!("Failed to emit an event to a non-existent topic"); - return; - } - - let event_ids = topics.get_mut(topic).unwrap(); - let event_id = E::id().to_string(); - - if !event_ids.contains_key(&event_id) { - error!("Failed to emit an event to a non-existent event id"); - return; - } - - let mut failed_listeners = vec![]; - - let listeners = event_ids.get_mut(&event_id).unwrap(); - for (listener_id, listener) in listeners.iter() { - if let Err(err) = listener.send(event.clone()).await { - trace!("Failed to emit event for topic {:?}: {}", topic, err); - failed_listeners.push(*listener_id); - } - } - - for listener_id in failed_listeners.iter() { - listeners.remove(listener_id); - } - } - - /// Registers a new event listener for the given topic. - pub async fn register<E: EventValueAny + EventValue + Clone>( - self: &Arc<Self>, - topic: &T, - ) -> EventListener<T, E> { - let chan = smol::channel::unbounded(); - - let topics = &mut self.listeners.lock().await; - - if !topics.contains_key(topic) { - topics.insert(topic.clone(), HashMap::new()); - } - - let event_ids = topics.get_mut(topic).unwrap(); - let event_id = E::id().to_string(); - - if !event_ids.contains_key(&event_id) { - event_ids.insert(event_id.clone(), HashMap::new()); - } - - let listeners = event_ids.get_mut(&event_id).unwrap(); - - let mut listener_id = random_16(); - while listeners.contains_key(&listener_id) { - listener_id = random_16(); - } - - let listener = - EventListener::new(listener_id, Arc::downgrade(self), chan.1, &event_id, topic); - - listeners.insert(listener_id, chan.0); - - listener - } - - /// Removes an event listener attached to the given topic. - async fn remove(&self, topic: &T, event_id: &str, listener_id: &EventListenerID) { - let topics = &mut self.listeners.lock().await; - if !topics.contains_key(topic) { - error!("Failed to remove a non-existent topic"); - return; - } - - let event_ids = topics.get_mut(topic).unwrap(); - if !event_ids.contains_key(event_id) { - error!("Failed to remove a non-existent event id"); - return; - } - - let listeners = event_ids.get_mut(event_id).unwrap(); - if listeners.remove(listener_id).is_none() { - error!("Failed to remove a non-existent event listener"); - } - } -} - -/// EventListener listens for and receives events from the `EventSys`. -pub struct EventListener<T, E> { - id: EventListenerID, - recv_chan: Receiver<Event>, - event_sys: WeakEventSys<T>, - event_id: String, - topic: T, - phantom: PhantomData<E>, -} - -impl<T, E> EventListener<T, E> -where - T: std::hash::Hash + Eq + Clone + std::fmt::Debug, - E: EventValueAny + Clone + EventValue, -{ - /// Create a new event listener. - fn new( - id: EventListenerID, - event_sys: WeakEventSys<T>, - recv_chan: Receiver<Event>, - event_id: &str, - topic: &T, - ) -> EventListener<T, E> { - Self { - id, - recv_chan, - event_sys, - event_id: event_id.to_string(), - topic: topic.clone(), - phantom: PhantomData, - } - } - - /// Receive the next event. - pub async fn recv(&self) -> Result<E> { - match self.recv_chan.recv().await { - Ok(event) => match ((*event.value).value_as_any()).downcast_ref::<E>() { - Some(v) => Ok(v.clone()), - None => unreachable!("Error when attempting to downcast the event value."), - }, - Err(err) => { - error!("Failed to receive new event: {err}"); - self.cancel().await; - Err(err.into()) - } - } - } - - /// Cancels the listener and removes it from the `EventSys`. - pub async fn cancel(&self) { - self.event_sys() - .remove(&self.topic, &self.event_id, &self.id) - .await; - } - - /// Returns the topic for this event listener. - pub async fn topic(&self) -> &T { - &self.topic - } - - /// Returns the event id for this event listener. - pub async fn event_id(&self) -> &String { - &self.event_id - } - - fn event_sys(&self) -> ArcEventSys<T> { - self.event_sys.upgrade().unwrap() - } -} - -/// An event within the `EventSys`. -#[derive(Clone, Debug)] -pub struct Event { - /// The time at which the event was created. - created_at: DateTime<Utc>, - /// The value of the Event. - value: Arc<dyn EventValueAny>, -} - -impl Event { - /// Creates a new Event. - pub fn new(value: Arc<dyn EventValueAny>) -> Self { - Self { - created_at: Utc::now(), - value, - } - } -} - -impl std::fmt::Display for Event { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{}: {:?}", self.created_at, self.value) - } -} - -pub trait EventValueAny: Any + Send + Sync + std::fmt::Debug { - fn value_as_any(&self) -> &dyn Any; -} - -impl<T: Send + Sync + std::fmt::Debug + Any> EventValueAny for T { - fn value_as_any(&self) -> &dyn Any { - self - } -} - -pub trait EventValue: EventValueAny { - fn id() -> &'static str - where - Self: Sized; -} - -pub trait EventValueTopic: EventValueAny + EventValue { - type Topic; - fn topic() -> Self::Topic - where - Self: Sized; -} - -#[cfg(test)] -mod tests { - use super::*; - - #[derive(Hash, PartialEq, Eq, Debug, Clone)] - enum Topic { - TopicA, - TopicB, - TopicC, - TopicD, - TopicE, - } - - #[derive(Clone, Debug, PartialEq)] - struct A { - a_value: usize, - } - - #[derive(Clone, Debug, PartialEq)] - struct B { - b_value: usize, - } - - #[derive(Clone, Debug, PartialEq)] - struct C { - c_value: usize, - } - - #[derive(Clone, Debug, PartialEq)] - struct D { - d_value: usize, - } - - #[derive(Clone, Debug, PartialEq)] - struct E { - e_value: usize, - } - - #[derive(Clone, Debug, PartialEq)] - struct F { - f_value: usize, - } - - impl EventValue for A { - fn id() -> &'static str { - "A" - } - } - - impl EventValue for B { - fn id() -> &'static str { - "B" - } - } - - impl EventValue for C { - fn id() -> &'static str { - "C" - } - } - - impl EventValue for D { - fn id() -> &'static str { - "D" - } - } - - impl EventValue for E { - fn id() -> &'static str { - "E" - } - } - - impl EventValue for F { - fn id() -> &'static str { - "F" - } - } - - impl EventValueTopic for C { - type Topic = Topic; - fn topic() -> Self::Topic { - Topic::TopicC - } - } - - #[test] - fn test_event_sys() { - smol::block_on(async move { - let event_sys = EventSys::<Topic>::new(); - - let a_listener = event_sys.register::<A>(&Topic::TopicA).await; - let b_listener = event_sys.register::<B>(&Topic::TopicB).await; - - event_sys - .emit_by_topic(&Topic::TopicA, &A { a_value: 3 }) - .await; - event_sys - .emit_by_topic(&Topic::TopicB, &B { b_value: 5 }) - .await; - - let msg = a_listener.recv().await.unwrap(); - assert_eq!(msg, A { a_value: 3 }); - - let msg = b_listener.recv().await.unwrap(); - assert_eq!(msg, B { b_value: 5 }); - - // register the same event type to different topics - let c_listener = event_sys.register::<C>(&Topic::TopicC).await; - let d_listener = event_sys.register::<C>(&Topic::TopicD).await; - - event_sys.emit(&C { c_value: 10 }).await; - let msg = c_listener.recv().await.unwrap(); - assert_eq!(msg, C { c_value: 10 }); - - event_sys - .emit_by_topic(&Topic::TopicD, &C { c_value: 10 }) - .await; - let msg = d_listener.recv().await.unwrap(); - assert_eq!(msg, C { c_value: 10 }); - - // register different event types to the same topic - let e_listener = event_sys.register::<E>(&Topic::TopicE).await; - let f_listener = event_sys.register::<F>(&Topic::TopicE).await; - - event_sys - .emit_by_topic(&Topic::TopicE, &E { e_value: 5 }) - .await; - - let msg = e_listener.recv().await.unwrap(); - assert_eq!(msg, E { e_value: 5 }); - - event_sys - .emit_by_topic(&Topic::TopicE, &F { f_value: 5 }) - .await; - - let msg = f_listener.recv().await.unwrap(); - assert_eq!(msg, F { f_value: 5 }); - }); - } -} |