Skip to content

Commit

Permalink
Add PostGIS routing datasource
Browse files Browse the repository at this point in the history
  • Loading branch information
pka committed Aug 16, 2023
1 parent 7f35e56 commit 856bc53
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 40 deletions.
2 changes: 1 addition & 1 deletion bbox-core/src/pg_ds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl PgDatasource {
}
}

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct DsPostgisCfg {
pub url: String,
Expand Down
2 changes: 1 addition & 1 deletion bbox-routing-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ futures = { workspace = true }
geo = "0.19.0"
geo-types = "0.7.6"
geojson = "0.22.3"
geozero = { workspace = true, features = [ "with-gpkg" ] }
geozero = { workspace = true, features = [ "with-gpkg", "with-postgis-sqlx" ] }
log = { workspace = true }
polyline = "0.9.0"
rstar = "0.9.2"
Expand Down
14 changes: 14 additions & 0 deletions bbox-routing-server/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use bbox_core::config::from_config_opt_or_exit;
use bbox_core::pg_ds::DsPostgisCfg;
use serde::Deserialize;

#[derive(Deserialize, Default, Debug)]
Expand All @@ -13,8 +14,21 @@ pub struct RoutingServerCfg {
pub struct RoutingServiceCfg {
pub profile: Option<String>,
pub gpkg: String,
pub postgis: Option<DsPostgisCfg>,
/// Edge table
pub table: String,
/// Node/Vertices table
pub node_table: Option<String>,
/// Geometry column
pub geom: String,
/// Node ID column in node table
pub node_id: Option<String>,
/// Cost column (default: geodesic line length)
pub cost: Option<String>,
/// Column with source node ID
pub node_src: Option<String>,
/// Column with destination (target) node ID
pub node_dst: Option<String>,
}

impl RoutingServerCfg {
Expand Down
84 changes: 72 additions & 12 deletions bbox-routing-server/src/ds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,43 +2,52 @@ use crate::config::RoutingServiceCfg;
use crate::engine::NodeIndex;
use crate::error::Result;
use async_trait::async_trait;
use bbox_core::pg_ds::PgDatasource;
use fast_paths::InputGraph;
use futures::TryStreamExt;
use geo::prelude::GeodesicLength;
use geo::LineString;
use geo::{LineString, Point};
use geozero::wkb;
use log::info;
use sqlx::sqlite::SqliteConnection;
use sqlx::{Connection, Row};
use std::convert::TryFrom;

#[async_trait]
pub trait RouterDs {
fn cache_name(&self) -> String;
/// Create routing graph from GeoPackage line geometries
async fn load(&self) -> Result<(InputGraph, NodeIndex)>;
pub trait RouterDs: Send {
fn cache_name(&self) -> &str;
/// Load edges and nodes from datasource
async fn load(&self) -> Result<GraphData>;
}

pub async fn ds_from_config(config: &RoutingServiceCfg) -> Result<impl RouterDs> {
Ok(GpkgLinesDs(config.clone()))
pub type GraphData = (InputGraph, NodeIndex);

pub async fn ds_from_config(config: &RoutingServiceCfg) -> Result<Box<dyn RouterDs>> {
let ds = if config.postgis.is_some() {
Box::new(PgRouteTablesDs(config.clone())) as Box<dyn RouterDs>
} else {
Box::new(GpkgLinesDs(config.clone())) as Box<dyn RouterDs>
};
Ok(ds)
}

/// GPKG routing source
pub struct GpkgLinesDs(RoutingServiceCfg);

#[async_trait]
impl RouterDs for GpkgLinesDs {
fn cache_name(&self) -> String {
self.0.gpkg.clone()
fn cache_name(&self) -> &str {
&self.0.gpkg
}
/// Create routing graph from GeoPackage line geometries
async fn load(&self) -> Result<(InputGraph, NodeIndex)> {
/// Load from GeoPackage line geometries
async fn load(&self) -> Result<GraphData> {
info!("Reading routing graph from {}", self.0.gpkg);
let mut index = NodeIndex::new();
let mut input_graph = InputGraph::new();

let geom = self.0.geom.as_str();
let mut conn = SqliteConnection::connect(&format!("sqlite://{}", self.0.gpkg)).await?;
let sql = format!("SELECT {geom} FROM {}", self.0.table);
let sql = format!(r#"SELECT "{geom}" FROM "{}""#, self.0.table);
let mut rows = sqlx::query(&sql).fetch(&mut conn);

while let Some(row) = rows.try_next().await? {
Expand All @@ -57,3 +66,54 @@ impl RouterDs for GpkgLinesDs {
Ok((input_graph, index))
}
}

/// PostGIS routing source
pub struct PgRouteTablesDs(RoutingServiceCfg);

#[async_trait]
impl RouterDs for PgRouteTablesDs {
fn cache_name(&self) -> &str {
&self.0.table
}
/// Load from PostGIS routing tables
async fn load(&self) -> Result<GraphData> {
let url = &self.0.postgis.as_ref().unwrap().url;
let geom = self.0.geom.as_str();
let cost = self.0.cost.as_ref().unwrap();
let table = self.0.table.clone();
let node_table = self.0.node_table.as_ref().unwrap();
let node_id = self.0.node_id.as_ref().unwrap();
let node_src = self.0.node_src.as_ref().unwrap();
let node_dst = self.0.node_dst.as_ref().unwrap();

info!("Reading routing graph from {url}");
let mut index = NodeIndex::new();
let mut input_graph = InputGraph::new();
let db = PgDatasource::new_pool(url).await.unwrap();
let sql = format!(
r#"
SELECT e.{node_src} AS src, e.{node_dst} AS dst, e.{cost} AS cost,
nsrc."{geom}" AS geom_src, ndst."{geom}" AS geom_dst
FROM "{table}" e
JOIN "{node_table}" nsrc ON nsrc.{node_id} = e.{node_src}
JOIN "{node_table}" ndst ON ndst.{node_id} = e.{node_dst}
"#
);
let mut rows = sqlx::query(&sql).fetch(&db.pool);
while let Some(row) = rows.try_next().await? {
let src_id: i32 = row.try_get("src")?;
let dst_id: i32 = row.try_get("dst")?;
let weight: f64 = row.try_get("cost")?;
let wkb: wkb::Decode<geo::Geometry<f64>> = row.try_get("geom_src")?;
let geom = wkb.geometry.unwrap();
let src = Point::try_from(geom).unwrap();
let _ = index.insert(src.x(), src.y(), src_id as usize);
let wkb: wkb::Decode<geo::Geometry<f64>> = row.try_get("geom_dst")?;
let geom = wkb.geometry.unwrap();
let dst = Point::try_from(geom).unwrap();
let _ = index.insert(dst.x(), dst.y(), dst_id as usize);
input_graph.add_edge_bidir(src_id as usize, dst_id as usize, weight.ceil() as usize);
}
Ok((input_graph, index))
}
}
87 changes: 61 additions & 26 deletions bbox-routing-server/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use log::info;
use rstar::primitives::GeomWithData;
use rstar::RTree;
use serde_json::json;
use std::collections::HashMap;
use std::fs::File;
use std::io::{BufReader, BufWriter, Write};
use std::path::Path;
Expand All @@ -14,34 +15,42 @@ use std::path::Path;
#[derive(Clone)]
pub struct NodeIndex {
tree: RTree<Node>,
/// lookup by node id for route result output
nodes: NodeLookup,
/// node id generation
next_node_id: usize,
node_coords: Vec<(f64, f64)>,
}

type NodeLookup = HashMap<usize, (f64, f64)>;

/// Node coordinates and id
type Node = GeomWithData<[f64; 2], usize>;

impl NodeIndex {
pub fn new() -> Self {
NodeIndex {
tree: RTree::new(),
nodes: Default::default(),
next_node_id: 0,
node_coords: Vec::new(),
}
}
fn bulk_load(node_coords: Vec<(f64, f64)>) -> Self {
let nodes = node_coords
fn bulk_load(nodes: NodeLookup) -> Self {
let rtree_nodes = nodes
.iter()
.enumerate()
.map(|(id, (x, y))| Node::new([*x, *y], id))
.map(|(id, (x, y))| Node::new([*x, *y], *id))
.collect::<Vec<_>>();
let tree = RTree::bulk_load(nodes);
let tree = RTree::bulk_load(rtree_nodes);
let next_node_id = nodes.keys().max().unwrap_or(&0) + 1;
NodeIndex {
tree,
next_node_id: node_coords.len(),
node_coords,
nodes,
next_node_id,
}
}
/// Lookup node coordinates
pub fn get_coord(&self, id: usize) -> Option<&(f64, f64)> {
self.nodes.get(&id)
}
/// Find or insert node
pub fn entry(&mut self, x: f64, y: f64) -> usize {
let coord = [x, y];
Expand All @@ -50,11 +59,24 @@ impl NodeIndex {
} else {
let id = self.next_node_id;
self.tree.insert(Node::new(coord, id));
self.node_coords.push((x, y));
self.nodes.insert(id, (x, y));
self.next_node_id += 1;
id
}
}
/// Insert node with given id (returns true, if new node is inserted)
pub fn insert(&mut self, x: f64, y: f64, id: usize) -> bool {
if self.nodes.contains_key(&id) {
// or: self.tree.contains(&node)
false
} else {
let coord = [x, y];
let node = Node::new(coord, id);
self.tree.insert(node);
self.nodes.insert(id, (x, y));
true
}
}
/// Find nearest node within max distance
fn find(&self, x: f64, y: f64) -> Option<usize> {
let max = 0.01; // ~ 10km CH
Expand All @@ -76,12 +98,12 @@ pub struct Router {
impl Router {
pub async fn from_config(config: &RoutingServiceCfg) -> Result<Self> {
let ds = ds_from_config(config).await?;
let cache_name = ds.cache_name().clone();
let cache_name = ds.cache_name().to_string();
let router = if Router::cache_exists(&cache_name) {
info!("Reading routing graph from disk");
Router::from_disk(&cache_name)?
} else {
let router = Router::from_ds(&ds).await?;
let router = Router::from_ds(ds).await?;
info!("Saving routing graph");
router.save_to_disk(&cache_name).unwrap();
router
Expand All @@ -92,15 +114,15 @@ impl Router {

fn cache_exists(base_name: &str) -> bool {
// TODO: check if cache is up-to-date!
Path::new(&format!("{base_name}.coords.bin")).exists()
Path::new(&format!("{base_name}.nodes.bin")).exists()
}

fn from_disk(base_name: &str) -> Result<Self> {
let fname = format!("{base_name}.coords.bin");
let fname = format!("{base_name}.nodes.bin");
let reader = BufReader::new(File::open(fname)?);
let node_coords: Vec<(f64, f64)> = bincode::deserialize_from(reader).unwrap();
let nodes: NodeLookup = bincode::deserialize_from(reader).unwrap();

let index = NodeIndex::bulk_load(node_coords);
let index = NodeIndex::bulk_load(nodes);

let fname = format!("{base_name}.graph.bin");
let reader = BufReader::new(File::open(fname)?);
Expand All @@ -115,16 +137,17 @@ impl Router {
let writer = BufWriter::new(File::create(fname)?);
bincode::serialize_into(writer, &self.graph)?;

let fname = format!("{base_name}.coords.bin");
let fname = format!("{base_name}.nodes.bin");
let writer = BufWriter::new(File::create(fname)?);
bincode::serialize_into(writer, &self.index.node_coords)?;
bincode::serialize_into(writer, &self.index.nodes)?;

Ok(())
}

/// Create routing graph from GeoPackage line geometries
pub async fn from_ds(ds: &impl RouterDs) -> Result<Self> {
let (mut input_graph, index) = ds.load().await?;
pub async fn from_ds(ds: Box<dyn RouterDs>) -> Result<Self> {
let load = ds.load();
let (mut input_graph, index) = load.await?;

info!("Peparing routing graph");
input_graph.freeze();
Expand Down Expand Up @@ -169,7 +192,7 @@ impl Router {
pub fn path_to_geojson(&self, paths: Vec<ShortestPath>) -> serde_json::Value {
let features = paths.iter().map(|p| {
let coords = p.get_nodes().iter().map(|node_id| {
let (x, y) = self.index.node_coords[*node_id];
let (x, y) = self.index.get_coord(*node_id).unwrap();
json!([x, y])
}).collect::<Vec<_>>();
json!({"type": "Feature", "geometry": {"type": "LineString", "coordinates": coords}})
Expand All @@ -183,7 +206,7 @@ impl Router {
pub fn path_to_valhalla_json(&self, paths: Vec<ShortestPath>) -> serde_json::Value {
let coords = paths.iter().flat_map(|p| {
p.get_nodes().iter().map(|node_id| {
let (x, y) = self.index.node_coords[*node_id];
let (x, y) = *self.index.get_coord(*node_id).unwrap();
geo_types::Coord { x, y }
})
});
Expand All @@ -207,8 +230,8 @@ impl Router {
#[allow(dead_code)]
pub fn fast_graph_to_geojson(&self, out: &mut dyn Write) {
let features = self.graph.edges_fwd.iter().map(|edge| {
let (x1, y1) = self.index.node_coords[edge.base_node];
let (x2, y2) = self.index.node_coords[edge.adj_node];
let (x1, y1) = self.index.get_coord(edge.base_node).unwrap();
let (x2, y2) = self.index.get_coord(edge.adj_node).unwrap();
let weight = edge.weight;
format!(r#"{{"type": "Feature", "geometry": {{"type": "LineString", "coordinates": [[{x1}, {y1}],[{x2}, {y2}]] }}, "properties": {{"weight": {weight}}} }}"#)
}).collect::<Vec<_>>().join(",\n");
Expand Down Expand Up @@ -237,7 +260,7 @@ pub mod tests {
..Default::default()
};
let ds = ds_from_config(&cfg).await.unwrap();
Router::from_ds(&ds).await.unwrap()
Router::from_ds(ds).await.unwrap()
}

#[tokio::test]
Expand All @@ -256,10 +279,22 @@ pub mod tests {
let weight = p.get_weight();
let nodes = p.get_nodes();
dbg!(&weight, &nodes);
assert_eq!(nodes.len(), 3);
}
Err(e) => {
println!("{e}")
assert!(false, "{e}");
}
}
}

#[tokio::test]
async fn multi() {
let router = router("../assets/railway-test.gpkg", "flows", "geom").await;

let shortest_path = router.calc_path_multiple_sources_and_targets(
vec![(9.352133533333333, 47.09350116666666)],
vec![(9.3422712, 47.1011887)],
);
assert_eq!(shortest_path.unwrap().get_nodes().len(), 3);
}
}

0 comments on commit 856bc53

Please sign in to comment.