Skip to content

Commit

Permalink
Add PMTiles store and tile datasource
Browse files Browse the repository at this point in the history
Seeding is not working, because writer is cloned for each tile.
  • Loading branch information
pka committed Nov 8, 2023
1 parent 77f287e commit f09dacb
Show file tree
Hide file tree
Showing 10 changed files with 657 additions and 17 deletions.
389 changes: 379 additions & 10 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions bbox-tile-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ martin-mbtiles = { package = "mbtiles", version = "0.7.2", default-features = fa
martin-tile-utils = "0.1.3"
num_cpus = { workspace = true }
once_cell = { workspace = true }
pmtiles = { version = "0.3.1", features = ["mmap-async-tokio"] }
pmtiles2 = { version = "0.2.2", default-features = false }
prometheus = { workspace = true }
reqwest = { workspace = true }
rusoto_core = { version = "0.48.0", default-features = false, features = ["rustls"] }
Expand Down
8 changes: 5 additions & 3 deletions bbox-tile-server/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ impl TileService {
None
};

let tile_writer = tileset.store_writer.clone().unwrap();
let mut tile_writer = tileset.store_writer.clone().unwrap();

// Keep a queue of tasks waiting for parallel async execution (size >= #cores).
let threads = args.threads.unwrap_or(num_cpus::get());
Expand Down Expand Up @@ -281,7 +281,7 @@ impl TileService {
let service = self.clone();
let tileset = args.tileset.clone();
let tmp_file_writer = tmp_file_writer.clone();
let tile_writer = tile_writer.clone();
let mut tile_writer = tile_writer.clone();
let tx_s3 = tx_s3.clone();
tasks.push(task::spawn(async move {
let tile = service.read_tile(&tileset, &xyz, &format).await.unwrap();
Expand All @@ -293,7 +293,7 @@ impl TileService {
tx_s3.send(xyz).await.unwrap();
}
} else {
tile_writer.put_tile(&xyz, input).await.unwrap();
tile_writer.put_tile_mut(&xyz, input).await.unwrap();
}
}));
if tasks.len() >= task_queue_size {
Expand All @@ -310,6 +310,8 @@ impl TileService {
// Wait for remaining writer tasks
futures_util::future::join_all(tasks).await;

tile_writer.finalize()?;

// Remove temporary directories
if let Some(file_writer) = tmp_file_writer {
file_writer.remove_dir_all()?;
Expand Down
9 changes: 9 additions & 0 deletions bbox-tile-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ pub enum SourceParamCfg {
Postgis(PostgisSourceParamsCfg),
#[serde(rename = "mbtiles")]
Mbtiles(MbtilesStoreCfg),
#[serde(rename = "pmtiles")]
Pmtiles(PmtilesStoreCfg),
}

#[derive(Deserialize, Clone, Debug)]
Expand Down Expand Up @@ -195,6 +197,7 @@ pub enum TileStoreCfg {
Files(FileStoreCfg),
S3(S3StoreCfg),
Mbtiles(MbtilesStoreCfg),
Pmtiles(PmtilesStoreCfg),
}

#[derive(Deserialize, Clone, Debug)]
Expand All @@ -219,6 +222,12 @@ pub struct MbtilesStoreCfg {
pub path: PathBuf,
}

#[derive(Deserialize, Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct PmtilesStoreCfg {
pub path: PathBuf,
}

impl TileserverCfg {
pub fn from_config(cli: &ArgMatches) -> Self {
let mut cfg: TileserverCfg = from_config_root_or_exit();
Expand Down
9 changes: 9 additions & 0 deletions bbox-tile-server/src/datasource/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod mbtiles;
mod mvt;
pub mod pmtiles;
pub mod postgis;
mod postgis_queries;
#[cfg(feature = "map-server")]
Expand All @@ -9,6 +10,7 @@ pub mod wms_http;
use crate::config::{SourceParamCfg, TileSetCfg};
use crate::service::TileService;
use crate::store::mbtiles::MbtilesStore;
use crate::store::pmtiles::PmtilesStoreReader;
use async_trait::async_trait;
use bbox_core::config::{error_exit, DatasourceCfg, NamedDatasourceCfg};
use bbox_core::endpoints::TileResponse;
Expand Down Expand Up @@ -47,6 +49,8 @@ pub enum TileSourceError {
WmsHttpError(#[from] reqwest::Error),
#[error(transparent)]
MbtilesError(#[from] martin_mbtiles::MbtError),
#[error(transparent)]
PmtilesError(#[from] ::pmtiles::error::Error),
}

#[derive(PartialEq, Clone, Debug)]
Expand Down Expand Up @@ -268,6 +272,11 @@ impl Datasources {
.await
.unwrap_or_else(error_exit),
),
SourceParamCfg::Pmtiles(cfg) => Box::new(
PmtilesStoreReader::from_config(cfg)
.await
.unwrap_or_else(error_exit),
),
}
}
}
Expand Down
54 changes: 54 additions & 0 deletions bbox-tile-server/src/datasource/pmtiles.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use crate::datasource::{
wms_fcgi::HttpRequestParams, LayerInfo, SourceType, TileRead, TileResponse, TileSourceError,
};
use crate::service::TileService;
use crate::store::pmtiles::PmtilesStoreReader;
use crate::store::TileReader;
use async_trait::async_trait;
use bbox_core::Format;
use log::info;
use tile_grid::Xyz;
use tilejson::tilejson;
use tilejson::TileJSON;

#[async_trait]
impl TileRead for PmtilesStoreReader {
async fn xyz_request(
&self,
_service: &TileService,
_tms_id: &str,
tile: &Xyz,
_format: &Format,
_request_params: HttpRequestParams<'_>,
) -> Result<TileResponse, TileSourceError> {
if let Some(tile) = self
.get_tile(tile)
.await
.map_err(|_| TileSourceError::TileXyzError)?
{
Ok(tile)
} else {
Err(TileSourceError::TileXyzError) // TODO: check for empty tile?
}
}
fn source_type(&self) -> SourceType {
SourceType::Vector //TODO
}
async fn tilejson(&self, format: &Format) -> Result<TileJSON, TileSourceError> {
info!(
"Metadata {}: {}",
self.path.display(),
self.get_metadata().await?
);
let mut tj = tilejson! { tiles: vec![] };
tj.other
.insert("format".to_string(), format.file_suffix().into());
Ok(tj)
}
async fn layers(&self) -> Result<Vec<LayerInfo>, TileSourceError> {
Ok(vec![LayerInfo {
name: self.path.to_string_lossy().to_string(), // TODO: file name only
geometry_type: None,
}])
}
}
6 changes: 3 additions & 3 deletions bbox-tile-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,6 @@ impl TileService {
pub fn tileset(&self, tileset: &str) -> Option<&TileSet> {
self.tilesets.get(tileset)
}
#[allow(dead_code)]
pub fn source(&self, tileset: &str) -> Option<&dyn TileRead> {
self.tilesets.source(tileset)
}
Expand Down Expand Up @@ -330,6 +329,7 @@ impl TileService {
}
}
// Request tile and write into cache
// let tms = tileset.tms.clone();
let mut tiledata = tileset
.source
.xyz_request(self, &tileset.tms, tile, format, request_params)
Expand All @@ -353,7 +353,7 @@ impl TileService {
Ok(Some(tiledata))
}
}
/// TileJSON layer metadata (https://github.com/mapbox/tilejson-spec)
/// TileJSON layer metadata (<https://github.com/mapbox/tilejson-spec>)
pub async fn tilejson(&self, tileset: &str, base_url: &str) -> Result<TileJSON, ServiceError> {
let ts = self
.tileset(tileset)
Expand Down Expand Up @@ -381,7 +381,7 @@ impl TileService {
Ok(ts.source.mbtiles_metadata(&ts.config, &ts.format).await?)
}

/// Autogenerated Style JSON (https://www.mapbox.com/mapbox-gl-style-spec/)
/// Autogenerated Style JSON (<https://www.mapbox.com/mapbox-gl-style-spec/>)
pub async fn stylejson(
&self,
tileset: &str,
Expand Down
31 changes: 30 additions & 1 deletion bbox-tile-server/src/store/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod files;
pub mod mbtiles;
pub mod pmtiles;
pub mod s3;
pub mod s3putfiles;

Expand All @@ -8,12 +9,14 @@ use crate::config::TileStoreCfg;
use crate::mbtiles_ds::Error as MbtilesDsError;
use crate::store::files::FileStore;
use crate::store::mbtiles::MbtilesStore;
use crate::store::pmtiles::{PmtilesStoreReader, PmtilesStoreWriter};
use crate::store::s3::{S3Store, S3StoreError};
use async_trait::async_trait;
use bbox_core::config::error_exit;
use bbox_core::endpoints::TileResponse;
use bbox_core::Format;
use dyn_clone::{clone_trait_object, DynClone};
use log::warn;
use martin_mbtiles::MbtError;
use martin_mbtiles::Metadata;
use std::io::Read;
Expand All @@ -28,12 +31,16 @@ pub enum TileStoreError {
FileError(PathBuf, #[source] std::io::Error),
#[error("Missing argument: {0}")]
ArgMissing(String),
#[error("Operation not supported on readonly data store")]
ReadOnly,
#[error(transparent)]
S3StoreError(#[from] S3StoreError),
#[error(transparent)]
MbtilesDsError(#[from] MbtilesDsError),
#[error(transparent)]
MbtError(#[from] MbtError),
#[error(transparent)]
PmtilesError(#[from] ::pmtiles::error::Error),
}

#[async_trait]
Expand All @@ -45,9 +52,21 @@ pub trait TileStoreType {

#[async_trait]
pub trait TileWriter: DynClone + Send + Sync {
/// Check for tile in cache
/// Check for existing tile
/// Must not be implemented for cases where generating a tile is less expensive than checking
// Method should probably return date of last change if known
async fn exists(&self, tile: &Xyz) -> bool;
/// Write tile into store
async fn put_tile(&self, tile: &Xyz, input: BoxRead) -> Result<(), TileStoreError>;
/// Write tile into store requiring &mut self
async fn put_tile_mut(&mut self, tile: &Xyz, input: BoxRead) -> Result<(), TileStoreError> {
// Most implementations support writing without &mut self
self.put_tile(tile, input).await
}
/// Finalize writing
fn finalize(&mut self) -> Result<(), TileStoreError> {
Ok(())
}
}

clone_trait_object!(TileWriter);
Expand Down Expand Up @@ -122,6 +141,15 @@ pub async fn store_reader_from_config(
.await
.unwrap_or_else(error_exit),
),
TileStoreCfg::Pmtiles(cfg) => {
if let Ok(reader) = PmtilesStoreReader::from_config(cfg).await {
Box::new(reader)
} else {
// We continue, because for seeding into a new file, the reader cannot be created and is not needed
warn!("Couldn't open PmtilesStoreReader {}", cfg.path.display());
Box::new(NoStore)
}
}
}
}

Expand All @@ -141,5 +169,6 @@ pub async fn store_writer_from_config(
.await
.unwrap_or_else(error_exit),
),
TileStoreCfg::Pmtiles(cfg) => Box::new(PmtilesStoreWriter::from_config(cfg, format)),
}
}
Loading

0 comments on commit f09dacb

Please sign in to comment.