aboutsummaryrefslogtreecommitdiff
path: root/p2p
diff options
context:
space:
mode:
authorhozan23 <hozan23@karyontech.net>2024-06-14 02:04:43 +0200
committerhozan23 <hozan23@karyontech.net>2024-06-14 02:04:43 +0200
commit0c0699c0460c1b149915729223eec701bde481df (patch)
treeb9ada7e1bd288684eb8fdf690ea1034520ab9f16 /p2p
parent60a947f6e857f0aa5ae5e8c3b0a183577f74a9f4 (diff)
p2p: WIP implement rpc server for the p2p monitor
Diffstat (limited to 'p2p')
-rw-r--r--p2p/examples/monitor/Cargo.lock43
-rw-r--r--p2p/examples/monitor/Cargo.toml1
-rw-r--r--p2p/examples/monitor/src/client.rs36
-rw-r--r--p2p/examples/monitor/src/main.rs240
-rw-r--r--p2p/examples/monitor/src/service.rs309
5 files changed, 391 insertions, 238 deletions
diff --git a/p2p/examples/monitor/Cargo.lock b/p2p/examples/monitor/Cargo.lock
index d881b93..25defa2 100644
--- a/p2p/examples/monitor/Cargo.lock
+++ b/p2p/examples/monitor/Cargo.lock
@@ -931,12 +931,28 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
[[package]]
+name = "futures"
+version = "0.3.30"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0"
+dependencies = [
+ "futures-channel",
+ "futures-core",
+ "futures-executor",
+ "futures-io",
+ "futures-sink",
+ "futures-task",
+ "futures-util",
+]
+
+[[package]]
name = "futures-channel"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78"
dependencies = [
"futures-core",
+ "futures-sink",
]
[[package]]
@@ -946,6 +962,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
[[package]]
+name = "futures-executor"
+version = "0.3.30"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d"
+dependencies = [
+ "futures-core",
+ "futures-task",
+ "futures-util",
+]
+
+[[package]]
name = "futures-io"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -980,6 +1007,17 @@ dependencies = [
]
[[package]]
+name = "futures-macro"
+version = "0.3.30"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.66",
+]
+
+[[package]]
name = "futures-rustls"
version = "0.25.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1008,9 +1046,13 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
dependencies = [
+ "futures-channel",
"futures-core",
+ "futures-io",
+ "futures-macro",
"futures-sink",
"futures-task",
+ "memchr",
"pin-project-lite",
"pin-utils",
"slab",
@@ -1386,6 +1428,7 @@ dependencies = [
"ctrlc",
"easy-parallel",
"env_logger",
+ "futures",
"karyon_core",
"karyon_jsonrpc",
"karyon_p2p",
diff --git a/p2p/examples/monitor/Cargo.toml b/p2p/examples/monitor/Cargo.toml
index b323ed1..52bab15 100644
--- a/p2p/examples/monitor/Cargo.toml
+++ b/p2p/examples/monitor/Cargo.toml
@@ -18,6 +18,7 @@ smol = "2.0.0"
serde_json = "1.0.117"
easy-parallel = "3.3.1"
ringbuffer = "0.15.0"
+futures = "0.3.30"
[[bin]]
name = "client"
diff --git a/p2p/examples/monitor/src/client.rs b/p2p/examples/monitor/src/client.rs
index b81c286..2f4d00d 100644
--- a/p2p/examples/monitor/src/client.rs
+++ b/p2p/examples/monitor/src/client.rs
@@ -1,4 +1,5 @@
use clap::Parser;
+use serde::{Deserialize, Serialize};
use karyon_jsonrpc::Client;
use karyon_p2p::endpoint::Endpoint;
@@ -11,6 +12,9 @@ struct Cli {
rpc_endpoint: Endpoint,
}
+#[derive(Deserialize, Serialize)]
+struct Pong {}
+
fn main() {
smol::block_on(async {
env_logger::init();
@@ -32,18 +36,44 @@ fn main() {
.await
.expect("Subscribe to peer pool events");
+ let (_, sub3) = rpc
+ .subscribe("MonitorRPC.discovery_subscribe", ())
+ .await
+ .expect("Subscribe to discovery events");
+
+ smol::spawn(async move {
+ loop {
+ let event = sub.recv().await.expect("Receive connection event");
+ println!("Receive new connection event: {event}");
+ }
+ })
+ .detach();
+
smol::spawn(async move {
loop {
- let _event = sub.recv().await.expect("Receive connection event");
+ let event = sub2.recv().await.expect("Receive peer pool event");
+ println!("Receive new peerpool event: {event}");
}
})
.detach();
smol::spawn(async move {
loop {
- let _event = sub2.recv().await.expect("Receive peer pool event");
+ let event = sub3.recv().await.expect("Receive discovery event");
+ println!("Receive new discovery event: {event}");
}
})
- .await;
+ .detach();
+
+ // start ping-pong loop
+ loop {
+ smol::Timer::after(std::time::Duration::from_secs(1)).await;
+ let _: Pong = rpc
+ .call("MonitorRPC.ping", ())
+ .await
+ .expect("Receive pong message");
+
+ println!("Receive pong message");
+ }
});
}
diff --git a/p2p/examples/monitor/src/main.rs b/p2p/examples/monitor/src/main.rs
index c57d06c..990f8d2 100644
--- a/p2p/examples/monitor/src/main.rs
+++ b/p2p/examples/monitor/src/main.rs
@@ -1,28 +1,21 @@
+mod service;
mod shared;
use std::sync::Arc;
use clap::Parser;
-use log::error;
-use ringbuffer::{AllocRingBuffer, RingBuffer};
-use serde::{Deserialize, Serialize};
-use smol::{channel, lock::Mutex, Executor};
+use smol::{channel, Executor};
-use karyon_core::async_util::{CondWait, TaskGroup, TaskResult};
-use karyon_jsonrpc::{
- message::SubscriptionID, rpc_impl, rpc_pubsub_impl, Channel, Server, Subscription,
-};
+use karyon_jsonrpc::Server;
use karyon_p2p::{
endpoint::{Endpoint, Port},
keypair::{KeyPair, KeyPairType},
- monitor::{ConnEvent, DiscoveryEvent, PeerPoolEvent},
- ArcBackend, Backend, Config, Error, Result,
+ Backend, Config,
};
+use service::MonitorRPC;
use shared::run_executor;
-const EVENT_BUFFER_SIZE: usize = 30;
-
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
struct Cli {
@@ -47,202 +40,6 @@ struct Cli {
discovery_port: Option<Port>,
}
-struct MonitorRPC {
- backend: ArcBackend,
- conn_event_buffer: Arc<Mutex<AllocRingBuffer<ConnEvent>>>,
- pp_event_buffer: Arc<Mutex<AllocRingBuffer<PeerPoolEvent>>>,
- discv_event_buffer: Arc<Mutex<AllocRingBuffer<DiscoveryEvent>>>,
- conn_event_condvar: Arc<CondWait>,
- pp_event_condvar: Arc<CondWait>,
- discv_event_condvar: Arc<CondWait>,
- task_group: TaskGroup,
-}
-
-impl MonitorRPC {
- fn new(backend: ArcBackend, ex: Arc<Executor<'static>>) -> Arc<Self> {
- Arc::new(MonitorRPC {
- backend,
- conn_event_buffer: Arc::new(Mutex::new(AllocRingBuffer::new(EVENT_BUFFER_SIZE))),
- pp_event_buffer: Arc::new(Mutex::new(AllocRingBuffer::new(EVENT_BUFFER_SIZE))),
- discv_event_buffer: Arc::new(Mutex::new(AllocRingBuffer::new(EVENT_BUFFER_SIZE))),
- conn_event_condvar: Arc::new(CondWait::new()),
- pp_event_condvar: Arc::new(CondWait::new()),
- discv_event_condvar: Arc::new(CondWait::new()),
- task_group: TaskGroup::with_executor(ex.into()),
- })
- }
-
- async fn run(&self) -> Result<()> {
- let conn_events = self.backend.monitor().conn_events().await;
- let peer_pool_events = self.backend.monitor().peer_pool_events().await;
- let discovery_events = self.backend.monitor().discovery_events().await;
-
- let conn_event_buffer = self.conn_event_buffer.clone();
- let pp_event_buffer = self.pp_event_buffer.clone();
- let discv_event_buffer = self.discv_event_buffer.clone();
-
- let conn_event_condvar = self.conn_event_condvar.clone();
- let pp_event_condvar = self.pp_event_condvar.clone();
- let discv_event_condvar = self.discv_event_condvar.clone();
-
- let on_failuer = |res: TaskResult<Result<()>>| async move {
- if let TaskResult::Completed(Err(err)) = res {
- error!("Event receive loop: {err}")
- }
- };
-
- self.task_group.spawn(
- async move {
- loop {
- let event = conn_events.recv().await?;
- conn_event_buffer.lock().await.push(event);
- conn_event_condvar.broadcast().await;
- }
- },
- on_failuer,
- );
-
- self.task_group.spawn(
- async move {
- loop {
- let event = peer_pool_events.recv().await?;
- pp_event_buffer.lock().await.push(event);
- pp_event_condvar.broadcast().await;
- }
- },
- on_failuer,
- );
-
- self.task_group.spawn(
- async move {
- loop {
- let event = discovery_events.recv().await?;
- discv_event_buffer.lock().await.push(event);
- discv_event_condvar.broadcast().await;
- }
- },
- on_failuer,
- );
-
- Ok(())
- }
-
- async fn shutdown(&self) {
- self.task_group.cancel().await;
- }
-}
-
-#[rpc_impl]
-impl MonitorRPC {
- async fn peer_id(
- &self,
- _params: serde_json::Value,
- ) -> karyon_jsonrpc::Result<serde_json::Value> {
- Ok(serde_json::json!(self.backend.peer_id().to_string()))
- }
-
- async fn inbound_connection(
- &self,
- _params: serde_json::Value,
- ) -> karyon_jsonrpc::Result<serde_json::Value> {
- Ok(serde_json::json!(self.backend.inbound_slots()))
- }
-
- async fn outbound_connection(
- &self,
- _params: serde_json::Value,
- ) -> karyon_jsonrpc::Result<serde_json::Value> {
- Ok(serde_json::json!(self.backend.outbound_slots()))
- }
-}
-
-#[rpc_pubsub_impl]
-impl MonitorRPC {
- async fn conn_subscribe(
- &self,
- chan: Arc<Channel>,
- method: String,
- _params: serde_json::Value,
- ) -> karyon_jsonrpc::Result<serde_json::Value> {
- let sub = chan.new_subscription(&method).await;
- let sub_id = sub.id;
-
- let cond_wait = self.conn_event_condvar.clone();
- let buffer = self.conn_event_buffer.clone();
- self.task_group
- .spawn(notify(sub, cond_wait, buffer), notify_failed);
-
- Ok(serde_json::json!(sub_id))
- }
-
- async fn peer_pool_subscribe(
- &self,
- chan: Arc<Channel>,
- method: String,
- _params: serde_json::Value,
- ) -> karyon_jsonrpc::Result<serde_json::Value> {
- let sub = chan.new_subscription(&method).await;
- let sub_id = sub.id;
-
- let cond_wait = self.pp_event_condvar.clone();
- let buffer = self.pp_event_buffer.clone();
- self.task_group
- .spawn(notify(sub, cond_wait, buffer), notify_failed);
-
- Ok(serde_json::json!(sub_id))
- }
-
- async fn discovery_subscribe(
- &self,
- chan: Arc<Channel>,
- method: String,
- _params: serde_json::Value,
- ) -> karyon_jsonrpc::Result<serde_json::Value> {
- let sub = chan.new_subscription(&method).await;
- let sub_id = sub.id;
-
- let cond_wait = self.discv_event_condvar.clone();
- let buffer = self.discv_event_buffer.clone();
- self.task_group
- .spawn(notify(sub, cond_wait, buffer), notify_failed);
-
- Ok(serde_json::json!(sub_id))
- }
-
- async fn conn_unsubscribe(
- &self,
- chan: Arc<Channel>,
- _method: String,
- params: serde_json::Value,
- ) -> karyon_jsonrpc::Result<serde_json::Value> {
- let sub_id: SubscriptionID = serde_json::from_value(params)?;
- chan.remove_subscription(&sub_id).await;
- Ok(serde_json::json!(true))
- }
-
- async fn peer_pool_unsubscribe(
- &self,
- chan: Arc<Channel>,
- _method: String,
- params: serde_json::Value,
- ) -> karyon_jsonrpc::Result<serde_json::Value> {
- let sub_id: SubscriptionID = serde_json::from_value(params)?;
- chan.remove_subscription(&sub_id).await;
- Ok(serde_json::json!(true))
- }
-
- async fn discovery_unsubscribe(
- &self,
- chan: Arc<Channel>,
- _method: String,
- params: serde_json::Value,
- ) -> karyon_jsonrpc::Result<serde_json::Value> {
- let sub_id: SubscriptionID = serde_json::from_value(params)?;
- chan.remove_subscription(&sub_id).await;
- Ok(serde_json::json!(true))
- }
-}
-
fn main() {
env_logger::init();
@@ -309,30 +106,3 @@ fn main() {
ex,
);
}
-
-async fn notify<T: Serialize + Deserialize<'static> + Clone>(
- sub: Subscription,
- cond_wait: Arc<CondWait>,
- buffer: Arc<Mutex<AllocRingBuffer<T>>>,
-) -> Result<()> {
- for event in buffer.lock().await.iter() {
- if let Err(err) = sub.notify(serde_json::json!(event)).await {
- return Err(Error::Other(format!("failed to notify: {err}")));
- }
- }
- loop {
- cond_wait.wait().await;
- cond_wait.reset().await;
- if let Some(event) = buffer.lock().await.back().cloned() {
- if let Err(err) = sub.notify(serde_json::json!(event)).await {
- return Err(Error::Other(format!("failed to notify: {err}")));
- }
- }
- }
-}
-
-async fn notify_failed(result: TaskResult<Result<()>>) {
- if let TaskResult::Completed(Err(err)) = result {
- error!("Error: {err}");
- }
-}
diff --git a/p2p/examples/monitor/src/service.rs b/p2p/examples/monitor/src/service.rs
new file mode 100644
index 0000000..15ce8da
--- /dev/null
+++ b/p2p/examples/monitor/src/service.rs
@@ -0,0 +1,309 @@
+use std::{collections::HashMap, sync::Arc};
+
+use futures::stream::{FuturesUnordered, StreamExt};
+use log::{debug, error};
+use ringbuffer::{AllocRingBuffer, RingBuffer};
+use serde::{Deserialize, Serialize};
+use smol::{lock::Mutex, Executor};
+
+use karyon_core::async_util::{TaskGroup, TaskResult};
+use karyon_jsonrpc::{message::SubscriptionID, rpc_impl, rpc_pubsub_impl, Channel, Subscription};
+use karyon_p2p::{monitor::MonitorTopic, ArcBackend, Result};
+
+const EVENT_BUFFER_SIZE: usize = 60;
+
+pub struct MonitorRPC {
+ backend: ArcBackend,
+ subs: Arc<Subscriptions>,
+ task_group: TaskGroup,
+}
+
+impl MonitorRPC {
+ pub fn new(backend: ArcBackend, ex: Arc<Executor<'static>>) -> Arc<Self> {
+ Arc::new(MonitorRPC {
+ backend,
+ task_group: TaskGroup::with_executor(ex.clone().into()),
+ subs: Subscriptions::new(ex),
+ })
+ }
+
+ pub async fn run(self: &Arc<Self>) -> Result<()> {
+ let conn_events = self.backend.monitor().conn_events().await;
+ let peer_pool_events = self.backend.monitor().peer_pool_events().await;
+ let discovery_events = self.backend.monitor().discovery_events().await;
+
+ let on_failuer = |res: TaskResult<Result<()>>| async move {
+ if let TaskResult::Completed(Err(err)) = res {
+ error!("Event receive loop: {err}")
+ }
+ };
+
+ let selfc = self.clone();
+ self.task_group.spawn(
+ async move {
+ loop {
+ let event = conn_events.recv().await?;
+ selfc.subs.notify(MonitorTopic::Conn, event).await;
+ }
+ },
+ on_failuer,
+ );
+
+ let selfc = self.clone();
+ self.task_group.spawn(
+ async move {
+ loop {
+ let event = peer_pool_events.recv().await?;
+ selfc.subs.notify(MonitorTopic::PeerPool, event).await;
+ }
+ },
+ on_failuer,
+ );
+
+ let selfc = self.clone();
+ self.task_group.spawn(
+ async move {
+ loop {
+ let event = discovery_events.recv().await?;
+ selfc.subs.notify(MonitorTopic::Discovery, event).await;
+ }
+ },
+ on_failuer,
+ );
+
+ Ok(())
+ }
+
+ pub async fn shutdown(&self) {
+ self.task_group.cancel().await;
+ self.subs.stop().await;
+ }
+}
+
+#[rpc_impl]
+impl MonitorRPC {
+ pub async fn ping(
+ &self,
+ _params: serde_json::Value,
+ ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ Ok(serde_json::json!(Pong {}))
+ }
+
+ pub async fn peer_id(
+ &self,
+ _params: serde_json::Value,
+ ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ Ok(serde_json::json!(self.backend.peer_id().to_string()))
+ }
+
+ pub async fn inbound_connection(
+ &self,
+ _params: serde_json::Value,
+ ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ Ok(serde_json::json!(self.backend.inbound_slots()))
+ }
+
+ pub async fn outbound_connection(
+ &self,
+ _params: serde_json::Value,
+ ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ Ok(serde_json::json!(self.backend.outbound_slots()))
+ }
+}
+
+#[rpc_pubsub_impl]
+impl MonitorRPC {
+ pub async fn conn_subscribe(
+ &self,
+ chan: Arc<Channel>,
+ method: String,
+ _params: serde_json::Value,
+ ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ let sub = chan.new_subscription(&method).await;
+ let sub_id = sub.id;
+
+ self.subs.add(MonitorTopic::Conn, sub).await;
+
+ Ok(serde_json::json!(sub_id))
+ }
+
+ pub async fn peer_pool_subscribe(
+ &self,
+ chan: Arc<Channel>,
+ method: String,
+ _params: serde_json::Value,
+ ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ let sub = chan.new_subscription(&method).await;
+ let sub_id = sub.id;
+
+ self.subs.add(MonitorTopic::PeerPool, sub).await;
+
+ Ok(serde_json::json!(sub_id))
+ }
+
+ pub async fn discovery_subscribe(
+ &self,
+ chan: Arc<Channel>,
+ method: String,
+ _params: serde_json::Value,
+ ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ let sub = chan.new_subscription(&method).await;
+ let sub_id = sub.id;
+
+ self.subs.add(MonitorTopic::Discovery, sub).await;
+
+ Ok(serde_json::json!(sub_id))
+ }
+
+ pub async fn conn_unsubscribe(
+ &self,
+ chan: Arc<Channel>,
+ _method: String,
+ params: serde_json::Value,
+ ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ let sub_id: SubscriptionID = serde_json::from_value(params)?;
+ chan.remove_subscription(&sub_id).await;
+ Ok(serde_json::json!(true))
+ }
+
+ pub async fn peer_pool_unsubscribe(
+ &self,
+ chan: Arc<Channel>,
+ _method: String,
+ params: serde_json::Value,
+ ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ let sub_id: SubscriptionID = serde_json::from_value(params)?;
+ chan.remove_subscription(&sub_id).await;
+ Ok(serde_json::json!(true))
+ }
+
+ pub async fn discovery_unsubscribe(
+ &self,
+ chan: Arc<Channel>,
+ _method: String,
+ params: serde_json::Value,
+ ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ let sub_id: SubscriptionID = serde_json::from_value(params)?;
+ chan.remove_subscription(&sub_id).await;
+ Ok(serde_json::json!(true))
+ }
+}
+
+#[derive(Deserialize, Serialize)]
+struct Pong {}
+
+struct Subscriptions {
+ subs: Mutex<HashMap<MonitorTopic, HashMap<SubscriptionID, Subscription>>>,
+ buffer: Mutex<HashMap<MonitorTopic, AllocRingBuffer<serde_json::Value>>>,
+ task_group: TaskGroup,
+}
+
+impl Subscriptions {
+ fn new(ex: Arc<Executor<'static>>) -> Arc<Self> {
+ let mut subs = HashMap::new();
+ subs.insert(MonitorTopic::Conn, HashMap::new());
+ subs.insert(MonitorTopic::PeerPool, HashMap::new());
+ subs.insert(MonitorTopic::Discovery, HashMap::new());
+
+ let mut buffer = HashMap::new();
+ buffer.insert(MonitorTopic::Conn, AllocRingBuffer::new(EVENT_BUFFER_SIZE));
+ buffer.insert(
+ MonitorTopic::PeerPool,
+ AllocRingBuffer::new(EVENT_BUFFER_SIZE),
+ );
+ buffer.insert(
+ MonitorTopic::Discovery,
+ AllocRingBuffer::new(EVENT_BUFFER_SIZE),
+ );
+
+ Arc::new(Self {
+ subs: Mutex::new(subs),
+ buffer: Mutex::new(buffer),
+ task_group: TaskGroup::with_executor(ex.into()),
+ })
+ }
+
+ /// Adds the subscription to the subs map according to the given type
+ async fn add(self: &Arc<Self>, ty: MonitorTopic, sub: Subscription) {
+ match self.subs.lock().await.get_mut(&ty) {
+ Some(subs) => {
+ subs.insert(sub.id, sub.clone());
+ }
+ None => todo!(),
+ }
+ // Send old events in the buffer to the subscriber
+ self.send_old_events(ty, sub).await;
+ }
+
+ /// Notifies all subscribers
+ async fn notify<T: Serialize>(&self, ty: MonitorTopic, event: T) {
+ let event = serde_json::json!(event);
+ // Add the new event to the ringbuffer
+ match self.buffer.lock().await.get_mut(&ty) {
+ Some(events) => events.push(event.clone()),
+ None => todo!(),
+ }
+
+ // Notify the subscribers
+ match self.subs.lock().await.get_mut(&ty) {
+ Some(subs) => {
+ let mut fulist = FuturesUnordered::new();
+
+ for sub in subs.values() {
+ let fu = async { (sub.id, sub.notify(event.clone()).await) };
+ fulist.push(fu)
+ }
+
+ let mut cleanup_list = vec![];
+ while let Some((sub_id, result)) = fulist.next().await {
+ if let Err(err) = result {
+ error!("Failed to notify the subscription: {:?} {sub_id} {err}", ty);
+ cleanup_list.push(sub_id);
+ }
+ }
+ drop(fulist);
+
+ for sub_id in cleanup_list {
+ subs.remove(&sub_id);
+ }
+ }
+ None => todo!(),
+ }
+ }
+
+ /// Sends old events in the ringbuffer to the new subscriber.
+ async fn send_old_events(self: &Arc<Self>, ty: MonitorTopic, sub: Subscription) {
+ let ty_cloned = ty.clone();
+ let sub_id = sub.id;
+ let on_complete = move |res: TaskResult<()>| async move {
+ debug!("Send old events: {:?} {:?} {res}", ty_cloned, sub_id);
+ };
+
+ let selfc = self.clone();
+ self.task_group.spawn(
+ async move {
+ match selfc.buffer.lock().await.get_mut(&ty) {
+ Some(events) => {
+ let mut fu = FuturesUnordered::new();
+
+ for event in events.iter().rev() {
+ fu.push(sub.notify(event.clone()))
+ }
+
+ while let Some(result) = fu.next().await {
+ if result.is_err() {
+ return;
+ }
+ }
+ }
+ None => todo!(),
+ }
+ },
+ on_complete,
+ );
+ }
+
+ async fn stop(&self) {
+ self.task_group.cancel().await;
+ }
+}