aboutsummaryrefslogtreecommitdiff
path: root/core/src/event.rs
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/event.rs')
-rw-r--r--core/src/event.rs74
1 files changed, 50 insertions, 24 deletions
diff --git a/core/src/event.rs b/core/src/event.rs
index 771d661..1632df3 100644
--- a/core/src/event.rs
+++ b/core/src/event.rs
@@ -7,17 +7,19 @@ use std::{
use async_channel::{Receiver, Sender};
use chrono::{DateTime, Utc};
+use futures_util::stream::{FuturesUnordered, StreamExt};
use log::{debug, error};
-use crate::{async_runtime::lock::Mutex, util::random_16, Result};
+use crate::{async_runtime::lock::Mutex, util::random_32, Result};
+
+const CHANNEL_BUFFER_SIZE: usize = 1000;
pub type ArcEventSys<T> = Arc<EventSys<T>>;
-pub type WeakEventSys<T> = Weak<EventSys<T>>;
-pub type EventListenerID = u16;
+pub type EventListenerID = u32;
type Listeners<T> = HashMap<T, HashMap<String, HashMap<EventListenerID, Sender<Event>>>>;
-/// EventSys supports event emission to registered listeners based on topics.
+/// EventSys emits events to registered listeners based on topics.
/// # Example
///
/// ```
@@ -74,22 +76,41 @@ type Listeners<T> = HashMap<T, HashMap<String, HashMap<EventListenerID, Sender<E
///
pub struct EventSys<T> {
listeners: Mutex<Listeners<T>>,
+ listener_buffer_size: usize,
}
impl<T> EventSys<T>
where
T: std::hash::Hash + Eq + std::fmt::Debug + Clone,
{
- /// Creates a new `EventSys`
+ /// Creates a new [`EventSys`]
pub fn new() -> ArcEventSys<T> {
Arc::new(Self {
listeners: Mutex::new(HashMap::new()),
+ listener_buffer_size: CHANNEL_BUFFER_SIZE,
+ })
+ }
+
+ /// Creates a new [`EventSys`] with the provided buffer size for the
+ /// [`EventListener`] channel.
+ ///
+ /// This is important to control the memory used by the listener channel.
+ /// If the consumer for the event listener can't keep up with the new events
+ /// coming, then the channel buffer will fill with new events, and if the
+ /// buffer is full, the emit function will block until the listener
+ /// starts to consume the buffered events.
+ ///
+ /// If `size` is zero, this function will panic.
+ pub fn with_buffer_size(size: usize) -> ArcEventSys<T> {
+ Arc::new(Self {
+ listeners: Mutex::new(HashMap::new()),
+ listener_buffer_size: size,
})
}
/// Emits an event to the listeners.
///
- /// The event must implement the `EventValueTopic` trait to indicate the
+ /// The event must implement the [`EventValueTopic`] trait to indicate the
/// topic of the event. Otherwise, you can use `emit_by_topic()`.
pub async fn emit<E: EventValueTopic<Topic = T> + Clone>(&self, value: &E) {
let topic = E::topic();
@@ -115,22 +136,26 @@ where
let event_id = E::id().to_string();
if !event_ids.contains_key(&event_id) {
- debug!(
- "Failed to emit an event to a non-existent event id: {:?}",
- event_id
- );
+ debug!("Failed to emit an event: unknown event id {:?}", event_id);
return;
}
- let mut failed_listeners = vec![];
+ let mut results = FuturesUnordered::new();
let listeners = event_ids.get_mut(&event_id).unwrap();
for (listener_id, listener) in listeners.iter() {
- if let Err(err) = listener.send(event.clone()).await {
+ let result = async { (*listener_id, listener.send(event.clone()).await) };
+ results.push(result);
+ }
+
+ let mut failed_listeners = vec![];
+ while let Some((id, fut_err)) = results.next().await {
+ if let Err(err) = fut_err {
debug!("Failed to emit event for topic {:?}: {}", topic, err);
- failed_listeners.push(*listener_id);
+ failed_listeners.push(id);
}
}
+ drop(results);
for listener_id in failed_listeners.iter() {
listeners.remove(listener_id);
@@ -142,7 +167,7 @@ where
self: &Arc<Self>,
topic: &T,
) -> EventListener<T, E> {
- let chan = async_channel::unbounded();
+ let chan = async_channel::bounded(self.listener_buffer_size);
let topics = &mut self.listeners.lock().await;
@@ -159,9 +184,10 @@ where
let listeners = event_ids.get_mut(&event_id).unwrap();
- let mut listener_id = random_16();
+ let mut listener_id = random_32();
+ // Generate a new one if listener_id already exists
while listeners.contains_key(&listener_id) {
- listener_id = random_16();
+ listener_id = random_32();
}
let listener =
@@ -197,7 +223,7 @@ where
pub struct EventListener<T, E> {
id: EventListenerID,
recv_chan: Receiver<Event>,
- event_sys: WeakEventSys<T>,
+ event_sys: Weak<EventSys<T>>,
event_id: String,
topic: T,
phantom: PhantomData<E>,
@@ -208,10 +234,10 @@ where
T: std::hash::Hash + Eq + Clone + std::fmt::Debug,
E: EventValueAny + Clone + EventValue,
{
- /// Create a new event listener.
+ /// Creates a new [`EventListener`].
fn new(
id: EventListenerID,
- event_sys: WeakEventSys<T>,
+ event_sys: Weak<EventSys<T>>,
recv_chan: Receiver<Event>,
event_id: &str,
topic: &T,
@@ -226,12 +252,12 @@ where
}
}
- /// Receive the next event.
+ /// Receives the next event.
pub async fn recv(&self) -> Result<E> {
match self.recv_chan.recv().await {
Ok(event) => match ((*event.value).value_as_any()).downcast_ref::<E>() {
Some(v) => Ok(v.clone()),
- None => unreachable!("Error when attempting to downcast the event value."),
+ None => unreachable!("Failed to downcast the event value."),
},
Err(err) => {
error!("Failed to receive new event: {err}");
@@ -241,7 +267,7 @@ where
}
}
- /// Cancels the listener and removes it from the `EventSys`.
+ /// Cancels the event listener and removes it from the [`EventSys`].
pub async fn cancel(&self) {
if let Some(es) = self.event_sys.upgrade() {
es.remove(&self.topic, &self.event_id, &self.id).await;
@@ -249,12 +275,12 @@ where
}
/// Returns the topic for this event listener.
- pub async fn topic(&self) -> &T {
+ pub fn topic(&self) -> &T {
&self.topic
}
/// Returns the event id for this event listener.
- pub async fn event_id(&self) -> &String {
+ pub fn event_id(&self) -> &String {
&self.event_id
}
}