diff options
author | hozan23 <hozan23@karyontech.net> | 2024-07-15 13:16:01 +0200 |
---|---|---|
committer | hozan23 <hozan23@karyontech.net> | 2024-07-15 13:16:01 +0200 |
commit | e15d3e6fd20b3f87abaad7ddec1c88b0e66419f9 (patch) | |
tree | 7976f6993e4f6b3646f5bd6954189346d5ffd330 /core | |
parent | 6c65232d741229635151671708556b9af7ef75ac (diff) |
p2p: Major refactoring of the handshake protocol
Introduce a new protocol InitProtocol which can be used as the core protocol
for initializing a connection with a peer.
Move the handshake logic from the PeerPool module to the protocols directory and
build a handshake protocol that implements InitProtocol trait.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/error.rs | 3 | ||||
-rw-r--r-- | core/src/event.rs | 58 |
2 files changed, 47 insertions, 14 deletions
diff --git a/core/src/error.rs b/core/src/error.rs index 2b8f641..754feb5 100644 --- a/core/src/error.rs +++ b/core/src/error.rs @@ -16,6 +16,9 @@ pub enum Error { #[error("Path Not Found Error: {0}")] PathNotFound(&'static str), + #[error("Event Emit Error: {0}")] + EventEmitError(String), + #[cfg(feature = "crypto")] #[error(transparent)] Ed25519(#[from] ed25519_dalek::ed25519::Error), diff --git a/core/src/event.rs b/core/src/event.rs index a4e356b..709dbd9 100644 --- a/core/src/event.rs +++ b/core/src/event.rs @@ -10,7 +10,7 @@ use chrono::{DateTime, Utc}; use futures_util::stream::{FuturesUnordered, StreamExt}; use log::{debug, error}; -use crate::{async_runtime::lock::Mutex, util::random_32, Result}; +use crate::{async_runtime::lock::Mutex, util::random_32, Error, Result}; const CHANNEL_BUFFER_SIZE: usize = 1000; @@ -111,13 +111,17 @@ where /// /// The event must implement the [`EventValueTopic`] trait to indicate the /// topic of the event. Otherwise, you can use `emit_by_topic()`. - pub async fn emit<E: EventValueTopic<Topic = T> + Clone>(&self, value: &E) { + pub async fn emit<E: EventValueTopic<Topic = T> + Clone>(&self, value: &E) -> Result<()> { let topic = E::topic(); - self.emit_by_topic(&topic, value).await; + self.emit_by_topic(&topic, value).await } /// Emits an event to the listeners. - pub async fn emit_by_topic<E: EventValueAny + EventValue + Clone>(&self, topic: &T, value: &E) { + pub async fn emit_by_topic<E: EventValueAny + EventValue + Clone>( + &self, + topic: &T, + value: &E, + ) -> Result<()> { let value: Arc<dyn EventValueAny> = Arc::new(value.clone()); let event = Event::new(value); @@ -128,7 +132,10 @@ where "Failed to emit an event to a non-existent topic {:?}", topic ); - return; + return Err(Error::EventEmitError(format!( + "Emit an event to a non-existent topic {:?}", + topic, + ))); } let event_ids = topics.get_mut(topic).unwrap(); @@ -136,7 +143,10 @@ where if !event_ids.contains_key(&event_id) { debug!("Failed to emit an event: unknown event id {:?}", event_id); - return; + return Err(Error::EventEmitError(format!( + "Emit an event: unknown event id {}", + event_id, + ))); } let mut results = FuturesUnordered::new(); @@ -159,6 +169,8 @@ where for listener_id in failed_listeners.iter() { listeners.remove(listener_id); } + + Ok(()) } /// Registers a new event listener for the given topic. @@ -166,10 +178,10 @@ where self: &Arc<Self>, topic: &T, ) -> EventListener<T, E> { - let chan = async_channel::bounded(self.listener_buffer_size); - let topics = &mut self.listeners.lock().await; + let chan = async_channel::bounded(self.listener_buffer_size); + if !topics.contains_key(topic) { topics.insert(topic.clone(), HashMap::new()); } @@ -197,6 +209,16 @@ where listener } + /// Remove all topics and event listeners + pub async fn clear(self: &Arc<Self>) { + self.listeners.lock().await.clear(); + } + + /// Unregisters all event listeners for the given topic. + pub async fn unregister_topic(self: &Arc<Self>, topic: &T) { + self.listeners.lock().await.remove(topic); + } + /// Removes the event listener attached to the given topic. async fn remove(&self, topic: &T, event_id: &str, listener_id: &EventListenerID) { let topics = &mut self.listeners.lock().await; @@ -419,10 +441,12 @@ mod tests { event_sys .emit_by_topic(&Topic::TopicA, &A { a_value: 3 }) - .await; + .await + .expect("Emit event"); event_sys .emit_by_topic(&Topic::TopicB, &B { b_value: 5 }) - .await; + .await + .expect("Emit event"); let msg = a_listener.recv().await.unwrap(); assert_eq!(msg, A { a_value: 3 }); @@ -434,13 +458,17 @@ mod tests { let c_listener = event_sys.register::<C>(&Topic::TopicC).await; let d_listener = event_sys.register::<C>(&Topic::TopicD).await; - event_sys.emit(&C { c_value: 10 }).await; + event_sys + .emit(&C { c_value: 10 }) + .await + .expect("Emit event"); let msg = c_listener.recv().await.unwrap(); assert_eq!(msg, C { c_value: 10 }); event_sys .emit_by_topic(&Topic::TopicD, &C { c_value: 10 }) - .await; + .await + .expect("Emit event"); let msg = d_listener.recv().await.unwrap(); assert_eq!(msg, C { c_value: 10 }); @@ -450,14 +478,16 @@ mod tests { event_sys .emit_by_topic(&Topic::TopicE, &E { e_value: 5 }) - .await; + .await + .expect("Emit event"); let msg = e_listener.recv().await.unwrap(); assert_eq!(msg, E { e_value: 5 }); event_sys .emit_by_topic(&Topic::TopicE, &F { f_value: 5 }) - .await; + .await + .expect("Emit event"); let msg = f_listener.recv().await.unwrap(); assert_eq!(msg, F { f_value: 5 }); |