From e15d3e6fd20b3f87abaad7ddec1c88b0e66419f9 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Mon, 15 Jul 2024 13:16:01 +0200 Subject: p2p: Major refactoring of the handshake protocol Introduce a new protocol InitProtocol which can be used as the core protocol for initializing a connection with a peer. Move the handshake logic from the PeerPool module to the protocols directory and build a handshake protocol that implements InitProtocol trait. --- core/src/error.rs | 3 +++ core/src/event.rs | 58 +++++++++++++++++++++++++++++++++++++++++-------------- 2 files changed, 47 insertions(+), 14 deletions(-) (limited to 'core') diff --git a/core/src/error.rs b/core/src/error.rs index 2b8f641..754feb5 100644 --- a/core/src/error.rs +++ b/core/src/error.rs @@ -16,6 +16,9 @@ pub enum Error { #[error("Path Not Found Error: {0}")] PathNotFound(&'static str), + #[error("Event Emit Error: {0}")] + EventEmitError(String), + #[cfg(feature = "crypto")] #[error(transparent)] Ed25519(#[from] ed25519_dalek::ed25519::Error), diff --git a/core/src/event.rs b/core/src/event.rs index a4e356b..709dbd9 100644 --- a/core/src/event.rs +++ b/core/src/event.rs @@ -10,7 +10,7 @@ use chrono::{DateTime, Utc}; use futures_util::stream::{FuturesUnordered, StreamExt}; use log::{debug, error}; -use crate::{async_runtime::lock::Mutex, util::random_32, Result}; +use crate::{async_runtime::lock::Mutex, util::random_32, Error, Result}; const CHANNEL_BUFFER_SIZE: usize = 1000; @@ -111,13 +111,17 @@ where /// /// The event must implement the [`EventValueTopic`] trait to indicate the /// topic of the event. Otherwise, you can use `emit_by_topic()`. - pub async fn emit + Clone>(&self, value: &E) { + pub async fn emit + Clone>(&self, value: &E) -> Result<()> { let topic = E::topic(); - self.emit_by_topic(&topic, value).await; + self.emit_by_topic(&topic, value).await } /// Emits an event to the listeners. - pub async fn emit_by_topic(&self, topic: &T, value: &E) { + pub async fn emit_by_topic( + &self, + topic: &T, + value: &E, + ) -> Result<()> { let value: Arc = Arc::new(value.clone()); let event = Event::new(value); @@ -128,7 +132,10 @@ where "Failed to emit an event to a non-existent topic {:?}", topic ); - return; + return Err(Error::EventEmitError(format!( + "Emit an event to a non-existent topic {:?}", + topic, + ))); } let event_ids = topics.get_mut(topic).unwrap(); @@ -136,7 +143,10 @@ where if !event_ids.contains_key(&event_id) { debug!("Failed to emit an event: unknown event id {:?}", event_id); - return; + return Err(Error::EventEmitError(format!( + "Emit an event: unknown event id {}", + event_id, + ))); } let mut results = FuturesUnordered::new(); @@ -159,6 +169,8 @@ where for listener_id in failed_listeners.iter() { listeners.remove(listener_id); } + + Ok(()) } /// Registers a new event listener for the given topic. @@ -166,10 +178,10 @@ where self: &Arc, topic: &T, ) -> EventListener { - let chan = async_channel::bounded(self.listener_buffer_size); - let topics = &mut self.listeners.lock().await; + let chan = async_channel::bounded(self.listener_buffer_size); + if !topics.contains_key(topic) { topics.insert(topic.clone(), HashMap::new()); } @@ -197,6 +209,16 @@ where listener } + /// Remove all topics and event listeners + pub async fn clear(self: &Arc) { + self.listeners.lock().await.clear(); + } + + /// Unregisters all event listeners for the given topic. + pub async fn unregister_topic(self: &Arc, topic: &T) { + self.listeners.lock().await.remove(topic); + } + /// Removes the event listener attached to the given topic. async fn remove(&self, topic: &T, event_id: &str, listener_id: &EventListenerID) { let topics = &mut self.listeners.lock().await; @@ -419,10 +441,12 @@ mod tests { event_sys .emit_by_topic(&Topic::TopicA, &A { a_value: 3 }) - .await; + .await + .expect("Emit event"); event_sys .emit_by_topic(&Topic::TopicB, &B { b_value: 5 }) - .await; + .await + .expect("Emit event"); let msg = a_listener.recv().await.unwrap(); assert_eq!(msg, A { a_value: 3 }); @@ -434,13 +458,17 @@ mod tests { let c_listener = event_sys.register::(&Topic::TopicC).await; let d_listener = event_sys.register::(&Topic::TopicD).await; - event_sys.emit(&C { c_value: 10 }).await; + event_sys + .emit(&C { c_value: 10 }) + .await + .expect("Emit event"); let msg = c_listener.recv().await.unwrap(); assert_eq!(msg, C { c_value: 10 }); event_sys .emit_by_topic(&Topic::TopicD, &C { c_value: 10 }) - .await; + .await + .expect("Emit event"); let msg = d_listener.recv().await.unwrap(); assert_eq!(msg, C { c_value: 10 }); @@ -450,14 +478,16 @@ mod tests { event_sys .emit_by_topic(&Topic::TopicE, &E { e_value: 5 }) - .await; + .await + .expect("Emit event"); let msg = e_listener.recv().await.unwrap(); assert_eq!(msg, E { e_value: 5 }); event_sys .emit_by_topic(&Topic::TopicE, &F { f_value: 5 }) - .await; + .await + .expect("Emit event"); let msg = f_listener.recv().await.unwrap(); assert_eq!(msg, F { f_value: 5 }); -- cgit v1.2.3