diff options
Diffstat (limited to 'p2p/src')
-rw-r--r-- | p2p/src/backend.rs | 17 | ||||
-rw-r--r-- | p2p/src/discovery/mod.rs | 4 | ||||
-rw-r--r-- | p2p/src/lib.rs | 4 | ||||
-rw-r--r-- | p2p/src/monitor/mod.rs | 6 | ||||
-rw-r--r-- | p2p/src/peer/mod.rs | 18 | ||||
-rw-r--r-- | p2p/src/peer_pool.rs | 15 | ||||
-rw-r--r-- | p2p/src/protocol.rs | 14 | ||||
-rw-r--r-- | p2p/src/protocols/ping.rs | 8 |
8 files changed, 33 insertions, 53 deletions
diff --git a/p2p/src/backend.rs b/p2p/src/backend.rs index f21d70b..4d24be6 100644 --- a/p2p/src/backend.rs +++ b/p2p/src/backend.rs @@ -6,17 +6,10 @@ use karyon_core::{async_runtime::Executor, crypto::KeyPair}; use karyon_net::Endpoint; use crate::{ - config::Config, - conn_queue::ConnQueue, - discovery::{ArcDiscovery, Discovery}, - monitor::Monitor, - peer_pool::PeerPool, - protocol::{ArcProtocol, Protocol}, - ArcPeer, PeerID, Result, + config::Config, conn_queue::ConnQueue, discovery::Discovery, monitor::Monitor, peer::Peer, + peer_pool::PeerPool, protocol::Protocol, PeerID, Result, }; -pub type ArcBackend = Arc<Backend>; - /// Backend serves as the central entry point for initiating and managing /// the P2P network. pub struct Backend { @@ -33,7 +26,7 @@ pub struct Backend { monitor: Arc<Monitor>, /// Discovery instance. - discovery: ArcDiscovery, + discovery: Arc<Discovery>, /// PeerPool instance. peer_pool: Arc<PeerPool>, @@ -41,7 +34,7 @@ pub struct Backend { impl Backend { /// Creates a new Backend. - pub fn new(key_pair: &KeyPair, config: Config, ex: Executor) -> ArcBackend { + pub fn new(key_pair: &KeyPair, config: Config, ex: Executor) -> Arc<Backend> { let config = Arc::new(config); let monitor = Arc::new(Monitor::new(config.clone())); let conn_queue = ConnQueue::new(); @@ -87,7 +80,7 @@ impl Backend { /// Attach a custom protocol to the network pub async fn attach_protocol<P: Protocol>( &self, - c: impl Fn(ArcPeer) -> ArcProtocol + Send + Sync + 'static, + c: impl Fn(Arc<Peer>) -> Arc<dyn Protocol> + Send + Sync + 'static, ) -> Result<()> { self.peer_pool.attach_protocol::<P>(Box::new(c)).await } diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs index 99f880d..dae4d3f 100644 --- a/p2p/src/discovery/mod.rs +++ b/p2p/src/discovery/mod.rs @@ -32,8 +32,6 @@ use crate::{ use lookup::LookupService; use refresh::RefreshService; -pub type ArcDiscovery = Arc<Discovery>; - pub struct Discovery { /// Routing table table: Arc<RoutingTable>, @@ -69,7 +67,7 @@ impl Discovery { config: Arc<Config>, monitor: Arc<Monitor>, ex: Executor, - ) -> ArcDiscovery { + ) -> Arc<Discovery> { let table = Arc::new(RoutingTable::new(peer_id.0)); let refresh_service = Arc::new(RefreshService::new( diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index ebc51d8..b21a353 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -61,9 +61,9 @@ pub mod monitor; /// [`Read More`](./protocol/trait.Protocol.html) pub mod protocol; -pub use backend::{ArcBackend, Backend}; +pub use backend::Backend; pub use config::Config; -pub use peer::{ArcPeer, PeerID}; +pub use peer::{Peer, PeerID}; pub use version::Version; pub mod endpoint { diff --git a/p2p/src/monitor/mod.rs b/p2p/src/monitor/mod.rs index 34d252e..4ecb431 100644 --- a/p2p/src/monitor/mod.rs +++ b/p2p/src/monitor/mod.rs @@ -2,7 +2,7 @@ mod event; use std::sync::Arc; -use karyon_core::event::{ArcEventSys, EventListener, EventSys, EventValue, EventValueTopic}; +use karyon_core::event::{EventListener, EventSys, EventValue, EventValueTopic}; use karyon_net::Endpoint; @@ -15,7 +15,7 @@ use crate::{Config, PeerID}; /// Responsible for network and system monitoring. /// -/// It use pub-sub pattern to notify the subscribers with new events. +/// It use event emitter to notify the registerd listeners about new events. /// /// # Example /// @@ -45,7 +45,7 @@ use crate::{Config, PeerID}; /// }; /// ``` pub struct Monitor { - event_sys: ArcEventSys<MonitorTopic>, + event_sys: Arc<EventSys<MonitorTopic>>, config: Arc<Config>, } diff --git a/p2p/src/peer/mod.rs b/p2p/src/peer/mod.rs index 0d42da3..2068789 100644 --- a/p2p/src/peer/mod.rs +++ b/p2p/src/peer/mod.rs @@ -2,7 +2,7 @@ mod peer_id; pub use peer_id::PeerID; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use async_channel::{Receiver, Sender}; use bincode::{Decode, Encode}; @@ -11,7 +11,7 @@ use log::{error, trace}; use karyon_core::{ async_runtime::{lock::RwLock, Executor}, async_util::{select, Either, TaskGroup, TaskResult}, - event::{ArcEventSys, EventListener, EventSys}, + event::{EventListener, EventSys}, util::{decode, encode}, }; @@ -20,19 +20,17 @@ use karyon_net::{Conn, Endpoint}; use crate::{ conn_queue::ConnDirection, message::{NetMsg, NetMsgCmd, ProtocolMsg, ShutdownMsg}, - peer_pool::{ArcPeerPool, WeakPeerPool}, + peer_pool::PeerPool, protocol::{Protocol, ProtocolEvent, ProtocolID}, Config, Error, Result, }; -pub type ArcPeer = Arc<Peer>; - pub struct Peer { /// Peer's ID id: PeerID, /// A weak pointer to `PeerPool` - peer_pool: WeakPeerPool, + peer_pool: Weak<PeerPool>, /// Holds the peer connection conn: Conn<NetMsg>, @@ -47,7 +45,7 @@ pub struct Peer { protocol_ids: RwLock<Vec<ProtocolID>>, /// `EventSys` responsible for sending events to the protocols. - protocol_events: ArcEventSys<ProtocolID>, + protocol_events: Arc<EventSys<ProtocolID>>, /// This channel is used to send a stop signal to the read loop. stop_chan: (Sender<Result<()>>, Receiver<Result<()>>), @@ -59,13 +57,13 @@ pub struct Peer { impl Peer { /// Creates a new peer pub fn new( - peer_pool: WeakPeerPool, + peer_pool: Weak<PeerPool>, id: &PeerID, conn: Conn<NetMsg>, remote_endpoint: Endpoint, conn_direction: ConnDirection, ex: Executor, - ) -> ArcPeer { + ) -> Arc<Peer> { Arc::new(Peer { id: id.clone(), peer_pool, @@ -228,7 +226,7 @@ impl Peer { } } - fn peer_pool(&self) -> ArcPeerPool { + fn peer_pool(&self) -> Arc<PeerPool> { self.peer_pool.upgrade().unwrap() } } diff --git a/p2p/src/peer_pool.rs b/p2p/src/peer_pool.rs index f8dda66..02a74ac 100644 --- a/p2p/src/peer_pool.rs +++ b/p2p/src/peer_pool.rs @@ -1,8 +1,4 @@ -use std::{ - collections::HashMap, - sync::{Arc, Weak}, - time::Duration, -}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use async_channel::Sender; use bincode::{Decode, Encode}; @@ -21,16 +17,13 @@ use crate::{ conn_queue::{ConnDirection, ConnQueue}, message::{get_msg_payload, NetMsg, NetMsgCmd, VerAckMsg, VerMsg}, monitor::{Monitor, PPEvent}, - peer::{ArcPeer, Peer, PeerID}, + peer::Peer, protocol::{Protocol, ProtocolConstructor, ProtocolID}, protocols::PingProtocol, version::{version_match, Version, VersionInt}, - Error, Result, + Error, PeerID, Result, }; -pub type ArcPeerPool = Arc<PeerPool>; -pub type WeakPeerPool = Weak<PeerPool>; - pub struct PeerPool { /// Peer's ID pub id: PeerID, @@ -39,7 +32,7 @@ pub struct PeerPool { conn_queue: Arc<ConnQueue>, /// Holds the running peers. - peers: RwLock<HashMap<PeerID, ArcPeer>>, + peers: RwLock<HashMap<PeerID, Arc<Peer>>>, /// Hashmap contains protocol constructors. pub(crate) protocols: RwLock<HashMap<ProtocolID, Box<ProtocolConstructor>>>, diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index d42f092..021844f 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -4,11 +4,9 @@ use async_trait::async_trait; use karyon_core::event::EventValue; -use crate::{peer::ArcPeer, version::Version, Result}; +use crate::{peer::Peer, version::Version, Result}; -pub type ArcProtocol = Arc<dyn Protocol>; - -pub type ProtocolConstructor = dyn Fn(ArcPeer) -> Arc<dyn Protocol> + Send + Sync; +pub type ProtocolConstructor = dyn Fn(Arc<Peer>) -> Arc<dyn Protocol> + Send + Sync; pub type ProtocolID = String; @@ -38,17 +36,17 @@ impl EventValue for ProtocolEvent { /// use smol::Executor; /// /// use karyon_p2p::{ -/// protocol::{ArcProtocol, Protocol, ProtocolID, ProtocolEvent}, -/// Backend, PeerID, Config, Version, Error, ArcPeer, +/// protocol::{Protocol, ProtocolID, ProtocolEvent}, +/// Backend, PeerID, Config, Version, Error, Peer, /// keypair::{KeyPair, KeyPairType}, /// }; /// /// pub struct NewProtocol { -/// peer: ArcPeer, +/// peer: Arc<Peer>, /// } /// /// impl NewProtocol { -/// fn new(peer: ArcPeer) -> ArcProtocol { +/// fn new(peer: Arc<Peer>) -> Arc<dyn Protocol> { /// Arc::new(Self { /// peer, /// }) diff --git a/p2p/src/protocols/ping.rs b/p2p/src/protocols/ping.rs index 654644a..ef1b54e 100644 --- a/p2p/src/protocols/ping.rs +++ b/p2p/src/protocols/ping.rs @@ -16,8 +16,8 @@ use karyon_core::{ use karyon_net::Error as NetError; use crate::{ - peer::ArcPeer, - protocol::{ArcProtocol, Protocol, ProtocolEvent, ProtocolID}, + peer::Peer, + protocol::{Protocol, ProtocolEvent, ProtocolID}, version::Version, Result, }; @@ -31,7 +31,7 @@ enum PingProtocolMsg { } pub struct PingProtocol { - peer: ArcPeer, + peer: Arc<Peer>, ping_interval: u64, ping_timeout: u64, task_group: TaskGroup, @@ -39,7 +39,7 @@ pub struct PingProtocol { impl PingProtocol { #[allow(clippy::new_ret_no_self)] - pub fn new(peer: ArcPeer, executor: Executor) -> ArcProtocol { + pub fn new(peer: Arc<Peer>, executor: Executor) -> Arc<dyn Protocol> { let ping_interval = peer.config().ping_interval; let ping_timeout = peer.config().ping_timeout; Arc::new(Self { |