aboutsummaryrefslogtreecommitdiff
path: root/p2p/examples/monitor
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/examples/monitor')
-rw-r--r--p2p/examples/monitor/src/service.rs51
1 files changed, 21 insertions, 30 deletions
diff --git a/p2p/examples/monitor/src/service.rs b/p2p/examples/monitor/src/service.rs
index 15ce8da..bc6ab81 100644
--- a/p2p/examples/monitor/src/service.rs
+++ b/p2p/examples/monitor/src/service.rs
@@ -4,10 +4,13 @@ use futures::stream::{FuturesUnordered, StreamExt};
use log::{debug, error};
use ringbuffer::{AllocRingBuffer, RingBuffer};
use serde::{Deserialize, Serialize};
+use serde_json::Value;
use smol::{lock::Mutex, Executor};
use karyon_core::async_util::{TaskGroup, TaskResult};
-use karyon_jsonrpc::{message::SubscriptionID, rpc_impl, rpc_pubsub_impl, Channel, Subscription};
+use karyon_jsonrpc::{
+ message::SubscriptionID, rpc_impl, rpc_pubsub_impl, Channel, RPCResult, Subscription,
+};
use karyon_p2p::{monitor::MonitorTopic, ArcBackend, Result};
const EVENT_BUFFER_SIZE: usize = 60;
@@ -82,31 +85,19 @@ impl MonitorRPC {
#[rpc_impl]
impl MonitorRPC {
- pub async fn ping(
- &self,
- _params: serde_json::Value,
- ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ pub async fn ping(&self, _params: Value) -> RPCResult<Value> {
Ok(serde_json::json!(Pong {}))
}
- pub async fn peer_id(
- &self,
- _params: serde_json::Value,
- ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ pub async fn peer_id(&self, _params: Value) -> RPCResult<Value> {
Ok(serde_json::json!(self.backend.peer_id().to_string()))
}
- pub async fn inbound_connection(
- &self,
- _params: serde_json::Value,
- ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ pub async fn inbound_connection(&self, _params: Value) -> RPCResult<Value> {
Ok(serde_json::json!(self.backend.inbound_slots()))
}
- pub async fn outbound_connection(
- &self,
- _params: serde_json::Value,
- ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ pub async fn outbound_connection(&self, _params: Value) -> RPCResult<Value> {
Ok(serde_json::json!(self.backend.outbound_slots()))
}
}
@@ -117,8 +108,8 @@ impl MonitorRPC {
&self,
chan: Arc<Channel>,
method: String,
- _params: serde_json::Value,
- ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ _params: Value,
+ ) -> RPCResult<Value> {
let sub = chan.new_subscription(&method).await;
let sub_id = sub.id;
@@ -131,8 +122,8 @@ impl MonitorRPC {
&self,
chan: Arc<Channel>,
method: String,
- _params: serde_json::Value,
- ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ _params: Value,
+ ) -> RPCResult<Value> {
let sub = chan.new_subscription(&method).await;
let sub_id = sub.id;
@@ -145,8 +136,8 @@ impl MonitorRPC {
&self,
chan: Arc<Channel>,
method: String,
- _params: serde_json::Value,
- ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ _params: Value,
+ ) -> RPCResult<Value> {
let sub = chan.new_subscription(&method).await;
let sub_id = sub.id;
@@ -159,8 +150,8 @@ impl MonitorRPC {
&self,
chan: Arc<Channel>,
_method: String,
- params: serde_json::Value,
- ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ params: Value,
+ ) -> RPCResult<Value> {
let sub_id: SubscriptionID = serde_json::from_value(params)?;
chan.remove_subscription(&sub_id).await;
Ok(serde_json::json!(true))
@@ -170,8 +161,8 @@ impl MonitorRPC {
&self,
chan: Arc<Channel>,
_method: String,
- params: serde_json::Value,
- ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ params: Value,
+ ) -> RPCResult<Value> {
let sub_id: SubscriptionID = serde_json::from_value(params)?;
chan.remove_subscription(&sub_id).await;
Ok(serde_json::json!(true))
@@ -181,8 +172,8 @@ impl MonitorRPC {
&self,
chan: Arc<Channel>,
_method: String,
- params: serde_json::Value,
- ) -> karyon_jsonrpc::Result<serde_json::Value> {
+ params: Value,
+ ) -> RPCResult<Value> {
let sub_id: SubscriptionID = serde_json::from_value(params)?;
chan.remove_subscription(&sub_id).await;
Ok(serde_json::json!(true))
@@ -194,7 +185,7 @@ struct Pong {}
struct Subscriptions {
subs: Mutex<HashMap<MonitorTopic, HashMap<SubscriptionID, Subscription>>>,
- buffer: Mutex<HashMap<MonitorTopic, AllocRingBuffer<serde_json::Value>>>,
+ buffer: Mutex<HashMap<MonitorTopic, AllocRingBuffer<Value>>>,
task_group: TaskGroup,
}