Skip to content

Commit

Permalink
refactor: removed reqwest dependecy, httppullcl is now using attohttpc
Browse files Browse the repository at this point in the history
  • Loading branch information
gh0st42 committed Feb 28, 2024
1 parent 261df45 commit 3ea8852
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 201 deletions.
121 changes: 0 additions & 121 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion core/dtn7/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ num-derive = "0.4.2"
num-traits = "0.2.15"
thiserror = "1.0.31"
dtn7-codegen = { path = "../codegen", version = "0.1.2" }
reqwest = { version = "0.11.13", default-features = false}
sha1 = "0.10.5"
glob-match = "0.2.1"

Expand Down
156 changes: 77 additions & 79 deletions core/dtn7/src/cla/httppull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,101 +21,99 @@ pub struct HttpPullConvergenceLayer {
tx: mpsc::Sender<super::ClaCmd>,
}

/// pulls missing bundles from node
/// addr can be either an IP or a DNS name
async fn http_pull_from_node(
eid: EndpointID,
addr: IpAddr,
addr: String,
port: u16,
local_digest: String,
) -> TransferResult {
let now = std::time::Instant::now();
let mut transfers = 0;

let client = reqwest::Client::new();

debug!("pulling bundles from {} / {}", eid, addr);

// get digest of remote node
let response = client
.get(&format!("http://{}:{}/status/bundles/digest", addr, port))
.send()
.await;
let digest = match response {
Ok(digest) => digest.text().await.unwrap(),
Err(e) => {
error!("could not get digest from remote: {}", e);
return TransferResult::Failure;
}
};
if digest == local_digest {
debug!("no new bundles on remote");
return TransferResult::Successful;
} else {
debug!(
"remote ({}) has new bundles (remote: {} vs local: {})",
eid, digest, local_digest
);
}
// get list of bundles from remote node
let response = client
.get(&format!("http://{}:{}/status/bundles", addr, port))
.send()
.await;
let bid_list = match response {
Ok(bid_list) => bid_list.text().await.unwrap(),
Err(e) => {
error!("could not get bundle ID list from remote: {}", e);
return TransferResult::Failure;
}
};
let bids: Vec<String> = serde_json::from_str(&bid_list).unwrap();

// calculate missing bundles
let mut missing = Vec::new();

for bid in bids {
if !store_has_item(&bid) {
missing.push(bid);
}
}

// fetch missing bundles from remote node
for bid in missing {
transfers += 1;
let response = client
.get(&format!("http://{}:{}/download?{}", addr, port, bid))
.send()
.await;
let bundle_buf = match response {
Ok(bundle) => bundle.bytes().await.unwrap(),
tokio::task::spawn_blocking(move || {
// get digest of remote node
let response =
attohttpc::get(format!("http://{}:{}/status/bundles/digest", addr, port)).send();
let digest = match response {
Ok(digest) => digest.text().unwrap(),
Err(e) => {
error!("could not get bundle from remote: {}", e);
error!("could not get digest from remote: {}", e);
//bail!("could not get digest from remote: {}", e);
return TransferResult::Failure;
}
};
let bundle = match bp7::Bundle::try_from(bundle_buf.as_ref()) {
Ok(bundle) => bundle,
if digest == local_digest {
debug!("no new bundles on remote");
return TransferResult::Successful;
} else {
debug!(
"remote ({}) has new bundles (remote: {} vs local: {})",
eid, digest, local_digest
);
}
// get list of bundles from remote node
let response = attohttpc::get(format!("http://{}:{}/status/bundles", addr, port)).send();
let bid_list = match response {
Ok(bid_list) => bid_list.text().unwrap(),
Err(e) => {
warn!("could not parse bundle from remote: {}", e);
continue;
error!("could not get bundle ID list from remote: {}", e);
return TransferResult::Failure;
}
};
info!("Downloaded bundle: {} from {}", bundle.id(), addr);
{
tokio::spawn(async move {
if let Err(err) = crate::core::processing::receive(bundle).await {
error!("Failed to process bundle: {}", err);
let bids: Vec<String> = serde_json::from_str(&bid_list).unwrap();

// calculate missing bundles
let mut missing = Vec::new();

for bid in bids {
if !store_has_item(&bid) {
missing.push(bid);
}
}

// fetch missing bundles from remote node
for bid in missing {
transfers += 1;
let response =
attohttpc::get(format!("http://{}:{}/download?{}", addr, port, bid)).send();

let bundle_buf = match response {
Ok(bundle) => bundle.bytes().unwrap(),
Err(e) => {
error!("could not get bundle from remote: {}", e);
return TransferResult::Failure;
}
};
let bundle = match bp7::Bundle::try_from(bundle_buf.as_ref()) {
Ok(bundle) => bundle,
Err(e) => {
warn!("could not parse bundle from remote: {}", e);
continue;
}
});
};
info!("Downloaded bundle: {} from {}", bundle.id(), addr);
{
tokio::spawn(async move {
if let Err(err) = crate::core::processing::receive(bundle).await {
error!("Failed to process bundle: {}", err);
}
});
}
}
}
debug!(
"finished pulling {} bundles from {} / {} in {:?}",
transfers,
eid,
addr,
now.elapsed()
);
TransferResult::Successful
debug!(
"finished pulling {} bundles from {} / {} in {:?}",
transfers,
eid,
addr,
now.elapsed()
);
TransferResult::Successful
})
.await
.unwrap()
}
async fn http_pull_bundles() {
debug!("pulling bundles from peers");
Expand All @@ -138,10 +136,10 @@ async fn http_pull_bundles() {
}
if CONFIG.lock().parallel_bundle_processing {
tokio::spawn(async move {
http_pull_from_node(peer.eid, ipaddr, port, local_digest).await;
http_pull_from_node(peer.eid, ipaddr.to_string(), port, local_digest).await;
});
} else {
http_pull_from_node(peer.eid, ipaddr, port, local_digest).await;
http_pull_from_node(peer.eid, ipaddr.to_string(), port, local_digest).await;
}
}
}
Expand Down

0 comments on commit 3ea8852

Please sign in to comment.