From 4fe665fc8bc6265baf5bfba6b6a5f3ee2dba63dc Mon Sep 17 00:00:00 2001 From: hozan23 Date: Wed, 8 Nov 2023 13:03:27 +0300 Subject: first commit --- karyons_core/src/event.rs | 451 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 451 insertions(+) create mode 100644 karyons_core/src/event.rs (limited to 'karyons_core/src/event.rs') diff --git a/karyons_core/src/event.rs b/karyons_core/src/event.rs new file mode 100644 index 0000000..b856385 --- /dev/null +++ b/karyons_core/src/event.rs @@ -0,0 +1,451 @@ +use std::{ + any::Any, + collections::HashMap, + marker::PhantomData, + sync::{Arc, Weak}, +}; + +use chrono::{DateTime, Utc}; +use log::{error, trace}; +use smol::{ + channel::{Receiver, Sender}, + lock::Mutex, +}; + +use crate::{utils::random_16, Result}; + +pub type ArcEventSys = Arc>; +pub type WeakEventSys = Weak>; +pub type EventListenerID = u16; + +type Listeners = HashMap>>>; + +/// EventSys supports event emission to registered listeners based on topics. +/// # Example +/// +/// ``` +/// use karyons_core::event::{EventSys, EventValueTopic, EventValue}; +/// +/// async { +/// let event_sys = EventSys::new(); +/// +/// #[derive(Hash, PartialEq, Eq, Debug, Clone)] +/// enum Topic { +/// TopicA, +/// TopicB, +/// } +/// +/// #[derive(Clone, Debug, PartialEq)] +/// struct A(usize); +/// +/// impl EventValue for A { +/// fn id() -> &'static str { +/// "A" +/// } +/// } +/// +/// let listener = event_sys.register::(&Topic::TopicA).await; +/// +/// event_sys.emit_by_topic(&Topic::TopicA, &A(3)) .await; +/// let msg: A = listener.recv().await.unwrap(); +/// +/// #[derive(Clone, Debug, PartialEq)] +/// struct B(usize); +/// +/// impl EventValue for B { +/// fn id() -> &'static str { +/// "B" +/// } +/// } +/// +/// impl EventValueTopic for B { +/// type Topic = Topic; +/// fn topic() -> Self::Topic{ +/// Topic::TopicB +/// } +/// } +/// +/// let listener = event_sys.register::(&Topic::TopicB).await; +/// +/// event_sys.emit(&B(3)) .await; +/// let msg: B = listener.recv().await.unwrap(); +/// +/// // .... +/// }; +/// +/// ``` +/// +pub struct EventSys { + listeners: Mutex>, +} + +impl EventSys +where + T: std::hash::Hash + Eq + std::fmt::Debug + Clone, +{ + /// Creates a new `EventSys` + pub fn new() -> ArcEventSys { + Arc::new(Self { + listeners: Mutex::new(HashMap::new()), + }) + } + + /// Emits an event to the listeners. + /// + /// The event must implement the `EventValueTopic` trait to indicate the + /// topic of the event. Otherwise, you can use `emit_by_topic()`. + pub async fn emit + Clone>(&self, value: &E) { + let topic = E::topic(); + self.emit_by_topic(&topic, value).await; + } + + /// Emits an event to the listeners. + pub async fn emit_by_topic(&self, topic: &T, value: &E) { + let value: Arc = Arc::new(value.clone()); + let event = Event::new(value); + + let mut topics = self.listeners.lock().await; + + if !topics.contains_key(topic) { + error!("Failed to emit an event to a non-existent topic"); + return; + } + + let event_ids = topics.get_mut(topic).unwrap(); + let event_id = E::id().to_string(); + + if !event_ids.contains_key(&event_id) { + error!("Failed to emit an event to a non-existent event id"); + return; + } + + let mut failed_listeners = vec![]; + + let listeners = event_ids.get_mut(&event_id).unwrap(); + for (listener_id, listener) in listeners.iter() { + if let Err(err) = listener.send(event.clone()).await { + trace!("Failed to emit event for topic {:?}: {}", topic, err); + failed_listeners.push(*listener_id); + } + } + + for listener_id in failed_listeners.iter() { + listeners.remove(listener_id); + } + } + + /// Registers a new event listener for the given topic. + pub async fn register( + self: &Arc, + topic: &T, + ) -> EventListener { + let chan = smol::channel::unbounded(); + + let topics = &mut self.listeners.lock().await; + + if !topics.contains_key(topic) { + topics.insert(topic.clone(), HashMap::new()); + } + + let event_ids = topics.get_mut(topic).unwrap(); + let event_id = E::id().to_string(); + + if !event_ids.contains_key(&event_id) { + event_ids.insert(event_id.clone(), HashMap::new()); + } + + let listeners = event_ids.get_mut(&event_id).unwrap(); + + let mut listener_id = random_16(); + while listeners.contains_key(&listener_id) { + listener_id = random_16(); + } + + let listener = + EventListener::new(listener_id, Arc::downgrade(self), chan.1, &event_id, topic); + + listeners.insert(listener_id, chan.0); + + listener + } + + /// Removes an event listener attached to the given topic. + async fn remove(&self, topic: &T, event_id: &str, listener_id: &EventListenerID) { + let topics = &mut self.listeners.lock().await; + if !topics.contains_key(topic) { + error!("Failed to remove a non-existent topic"); + return; + } + + let event_ids = topics.get_mut(topic).unwrap(); + if !event_ids.contains_key(event_id) { + error!("Failed to remove a non-existent event id"); + return; + } + + let listeners = event_ids.get_mut(event_id).unwrap(); + if listeners.remove(listener_id).is_none() { + error!("Failed to remove a non-existent event listener"); + } + } +} + +/// EventListener listens for and receives events from the `EventSys`. +pub struct EventListener { + id: EventListenerID, + recv_chan: Receiver, + event_sys: WeakEventSys, + event_id: String, + topic: T, + phantom: PhantomData, +} + +impl EventListener +where + T: std::hash::Hash + Eq + Clone + std::fmt::Debug, + E: EventValueAny + Clone + EventValue, +{ + /// Create a new event listener. + fn new( + id: EventListenerID, + event_sys: WeakEventSys, + recv_chan: Receiver, + event_id: &str, + topic: &T, + ) -> EventListener { + Self { + id, + recv_chan, + event_sys, + event_id: event_id.to_string(), + topic: topic.clone(), + phantom: PhantomData, + } + } + + /// Receive the next event. + pub async fn recv(&self) -> Result { + match self.recv_chan.recv().await { + Ok(event) => match ((*event.value).value_as_any()).downcast_ref::() { + Some(v) => Ok(v.clone()), + None => unreachable!("Error when attempting to downcast the event value."), + }, + Err(err) => { + error!("Failed to receive new event: {err}"); + self.cancel().await; + Err(err.into()) + } + } + } + + /// Cancels the listener and removes it from the `EventSys`. + pub async fn cancel(&self) { + self.event_sys() + .remove(&self.topic, &self.event_id, &self.id) + .await; + } + + /// Returns the topic for this event listener. + pub async fn topic(&self) -> &T { + &self.topic + } + + /// Returns the event id for this event listener. + pub async fn event_id(&self) -> &String { + &self.event_id + } + + fn event_sys(&self) -> ArcEventSys { + self.event_sys.upgrade().unwrap() + } +} + +/// An event within the `EventSys`. +#[derive(Clone, Debug)] +pub struct Event { + /// The time at which the event was created. + created_at: DateTime, + /// The value of the Event. + value: Arc, +} + +impl Event { + /// Creates a new Event. + pub fn new(value: Arc) -> Self { + Self { + created_at: Utc::now(), + value, + } + } +} + +impl std::fmt::Display for Event { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}: {:?}", self.created_at, self.value) + } +} + +pub trait EventValueAny: Any + Send + Sync + std::fmt::Debug { + fn value_as_any(&self) -> &dyn Any; +} + +impl EventValueAny for T { + fn value_as_any(&self) -> &dyn Any { + self + } +} + +pub trait EventValue: EventValueAny { + fn id() -> &'static str + where + Self: Sized; +} + +pub trait EventValueTopic: EventValueAny + EventValue { + type Topic; + fn topic() -> Self::Topic + where + Self: Sized; +} + +#[cfg(test)] +mod tests { + use super::*; + + #[derive(Hash, PartialEq, Eq, Debug, Clone)] + enum Topic { + TopicA, + TopicB, + TopicC, + TopicD, + TopicE, + } + + #[derive(Clone, Debug, PartialEq)] + struct A { + a_value: usize, + } + + #[derive(Clone, Debug, PartialEq)] + struct B { + b_value: usize, + } + + #[derive(Clone, Debug, PartialEq)] + struct C { + c_value: usize, + } + + #[derive(Clone, Debug, PartialEq)] + struct D { + d_value: usize, + } + + #[derive(Clone, Debug, PartialEq)] + struct E { + e_value: usize, + } + + #[derive(Clone, Debug, PartialEq)] + struct F { + f_value: usize, + } + + impl EventValue for A { + fn id() -> &'static str { + "A" + } + } + + impl EventValue for B { + fn id() -> &'static str { + "B" + } + } + + impl EventValue for C { + fn id() -> &'static str { + "C" + } + } + + impl EventValue for D { + fn id() -> &'static str { + "D" + } + } + + impl EventValue for E { + fn id() -> &'static str { + "E" + } + } + + impl EventValue for F { + fn id() -> &'static str { + "F" + } + } + + impl EventValueTopic for C { + type Topic = Topic; + fn topic() -> Self::Topic { + Topic::TopicC + } + } + + #[test] + fn test_event_sys() { + smol::block_on(async move { + let event_sys = EventSys::::new(); + + let a_listener = event_sys.register::(&Topic::TopicA).await; + let b_listener = event_sys.register::(&Topic::TopicB).await; + + event_sys + .emit_by_topic(&Topic::TopicA, &A { a_value: 3 }) + .await; + event_sys + .emit_by_topic(&Topic::TopicB, &B { b_value: 5 }) + .await; + + let msg = a_listener.recv().await.unwrap(); + assert_eq!(msg, A { a_value: 3 }); + + let msg = b_listener.recv().await.unwrap(); + assert_eq!(msg, B { b_value: 5 }); + + // register the same event type to different topics + let c_listener = event_sys.register::(&Topic::TopicC).await; + let d_listener = event_sys.register::(&Topic::TopicD).await; + + event_sys.emit(&C { c_value: 10 }).await; + let msg = c_listener.recv().await.unwrap(); + assert_eq!(msg, C { c_value: 10 }); + + event_sys + .emit_by_topic(&Topic::TopicD, &C { c_value: 10 }) + .await; + let msg = d_listener.recv().await.unwrap(); + assert_eq!(msg, C { c_value: 10 }); + + // register different event types to the same topic + let e_listener = event_sys.register::(&Topic::TopicE).await; + let f_listener = event_sys.register::(&Topic::TopicE).await; + + event_sys + .emit_by_topic(&Topic::TopicE, &E { e_value: 5 }) + .await; + + let msg = e_listener.recv().await.unwrap(); + assert_eq!(msg, E { e_value: 5 }); + + event_sys + .emit_by_topic(&Topic::TopicE, &F { f_value: 5 }) + .await; + + let msg = f_listener.recv().await.unwrap(); + assert_eq!(msg, F { f_value: 5 }); + }); + } +} -- cgit v1.2.3