Skip to content

Commit

Permalink
Merge pull request #3 from samply/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
davidmscholz committed Jul 29, 2024
2 parents 08553bc + d273026 commit f687f27
Show file tree
Hide file tree
Showing 14 changed files with 537 additions and 381 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@
.env
.vscode
Cargo.lock
.clj-kondo/
.lsp/
Dockerfile_test
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ dotenv = "0.15"
reqwest = { version = "0.11", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread"]}
tokio-postgres = "0.7.10"
diesel = { version = "2.2.0", features = ["postgres", "r2d2"] }
tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread", "full", "time", "signal"]}
sqlx = { version = "0.7", features = [ "runtime-tokio", "postgres"] }
anyhow = "1.0.58"
chrono = "0.4"

# Logging
tracing = "0.1.37"
Expand Down
35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
**FHIR2SQL**
=====================

A Rust-based application that synchronizes FHIR (Fast Healthcare Interoperability Resources) data with a PostgreSQL database.

**Overview**
------------

This application connects to a FHIR server, retrieves data, and syncs it with a PostgreSQL database. It uses the `sqlx` library for database interactions and `reqwest` for making HTTP requests to the FHIR server. The application is designed to run continuously, syncing data at regular intervals.

**Features**
------------

* Connects to a PostgreSQL database and creates necessary tables and triggers if they don't exist
* Retrieves FHIR data from a specified server and syncs it with the PostgreSQL database
* Supports regular syncing at a specified interval
* Handles errors and retries connections to the FHIR server and PostgreSQL database
* Supports graceful shutdown on SIGTERM and SIGINT signals

**Components**
--------------

* `main.rs`: The main application entry point
* `db_utils.rs`: Database utility functions for connecting to PostgreSQL and creating tables and triggers
* `models.rs`: Data models for FHIR resources and database interactions
* `graceful_shutdown.rs`: Functions for handling graceful shutdown on SIGTERM and SIGINT signals

**Getting Started**
-------------------

To use this application, you'll need to:

1. Install Rust and the required dependencies
2. Configure the application by setting environment variables for the FHIR server URL, PostgreSQL connection details, and syncing interval
3. Run the application using `cargo run`
6 changes: 0 additions & 6 deletions diesel.toml

This file was deleted.

Empty file removed migrations/.keep
Empty file.
6 changes: 0 additions & 6 deletions migrations/00000000000000_diesel_initial_setup/down.sql

This file was deleted.

36 changes: 0 additions & 36 deletions migrations/00000000000000_diesel_initial_setup/up.sql

This file was deleted.

4 changes: 0 additions & 4 deletions migrations/2024-06-18-075834_create_tables/down.sql

This file was deleted.

43 changes: 0 additions & 43 deletions migrations/2024-06-18-075834_create_tables/up.sql

This file was deleted.

144 changes: 144 additions & 0 deletions src/db_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
use sqlx::{postgres::PgPoolOptions, PgPool, Row};
use tracing::{error, info};
use anyhow::anyhow;
use reqwest::Client;

pub async fn get_pg_connection_pool(pg_url: &str, num_attempts: u32) -> Result<PgPool, anyhow::Error> {
info!("Trying to establish a PostgreSQL connection pool");

let mut attempts = 0;
let mut err: Option<anyhow::Error> = None;

while attempts < num_attempts {
info!("Attempt to connect to PostgreSQL {} of {}", attempts + 1, num_attempts);
match PgPoolOptions::new()
.max_connections(10)
.connect(&pg_url)
.await
{
Ok(pg_con_pool) => {
info!("PostgreSQL connection successfull");
return Ok(pg_con_pool)
},
Err(e) => {
error!("Failed to connect to PostgreSQL. Attempt {} of {}: {}", attempts + 1, num_attempts, e);
err = Some(anyhow!(e));
}
}
attempts += 1;
tokio::time::sleep(std::time::Duration::from_secs(5)).await; //@todo: move param somewhere else?
}
Err(err.unwrap_or_else(|| anyhow!("Failed to connect to PostgreSQL")))
}


pub async fn check_blaze_connection(blaze_base_url: &str, num_attempts: u32) -> Result<bool, anyhow::Error> {
info!("Attempting to connect to Blaze");

let mut attempts = 0;
let mut err: Option<anyhow::Error> = None;
let client = Client::new();

while attempts < num_attempts {
info!("Attempt to connect to Blaze {} of {}", attempts + 1, num_attempts);
match client.get(format!("{}/health", blaze_base_url)).send().await {
Ok(_) => {
info!("Blaze connection successfull");
return Ok(true)
},
Err(e) => {
error!("Failed to connect to Blaze. Attempt {} of {}: {}", attempts + 1, num_attempts, e);
err = Some(anyhow!(e));
}
}
attempts += 1;
tokio::time::sleep(std::time::Duration::from_secs(5)).await; //@todo: move param somewhere else?
}
Err(err.unwrap_or_else(|| anyhow!("Failed to connect to Blaze")))

}


//function that checks wheter a given list of required tables exist in pg
pub async fn pred_tables_exist(pg_con_pool: &PgPool, table_names: &Vec<&str>) -> Result<bool, anyhow::Error> {
info!("Checking whether PostgreSQL tables exist");

let table_query: &str = r#"select table_name from information_schema.tables;"#;

let rows = sqlx::query(table_query)
.fetch_all(pg_con_pool)
.await
.map_err(|err| {
error!("Failed to execute query: {}", err);
anyhow::Error::new(err)
})?;

let pg_table_names: Vec<String> = rows.into_iter().map(|row| row.get(0)).collect();
let all_tables_exist = table_names.iter().all(|table_name| pg_table_names.contains(&table_name.to_string()));


Ok(all_tables_exist)
}

//create necessary tables and triggers in pg if they don't exist yet
pub async fn create_tables(pg_con_pool: &PgPool) -> Result<(), anyhow::Error> {
info!("Creating PostgreSQL tables");

let create_tables_queries = vec![
"CREATE TABLE IF NOT EXISTS patient (
id SERIAL PRIMARY KEY,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
resource JSONB NOT NULL
)",
"CREATE TABLE IF NOT EXISTS specimen (
id SERIAL PRIMARY KEY,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
resource JSONB NOT NULL
)",
"CREATE TABLE IF NOT EXISTS condition (
id SERIAL PRIMARY KEY,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
resource JSONB NOT NULL
)",
"CREATE TABLE IF NOT EXISTS observation (
id SERIAL PRIMARY KEY,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
resource JSONB NOT NULL
)",
"CREATE OR REPLACE FUNCTION update_last_updated()
RETURNS TRIGGER AS $$
BEGIN
NEW.last_updated_at = CURRENT_TIMESTAMP;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;",
"CREATE TRIGGER update_last_updated_trigger
BEFORE UPDATE ON patient
FOR EACH ROW
EXECUTE PROCEDURE update_last_updated();",
"CREATE TRIGGER update_last_updated_trigger
BEFORE UPDATE ON specimen
FOR EACH ROW
EXECUTE PROCEDURE update_last_updated();",
"CREATE TRIGGER update_last_updated_trigger
BEFORE UPDATE ON condition
FOR EACH ROW
EXECUTE PROCEDURE update_last_updated();",
"CREATE TRIGGER update_last_updated_trigger
BEFORE UPDATE ON observation
FOR EACH ROW
EXECUTE PROCEDURE update_last_updated();",
];

for query in create_tables_queries {
sqlx::query(query)
.execute(pg_con_pool)
.await?;
}

Ok(())
}
16 changes: 16 additions & 0 deletions src/graceful_shutdown.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use tracing::info;

pub async fn wait_for_signal() {
use tokio::signal::unix::{signal,SignalKind};
let mut sigterm = signal(SignalKind::terminate())
.expect("Unable to register shutdown handler");
let mut sigint = signal(SignalKind::interrupt())
.expect("Unable to register shutdown handler");
let signal = tokio::select! {
_ = sigterm.recv() => "SIGTERM",
_ = sigint.recv() => "SIGINT"
};
// The following does not print in docker-compose setups but it does when run individually.
// Probably a docker-compose error.
info!("Received signal ({signal}) - shutting down gracefully.");
}
Loading

0 comments on commit f687f27

Please sign in to comment.