From 4dc6ae61c66d2ecedb3dbd519dde89e8afc727a9 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Sun, 30 Jun 2024 01:13:45 +0200 Subject: p2p: remove async-std from dev dependencies & clean up examples --- p2p/examples/chat.rs | 24 ++++++++++++------------ p2p/examples/peer.rs | 8 ++++---- p2p/examples/shared/mod.rs | 13 ++++++++++++- p2p/examples/tokio-example/src/main.rs | 10 +++++----- 4 files changed, 33 insertions(+), 22 deletions(-) (limited to 'p2p/examples') diff --git a/p2p/examples/chat.rs b/p2p/examples/chat.rs index 1ad215c..5867c8b 100644 --- a/p2p/examples/chat.rs +++ b/p2p/examples/chat.rs @@ -2,7 +2,6 @@ mod shared; use std::sync::Arc; -use async_std::io; use async_trait::async_trait; use clap::Parser; use smol::{channel, Executor}; @@ -14,7 +13,7 @@ use karyon_p2p::{ Backend, Config, Error, Peer, Version, }; -use shared::run_executor; +use shared::{read_line_async, run_executor}; #[derive(Parser)] #[command(author, version, about, long_about = None)] @@ -59,13 +58,11 @@ impl ChatProtocol { #[async_trait] impl Protocol for ChatProtocol { async fn start(self: Arc) -> Result<(), Error> { - let stdin = io::stdin(); let task = self.executor.spawn({ let this = self.clone(); async move { loop { - let mut input = String::new(); - stdin.read_line(&mut input).await.unwrap(); + let input = read_line_async().await.expect("Read line from stdin"); let msg = format!("> {}: {}", this.username, input.trim()); this.peer.broadcast(&Self::id(), &msg).await; } @@ -74,11 +71,11 @@ impl Protocol for ChatProtocol { let listener = self.peer.register_listener::().await; loop { - let event = listener.recv().await.unwrap(); + let event = listener.recv().await.expect("Receive new protocol event"); match event { ProtocolEvent::Message(msg) => { - let msg = String::from_utf8(msg).unwrap(); + let msg = String::from_utf8(msg).expect("Convert received bytes to string"); println!("{msg}"); } ProtocolEvent::Shutdown => { @@ -125,8 +122,8 @@ fn main() { let backend = Backend::new(&key_pair, config, ex.clone().into()); let (ctrlc_s, ctrlc_r) = channel::unbounded(); - let handle = move || ctrlc_s.try_send(()).unwrap(); - ctrlc::set_handler(handle).unwrap(); + let handle = move || ctrlc_s.try_send(()).expect("Send ctrlc signal"); + ctrlc::set_handler(handle).expect("ctrlc set handler"); run_executor( { @@ -136,13 +133,16 @@ fn main() { // Attach the ChatProtocol let c = move |peer| ChatProtocol::new(&username, peer, ex.clone().into()); - backend.attach_protocol::(c).await.unwrap(); + backend + .attach_protocol::(c) + .await + .expect("Attach chat protocol to the p2p backend"); // Run the backend - backend.run().await.unwrap(); + backend.run().await.expect("Run the backend"); // Wait for ctrlc signal - ctrlc_r.recv().await.unwrap(); + ctrlc_r.recv().await.expect("Receive ctrlc signal"); // Shutdown the backend backend.shutdown().await; diff --git a/p2p/examples/peer.rs b/p2p/examples/peer.rs index 77bf732..58b9fc6 100644 --- a/p2p/examples/peer.rs +++ b/p2p/examples/peer.rs @@ -55,16 +55,16 @@ fn main() { let backend = Backend::new(&key_pair, config, ex.clone().into()); let (ctrlc_s, ctrlc_r) = channel::unbounded(); - let handle = move || ctrlc_s.try_send(()).unwrap(); - ctrlc::set_handler(handle).unwrap(); + let handle = move || ctrlc_s.try_send(()).expect("Send ctrlc signal"); + ctrlc::set_handler(handle).expect("ctrlc set handler"); run_executor( async { // Run the backend - backend.run().await.unwrap(); + backend.run().await.expect("Run the backend"); // Wait for ctrlc signal - ctrlc_r.recv().await.unwrap(); + ctrlc_r.recv().await.expect("Receive ctrlc signal"); // Shutdown the backend backend.shutdown().await; diff --git a/p2p/examples/shared/mod.rs b/p2p/examples/shared/mod.rs index 0e8079c..8c00446 100644 --- a/p2p/examples/shared/mod.rs +++ b/p2p/examples/shared/mod.rs @@ -1,8 +1,19 @@ -use std::{num::NonZeroUsize, sync::Arc, thread}; +use std::{io, num::NonZeroUsize, sync::Arc, thread}; +use blocking::unblock; use easy_parallel::Parallel; use smol::{channel, future, future::Future, Executor}; +#[allow(dead_code)] +pub async fn read_line_async() -> Result { + unblock(|| { + let mut input = String::new(); + std::io::stdin().read_line(&mut input)?; + Ok(input) + }) + .await +} + /// Returns an estimate of the default amount of parallelism a program should use. /// see `std::thread::available_parallelism` pub fn available_parallelism() -> usize { diff --git a/p2p/examples/tokio-example/src/main.rs b/p2p/examples/tokio-example/src/main.rs index 6a81ac6..5858e90 100644 --- a/p2p/examples/tokio-example/src/main.rs +++ b/p2p/examples/tokio-example/src/main.rs @@ -57,22 +57,22 @@ fn main() { .worker_threads(available_parallelism()) .enable_all() .build() - .unwrap(), + .expect("Build a new tokio runtime"), ); // Create a new Backend let backend = Backend::new(&key_pair, config, rt.clone().into()); let (ctrlc_s, ctrlc_r) = async_channel::unbounded(); - let handle = move || ctrlc_s.try_send(()).unwrap(); - ctrlc::set_handler(handle).unwrap(); + let handle = move || ctrlc_s.try_send(()).expect("Send ctrlc signal"); + ctrlc::set_handler(handle).expect("ctrlc set handler"); rt.block_on(async { // Run the backend - backend.run().await.unwrap(); + backend.run().await.expect("Run the backend"); // Wait for ctrlc signal - ctrlc_r.recv().await.unwrap(); + ctrlc_r.recv().await.expect("Receive ctrlc signal"); // Shutdown the backend backend.shutdown().await; -- cgit v1.2.3