From c6ad385d6f7cc5480b537742af4f5ac7923eb4c0 Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Mon, 26 Aug 2024 17:50:30 +0200 Subject: [PATCH] feat: add progress support, backed by the database --- Cargo.lock | 176 ++++++------------ Cargo.toml | 10 +- entity/src/importer.rs | 3 + migration/src/lib.rs | 2 + migration/src/m0000570_add_import_progress.rs | 43 +++++ modules/importer/src/endpoints.rs | 1 + modules/importer/src/model/mod.rs | 139 +++++++++++--- .../src/runner/clearly_defined/walker.rs | 18 +- modules/importer/src/runner/context.rs | 2 +- modules/importer/src/runner/csaf/mod.rs | 9 +- modules/importer/src/server/context.rs | 51 ++++- modules/importer/src/server/mod.rs | 1 + modules/importer/src/server/progress.rs | 79 ++++++++ modules/importer/src/service.rs | 29 ++- modules/importer/src/test.rs | 2 + .../src/service/sbom/clearly_defined.rs | 10 +- openapi.yaml | 41 ++++ xtask/src/dataset.rs | 4 +- 18 files changed, 443 insertions(+), 177 deletions(-) create mode 100644 migration/src/m0000570_add_import_progress.rs create mode 100644 modules/importer/src/server/progress.rs diff --git a/Cargo.lock b/Cargo.lock index 773a90f0..b5b6a809 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -387,7 +387,6 @@ dependencies = [ "cfg-if", "getrandom", "once_cell", - "serde", "version_check", "zerocopy", ] @@ -914,6 +913,12 @@ dependencies = [ "generic-array 0.14.7", ] +[[package]] +name = "borrow-or-share" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3eeab4423108c5d7c744f4d234de88d18d636100093ae04caf4825134b9c3a32" + [[package]] name = "borsh" version = "1.5.1" @@ -1007,12 +1012,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "bytecount" -version = "0.6.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ce89b21cab1437276d2650d57e971f9d548a2d9037cc231abdc0562b97498ce" - [[package]] name = "byteorder" version = "1.5.0" @@ -1485,9 +1484,9 @@ dependencies = [ [[package]] name = "csaf-walker" -version = "0.8.11" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "185ccd5f4a9c890814c7c77e73f2276c6b0a4f8f7d6ad059c0edd0ada38c5c41" +checksum = "729ca0fefe4fb44730031106b1ebc21504002068f722e52ccf18f6edef585032" dependencies = [ "anyhow", "async-trait", @@ -1498,7 +1497,7 @@ dependencies = [ "csv", "digest", "filetime", - "fluent-uri", + "fluent-uri 0.2.0", "futures", "hickory-resolver", "html-escape", @@ -1593,18 +1592,17 @@ dependencies = [ [[package]] name = "cyclonedx-bom" -version = "0.6.2" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a114dd99ed051f1481d8d35acd455f77469f026b783fe08074763fdf1701506" +checksum = "fa4bc2b1186cf894ec0192863b5d6e6164de1a39ab66d2a281a4157b4ea45138" dependencies = [ "base64 0.21.7", "cyclonedx-bom-macros", - "fluent-uri", + "fluent-uri 0.1.4", "indexmap 2.3.0", - "jsonschema", "once_cell", "ordered-float 4.2.2", - "packageurl", + "purl", "regex", "serde", "serde_json", @@ -2055,16 +2053,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "fancy-regex" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b95f7c0680e4142284cf8b22c14a476e87d61b004a3a0861872b32ef7ead40a2" -dependencies = [ - "bit-set", - "regex", -] - [[package]] name = "fast_chemail" version = "0.9.6" @@ -2142,6 +2130,16 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "fluent-uri" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d77395429e0ce700a8378be6625660a4aa00ca5dc5cd1527193ebd0946cc9b3" +dependencies = [ + "borrow-or-share", + "ref-cast", +] + [[package]] name = "flume" version = "0.11.0" @@ -2183,16 +2181,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "fraction" -version = "0.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3027ae1df8d41b4bed2241c8fdad4acc1e7af60c8e17743534b545e77182d678" -dependencies = [ - "lazy_static", - "num", -] - [[package]] name = "funty" version = "2.0.0" @@ -3202,15 +3190,6 @@ version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" -[[package]] -name = "iso8601" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "924e5d73ea28f59011fec52a0d12185d496a9b075d360657aed2a5707f701153" -dependencies = [ - "nom", -] - [[package]] name = "itertools" version = "0.10.5" @@ -3294,34 +3273,6 @@ dependencies = [ "thiserror", ] -[[package]] -name = "jsonschema" -version = "0.17.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a071f4f7efc9a9118dfb627a0a94ef247986e1ab8606a4c806ae2b3aa3b6978" -dependencies = [ - "ahash 0.8.11", - "anyhow", - "base64 0.21.7", - "bytecount", - "fancy-regex", - "fraction", - "getrandom", - "iso8601", - "itoa", - "memchr", - "num-cmp", - "once_cell", - "parking_lot 0.12.3", - "percent-encoding", - "regex", - "serde", - "serde_json", - "time", - "url", - "uuid", -] - [[package]] name = "lalrpop" version = "0.20.2" @@ -3807,20 +3758,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "num" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35bd024e8b2ff75562e5f34e7f4905839deb4b22955ef5e73d2fea1b9813cb23" -dependencies = [ - "num-bigint", - "num-complex", - "num-integer", - "num-iter", - "num-rational", - "num-traits", -] - [[package]] name = "num-bigint" version = "0.4.6" @@ -3848,21 +3785,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "num-cmp" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63335b2e2c34fae2fb0aa2cecfd9f0832a1e24b3b32ecec612c3426d46dc8aaa" - -[[package]] -name = "num-complex" -version = "0.4.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73f88a1307638156682bada9d7604135552957b7818057dcef22705b4d509495" -dependencies = [ - "num-traits", -] - [[package]] name = "num-conv" version = "0.1.0" @@ -3899,17 +3821,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "num-rational" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f83d14da390562dca69fc84082e73e548e1ad308d24accdedd2720017cb37824" -dependencies = [ - "num-bigint", - "num-integer", - "num-traits", -] - [[package]] name = "num-traits" version = "0.2.19" @@ -4842,6 +4753,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "purl" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c14fe28c8495f7eaf77a6e6106966f95211c0a2404b9da50d248fc32af3a3f14" +dependencies = [ + "hex", + "percent-encoding", + "thiserror", +] + [[package]] name = "quick-error" version = "1.2.3" @@ -5009,6 +4931,26 @@ dependencies = [ "thiserror", ] +[[package]] +name = "ref-cast" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccf0a6f84d5f1d581da8b41b47ec8600871962f2a528115b542b362d4b744931" +dependencies = [ + "ref-cast-impl", +] + +[[package]] +name = "ref-cast-impl" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bcc303e793d3734489387d205e9b186fac9c6cfacedd98cbb2e8a5943595f3e6" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.72", +] + [[package]] name = "regex" version = "1.10.6" @@ -5588,9 +5530,9 @@ dependencies = [ [[package]] name = "sbom-walker" -version = "0.8.11" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fac8a4b6ebbcf8bcb90e924b5dcaff013b740270a1bba6e3bc0c855c026b979" +checksum = "d0f2fa9c1cd1452c16a54b01e1939b6a7617cc9228913aa5752c83bf0e3e8443" dependencies = [ "anyhow", "async-trait", @@ -5600,7 +5542,7 @@ dependencies = [ "cyclonedx-bom", "digest", "filetime", - "fluent-uri", + "fluent-uri 0.2.0", "futures", "http 1.1.0", "humantime", @@ -7957,9 +7899,9 @@ dependencies = [ [[package]] name = "walker-common" -version = "0.8.11" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e959ae72402644df7c1c6d393a294d6aff17dc830dc99a872d3f2ea0a4140fc" +checksum = "cf77a96b67a43ce73c7f678bda475cc231c8c4c5976f8afe2ab67c3877e236dc" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 791311ac..52fd68d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,9 +51,9 @@ clap = "4" concat-idents = "1" cpe = "0.1.3" csaf = { version = "0.5.0", default-features = false } -csaf-walker = { version = "0.8.0", default-features = false } +csaf-walker = { version = "0.9.0", default-features = false } cve = "0.3.1" -cyclonedx-bom = "0.6.1" +cyclonedx-bom = "0.7.0" env_logger = "0.11.0" futures = "0.3.30" futures-util = "0.3" @@ -94,7 +94,7 @@ reqwest = "0.12" ring = "0.17.8" rstest = "0.22" rust-s3 = "0.34" -sbom-walker = { version = "0.8.6", default-features = false, features = ["crypto-openssl", "cyclonedx-bom", "spdx-rs"] } +sbom-walker = { version = "0.9.0", default-features = false, features = ["crypto-openssl", "cyclonedx-bom", "spdx-rs"] } schemars = "0.8" sea-orm = "~1.0" # See https://www.sea-ql.org/blog/2024-08-04-sea-orm-1.0/#release-planning sea-orm-migration = "~1.0" @@ -130,8 +130,8 @@ utoipa-redoc = { version = "4.0.0", features = ["actix-web"] } utoipa-swagger-ui = "7.1.0" uuid = "1.7.0" walkdir = "2.5" -walker-common = "0.8.0" -walker-extras = "0.8.0" +walker-common = "0.9.0" +walker-extras = "0.9.0" trustify-auth = { path = "common/auth", features = ["actix", "swagger"] } trustify-common = { path = "common" } diff --git a/entity/src/importer.rs b/entity/src/importer.rs index aa509e31..6f6170a4 100644 --- a/entity/src/importer.rs +++ b/entity/src/importer.rs @@ -14,6 +14,9 @@ pub struct Model { pub last_run: Option, pub last_error: Option, + pub progress_current: Option, + pub progress_total: Option, + /// an importer specific continuation token pub continuation: Option, diff --git a/migration/src/lib.rs b/migration/src/lib.rs index 0c97d480..39282085 100644 --- a/migration/src/lib.rs +++ b/migration/src/lib.rs @@ -69,6 +69,7 @@ mod m0000545_create_purl_license_assertion; mod m0000550_create_cpe_license_assertion; mod m0000560_alter_vulnerability_cwe_column; mod m0000565_alter_advisory_vulnerability_cwe_column; +mod m0000570_add_import_progress; pub struct Migrator; @@ -144,6 +145,7 @@ impl MigratorTrait for Migrator { Box::new(m0000550_create_cpe_license_assertion::Migration), Box::new(m0000560_alter_vulnerability_cwe_column::Migration), Box::new(m0000565_alter_advisory_vulnerability_cwe_column::Migration), + Box::new(m0000570_add_import_progress::Migration), ] } } diff --git a/migration/src/m0000570_add_import_progress.rs b/migration/src/m0000570_add_import_progress.rs new file mode 100644 index 00000000..a8084647 --- /dev/null +++ b/migration/src/m0000570_add_import_progress.rs @@ -0,0 +1,43 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + for col in [Importer::ProgressCurrent, Importer::ProgressTotal] { + manager + .alter_table( + Table::alter() + .table(Importer::Table) + .add_column(ColumnDef::new(col).integer().null()) + .to_owned(), + ) + .await?; + } + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + for col in [Importer::ProgressCurrent, Importer::ProgressTotal] { + manager + .alter_table( + Table::alter() + .table(Importer::Table) + .drop_column(col) + .to_owned(), + ) + .await?; + } + + Ok(()) + } +} + +#[derive(DeriveIden)] +enum Importer { + Table, + ProgressCurrent, + ProgressTotal, +} diff --git a/modules/importer/src/endpoints.rs b/modules/importer/src/endpoints.rs index 5598cbe2..acdebf10 100644 --- a/modules/importer/src/endpoints.rs +++ b/modules/importer/src/endpoints.rs @@ -43,6 +43,7 @@ pub fn configure(svc: &mut web::ServiceConfig, db: Database) { crate::model::ImporterReport, crate::model::OsvImporter, crate::model::PaginatedImporterReport, + crate::model::Progress, crate::model::RevisionedImporter, crate::model::SbomImporter, crate::model::State, diff --git a/modules/importer/src/model/mod.rs b/modules/importer/src/model/mod.rs index d74840a9..89902db8 100644 --- a/modules/importer/src/model/mod.rs +++ b/modules/importer/src/model/mod.rs @@ -10,8 +10,10 @@ pub use cve::*; pub use osv::*; pub use sbom::*; -use std::ops::{Deref, DerefMut}; -use std::time::Duration; +use std::{ + ops::{Deref, DerefMut}, + time::Duration, +}; use time::OffsetDateTime; use trustify_common::{model::Revisioned, paginated, revisioned}; use trustify_entity::{ @@ -22,7 +24,7 @@ use trustify_entity::{ use url::Url; use utoipa::ToSchema; -#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize, ToSchema)] +#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize, ToSchema)] pub struct Importer { pub name: String, #[serde(flatten)] @@ -54,7 +56,7 @@ impl From for importer::State { } } -#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize, ToSchema)] +#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct ImporterData { pub configuration: ImporterConfiguration, @@ -80,11 +82,32 @@ pub struct ImporterData { #[serde(default, skip_serializing_if = "Option::is_none")] pub last_error: Option, + /// The current progress, if available. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub progress: Option, + /// The continuation token of the importer. #[serde(default, skip_serializing_if = "serde_json::Value::is_null")] pub continuation: serde_json::Value, } +#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize, ToSchema)] +pub struct Progress { + /// The current processed items. + pub current: u32, + /// The total number of items to be processed. + pub total: u32, + /// Progress in percent (0..=1) + pub percent: f32, + /// The average processing rate (per second). + pub rate: f32, + /// The estimated remaining time in seconds. + pub estimated_seconds_remaining: u64, + /// The estimated time of completion. + #[serde(with = "time::serde::rfc3339")] + pub estimated_completion: OffsetDateTime, +} + #[derive( Clone, Debug, @@ -176,6 +199,8 @@ impl TryFrom for Importer { last_success, last_run, last_error, + progress_current, + progress_total, continuation, revision: _, }: Model, @@ -188,6 +213,12 @@ impl TryFrom for Importer { last_success, last_run, last_error, + progress: into_progress( + last_change, + OffsetDateTime::now_utc(), + progress_current, + progress_total, + ), continuation: continuation.unwrap_or_default(), configuration: serde_json::from_value(configuration)?, }, @@ -200,33 +231,11 @@ revisioned!(Importer); impl TryFrom for RevisionedImporter { type Error = serde_json::Error; - fn try_from( - Model { - name, - configuration, - state, - last_change, - last_success, - last_run, - last_error, - continuation, - revision, - }: Model, - ) -> Result { + fn try_from(value: Model) -> Result { + let revision = value.revision.to_string(); Ok(Self(Revisioned { - value: Importer { - name, - data: ImporterData { - state: state.into(), - last_change, - last_success, - last_run, - last_error, - continuation: continuation.unwrap_or_default(), - configuration: serde_json::from_value(configuration)?, - }, - }, - revision: revision.to_string(), + value: value.try_into()?, + revision, })) } } @@ -265,3 +274,73 @@ impl From for ImporterReport { } } } + +/// Create the progress information from the progress state +fn into_progress( + start: OffsetDateTime, + now: OffsetDateTime, + current: Option, + total: Option, +) -> Option { + // elapsed time in seconds + let elapsed = (now - start).as_seconds_f32(); + + // current and total progress information + let current = current? as u32; + let total = total? as u32; + + if current > total || total == 0 { + return None; + } + + // calculate rate and ETA + let total_f = total as f32; + let rate = current as f32 / elapsed; + let remaining = (total - current) as f32; + let estimated_seconds_remaining = (remaining / rate) as u64; + + // return result + Some(Progress { + current, + total, + percent: current as f32 / total_f, + rate, + estimated_seconds_remaining, + estimated_completion: now + Duration::from_secs(estimated_seconds_remaining), + }) +} + +#[cfg(test)] +mod test { + use super::*; + use time::macros::datetime; + + #[test] + fn progress() { + let start = datetime!(2024-01-01 00:00:00 UTC); + let now = datetime!(2024-01-01 00:00:10 UTC); + assert_eq!( + into_progress(start, now, Some(15), Some(100)), + Some(Progress { + current: 15, + total: 100, + percent: 0.15, + rate: 1.5, + estimated_seconds_remaining: 56, + estimated_completion: datetime!(2024-01-01 00:01:06 UTC), + }) + ) + } + + #[test] + fn progress_none() { + let start = datetime!(2024-01-01 00:00:00 UTC); + let now = datetime!(2024-01-01 00:00:10 UTC); + assert_eq!(into_progress(start, now, None, None), None); + assert_eq!(into_progress(start, now, Some(1), None), None); + assert_eq!(into_progress(start, now, None, Some(1)), None); + + assert_eq!(into_progress(start, now, Some(10), Some(1)), None); + assert_eq!(into_progress(start, now, Some(0), Some(0)), None); + } +} diff --git a/modules/importer/src/runner/clearly_defined/walker.rs b/modules/importer/src/runner/clearly_defined/walker.rs index 9af73425..7c13a72b 100644 --- a/modules/importer/src/runner/clearly_defined/walker.rs +++ b/modules/importer/src/runner/clearly_defined/walker.rs @@ -1,13 +1,15 @@ -use crate::model::ClearlyDefinedPackageType; -use crate::runner::{ - common::{ - processing_error::ProcessingError, - walker::{ - CallbackError, Callbacks, Continuation, Error, GitWalker, Handler, HandlerError, - WorkingDirectory, +use crate::{ + model::ClearlyDefinedPackageType, + runner::{ + common::{ + processing_error::ProcessingError, + walker::{ + CallbackError, Callbacks, Continuation, Error, GitWalker, Handler, HandlerError, + WorkingDirectory, + }, }, + progress::Progress, }, - progress::Progress, }; use std::collections::HashSet; use std::io::Read; diff --git a/modules/importer/src/runner/context.rs b/modules/importer/src/runner/context.rs index bbf0355e..abe878e9 100644 --- a/modules/importer/src/runner/context.rs +++ b/modules/importer/src/runner/context.rs @@ -40,5 +40,5 @@ pub trait RunContext: Debug + Send { } } - fn progress(&self, #[allow(unused)] name: String) -> impl Progress + Send + 'static {} + fn progress(&self, #[allow(unused)] message: String) -> impl Progress + Send + 'static {} } diff --git a/modules/importer/src/runner/csaf/mod.rs b/modules/importer/src/runner/csaf/mod.rs index cba5619a..122d3ef0 100644 --- a/modules/importer/src/runner/csaf/mod.rs +++ b/modules/importer/src/runner/csaf/mod.rs @@ -1,6 +1,7 @@ mod report; pub mod storage; +use crate::server::context::WalkerProgress; use crate::{ model::CsafImporter, runner::{ @@ -33,6 +34,12 @@ impl super::ImportRunner { importer: CsafImporter, last_success: Option, ) -> Result { + // progress reporting + + let progress = context.progress(format!("Import CSF: {}", importer.source)); + + // report + let report = Arc::new(Mutex::new(ReportBuilder::new())); let fetcher = @@ -78,8 +85,8 @@ impl super::ImportRunner { // walker - // FIXME: track progress Walker::new(source) + .with_progress(WalkerProgress(progress)) .walk(filter) .await // if the walker fails, we record the outcome as part of the report, but skip any diff --git a/modules/importer/src/server/context.rs b/modules/importer/src/server/context.rs index 80400207..a361e320 100644 --- a/modules/importer/src/server/context.rs +++ b/modules/importer/src/server/context.rs @@ -1,8 +1,9 @@ use crate::{ runner::{ context::RunContext, - progress::{Progress, TracingProgress}, + progress::{Progress, ProgressInstance}, }, + server::progress::ServiceProgress, service::ImporterService, }; use std::{ @@ -18,6 +19,7 @@ pub struct ServiceRunContext { /// The name of the import job name: String, state: Mutex, + service: ImporterService, } impl ServiceRunContext { @@ -25,10 +27,11 @@ impl ServiceRunContext { Self { name: name.clone(), state: Mutex::new(CheckCancellation::new( - service, + service.clone(), name, Duration::from_secs(60), )), + service, } } } @@ -42,10 +45,10 @@ impl RunContext for ServiceRunContext { self.state.lock().await.check().await } - fn progress(&self, name: String) -> impl Progress + Send + 'static { - TracingProgress { - name, - period: Duration::from_secs(60), + fn progress(&self, _message: String) -> impl Progress + Send + 'static { + ServiceProgress { + name: self.name.clone(), + service: self.service.clone(), } } } @@ -96,3 +99,39 @@ impl CheckCancellation { .unwrap_or(true)) } } + +pub struct WalkerProgress

(pub P) +where + P: Progress; + +impl

walker_common::progress::Progress for WalkerProgress

+where + P: Progress, +{ + type Instance = WalkerProgressInstance

; + + fn start(&self, work: usize) -> Self::Instance { + WalkerProgressInstance(self.0.start(work)) + } +} + +pub struct WalkerProgressInstance

(P::Instance) +where + P: Progress; + +impl

walker_common::progress::ProgressBar for WalkerProgressInstance

+where + P: Progress, +{ + async fn increment(&mut self, work: usize) { + ProgressInstance::increment(&mut self.0, work).await; + } + + async fn finish(self) { + ProgressInstance::finish(self.0).await; + } + + async fn set_message(&mut self, _msg: String) { + // we don't support that + } +} diff --git a/modules/importer/src/server/mod.rs b/modules/importer/src/server/mod.rs index b3dd634e..73416267 100644 --- a/modules/importer/src/server/mod.rs +++ b/modules/importer/src/server/mod.rs @@ -1,4 +1,5 @@ pub mod context; +pub(crate) mod progress; use crate::{ model::{Importer, State}, diff --git a/modules/importer/src/server/progress.rs b/modules/importer/src/server/progress.rs new file mode 100644 index 00000000..44e8cfc3 --- /dev/null +++ b/modules/importer/src/server/progress.rs @@ -0,0 +1,79 @@ +use crate::{ + runner::progress::{Progress, ProgressInstance, TracingProgress, TracingProgressInstance}, + service::ImporterService, +}; +use std::time::{Duration, Instant}; + +/// [`Progress`] implementation for using the import service. +pub struct ServiceProgress { + pub name: String, + pub service: ImporterService, +} + +const FLUSH_PERIOD: Duration = Duration::from_secs(15); + +impl Progress for ServiceProgress { + type Instance = ServiceProgressInstance; + + fn start(&self, work: usize) -> Self::Instance { + ServiceProgressInstance { + name: self.name.clone(), + service: self.service.clone(), + current: 0, + total: work, + last_flush: Instant::now() - FLUSH_PERIOD, + tracing: TracingProgress { + name: self.name.clone(), + period: FLUSH_PERIOD, + } + .start(work), + } + } +} + +pub struct ServiceProgressInstance { + name: String, + service: ImporterService, + current: usize, + total: usize, + last_flush: Instant, + tracing: TracingProgressInstance, +} + +impl ServiceProgressInstance { + /// flush the state to the database + async fn flush(&self) { + let current = self.current.min(self.total); + + tracing::debug!( + importer = self.name, + current, + total = self.total, + "Updating progress" + ); + + let _ = self + .service + .update_progress(&self.name, None, current as u32, self.total as u32) + .await; + } +} + +impl ProgressInstance for ServiceProgressInstance { + async fn increment(&mut self, work: usize) { + self.tracing.increment(work).await; + + self.current += work; + if self.last_flush.elapsed() > FLUSH_PERIOD { + self.last_flush = Instant::now(); + self.flush().await; + } + } + + async fn finish(mut self) { + self.current = self.total; + self.flush().await; + + self.tracing.finish().await; + } +} diff --git a/modules/importer/src/service.rs b/modules/importer/src/service.rs index 641446cc..a867b217 100644 --- a/modules/importer/src/service.rs +++ b/modules/importer/src/service.rs @@ -4,7 +4,7 @@ use sea_orm::{ ActiveModelTrait, ActiveValue::Set, ColumnTrait, ConnectionTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder, TransactionTrait, }; -use sea_query::{Alias, Expr, SimpleExpr}; +use sea_query::{Alias, Expr, Nullable, SimpleExpr}; use time::OffsetDateTime; use tracing::instrument; use trustify_common::{ @@ -93,6 +93,9 @@ impl ImporterService { last_run: Set(None), last_error: Set(None), + progress_current: Set(None), + progress_total: Set(None), + continuation: Set(None), configuration: Set(serde_json::to_value(configuration)?), @@ -219,6 +222,8 @@ impl ImporterService { importer::Column::State, Expr::value(importer::State::Waiting), ), + (importer::Column::ProgressCurrent, Expr::value(i32::null())), + (importer::Column::ProgressTotal, Expr::value(i32::null())), (importer::Column::LastChange, Expr::value(now)), (importer::Column::Continuation, Expr::value(continuation)), ]; @@ -302,6 +307,8 @@ impl ImporterService { importer::Column::LastRun, Expr::value(None::), ), + (importer::Column::ProgressCurrent, Expr::value(i32::null())), + (importer::Column::ProgressTotal, Expr::value(i32::null())), ( importer::Column::Continuation, Expr::value(None::), @@ -311,6 +318,26 @@ impl ImporterService { .await } + #[instrument(skip(self))] + pub async fn update_progress( + &self, + name: &str, + expected_revision: Option<&str>, + current: u32, + total: u32, + ) -> Result<(), Error> { + self.update( + &self.db, + name, + expected_revision, + vec![ + (importer::Column::ProgressCurrent, Expr::value(current)), + (importer::Column::ProgressTotal, Expr::value(total)), + ], + ) + .await + } + #[instrument(skip(self))] pub async fn delete(&self, name: &str, expected_revision: Option<&str>) -> Result { let mut delete = importer::Entity::delete_many().filter(importer::Column::Name.eq(name)); diff --git a/modules/importer/src/test.rs b/modules/importer/src/test.rs index 4ab0ff73..49612bf0 100644 --- a/modules/importer/src/test.rs +++ b/modules/importer/src/test.rs @@ -41,6 +41,7 @@ fn mock_importer(result: &Importer, source: impl Into) -> Importer { last_success: None, last_error: None, last_run: None, + progress: None, continuation: serde_json::Value::Null, }, } @@ -87,6 +88,7 @@ async fn test_default(ctx: TrustifyContext) { last_success: None, last_run: None, last_error: None, + progress: None, continuation: serde_json::Value::Null, } }] diff --git a/modules/ingestor/src/service/sbom/clearly_defined.rs b/modules/ingestor/src/service/sbom/clearly_defined.rs index 3cdd3302..708a9e94 100644 --- a/modules/ingestor/src/service/sbom/clearly_defined.rs +++ b/modules/ingestor/src/service/sbom/clearly_defined.rs @@ -1,9 +1,7 @@ -use crate::graph::sbom::clearly_defined::Curation; -use crate::graph::Graph; -use crate::model::IngestResult; -use crate::service::Error; -use trustify_common::hashing::Digests; -use trustify_common::id::Id; +use crate::{ + graph::sbom::clearly_defined::Curation, graph::Graph, model::IngestResult, service::Error, +}; +use trustify_common::{hashing::Digests, id::Id}; use trustify_entity::labels::Labels; pub struct ClearlyDefinedLoader<'g> { diff --git a/openapi.yaml b/openapi.yaml index 75c4206f..f4778025 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -1826,6 +1826,10 @@ components: format: date-time description: The last successful run nullable: true + progress: + allOf: + - $ref: '#/components/schemas/Progress' + nullable: true state: $ref: '#/components/schemas/State' ImporterReport: @@ -2247,6 +2251,43 @@ components: nullable: true version: type: string + Progress: + type: object + required: + - current + - total + - percent + - rate + - estimated_seconds_remaining + - estimated_completion + properties: + current: + type: integer + format: int32 + description: The current processed items. + minimum: 0 + estimated_completion: + type: string + format: date-time + description: The estimated time of completion. + estimated_seconds_remaining: + type: integer + format: int64 + description: The estimated remaining time in seconds. + minimum: 0 + percent: + type: number + format: float + description: Progress in percent (0..=1) + rate: + type: number + format: float + description: The average processing rate (per second). + total: + type: integer + format: int32 + description: The total number of items to be processed. + minimum: 0 Purl: type: string format: uri diff --git a/xtask/src/dataset.rs b/xtask/src/dataset.rs index 9c6280ab..1745ef04 100644 --- a/xtask/src/dataset.rs +++ b/xtask/src/dataset.rs @@ -193,9 +193,9 @@ impl RunContext for Context { false } - fn progress(&self, name: String) -> impl Progress + Send + 'static { + fn progress(&self, message: String) -> impl Progress + Send + 'static { TracingProgress { - name: format!("{}: {name}", self.name), + name: format!("{}: {message}", self.name), period: Duration::from_secs(15), } }