Skip to content

Commit

Permalink
Seed using store_writer from service
Browse files Browse the repository at this point in the history
  • Loading branch information
pka committed Nov 7, 2023
1 parent 15e7f00 commit f3c9814
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 52 deletions.
63 changes: 32 additions & 31 deletions bbox-tile-server/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use crate::config::TileStoreCfg;
use crate::service::{ServiceError, TileService};
use crate::store::{
files::FileStore, s3::S3Store, s3putfiles, BoxRead, CacheLayout, TileReader, TileStoreType,
TileWriter,
files::FileStore, s3::S3Store, s3putfiles, BoxRead, CacheLayout, TileStoreType, TileWriter,
};
use bbox_core::config::error_exit;
use clap::{Args, Parser};
Expand Down Expand Up @@ -184,6 +183,11 @@ impl TileService {
tms.xy_bbox()
};

// We setup different pipelines for certain scenarios.
// Examples:
// map service source -> tile store writer
// map service source -> temporary file writer -> s3 store writer

let s3_writer = if args.s3_path.is_some() {
Some(
S3Store::from_args(args, &format)
Expand All @@ -195,6 +199,18 @@ impl TileService {
} else {
None
};
let tmp_file_writer = if s3_writer.is_some() {
let file_dir = args
.base_dir
.as_ref()
.map(|d| PathBuf::from(&d))
.unwrap_or_else(|| TempDir::new().unwrap().into_path());
Some(FileStore::new(file_dir.clone(), format))
} else {
None
};

let tile_writer = tileset.store_writer.clone().unwrap();

// Keep a queue of tasks waiting for parallel async execution (size >= #cores).
let threads = args.threads.unwrap_or(num_cpus::get());
Expand All @@ -206,28 +222,10 @@ impl TileService {
let task_queue_size = writer_task_count + threads * 2;
let mut tasks = Vec::with_capacity(task_queue_size);

let file_dir = args
.base_dir
.as_ref()
.map(|d| PathBuf::from(&d))
.or_else(|| {
if let TileStoreCfg::Files(fc) = &cache_cfg {
Some(fc.base_dir.clone())
} else {
None
}
})
.unwrap_or_else(|| TempDir::new().unwrap().into_path());
let file_writer = FileStore::new(file_dir.clone(), format);

let (tx_s3, rx_s3) = async_channel::bounded(task_queue_size);

if let Some(s3_writer) = s3_writer {
// let file_dir = args
// .base_dir
// .as_ref()
// .map(|d| PathBuf::from(&d))
// .unwrap_or_else(|| TempDir::new().unwrap().into_path());
let file_dir = tmp_file_writer.clone().unwrap().base_dir;
info!(
"Writing tiles to {s3_writer:?} (temporary dir: {})",
file_dir.to_string_lossy()
Expand All @@ -243,15 +241,13 @@ impl TileService {
}));
}
debug!("{} S3 writer tasks spawned", tasks.len());
} else {
info!("Writing tiles to {}", file_dir.to_string_lossy());
}

// let (tx_cache, rx_cache) = async_channel::bounded(task_queue_size);
// for _ in 0..writer_task_count {
// let service = self.clone();
// let tileset = args.tileset.clone();
// let file_writer = file_writer.clone();
// let tile_writer = tile_writer.clone();
// let suffix = suffix.clone();
// let rx_cache = rx_cache.clone();
// tasks.push(task::spawn(async move {
Expand All @@ -260,7 +256,7 @@ impl TileService {
// let tile = service.read_tile(&tileset, &tile, &suffix).await.unwrap();
// let input: BoxRead = Box::new(tile.body);

// file_writer.put_tile(path, input).await.unwrap();
// tile_writer.put_tile(path, input).await.unwrap();
// // tx_s3.send(path.clone()).await.unwrap();
// }
// }));
Expand All @@ -276,23 +272,28 @@ impl TileService {
let path = CacheLayout::Zxy.path_string(&PathBuf::new(), &xyz, &format);
progress.set_message(path.clone());
progress.inc(1);
let cache_exists = file_writer.exists(&xyz).await;
let cache_exists = tile_writer.exists(&xyz).await;
if cache_exists && !overwrite {
continue;
}
cnt += 1;
// TODO: we should not clone for each tile, only for a pool of tasks
let service = self.clone();
let tileset = args.tileset.clone();
let file_writer = file_writer.clone();
let tmp_file_writer = tmp_file_writer.clone();
let tile_writer = tile_writer.clone();
let tx_s3 = tx_s3.clone();
tasks.push(task::spawn(async move {
let tile = service.read_tile(&tileset, &xyz, &format).await.unwrap();
let input: BoxRead = Box::new(tile.body);

file_writer.put_tile(&xyz, input).await.unwrap();
if writer_task_count > 0 {
tx_s3.send(xyz).await.unwrap();
if let Some(file_writer) = tmp_file_writer {
file_writer.put_tile(&xyz, input).await.unwrap();
if writer_task_count > 0 {
tx_s3.send(xyz).await.unwrap();
}
} else {
tile_writer.put_tile(&xyz, input).await.unwrap();
}
}));
if tasks.len() >= task_queue_size {
Expand All @@ -310,7 +311,7 @@ impl TileService {
futures_util::future::join_all(tasks).await;

// Remove temporary directories
if args.base_dir.is_none() {
if let Some(file_writer) = tmp_file_writer {
file_writer.remove_dir_all()?;
}

Expand Down
8 changes: 4 additions & 4 deletions bbox-tile-server/src/store/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ impl TileStoreType for FileStore {

#[async_trait]
impl TileWriter for FileStore {
async fn exists(&self, tile: &Xyz) -> bool {
let p = CacheLayout::Zxy.path(&self.base_dir, tile, &self.format);
p.exists()
}
async fn put_tile(&self, tile: &Xyz, mut input: BoxRead) -> Result<(), TileStoreError> {
let fullpath = CacheLayout::Zxy.path(&self.base_dir, tile, &self.format);
let p = fullpath.as_path();
Expand All @@ -61,10 +65,6 @@ impl TileWriter for FileStore {

#[async_trait]
impl TileReader for FileStore {
async fn exists(&self, tile: &Xyz) -> bool {
let p = CacheLayout::Zxy.path(&self.base_dir, tile, &self.format);
p.exists()
}
async fn get_tile(&self, tile: &Xyz) -> Result<Option<TileResponse>, TileStoreError> {
let p = CacheLayout::Zxy.path(&self.base_dir, tile, &self.format);
if let Ok(f) = File::open(p) {
Expand Down
19 changes: 11 additions & 8 deletions bbox-tile-server/src/store/mbtiles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ impl TileStoreType for MbtilesStore {

#[async_trait]
impl TileWriter for MbtilesStore {
async fn exists(&self, tile: &Xyz) -> bool {
match self
.mbt
.get_tile(tile.z, tile.x as u32, tile.y as u32)
.await
{
Ok(None) | Err(_) => false,
Ok(_) => true,
}
}
async fn put_tile(&self, tile: &Xyz, mut input: BoxRead) -> Result<(), TileStoreError> {
let mut bytes: Vec<u8> = Vec::new();
input.read_to_end(&mut bytes).ok(); // TODO: map_err
Expand All @@ -76,20 +86,13 @@ impl TileWriter for MbtilesStore {
.bind(bytes)
.execute(&mut *conn)
.await
.unwrap();
.unwrap(); // TODO
Ok(())
}
}

#[async_trait]
impl TileReader for MbtilesStore {
async fn exists(&self, tile: &Xyz) -> bool {
self.mbt
.get_tile(tile.z, tile.x as u32, tile.y as u32)
.await
.ok()
.is_some()
}
async fn get_tile(&self, tile: &Xyz) -> Result<Option<TileResponse>, TileStoreError> {
let resp = if let Some(content) = self
.mbt
Expand Down
10 changes: 5 additions & 5 deletions bbox-tile-server/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ pub trait TileStoreType {

#[async_trait]
pub trait TileWriter: DynClone + Send + Sync {
/// Check for tile in cache
async fn exists(&self, tile: &Xyz) -> bool;
async fn put_tile(&self, tile: &Xyz, input: BoxRead) -> Result<(), TileStoreError>;
}

clone_trait_object!(TileWriter);

#[async_trait]
pub trait TileReader: DynClone + Send + Sync {
/// Check for tile in cache
async fn exists(&self, tile: &Xyz) -> bool;
/// Lookup tile and return Read stream, if found
async fn get_tile(&self, tile: &Xyz) -> Result<Option<TileResponse>, TileStoreError>;
}
Expand Down Expand Up @@ -92,16 +92,16 @@ pub struct NoStore;

#[async_trait]
impl TileWriter for NoStore {
async fn exists(&self, _tile: &Xyz) -> bool {
false
}
async fn put_tile(&self, _tile: &Xyz, mut _input: BoxRead) -> Result<(), TileStoreError> {
Ok(())
}
}

#[async_trait]
impl TileReader for NoStore {
async fn exists(&self, _tile: &Xyz) -> bool {
false
}
async fn get_tile(&self, _tile: &Xyz) -> Result<Option<TileResponse>, TileStoreError> {
Ok(None)
}
Expand Down
8 changes: 4 additions & 4 deletions bbox-tile-server/src/store/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ impl TileStoreType for S3Store {

#[async_trait]
impl TileWriter for S3Store {
async fn exists(&self, _tile: &Xyz) -> bool {
// 2nd level cache lookup is not supported
false
}
async fn put_tile(&self, tile: &Xyz, input: BoxRead) -> Result<(), TileStoreError> {
let key = CacheLayout::Zxy.path_string(&PathBuf::new(), tile, &self.format);
self.put_data(key, input).await
Expand Down Expand Up @@ -123,10 +127,6 @@ impl S3Store {

#[async_trait]
impl TileReader for S3Store {
async fn exists(&self, _tile: &Xyz) -> bool {
// 2nd level cache lookup is not supported
false
}
async fn get_tile(&self, _tile: &Xyz) -> Result<Option<TileResponse>, TileStoreError> {
// 2nd level cache lookup is not supported
Ok(None)
Expand Down

0 comments on commit f3c9814

Please sign in to comment.