Skip to content

Commit

Permalink
add graph nodes and channels rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyukang committed Sep 23, 2024
1 parent 6805b76 commit 97be399
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 14 deletions.
5 changes: 5 additions & 0 deletions src/fiber/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ where
network_graph
}

pub fn chain_hash(&self) -> Hash256 {
self.chain_hash
}

fn load_from_store(&mut self) {
let channels = self.store.get_channels(None);
for channel in channels.iter() {
Expand Down Expand Up @@ -259,6 +263,7 @@ where
// If the channel already exists, we don't need to update it
// FIXME: if other fields is different, we should consider it as malioucious and ban the node?
if channel.one_to_two.is_some() || channel.two_to_one.is_some() {
debug!("channel already exists, ignoring: {:?}", &channel_info);
return;
}
}
Expand Down
106 changes: 92 additions & 14 deletions src/rpc/graph.rs
Original file line number Diff line number Diff line change
@@ -1,41 +1,71 @@
use crate::fiber::serde_utils::EntityHex;
use crate::fiber::types::{Hash256, Pubkey};
use crate::fiber::{
config::AnnouncedNodeName,
graph::{NetworkGraph, NetworkGraphStateStore},
};
use ckb_types::packed::OutPoint;
use jsonrpsee::{core::async_trait, proc_macros::rpc, types::ErrorObjectOwned};
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use std::sync::Arc;
use tentacle::multiaddr::MultiAddr;
use tokio::sync::RwLock;

use crate::fiber::{
config::AnnouncedNodeName,
graph::{NetworkGraph, NetworkGraphStateStore},
};

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct GraphNodesParams {
pub limit: Option<usize>,
pub offset: Option<usize>,
}
pub struct GraphNodesParams {}

#[serde_as]
#[derive(Serialize, Deserialize, Clone)]
pub struct NodeInfo {
pub alias: AnnouncedNodeName,
pub addresses: Vec<MultiAddr>,
pub node_id: Pubkey,
pub timestamp: u64,
pub chain_hash: Hash256,
}

#[serde_as]
#[derive(Serialize, Deserialize, Clone)]
pub struct GraphNodesResult {
pub nodes: Vec<NodeInfo>,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct GraphChannelsParams {}

#[serde_as]
#[derive(Serialize, Deserialize, Clone)]
pub struct ChannelInfo {
#[serde_as(as = "EntityHex")]
pub channel_outpoint: OutPoint,
pub funding_tx_block_number: u64,
pub funidng_tx_index: u32,
pub node1: Pubkey,
pub node2: Pubkey,
pub last_updated_timestamp: Option<u64>,
pub created_timestamp: u64,
pub fee_rate: u64,
pub capacity: u128,
pub chain_hash: Hash256,
}

#[derive(Serialize, Deserialize, Clone)]
pub struct GraphChannelsResult {
pub channels: Vec<ChannelInfo>,
}

#[rpc(server)]
pub trait GraphRpc {
#[method(name = "nodes")]
async fn get_nodes(
#[method(name = "graph_nodes")]
async fn graph_nodes(
&self,
params: GraphNodesParams,
) -> Result<GraphNodesResult, ErrorObjectOwned>;

#[method(name = "graph_channels")]
async fn graph_channels(
&self,
params: GraphChannelsParams,
) -> Result<GraphChannelsResult, ErrorObjectOwned>;
}

pub struct GraphRpcServerImpl<S>
Expand Down Expand Up @@ -63,7 +93,7 @@ impl<S> GraphRpcServer for GraphRpcServerImpl<S>
where
S: NetworkGraphStateStore + Clone + Send + Sync + 'static,
{
async fn get_nodes(
async fn graph_nodes(
&self,
_params: GraphNodesParams,
) -> Result<GraphNodesResult, ErrorObjectOwned> {
Expand All @@ -73,8 +103,56 @@ where
.map(|node_info| NodeInfo {
alias: node_info.anouncement_msg.alias,
addresses: node_info.anouncement_msg.addresses.clone(),
node_id: node_info.node_id,
timestamp: node_info.timestamp,
chain_hash: node_info.anouncement_msg.chain_hash,
})
.collect();
Ok(GraphNodesResult { nodes })
}

async fn graph_channels(
&self,
_params: GraphChannelsParams,
) -> Result<GraphChannelsResult, ErrorObjectOwned> {
let network_graph = self.network_graph.read().await;
let chain_hash = network_graph.chain_hash();
let channels = network_graph
.channels()
.map(|channel_info| {
let mut res = vec![];
if let Some(channel_update) = &channel_info.one_to_two {
res.push(ChannelInfo {
channel_outpoint: channel_info.out_point(),
funding_tx_block_number: channel_info.funding_tx_block_number,
funidng_tx_index: channel_info.funding_tx_index,
node1: channel_info.node1(),
node2: channel_info.node2(),
capacity: channel_info.capacity(),
last_updated_timestamp: channel_info.channel_update_one_to_two_timestamp(),
created_timestamp: channel_info.timestamp,
fee_rate: channel_update.fee_rate,
chain_hash,
})
}
if let Some(channel_update) = &channel_info.two_to_one {
res.push(ChannelInfo {
channel_outpoint: channel_info.out_point(),
funding_tx_block_number: channel_info.funding_tx_block_number,
funidng_tx_index: channel_info.funding_tx_index,
node1: channel_info.node1(),
node2: channel_info.node2(),
capacity: channel_info.capacity(),
last_updated_timestamp: channel_info.channel_update_two_to_one_timestamp(),
created_timestamp: channel_info.timestamp,
fee_rate: channel_update.fee_rate,
chain_hash,
})
}
res.into_iter()
})
.flatten()
.collect::<Vec<_>>();
Ok(GraphChannelsResult { channels })
}
}
43 changes: 43 additions & 0 deletions tests/bruno/e2e/router-pay/16-node1-get-nodes.bru
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
meta {
name: Node1 send get_nodes rpc request
type: http
seq: 16
}

post {
url: {{NODE1_RPC_URL}}
body: json
auth: none
}

headers {
Content-Type: application/json
Accept: application/json
}

body:json {
{
"id": "42",
"jsonrpc": "2.0",
"method": "graph_nodes",
"params": [
{
}
]
}
}

assert {
res.body.error: isUndefined
}

script:pre-request {
// sleep for a while
await new Promise(r => setTimeout(r, 1000));
}

script:post-response {
// Sleep for sometime to make sure current operation finishes before next request starts.
await new Promise(r => setTimeout(r, 100));
console.log("get result: ", res.body.result);
}
43 changes: 43 additions & 0 deletions tests/bruno/e2e/router-pay/17-node1-get-channels.bru
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
meta {
name: Node1 send get_channels rpc request
type: http
seq: 17
}

post {
url: {{NODE1_RPC_URL}}
body: json
auth: none
}

headers {
Content-Type: application/json
Accept: application/json
}

body:json {
{
"id": "42",
"jsonrpc": "2.0",
"method": "graph_channels",
"params": [
{
}
]
}
}

assert {
res.body.error: isUndefined
}

script:pre-request {
// sleep for a while
await new Promise(r => setTimeout(r, 1000));
}

script:post-response {
// Sleep for sometime to make sure current operation finishes before next request starts.
await new Promise(r => setTimeout(r, 100));
console.log("get result: ", res.body.result);
}

0 comments on commit 97be399

Please sign in to comment.