aboutsummaryrefslogtreecommitdiff
path: root/p2p
diff options
context:
space:
mode:
Diffstat (limited to 'p2p')
-rw-r--r--p2p/Cargo.toml2
-rw-r--r--p2p/examples/chat.rs24
-rw-r--r--p2p/examples/peer.rs8
-rw-r--r--p2p/examples/shared/mod.rs13
-rw-r--r--p2p/examples/tokio-example/src/main.rs10
5 files changed, 34 insertions, 23 deletions
diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml
index 3f61fee..9c303e9 100644
--- a/p2p/Cargo.toml
+++ b/p2p/Cargo.toml
@@ -59,7 +59,7 @@ tokio-rustls = { version = "0.26.0", features = ["aws-lc-rs"], optional = true }
rustls-pki-types = "1.7.0"
[dev-dependencies]
-async-std = "1.12.0"
+blocking = "1.6.1"
clap = { version = "4.5.7", features = ["derive"] }
ctrlc = "3.4.4"
easy-parallel = "3.3.1"
diff --git a/p2p/examples/chat.rs b/p2p/examples/chat.rs
index 1ad215c..5867c8b 100644
--- a/p2p/examples/chat.rs
+++ b/p2p/examples/chat.rs
@@ -2,7 +2,6 @@ mod shared;
use std::sync::Arc;
-use async_std::io;
use async_trait::async_trait;
use clap::Parser;
use smol::{channel, Executor};
@@ -14,7 +13,7 @@ use karyon_p2p::{
Backend, Config, Error, Peer, Version,
};
-use shared::run_executor;
+use shared::{read_line_async, run_executor};
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
@@ -59,13 +58,11 @@ impl ChatProtocol {
#[async_trait]
impl Protocol for ChatProtocol {
async fn start(self: Arc<Self>) -> Result<(), Error> {
- let stdin = io::stdin();
let task = self.executor.spawn({
let this = self.clone();
async move {
loop {
- let mut input = String::new();
- stdin.read_line(&mut input).await.unwrap();
+ let input = read_line_async().await.expect("Read line from stdin");
let msg = format!("> {}: {}", this.username, input.trim());
this.peer.broadcast(&Self::id(), &msg).await;
}
@@ -74,11 +71,11 @@ impl Protocol for ChatProtocol {
let listener = self.peer.register_listener::<Self>().await;
loop {
- let event = listener.recv().await.unwrap();
+ let event = listener.recv().await.expect("Receive new protocol event");
match event {
ProtocolEvent::Message(msg) => {
- let msg = String::from_utf8(msg).unwrap();
+ let msg = String::from_utf8(msg).expect("Convert received bytes to string");
println!("{msg}");
}
ProtocolEvent::Shutdown => {
@@ -125,8 +122,8 @@ fn main() {
let backend = Backend::new(&key_pair, config, ex.clone().into());
let (ctrlc_s, ctrlc_r) = channel::unbounded();
- let handle = move || ctrlc_s.try_send(()).unwrap();
- ctrlc::set_handler(handle).unwrap();
+ let handle = move || ctrlc_s.try_send(()).expect("Send ctrlc signal");
+ ctrlc::set_handler(handle).expect("ctrlc set handler");
run_executor(
{
@@ -136,13 +133,16 @@ fn main() {
// Attach the ChatProtocol
let c = move |peer| ChatProtocol::new(&username, peer, ex.clone().into());
- backend.attach_protocol::<ChatProtocol>(c).await.unwrap();
+ backend
+ .attach_protocol::<ChatProtocol>(c)
+ .await
+ .expect("Attach chat protocol to the p2p backend");
// Run the backend
- backend.run().await.unwrap();
+ backend.run().await.expect("Run the backend");
// Wait for ctrlc signal
- ctrlc_r.recv().await.unwrap();
+ ctrlc_r.recv().await.expect("Receive ctrlc signal");
// Shutdown the backend
backend.shutdown().await;
diff --git a/p2p/examples/peer.rs b/p2p/examples/peer.rs
index 77bf732..58b9fc6 100644
--- a/p2p/examples/peer.rs
+++ b/p2p/examples/peer.rs
@@ -55,16 +55,16 @@ fn main() {
let backend = Backend::new(&key_pair, config, ex.clone().into());
let (ctrlc_s, ctrlc_r) = channel::unbounded();
- let handle = move || ctrlc_s.try_send(()).unwrap();
- ctrlc::set_handler(handle).unwrap();
+ let handle = move || ctrlc_s.try_send(()).expect("Send ctrlc signal");
+ ctrlc::set_handler(handle).expect("ctrlc set handler");
run_executor(
async {
// Run the backend
- backend.run().await.unwrap();
+ backend.run().await.expect("Run the backend");
// Wait for ctrlc signal
- ctrlc_r.recv().await.unwrap();
+ ctrlc_r.recv().await.expect("Receive ctrlc signal");
// Shutdown the backend
backend.shutdown().await;
diff --git a/p2p/examples/shared/mod.rs b/p2p/examples/shared/mod.rs
index 0e8079c..8c00446 100644
--- a/p2p/examples/shared/mod.rs
+++ b/p2p/examples/shared/mod.rs
@@ -1,8 +1,19 @@
-use std::{num::NonZeroUsize, sync::Arc, thread};
+use std::{io, num::NonZeroUsize, sync::Arc, thread};
+use blocking::unblock;
use easy_parallel::Parallel;
use smol::{channel, future, future::Future, Executor};
+#[allow(dead_code)]
+pub async fn read_line_async() -> Result<String, io::Error> {
+ unblock(|| {
+ let mut input = String::new();
+ std::io::stdin().read_line(&mut input)?;
+ Ok(input)
+ })
+ .await
+}
+
/// Returns an estimate of the default amount of parallelism a program should use.
/// see `std::thread::available_parallelism`
pub fn available_parallelism() -> usize {
diff --git a/p2p/examples/tokio-example/src/main.rs b/p2p/examples/tokio-example/src/main.rs
index 6a81ac6..5858e90 100644
--- a/p2p/examples/tokio-example/src/main.rs
+++ b/p2p/examples/tokio-example/src/main.rs
@@ -57,22 +57,22 @@ fn main() {
.worker_threads(available_parallelism())
.enable_all()
.build()
- .unwrap(),
+ .expect("Build a new tokio runtime"),
);
// Create a new Backend
let backend = Backend::new(&key_pair, config, rt.clone().into());
let (ctrlc_s, ctrlc_r) = async_channel::unbounded();
- let handle = move || ctrlc_s.try_send(()).unwrap();
- ctrlc::set_handler(handle).unwrap();
+ let handle = move || ctrlc_s.try_send(()).expect("Send ctrlc signal");
+ ctrlc::set_handler(handle).expect("ctrlc set handler");
rt.block_on(async {
// Run the backend
- backend.run().await.unwrap();
+ backend.run().await.expect("Run the backend");
// Wait for ctrlc signal
- ctrlc_r.recv().await.unwrap();
+ ctrlc_r.recv().await.expect("Receive ctrlc signal");
// Shutdown the backend
backend.shutdown().await;