aboutsummaryrefslogtreecommitdiff
path: root/p2p/src
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/src')
-rw-r--r--p2p/src/backend.rs17
-rw-r--r--p2p/src/discovery/mod.rs4
-rw-r--r--p2p/src/lib.rs4
-rw-r--r--p2p/src/monitor/mod.rs6
-rw-r--r--p2p/src/peer/mod.rs18
-rw-r--r--p2p/src/peer_pool.rs15
-rw-r--r--p2p/src/protocol.rs14
-rw-r--r--p2p/src/protocols/ping.rs8
8 files changed, 33 insertions, 53 deletions
diff --git a/p2p/src/backend.rs b/p2p/src/backend.rs
index f21d70b..4d24be6 100644
--- a/p2p/src/backend.rs
+++ b/p2p/src/backend.rs
@@ -6,17 +6,10 @@ use karyon_core::{async_runtime::Executor, crypto::KeyPair};
use karyon_net::Endpoint;
use crate::{
- config::Config,
- conn_queue::ConnQueue,
- discovery::{ArcDiscovery, Discovery},
- monitor::Monitor,
- peer_pool::PeerPool,
- protocol::{ArcProtocol, Protocol},
- ArcPeer, PeerID, Result,
+ config::Config, conn_queue::ConnQueue, discovery::Discovery, monitor::Monitor, peer::Peer,
+ peer_pool::PeerPool, protocol::Protocol, PeerID, Result,
};
-pub type ArcBackend = Arc<Backend>;
-
/// Backend serves as the central entry point for initiating and managing
/// the P2P network.
pub struct Backend {
@@ -33,7 +26,7 @@ pub struct Backend {
monitor: Arc<Monitor>,
/// Discovery instance.
- discovery: ArcDiscovery,
+ discovery: Arc<Discovery>,
/// PeerPool instance.
peer_pool: Arc<PeerPool>,
@@ -41,7 +34,7 @@ pub struct Backend {
impl Backend {
/// Creates a new Backend.
- pub fn new(key_pair: &KeyPair, config: Config, ex: Executor) -> ArcBackend {
+ pub fn new(key_pair: &KeyPair, config: Config, ex: Executor) -> Arc<Backend> {
let config = Arc::new(config);
let monitor = Arc::new(Monitor::new(config.clone()));
let conn_queue = ConnQueue::new();
@@ -87,7 +80,7 @@ impl Backend {
/// Attach a custom protocol to the network
pub async fn attach_protocol<P: Protocol>(
&self,
- c: impl Fn(ArcPeer) -> ArcProtocol + Send + Sync + 'static,
+ c: impl Fn(Arc<Peer>) -> Arc<dyn Protocol> + Send + Sync + 'static,
) -> Result<()> {
self.peer_pool.attach_protocol::<P>(Box::new(c)).await
}
diff --git a/p2p/src/discovery/mod.rs b/p2p/src/discovery/mod.rs
index 99f880d..dae4d3f 100644
--- a/p2p/src/discovery/mod.rs
+++ b/p2p/src/discovery/mod.rs
@@ -32,8 +32,6 @@ use crate::{
use lookup::LookupService;
use refresh::RefreshService;
-pub type ArcDiscovery = Arc<Discovery>;
-
pub struct Discovery {
/// Routing table
table: Arc<RoutingTable>,
@@ -69,7 +67,7 @@ impl Discovery {
config: Arc<Config>,
monitor: Arc<Monitor>,
ex: Executor,
- ) -> ArcDiscovery {
+ ) -> Arc<Discovery> {
let table = Arc::new(RoutingTable::new(peer_id.0));
let refresh_service = Arc::new(RefreshService::new(
diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs
index ebc51d8..b21a353 100644
--- a/p2p/src/lib.rs
+++ b/p2p/src/lib.rs
@@ -61,9 +61,9 @@ pub mod monitor;
/// [`Read More`](./protocol/trait.Protocol.html)
pub mod protocol;
-pub use backend::{ArcBackend, Backend};
+pub use backend::Backend;
pub use config::Config;
-pub use peer::{ArcPeer, PeerID};
+pub use peer::{Peer, PeerID};
pub use version::Version;
pub mod endpoint {
diff --git a/p2p/src/monitor/mod.rs b/p2p/src/monitor/mod.rs
index 34d252e..4ecb431 100644
--- a/p2p/src/monitor/mod.rs
+++ b/p2p/src/monitor/mod.rs
@@ -2,7 +2,7 @@ mod event;
use std::sync::Arc;
-use karyon_core::event::{ArcEventSys, EventListener, EventSys, EventValue, EventValueTopic};
+use karyon_core::event::{EventListener, EventSys, EventValue, EventValueTopic};
use karyon_net::Endpoint;
@@ -15,7 +15,7 @@ use crate::{Config, PeerID};
/// Responsible for network and system monitoring.
///
-/// It use pub-sub pattern to notify the subscribers with new events.
+/// It use event emitter to notify the registerd listeners about new events.
///
/// # Example
///
@@ -45,7 +45,7 @@ use crate::{Config, PeerID};
/// };
/// ```
pub struct Monitor {
- event_sys: ArcEventSys<MonitorTopic>,
+ event_sys: Arc<EventSys<MonitorTopic>>,
config: Arc<Config>,
}
diff --git a/p2p/src/peer/mod.rs b/p2p/src/peer/mod.rs
index 0d42da3..2068789 100644
--- a/p2p/src/peer/mod.rs
+++ b/p2p/src/peer/mod.rs
@@ -2,7 +2,7 @@ mod peer_id;
pub use peer_id::PeerID;
-use std::sync::Arc;
+use std::sync::{Arc, Weak};
use async_channel::{Receiver, Sender};
use bincode::{Decode, Encode};
@@ -11,7 +11,7 @@ use log::{error, trace};
use karyon_core::{
async_runtime::{lock::RwLock, Executor},
async_util::{select, Either, TaskGroup, TaskResult},
- event::{ArcEventSys, EventListener, EventSys},
+ event::{EventListener, EventSys},
util::{decode, encode},
};
@@ -20,19 +20,17 @@ use karyon_net::{Conn, Endpoint};
use crate::{
conn_queue::ConnDirection,
message::{NetMsg, NetMsgCmd, ProtocolMsg, ShutdownMsg},
- peer_pool::{ArcPeerPool, WeakPeerPool},
+ peer_pool::PeerPool,
protocol::{Protocol, ProtocolEvent, ProtocolID},
Config, Error, Result,
};
-pub type ArcPeer = Arc<Peer>;
-
pub struct Peer {
/// Peer's ID
id: PeerID,
/// A weak pointer to `PeerPool`
- peer_pool: WeakPeerPool,
+ peer_pool: Weak<PeerPool>,
/// Holds the peer connection
conn: Conn<NetMsg>,
@@ -47,7 +45,7 @@ pub struct Peer {
protocol_ids: RwLock<Vec<ProtocolID>>,
/// `EventSys` responsible for sending events to the protocols.
- protocol_events: ArcEventSys<ProtocolID>,
+ protocol_events: Arc<EventSys<ProtocolID>>,
/// This channel is used to send a stop signal to the read loop.
stop_chan: (Sender<Result<()>>, Receiver<Result<()>>),
@@ -59,13 +57,13 @@ pub struct Peer {
impl Peer {
/// Creates a new peer
pub fn new(
- peer_pool: WeakPeerPool,
+ peer_pool: Weak<PeerPool>,
id: &PeerID,
conn: Conn<NetMsg>,
remote_endpoint: Endpoint,
conn_direction: ConnDirection,
ex: Executor,
- ) -> ArcPeer {
+ ) -> Arc<Peer> {
Arc::new(Peer {
id: id.clone(),
peer_pool,
@@ -228,7 +226,7 @@ impl Peer {
}
}
- fn peer_pool(&self) -> ArcPeerPool {
+ fn peer_pool(&self) -> Arc<PeerPool> {
self.peer_pool.upgrade().unwrap()
}
}
diff --git a/p2p/src/peer_pool.rs b/p2p/src/peer_pool.rs
index f8dda66..02a74ac 100644
--- a/p2p/src/peer_pool.rs
+++ b/p2p/src/peer_pool.rs
@@ -1,8 +1,4 @@
-use std::{
- collections::HashMap,
- sync::{Arc, Weak},
- time::Duration,
-};
+use std::{collections::HashMap, sync::Arc, time::Duration};
use async_channel::Sender;
use bincode::{Decode, Encode};
@@ -21,16 +17,13 @@ use crate::{
conn_queue::{ConnDirection, ConnQueue},
message::{get_msg_payload, NetMsg, NetMsgCmd, VerAckMsg, VerMsg},
monitor::{Monitor, PPEvent},
- peer::{ArcPeer, Peer, PeerID},
+ peer::Peer,
protocol::{Protocol, ProtocolConstructor, ProtocolID},
protocols::PingProtocol,
version::{version_match, Version, VersionInt},
- Error, Result,
+ Error, PeerID, Result,
};
-pub type ArcPeerPool = Arc<PeerPool>;
-pub type WeakPeerPool = Weak<PeerPool>;
-
pub struct PeerPool {
/// Peer's ID
pub id: PeerID,
@@ -39,7 +32,7 @@ pub struct PeerPool {
conn_queue: Arc<ConnQueue>,
/// Holds the running peers.
- peers: RwLock<HashMap<PeerID, ArcPeer>>,
+ peers: RwLock<HashMap<PeerID, Arc<Peer>>>,
/// Hashmap contains protocol constructors.
pub(crate) protocols: RwLock<HashMap<ProtocolID, Box<ProtocolConstructor>>>,
diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs
index d42f092..021844f 100644
--- a/p2p/src/protocol.rs
+++ b/p2p/src/protocol.rs
@@ -4,11 +4,9 @@ use async_trait::async_trait;
use karyon_core::event::EventValue;
-use crate::{peer::ArcPeer, version::Version, Result};
+use crate::{peer::Peer, version::Version, Result};
-pub type ArcProtocol = Arc<dyn Protocol>;
-
-pub type ProtocolConstructor = dyn Fn(ArcPeer) -> Arc<dyn Protocol> + Send + Sync;
+pub type ProtocolConstructor = dyn Fn(Arc<Peer>) -> Arc<dyn Protocol> + Send + Sync;
pub type ProtocolID = String;
@@ -38,17 +36,17 @@ impl EventValue for ProtocolEvent {
/// use smol::Executor;
///
/// use karyon_p2p::{
-/// protocol::{ArcProtocol, Protocol, ProtocolID, ProtocolEvent},
-/// Backend, PeerID, Config, Version, Error, ArcPeer,
+/// protocol::{Protocol, ProtocolID, ProtocolEvent},
+/// Backend, PeerID, Config, Version, Error, Peer,
/// keypair::{KeyPair, KeyPairType},
/// };
///
/// pub struct NewProtocol {
-/// peer: ArcPeer,
+/// peer: Arc<Peer>,
/// }
///
/// impl NewProtocol {
-/// fn new(peer: ArcPeer) -> ArcProtocol {
+/// fn new(peer: Arc<Peer>) -> Arc<dyn Protocol> {
/// Arc::new(Self {
/// peer,
/// })
diff --git a/p2p/src/protocols/ping.rs b/p2p/src/protocols/ping.rs
index 654644a..ef1b54e 100644
--- a/p2p/src/protocols/ping.rs
+++ b/p2p/src/protocols/ping.rs
@@ -16,8 +16,8 @@ use karyon_core::{
use karyon_net::Error as NetError;
use crate::{
- peer::ArcPeer,
- protocol::{ArcProtocol, Protocol, ProtocolEvent, ProtocolID},
+ peer::Peer,
+ protocol::{Protocol, ProtocolEvent, ProtocolID},
version::Version,
Result,
};
@@ -31,7 +31,7 @@ enum PingProtocolMsg {
}
pub struct PingProtocol {
- peer: ArcPeer,
+ peer: Arc<Peer>,
ping_interval: u64,
ping_timeout: u64,
task_group: TaskGroup,
@@ -39,7 +39,7 @@ pub struct PingProtocol {
impl PingProtocol {
#[allow(clippy::new_ret_no_self)]
- pub fn new(peer: ArcPeer, executor: Executor) -> ArcProtocol {
+ pub fn new(peer: Arc<Peer>, executor: Executor) -> Arc<dyn Protocol> {
let ping_interval = peer.config().ping_interval;
let ping_timeout = peer.config().ping_timeout;
Arc::new(Self {