aboutsummaryrefslogtreecommitdiff
path: root/core/src/event.rs
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-07-15 13:16:01 +0200
committerhozan23 <hozan23@karyontech.net>2024-07-15 13:16:01 +0200
commite15d3e6fd20b3f87abaad7ddec1c88b0e66419f9 (patch)
tree7976f6993e4f6b3646f5bd6954189346d5ffd330 /core/src/event.rs
parent6c65232d741229635151671708556b9af7ef75ac (diff)
p2p: Major refactoring of the handshake protocol
Introduce a new protocol InitProtocol which can be used as the core protocol for initializing a connection with a peer. Move the handshake logic from the PeerPool module to the protocols directory and build a handshake protocol that implements InitProtocol trait.
Diffstat (limited to 'core/src/event.rs')
-rw-r--r--core/src/event.rs58
1 files changed, 44 insertions, 14 deletions
diff --git a/core/src/event.rs b/core/src/event.rs
index a4e356b..709dbd9 100644
--- a/core/src/event.rs
+++ b/core/src/event.rs
@@ -10,7 +10,7 @@ use chrono::{DateTime, Utc};
use futures_util::stream::{FuturesUnordered, StreamExt};
use log::{debug, error};
-use crate::{async_runtime::lock::Mutex, util::random_32, Result};
+use crate::{async_runtime::lock::Mutex, util::random_32, Error, Result};
const CHANNEL_BUFFER_SIZE: usize = 1000;
@@ -111,13 +111,17 @@ where
///
/// The event must implement the [`EventValueTopic`] trait to indicate the
/// topic of the event. Otherwise, you can use `emit_by_topic()`.
- pub async fn emit<E: EventValueTopic<Topic = T> + Clone>(&self, value: &E) {
+ pub async fn emit<E: EventValueTopic<Topic = T> + Clone>(&self, value: &E) -> Result<()> {
let topic = E::topic();
- self.emit_by_topic(&topic, value).await;
+ self.emit_by_topic(&topic, value).await
}
/// Emits an event to the listeners.
- pub async fn emit_by_topic<E: EventValueAny + EventValue + Clone>(&self, topic: &T, value: &E) {
+ pub async fn emit_by_topic<E: EventValueAny + EventValue + Clone>(
+ &self,
+ topic: &T,
+ value: &E,
+ ) -> Result<()> {
let value: Arc<dyn EventValueAny> = Arc::new(value.clone());
let event = Event::new(value);
@@ -128,7 +132,10 @@ where
"Failed to emit an event to a non-existent topic {:?}",
topic
);
- return;
+ return Err(Error::EventEmitError(format!(
+ "Emit an event to a non-existent topic {:?}",
+ topic,
+ )));
}
let event_ids = topics.get_mut(topic).unwrap();
@@ -136,7 +143,10 @@ where
if !event_ids.contains_key(&event_id) {
debug!("Failed to emit an event: unknown event id {:?}", event_id);
- return;
+ return Err(Error::EventEmitError(format!(
+ "Emit an event: unknown event id {}",
+ event_id,
+ )));
}
let mut results = FuturesUnordered::new();
@@ -159,6 +169,8 @@ where
for listener_id in failed_listeners.iter() {
listeners.remove(listener_id);
}
+
+ Ok(())
}
/// Registers a new event listener for the given topic.
@@ -166,10 +178,10 @@ where
self: &Arc<Self>,
topic: &T,
) -> EventListener<T, E> {
- let chan = async_channel::bounded(self.listener_buffer_size);
-
let topics = &mut self.listeners.lock().await;
+ let chan = async_channel::bounded(self.listener_buffer_size);
+
if !topics.contains_key(topic) {
topics.insert(topic.clone(), HashMap::new());
}
@@ -197,6 +209,16 @@ where
listener
}
+ /// Remove all topics and event listeners
+ pub async fn clear(self: &Arc<Self>) {
+ self.listeners.lock().await.clear();
+ }
+
+ /// Unregisters all event listeners for the given topic.
+ pub async fn unregister_topic(self: &Arc<Self>, topic: &T) {
+ self.listeners.lock().await.remove(topic);
+ }
+
/// Removes the event listener attached to the given topic.
async fn remove(&self, topic: &T, event_id: &str, listener_id: &EventListenerID) {
let topics = &mut self.listeners.lock().await;
@@ -419,10 +441,12 @@ mod tests {
event_sys
.emit_by_topic(&Topic::TopicA, &A { a_value: 3 })
- .await;
+ .await
+ .expect("Emit event");
event_sys
.emit_by_topic(&Topic::TopicB, &B { b_value: 5 })
- .await;
+ .await
+ .expect("Emit event");
let msg = a_listener.recv().await.unwrap();
assert_eq!(msg, A { a_value: 3 });
@@ -434,13 +458,17 @@ mod tests {
let c_listener = event_sys.register::<C>(&Topic::TopicC).await;
let d_listener = event_sys.register::<C>(&Topic::TopicD).await;
- event_sys.emit(&C { c_value: 10 }).await;
+ event_sys
+ .emit(&C { c_value: 10 })
+ .await
+ .expect("Emit event");
let msg = c_listener.recv().await.unwrap();
assert_eq!(msg, C { c_value: 10 });
event_sys
.emit_by_topic(&Topic::TopicD, &C { c_value: 10 })
- .await;
+ .await
+ .expect("Emit event");
let msg = d_listener.recv().await.unwrap();
assert_eq!(msg, C { c_value: 10 });
@@ -450,14 +478,16 @@ mod tests {
event_sys
.emit_by_topic(&Topic::TopicE, &E { e_value: 5 })
- .await;
+ .await
+ .expect("Emit event");
let msg = e_listener.recv().await.unwrap();
assert_eq!(msg, E { e_value: 5 });
event_sys
.emit_by_topic(&Topic::TopicE, &F { f_value: 5 })
- .await;
+ .await
+ .expect("Emit event");
let msg = f_listener.recv().await.unwrap();
assert_eq!(msg, F { f_value: 5 });