aboutsummaryrefslogtreecommitdiff
path: root/p2p/examples/chat.rs
diff options
context:
space:
mode:
authorhozan23 <hozan23@proton.me>2023-11-09 11:38:19 +0300
committerhozan23 <hozan23@proton.me>2023-11-09 11:38:19 +0300
commit849d827486c75b2ab223d7b0e638dbb5b74d4d1d (patch)
tree41cd3babc37147ec4a40cab8ce8ae31c91cce33b /p2p/examples/chat.rs
parentde1354525895ffbad18f90a5246fd65157f7449e (diff)
rename crates
Diffstat (limited to 'p2p/examples/chat.rs')
-rw-r--r--p2p/examples/chat.rs141
1 files changed, 141 insertions, 0 deletions
diff --git a/p2p/examples/chat.rs b/p2p/examples/chat.rs
new file mode 100644
index 0000000..4358362
--- /dev/null
+++ b/p2p/examples/chat.rs
@@ -0,0 +1,141 @@
+use std::sync::Arc;
+
+use async_std::io;
+use async_trait::async_trait;
+use clap::Parser;
+use smol::{channel, future, Executor};
+
+use karyons_net::{Endpoint, Port};
+
+use karyons_p2p::{
+ protocol::{ArcProtocol, Protocol, ProtocolEvent, ProtocolID},
+ ArcPeer, Backend, Config, P2pError, PeerID, Version,
+};
+
+#[derive(Parser)]
+#[command(author, version, about, long_about = None)]
+struct Cli {
+ /// Optional list of bootstrap peers to start the seeding process.
+ #[arg(short)]
+ bootstrap_peers: Vec<Endpoint>,
+
+ /// Optional list of peer endpoints for manual connections.
+ #[arg(short)]
+ peer_endpoints: Vec<Endpoint>,
+
+ /// Optional endpoint for accepting incoming connections.
+ #[arg(short)]
+ listen_endpoint: Option<Endpoint>,
+
+ /// Optional TCP/UDP port for the discovery service.
+ #[arg(short)]
+ discovery_port: Option<Port>,
+
+ /// Username
+ #[arg(long)]
+ username: String,
+}
+
+pub struct ChatProtocol {
+ username: String,
+ peer: ArcPeer,
+}
+
+impl ChatProtocol {
+ fn new(username: &str, peer: ArcPeer) -> ArcProtocol {
+ Arc::new(Self {
+ peer,
+ username: username.to_string(),
+ })
+ }
+}
+
+#[async_trait]
+impl Protocol for ChatProtocol {
+ async fn start(self: Arc<Self>, ex: Arc<Executor<'_>>) -> Result<(), P2pError> {
+ let selfc = self.clone();
+ let stdin = io::stdin();
+ let task = ex.spawn(async move {
+ loop {
+ let mut input = String::new();
+ stdin.read_line(&mut input).await.unwrap();
+ let msg = format!("> {}: {}", selfc.username, input.trim());
+ selfc.peer.broadcast(&Self::id(), &msg).await;
+ }
+ });
+
+ let listener = self.peer.register_listener::<Self>().await;
+ loop {
+ let event = listener.recv().await.unwrap();
+
+ match event {
+ ProtocolEvent::Message(msg) => {
+ let msg = String::from_utf8(msg).unwrap();
+ println!("{msg}");
+ }
+ ProtocolEvent::Shutdown => {
+ break;
+ }
+ }
+ }
+
+ task.cancel().await;
+ listener.cancel().await;
+ Ok(())
+ }
+
+ fn version() -> Result<Version, P2pError> {
+ "0.1.0, 0.1.0".parse()
+ }
+
+ fn id() -> ProtocolID {
+ "CHAT".into()
+ }
+}
+
+fn main() {
+ env_logger::init();
+ let cli = Cli::parse();
+
+ // Create a PeerID based on the username.
+ let peer_id = PeerID::new(cli.username.as_bytes());
+
+ // Create the configuration for the backend.
+ let config = Config {
+ listen_endpoint: cli.listen_endpoint,
+ peer_endpoints: cli.peer_endpoints,
+ bootstrap_peers: cli.bootstrap_peers,
+ discovery_port: cli.discovery_port.unwrap_or(0),
+ ..Default::default()
+ };
+
+ // Create a new Backend
+ let backend = Backend::new(peer_id, config);
+
+ let (ctrlc_s, ctrlc_r) = channel::unbounded();
+ let handle = move || ctrlc_s.try_send(()).unwrap();
+ ctrlc::set_handler(handle).unwrap();
+
+ // Create a new Executor
+ let ex = Arc::new(Executor::new());
+
+ let ex_cloned = ex.clone();
+ let task = ex.spawn(async {
+ let username = cli.username;
+
+ // Attach the ChatProtocol
+ let c = move |peer| ChatProtocol::new(&username, peer);
+ backend.attach_protocol::<ChatProtocol>(c).await.unwrap();
+
+ // Run the backend
+ backend.run(ex_cloned).await.unwrap();
+
+ // Wait for ctrlc signal
+ ctrlc_r.recv().await.unwrap();
+
+ // Shutdown the backend
+ backend.shutdown().await;
+ });
+
+ future::block_on(ex.run(task));
+}