Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API logger work #603

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 153 additions & 0 deletions backend/api_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import datetime
import os
import re
import httpx
import time
from _socket import gethostname
from typing import Dict, List, Optional

import pytz
from starlette.requests import Request

from backend.config import get_schema_name
from backend.db.utils import get_db_connection, insert_from_dict, run_sql
from backend.utils import dump


class Api_logger:
def __init__(self):
pass

async def start_rpt(self, request: Request, params: Dict):
self.start_time = time.time()
rpt = {}
url = request.url
api_call = url.components[2][1:] # string leading /
rpt['api_call'] = api_call

eastern = pytz.timezone('US/Eastern')
rpt['timestamp'] = datetime.datetime.now(eastern).isoformat()

rpt['host'] = os.getenv('HOSTENV', gethostname())

rpt['client'] = await client_location(request)

rpt['schema'] = get_schema_name()

rpt_params = {}
for k,v in params.items():
if type(v) == list:
if len(v) > 20:
# change any params with len > 20 to just log the len
rpt_params[k + '_len'] = len(v)
elif k == 'codeset_ids' or k == 'id':
# put codeset_ids in a separate column (is this going to be helpful?)
codeset_ids = v

if len(v) == 1 and type(codeset_ids[0]) == str:
codeset_ids = codeset_ids[0].split('|')

codeset_ids = [int(x) for x in codeset_ids]
rpt['codeset_ids'] = codeset_ids
else:
rpt_params[k] = v
else:
raise(Exception(f"don't know how to log {k}: {dump(v)}"))

# everything but codeset_ids just gets dumped into the rpt
params_list = []
for k,v in rpt_params.items():
params_list.append(f'{k}: {v}')

rpt['params'] = '; '.join(params_list)
self.rpt = rpt
with get_db_connection() as con:
insert_from_dict(con, 'public.api_runs', rpt, skip_if_already_exists=False)


async def finish(self, rows: List = []):
if rows:
self.rpt['result'] = f'{rows} rows'
else:
self.rpt['result'] = 'Success'

await self.complete_log_record()


async def complete_log_record(self):
end_time = time.time()
process_seconds = end_time - self.start_time
self.rpt['process_seconds'] = process_seconds

with get_db_connection() as con:
run_sql(con, """
UPDATE public.api_runs
SET process_seconds = :process_seconds, result = :result
WHERE timestamp = :timestamp""", self.rpt)
# using timestamp as a primary key. not the best practice, I know, but with microsecond granularity
# (e.g., 2023-10-31T13:32:23.934211), it seems like it should be safe


async def log_error(self, e):
self.rpt['result'] = f'Error: {e}'
await self.complete_log_record()

async def client_location(request: Request) -> str:
# rpt['client'] = request.client.host -- this gives a local (169.154) IP on azure
# chatgpt recommends:
forwarded_for: Optional[str] = request.headers.get('X-Forwarded-For')
if forwarded_for:
# The header can contain multiple IP addresses, so take the first one
ip = forwarded_for.split(',')[0]
else:
ip = request.client.host

ip = re.sub(':.*', '', ip)

ipstack_key = os.getenv('API_STACK_KEY', None)

if ip != '127.0.0.1' and ipstack_key:
"""
http://api.ipstack.com/134.201.250.155?access_key=7a6f9d6d72d68a1452b643eb58cd8ee7&format=1
{
"ip": "134.201.250.155",
"type": "ipv4",
"continent_code": "NA",
"continent_name": "North America",
"country_code": "US",
"country_name": "United States",
"region_code": "CA",
"region_name": "California",
"city": "San Fernando",
"zip": "91344",
"latitude": 34.293949127197266,
"longitude": -118.50763702392578,
"location": {
"geoname_id": 5391945,
"capital": "Washington D.C.",
"languages": [
{
"code": "en",
"name": "English",
"native": "English"
}
],
"country_flag": "https://assets.ipstack.com/flags/us.svg",
"country_flag_emoji": "🇺🇸",
"country_flag_emoji_unicode": "U+1F1FA U+1F1F8",
"calling_code": "1",
"is_eu": false
}
}
"""

loc_url = f"http://api.ipstack.com/{ip}?access_key={ipstack_key}"

async with httpx.AsyncClient() as client:
response = await client.get(loc_url)
if response and response.json:
loc_obj = response.json()
location = f"{ip}: {loc_obj['city']}, {loc_obj['region_name']}"
return location

return ip
87 changes: 10 additions & 77 deletions backend/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,18 @@
Resources
- https://github.com/tiangolo/fastapi
"""
import os
from pathlib import Path
from typing import List, Optional

import uvicorn
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
# from starlette.requests import Request
import time
import datetime
from socket import gethostname

from backend.routes import cset_crud, db, graph
from backend.db.config import override_schema, get_schema_name
from backend.db.utils import insert_from_dict, get_db_connection
from backend.config import override_schema

PROJECT_DIR = Path(os.path.dirname(__file__)).parent
# users on the same server
APP = FastAPI()
# APP = FastAPI()
APP = FastAPI(client_max_size=100_000_000) # trying this, but it shouldn't be necessary
APP.include_router(cset_crud.router)
# APP.include_router(oak.router)
APP.include_router(graph.router)
Expand All @@ -35,71 +27,16 @@
)
APP.add_middleware(GZipMiddleware, minimum_size=1000)

@APP.middleware("http")
async def set_schema_globally_and_log_calls(request: Request, call_next):
"""
This is middleware and will be EXECUTED ON EVERY API CALL
Its purpose is to log TermHub usage to help us prioritize performance improvements

Also, if a schema is provided, it will be used to override CONFIG['schema']
"""

url = request.url
query_params = request.query_params # Extracting query params as a dict

codeset_ids = query_params.getlist("codeset_ids")
if not codeset_ids:
print(f"No codeset_ids provided, not sure what monitoring to do, if any for {url}")
return await call_next(request)
if len(codeset_ids) == 1 and type(codeset_ids[0]) == str:
codeset_ids = codeset_ids[0].split('|')
codeset_ids = [int(x) for x in codeset_ids]

start_time = time.time()

rpt = {}

rpt['host'] = os.getenv('HOSTENV', gethostname())

# rpt['client'] = request.client.host -- this gives a local (169.154) IP on azure
# chatgpt recommends:
forwarded_for: Optional[str] = request.headers.get('X-Forwarded-For')
if forwarded_for:
# The header can contain multiple IP addresses, so take the first one
rpt['client'] = forwarded_for.split(',')[0]
else:
rpt['client'] = request.client.host
@APP.middleware("http")
async def set_schema_globally(request: Request, call_next):
print(request.url)

schema = query_params.get("schema")
schema = request.query_params.get("schema")
if schema:
override_schema(schema)

schema = get_schema_name()
rpt['schema'] = schema

api_call = url.components[2][1:] # string leading /
rpt['api_call'] = api_call


if api_call == 'concept-ids-by-codeset-id':
rpt['related_codeset_ids'] = len(codeset_ids)
else:
rpt['codeset_ids'] = codeset_ids

print(f"Request: {request.url} {request.method} {schema} {codeset_ids}")

response = await call_next(request) # Proceed with the request

end_time = time.time()
process_seconds = end_time - start_time

rpt['timestamp'] = datetime.datetime.now().isoformat()
rpt['process_seconds'] = process_seconds

with get_db_connection() as con:
insert_from_dict(con, 'public.api_runs', rpt, skip_if_already_exists=False)

response.headers["X-Process-Time"] = str(process_seconds)
response = await call_next(request)
return response


Expand All @@ -115,6 +52,7 @@ def read_root():
url_list = [{"path": route.path, "name": route.name} for route in APP.routes]
return url_list


# CACHE_FILE = "cache.pickle"
#
#
Expand Down Expand Up @@ -160,9 +98,4 @@ def read_root():


if __name__ == '__main__':
run()


def monitor_request(request: Request, codeset_ids: List[int]) -> None:

pass
run()
51 changes: 51 additions & 0 deletions backend/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import os
from dotenv import load_dotenv

APP_ROOT = os.path.dirname(os.path.realpath(__file__))
PROJECT_ROOT = os.path.realpath(os.path.join(APP_ROOT, '..'))
ENV_DIR = os.path.join(PROJECT_ROOT, 'env')
OUTPUT_DIR = os.path.join(PROJECT_ROOT, 'output')
ENV_FILE = os.path.join(ENV_DIR, '.env')
load_dotenv(ENV_FILE)

DB_DIR = os.path.dirname(os.path.realpath(__file__))
BACKEND_DIR = os.path.join(DB_DIR, '..')
DOCS_DIR = os.path.join(PROJECT_ROOT, 'docs')
TERMHUB_CSETS_PATH = os.path.join(PROJECT_ROOT, 'termhub-csets')
DATASETS_PATH = os.path.join(TERMHUB_CSETS_PATH, 'datasets', 'prepped_files')
OBJECTS_PATH = os.path.join(TERMHUB_CSETS_PATH, 'objects')
DDL_JINJA_PATH_PATTERN = os.path.join(DB_DIR, 'ddl-*.jinja.sql')

CONFIG = {
'server': os.getenv('TERMHUB_DB_SERVER'),
'driver': os.getenv('TERMHUB_DB_DRIVER'),
'host': os.getenv('TERMHUB_DB_HOST'),
'user': os.getenv('TERMHUB_DB_USER'),
'db': os.getenv('TERMHUB_DB_DB'),
'schema': os.getenv('TERMHUB_DB_SCHEMA'),
'pass': os.getenv('TERMHUB_DB_PASS'),
'port': os.getenv('TERMHUB_DB_PORT'),
'personal_access_token': os.getenv('GH_LIMITED_PERSONAL_ACCESS_TOKEN')
}
CONFIG_LOCAL = {
'server': os.getenv('TERMHUB_DB_SERVER_LOCAL'),
'driver': os.getenv('TERMHUB_DB_DRIVER_LOCAL'),
'host': os.getenv('TERMHUB_DB_HOST_LOCAL'),
'user': os.getenv('TERMHUB_DB_USER_LOCAL'),
'db': os.getenv('TERMHUB_DB_DB_LOCAL'),
'schema': os.getenv('TERMHUB_DB_SCHEMA_LOCAL'),
'pass': os.getenv('TERMHUB_DB_PASS_LOCAL'),
'port': os.getenv('TERMHUB_DB_PORT_LOCAL'),
'personal_access_token': os.getenv('GH_LIMITED_PERSONAL_ACCESS_TOKEN')
}


def override_schema(schema: str):
if CONFIG['schema']!= schema:
print(f'Overriding {CONFIG["schema"]} schema to {schema}')
else:
CONFIG['schema'] = schema


def get_schema_name():
return CONFIG['schema']
2 changes: 1 addition & 1 deletion backend/db/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
THIS_DIR = os.path.dirname(__file__)
PROJECT_ROOT = Path(THIS_DIR).parent.parent
sys.path.insert(0, str(PROJECT_ROOT))
from backend.db.config import DOCS_DIR
from backend.config import DOCS_DIR
from backend.db.initialize import SCHEMA
from backend.db.utils import get_db_connection, insert_from_dict, list_tables, sql_query

Expand Down
44 changes: 1 addition & 43 deletions backend/db/config.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,7 @@
"""Configuration for TermHub database"""
import os
from typing import Dict, List

from dotenv import load_dotenv

DB_DIR = os.path.dirname(os.path.realpath(__file__))
BACKEND_DIR = os.path.join(DB_DIR, '..')
PROJECT_ROOT = os.path.join(BACKEND_DIR, '..')
DOCS_DIR = os.path.join(PROJECT_ROOT, 'docs')
ENV_DIR = os.path.join(PROJECT_ROOT, 'env')
ENV_FILE = os.path.join(ENV_DIR, '.env')
TERMHUB_CSETS_PATH = os.path.join(PROJECT_ROOT, 'termhub-csets')
DATASETS_PATH = os.path.join(TERMHUB_CSETS_PATH, 'datasets', 'prepped_files')
OBJECTS_PATH = os.path.join(TERMHUB_CSETS_PATH, 'objects')
DDL_JINJA_PATH_PATTERN = os.path.join(DB_DIR, 'ddl-*.jinja.sql')
load_dotenv(ENV_FILE)
CONFIG = {
'server': os.getenv('TERMHUB_DB_SERVER'),
'driver': os.getenv('TERMHUB_DB_DRIVER'),
'host': os.getenv('TERMHUB_DB_HOST'),
'user': os.getenv('TERMHUB_DB_USER'),
'db': os.getenv('TERMHUB_DB_DB'),
'schema': os.getenv('TERMHUB_DB_SCHEMA'),
'pass': os.getenv('TERMHUB_DB_PASS'),
'port': os.getenv('TERMHUB_DB_PORT'),
'personal_access_token': os.getenv('GH_LIMITED_PERSONAL_ACCESS_TOKEN')
}
CONFIG_LOCAL = {
'server': os.getenv('TERMHUB_DB_SERVER_LOCAL'),
'driver': os.getenv('TERMHUB_DB_DRIVER_LOCAL'),
'host': os.getenv('TERMHUB_DB_HOST_LOCAL'),
'user': os.getenv('TERMHUB_DB_USER_LOCAL'),
'db': os.getenv('TERMHUB_DB_DB_LOCAL'),
'schema': os.getenv('TERMHUB_DB_SCHEMA_LOCAL'),
'pass': os.getenv('TERMHUB_DB_PASS_LOCAL'),
'port': os.getenv('TERMHUB_DB_PORT_LOCAL'),
'personal_access_token': os.getenv('GH_LIMITED_PERSONAL_ACCESS_TOKEN')
}

def override_schema(schema: str):
CONFIG['schema'] = schema


def get_schema_name():
return CONFIG['schema']
from backend.config import CONFIG, CONFIG_LOCAL


def invert_list_dict(d1: Dict[str, List[str]]) -> Dict[str, List[str]]:
Expand Down
Loading
Loading