From f3c98141018ed3a342a11a827f76c6989e5fb4ec Mon Sep 17 00:00:00 2001 From: Pirmin Kalberer Date: Tue, 7 Nov 2023 17:58:35 +0100 Subject: [PATCH] Seed using store_writer from service --- bbox-tile-server/src/cli.rs | 63 ++++++++++++++------------- bbox-tile-server/src/store/files.rs | 8 ++-- bbox-tile-server/src/store/mbtiles.rs | 19 ++++---- bbox-tile-server/src/store/mod.rs | 10 ++--- bbox-tile-server/src/store/s3.rs | 8 ++-- 5 files changed, 56 insertions(+), 52 deletions(-) diff --git a/bbox-tile-server/src/cli.rs b/bbox-tile-server/src/cli.rs index b21bba77..3bdb1a01 100644 --- a/bbox-tile-server/src/cli.rs +++ b/bbox-tile-server/src/cli.rs @@ -1,8 +1,7 @@ use crate::config::TileStoreCfg; use crate::service::{ServiceError, TileService}; use crate::store::{ - files::FileStore, s3::S3Store, s3putfiles, BoxRead, CacheLayout, TileReader, TileStoreType, - TileWriter, + files::FileStore, s3::S3Store, s3putfiles, BoxRead, CacheLayout, TileStoreType, TileWriter, }; use bbox_core::config::error_exit; use clap::{Args, Parser}; @@ -184,6 +183,11 @@ impl TileService { tms.xy_bbox() }; + // We setup different pipelines for certain scenarios. + // Examples: + // map service source -> tile store writer + // map service source -> temporary file writer -> s3 store writer + let s3_writer = if args.s3_path.is_some() { Some( S3Store::from_args(args, &format) @@ -195,6 +199,18 @@ impl TileService { } else { None }; + let tmp_file_writer = if s3_writer.is_some() { + let file_dir = args + .base_dir + .as_ref() + .map(|d| PathBuf::from(&d)) + .unwrap_or_else(|| TempDir::new().unwrap().into_path()); + Some(FileStore::new(file_dir.clone(), format)) + } else { + None + }; + + let tile_writer = tileset.store_writer.clone().unwrap(); // Keep a queue of tasks waiting for parallel async execution (size >= #cores). let threads = args.threads.unwrap_or(num_cpus::get()); @@ -206,28 +222,10 @@ impl TileService { let task_queue_size = writer_task_count + threads * 2; let mut tasks = Vec::with_capacity(task_queue_size); - let file_dir = args - .base_dir - .as_ref() - .map(|d| PathBuf::from(&d)) - .or_else(|| { - if let TileStoreCfg::Files(fc) = &cache_cfg { - Some(fc.base_dir.clone()) - } else { - None - } - }) - .unwrap_or_else(|| TempDir::new().unwrap().into_path()); - let file_writer = FileStore::new(file_dir.clone(), format); - let (tx_s3, rx_s3) = async_channel::bounded(task_queue_size); if let Some(s3_writer) = s3_writer { - // let file_dir = args - // .base_dir - // .as_ref() - // .map(|d| PathBuf::from(&d)) - // .unwrap_or_else(|| TempDir::new().unwrap().into_path()); + let file_dir = tmp_file_writer.clone().unwrap().base_dir; info!( "Writing tiles to {s3_writer:?} (temporary dir: {})", file_dir.to_string_lossy() @@ -243,15 +241,13 @@ impl TileService { })); } debug!("{} S3 writer tasks spawned", tasks.len()); - } else { - info!("Writing tiles to {}", file_dir.to_string_lossy()); } // let (tx_cache, rx_cache) = async_channel::bounded(task_queue_size); // for _ in 0..writer_task_count { // let service = self.clone(); // let tileset = args.tileset.clone(); - // let file_writer = file_writer.clone(); + // let tile_writer = tile_writer.clone(); // let suffix = suffix.clone(); // let rx_cache = rx_cache.clone(); // tasks.push(task::spawn(async move { @@ -260,7 +256,7 @@ impl TileService { // let tile = service.read_tile(&tileset, &tile, &suffix).await.unwrap(); // let input: BoxRead = Box::new(tile.body); - // file_writer.put_tile(path, input).await.unwrap(); + // tile_writer.put_tile(path, input).await.unwrap(); // // tx_s3.send(path.clone()).await.unwrap(); // } // })); @@ -276,7 +272,7 @@ impl TileService { let path = CacheLayout::Zxy.path_string(&PathBuf::new(), &xyz, &format); progress.set_message(path.clone()); progress.inc(1); - let cache_exists = file_writer.exists(&xyz).await; + let cache_exists = tile_writer.exists(&xyz).await; if cache_exists && !overwrite { continue; } @@ -284,15 +280,20 @@ impl TileService { // TODO: we should not clone for each tile, only for a pool of tasks let service = self.clone(); let tileset = args.tileset.clone(); - let file_writer = file_writer.clone(); + let tmp_file_writer = tmp_file_writer.clone(); + let tile_writer = tile_writer.clone(); let tx_s3 = tx_s3.clone(); tasks.push(task::spawn(async move { let tile = service.read_tile(&tileset, &xyz, &format).await.unwrap(); let input: BoxRead = Box::new(tile.body); - file_writer.put_tile(&xyz, input).await.unwrap(); - if writer_task_count > 0 { - tx_s3.send(xyz).await.unwrap(); + if let Some(file_writer) = tmp_file_writer { + file_writer.put_tile(&xyz, input).await.unwrap(); + if writer_task_count > 0 { + tx_s3.send(xyz).await.unwrap(); + } + } else { + tile_writer.put_tile(&xyz, input).await.unwrap(); } })); if tasks.len() >= task_queue_size { @@ -310,7 +311,7 @@ impl TileService { futures_util::future::join_all(tasks).await; // Remove temporary directories - if args.base_dir.is_none() { + if let Some(file_writer) = tmp_file_writer { file_writer.remove_dir_all()?; } diff --git a/bbox-tile-server/src/store/files.rs b/bbox-tile-server/src/store/files.rs index caac801b..2df6031c 100644 --- a/bbox-tile-server/src/store/files.rs +++ b/bbox-tile-server/src/store/files.rs @@ -44,6 +44,10 @@ impl TileStoreType for FileStore { #[async_trait] impl TileWriter for FileStore { + async fn exists(&self, tile: &Xyz) -> bool { + let p = CacheLayout::Zxy.path(&self.base_dir, tile, &self.format); + p.exists() + } async fn put_tile(&self, tile: &Xyz, mut input: BoxRead) -> Result<(), TileStoreError> { let fullpath = CacheLayout::Zxy.path(&self.base_dir, tile, &self.format); let p = fullpath.as_path(); @@ -61,10 +65,6 @@ impl TileWriter for FileStore { #[async_trait] impl TileReader for FileStore { - async fn exists(&self, tile: &Xyz) -> bool { - let p = CacheLayout::Zxy.path(&self.base_dir, tile, &self.format); - p.exists() - } async fn get_tile(&self, tile: &Xyz) -> Result, TileStoreError> { let p = CacheLayout::Zxy.path(&self.base_dir, tile, &self.format); if let Ok(f) = File::open(p) { diff --git a/bbox-tile-server/src/store/mbtiles.rs b/bbox-tile-server/src/store/mbtiles.rs index 9c2c23a5..b70636e8 100644 --- a/bbox-tile-server/src/store/mbtiles.rs +++ b/bbox-tile-server/src/store/mbtiles.rs @@ -60,6 +60,16 @@ impl TileStoreType for MbtilesStore { #[async_trait] impl TileWriter for MbtilesStore { + async fn exists(&self, tile: &Xyz) -> bool { + match self + .mbt + .get_tile(tile.z, tile.x as u32, tile.y as u32) + .await + { + Ok(None) | Err(_) => false, + Ok(_) => true, + } + } async fn put_tile(&self, tile: &Xyz, mut input: BoxRead) -> Result<(), TileStoreError> { let mut bytes: Vec = Vec::new(); input.read_to_end(&mut bytes).ok(); // TODO: map_err @@ -76,20 +86,13 @@ impl TileWriter for MbtilesStore { .bind(bytes) .execute(&mut *conn) .await - .unwrap(); + .unwrap(); // TODO Ok(()) } } #[async_trait] impl TileReader for MbtilesStore { - async fn exists(&self, tile: &Xyz) -> bool { - self.mbt - .get_tile(tile.z, tile.x as u32, tile.y as u32) - .await - .ok() - .is_some() - } async fn get_tile(&self, tile: &Xyz) -> Result, TileStoreError> { let resp = if let Some(content) = self .mbt diff --git a/bbox-tile-server/src/store/mod.rs b/bbox-tile-server/src/store/mod.rs index 5a532d02..3fa0bd00 100644 --- a/bbox-tile-server/src/store/mod.rs +++ b/bbox-tile-server/src/store/mod.rs @@ -45,6 +45,8 @@ pub trait TileStoreType { #[async_trait] pub trait TileWriter: DynClone + Send + Sync { + /// Check for tile in cache + async fn exists(&self, tile: &Xyz) -> bool; async fn put_tile(&self, tile: &Xyz, input: BoxRead) -> Result<(), TileStoreError>; } @@ -52,8 +54,6 @@ clone_trait_object!(TileWriter); #[async_trait] pub trait TileReader: DynClone + Send + Sync { - /// Check for tile in cache - async fn exists(&self, tile: &Xyz) -> bool; /// Lookup tile and return Read stream, if found async fn get_tile(&self, tile: &Xyz) -> Result, TileStoreError>; } @@ -92,6 +92,9 @@ pub struct NoStore; #[async_trait] impl TileWriter for NoStore { + async fn exists(&self, _tile: &Xyz) -> bool { + false + } async fn put_tile(&self, _tile: &Xyz, mut _input: BoxRead) -> Result<(), TileStoreError> { Ok(()) } @@ -99,9 +102,6 @@ impl TileWriter for NoStore { #[async_trait] impl TileReader for NoStore { - async fn exists(&self, _tile: &Xyz) -> bool { - false - } async fn get_tile(&self, _tile: &Xyz) -> Result, TileStoreError> { Ok(None) } diff --git a/bbox-tile-server/src/store/s3.rs b/bbox-tile-server/src/store/s3.rs index 15259f83..22aea9f7 100644 --- a/bbox-tile-server/src/store/s3.rs +++ b/bbox-tile-server/src/store/s3.rs @@ -75,6 +75,10 @@ impl TileStoreType for S3Store { #[async_trait] impl TileWriter for S3Store { + async fn exists(&self, _tile: &Xyz) -> bool { + // 2nd level cache lookup is not supported + false + } async fn put_tile(&self, tile: &Xyz, input: BoxRead) -> Result<(), TileStoreError> { let key = CacheLayout::Zxy.path_string(&PathBuf::new(), tile, &self.format); self.put_data(key, input).await @@ -123,10 +127,6 @@ impl S3Store { #[async_trait] impl TileReader for S3Store { - async fn exists(&self, _tile: &Xyz) -> bool { - // 2nd level cache lookup is not supported - false - } async fn get_tile(&self, _tile: &Xyz) -> Result, TileStoreError> { // 2nd level cache lookup is not supported Ok(None)