Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: store documents when ingesting a dataset #810

Merged
merged 1 commit into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions modules/fundamental/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ urlencoding = { workspace = true }
criterion = { workspace = true, features = ["html_reports", "async_tokio"] }
csaf = { workspace = true }
packageurl = { workspace = true }
walkdir = { workspace = true }
zip = { workspace = true }

[[bench]]
Expand Down
12 changes: 9 additions & 3 deletions modules/fundamental/src/advisory/endpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,17 +209,23 @@ pub async fn download(
return Ok(HttpResponse::NotFound().finish());
};

log::debug!("Found document - hashes: {:?}", advisory.head.hashes);

let stream = ingestor
.get_ref()
.storage()
.clone()
.retrieve(advisory.head.hashes.try_into()?)
.await
.map_err(Error::Storage)?
.map(|stream| stream.map_err(Error::Storage));

Ok(match stream {
Some(s) => HttpResponse::Ok().streaming(s),
None => HttpResponse::NotFound().finish(),
None => {
tracing::warn!(
uuid = ?advisory.head.uuid,
"Found the document, but not its content"
);
HttpResponse::NotFound().finish()
}
})
}
88 changes: 88 additions & 0 deletions modules/fundamental/tests/dataset.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#![allow(clippy::unwrap_used)]

use bytes::BytesMut;
use futures_util::StreamExt;
use std::{
io::{Cursor, Write},
time::Instant,
};
use test_context::test_context;
use test_log::test;
use tracing::instrument;
use trustify_common::id::Id;
use trustify_module_fundamental::sbom::service::SbomService;
use trustify_module_storage::service::StorageBackend;
use trustify_test_context::TrustifyContext;
use zip::write::FileOptions;

/// Test ingesting a dataset.
#[test_context(TrustifyContext, skip_teardown)]
#[test(tokio::test)]
#[instrument]
async fn ingest(ctx: TrustifyContext) -> anyhow::Result<()> {
let service = SbomService::new(ctx.db.clone());
let storage = &ctx.storage;

let start = Instant::now();

// create dataset ad-hoc

let base = ctx.absolute_path("../datasets/ds3")?;
let mut data = vec![];
let mut dataset = zip::write::ZipWriter::new(Cursor::new(&mut data));
for entry in walkdir::WalkDir::new(&base) {
let entry = entry?;
let Ok(path) = entry.path().strip_prefix(&base) else {
continue;
};

if entry.file_type().is_file() {
dataset.start_file_from_path(path, FileOptions::<()>::default())?;
dataset.write_all(&(std::fs::read(entry.path())?))?;
} else if entry.file_type().is_dir() {
dataset.add_directory_from_path(path, FileOptions::<()>::default())?;
}
}
dataset.finish()?;

// ingest

let result = ctx.ingestor.ingest_dataset(&data, ()).await?;

let ingest_time = start.elapsed();

// check ingest results

log::info!("ingest: {}", humantime::Duration::from(ingest_time));

assert!(result.warnings.is_empty());
assert_eq!(result.files.len(), 64);

// get a document

let sbom = &result.files["spdx/quarkus-bom-2.13.8.Final-redhat-00004.json.bz2"];
assert!(matches!(sbom.id, Id::Uuid(_)));

let sbom_summary = service.fetch_sbom_summary(sbom.id.clone(), ()).await?;
assert!(sbom_summary.is_some());
let sbom_summary = sbom_summary.unwrap();
assert_eq!(sbom_summary.head.name, "quarkus-bom");

// test source document

let stream = storage
.retrieve(sbom_summary.head.hashes.try_into()?)
.await?;
assert!(stream.is_some());
let mut stream = stream.unwrap();
let mut content = BytesMut::new();
while let Some(data) = stream.next().await {
content.extend(&data?);
}

assert_eq!(content.len(), 1174356);

// done

Ok(())
}
13 changes: 11 additions & 2 deletions modules/ingestor/src/service/dataset/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
model::IngestResult,
service::{Error, Format, Warnings},
};
use anyhow::anyhow;
use bytes::Bytes;
use sbom_walker::common::compression::decompress;
use std::{
Expand All @@ -13,17 +14,20 @@ use std::{
str::FromStr,
};
use tokio::runtime::Handle;
use tokio_util::io::ReaderStream;
use tracing::instrument;
use trustify_common::hashing::Digests;
use trustify_entity::labels::Labels;
use trustify_module_storage::{service::dispatch::DispatchBackend, service::StorageBackend};

pub struct DatasetLoader<'g> {
graph: &'g Graph,
storage: &'g DispatchBackend,
}

impl<'g> DatasetLoader<'g> {
pub fn new(graph: &'g Graph) -> Self {
Self { graph }
pub fn new(graph: &'g Graph, storage: &'g DispatchBackend) -> Self {
Self { graph, storage }
}

#[instrument(skip(self, buffer), ret)]
Expand Down Expand Up @@ -74,6 +78,11 @@ impl<'g> DatasetLoader<'g> {

let labels = labels.clone().add("datasetFile", &full_name);

self.storage
.store(ReaderStream::new(&*data))
.await
.map_err(|err| Error::Storage(anyhow!("{err}")))?;

// We need to box it, to work around async recursion limits
let result = Box::pin({
async move {
Expand Down
2 changes: 1 addition & 1 deletion modules/ingestor/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ impl IngestorService {
bytes: &[u8],
labels: impl Into<Labels> + Debug,
) -> Result<DatasetIngestResult, Error> {
let loader = DatasetLoader::new(self.graph());
let loader = DatasetLoader::new(self.graph(), self.storage());
loader.load(labels.into(), bytes).await
}
}
Expand Down
2 changes: 2 additions & 0 deletions modules/storage/src/service/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ impl StorageBackend for FileSystemBackend {
create_dir_all(&target).await?;
let target = target.join(hash);

log::debug!("Opening file: {}", target.display());

let file = match File::open(&target).await {
Ok(file) => Some(file),
Err(err) if err.kind() == ErrorKind::NotFound => None,
Expand Down
8 changes: 6 additions & 2 deletions test-context/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use peak_alloc::PeakAlloc;
use postgresql_embedded::PostgreSQL;
use std::env;
use std::io::{Read, Seek};
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use test_context::AsyncTestContext;
use tokio_util::bytes::Bytes;
use tokio_util::io::{ReaderStream, SyncIoBridge};
Expand Down Expand Up @@ -90,6 +90,10 @@ impl TrustifyContext {
.ingest(&bytes, Format::Unknown, ("source", "TrustifyContext"), None)
.await?)
}

pub fn absolute_path(&self, path: impl AsRef<Path>) -> anyhow::Result<PathBuf> {
absolute(path)
}
}

impl AsyncTestContext for TrustifyContext {
Expand Down Expand Up @@ -128,7 +132,7 @@ impl AsyncTestContext for TrustifyContext {
}
}

fn absolute(path: &str) -> Result<PathBuf, anyhow::Error> {
fn absolute(path: impl AsRef<Path>) -> Result<PathBuf, anyhow::Error> {
let workspace_root: PathBuf = env!("CARGO_WORKSPACE_ROOT").into();
let test_data = workspace_root.join("etc/test-data");
Ok(test_data.join(path))
Expand Down