From 849d827486c75b2ab223d7b0e638dbb5b74d4d1d Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 9 Nov 2023 11:38:19 +0300 Subject: rename crates --- p2p/examples/chat.rs | 141 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 141 insertions(+) create mode 100644 p2p/examples/chat.rs (limited to 'p2p/examples/chat.rs') diff --git a/p2p/examples/chat.rs b/p2p/examples/chat.rs new file mode 100644 index 0000000..4358362 --- /dev/null +++ b/p2p/examples/chat.rs @@ -0,0 +1,141 @@ +use std::sync::Arc; + +use async_std::io; +use async_trait::async_trait; +use clap::Parser; +use smol::{channel, future, Executor}; + +use karyons_net::{Endpoint, Port}; + +use karyons_p2p::{ + protocol::{ArcProtocol, Protocol, ProtocolEvent, ProtocolID}, + ArcPeer, Backend, Config, P2pError, PeerID, Version, +}; + +#[derive(Parser)] +#[command(author, version, about, long_about = None)] +struct Cli { + /// Optional list of bootstrap peers to start the seeding process. + #[arg(short)] + bootstrap_peers: Vec, + + /// Optional list of peer endpoints for manual connections. + #[arg(short)] + peer_endpoints: Vec, + + /// Optional endpoint for accepting incoming connections. + #[arg(short)] + listen_endpoint: Option, + + /// Optional TCP/UDP port for the discovery service. + #[arg(short)] + discovery_port: Option, + + /// Username + #[arg(long)] + username: String, +} + +pub struct ChatProtocol { + username: String, + peer: ArcPeer, +} + +impl ChatProtocol { + fn new(username: &str, peer: ArcPeer) -> ArcProtocol { + Arc::new(Self { + peer, + username: username.to_string(), + }) + } +} + +#[async_trait] +impl Protocol for ChatProtocol { + async fn start(self: Arc, ex: Arc>) -> Result<(), P2pError> { + let selfc = self.clone(); + let stdin = io::stdin(); + let task = ex.spawn(async move { + loop { + let mut input = String::new(); + stdin.read_line(&mut input).await.unwrap(); + let msg = format!("> {}: {}", selfc.username, input.trim()); + selfc.peer.broadcast(&Self::id(), &msg).await; + } + }); + + let listener = self.peer.register_listener::().await; + loop { + let event = listener.recv().await.unwrap(); + + match event { + ProtocolEvent::Message(msg) => { + let msg = String::from_utf8(msg).unwrap(); + println!("{msg}"); + } + ProtocolEvent::Shutdown => { + break; + } + } + } + + task.cancel().await; + listener.cancel().await; + Ok(()) + } + + fn version() -> Result { + "0.1.0, 0.1.0".parse() + } + + fn id() -> ProtocolID { + "CHAT".into() + } +} + +fn main() { + env_logger::init(); + let cli = Cli::parse(); + + // Create a PeerID based on the username. + let peer_id = PeerID::new(cli.username.as_bytes()); + + // Create the configuration for the backend. + let config = Config { + listen_endpoint: cli.listen_endpoint, + peer_endpoints: cli.peer_endpoints, + bootstrap_peers: cli.bootstrap_peers, + discovery_port: cli.discovery_port.unwrap_or(0), + ..Default::default() + }; + + // Create a new Backend + let backend = Backend::new(peer_id, config); + + let (ctrlc_s, ctrlc_r) = channel::unbounded(); + let handle = move || ctrlc_s.try_send(()).unwrap(); + ctrlc::set_handler(handle).unwrap(); + + // Create a new Executor + let ex = Arc::new(Executor::new()); + + let ex_cloned = ex.clone(); + let task = ex.spawn(async { + let username = cli.username; + + // Attach the ChatProtocol + let c = move |peer| ChatProtocol::new(&username, peer); + backend.attach_protocol::(c).await.unwrap(); + + // Run the backend + backend.run(ex_cloned).await.unwrap(); + + // Wait for ctrlc signal + ctrlc_r.recv().await.unwrap(); + + // Shutdown the backend + backend.shutdown().await; + }); + + future::block_on(ex.run(task)); +} -- cgit v1.2.3