Skip to content

Commit

Permalink
Task/store snowmasked hfi rasters (#3809)
Browse files Browse the repository at this point in the history
Store the intermediate raster that's snow masked, not just the ultimate pmtiles vector. This allows us to simplify some of the TPI elevation processing in #3696 

Note: formatting changes caused by ruff on save
  • Loading branch information
conbrad committed Jul 29, 2024
1 parent d8238ac commit 1f7854e
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 87 deletions.
5 changes: 4 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,22 @@
"GDPS",
"GEOGCS",
"geospatial",
"geotiff",
"grib",
"gribs",
"hourlies",
"HRDPS",
"luxon",
"maxx",
"maxy",
"miny",
"luxon",
"ndarray",
"numba",
"osgeo",
"PMTILES",
"polygonize",
"Polygonized",
"polygonizing",
"PRECIP",
"PRIMEM",
"PROJCS",
Expand Down
55 changes: 55 additions & 0 deletions api/app/auto_spatial_advisory/hfi_filepath.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import os
from app.auto_spatial_advisory.run_type import RunType
from datetime import date


def get_pmtiles_filepath(run_date: date, run_type: RunType, filename: str) -> str:
"""
Get the file path for both reading and writing the pmtiles from/to the object store.
Example: {bucket}/psu/pmtiles/hfi/actual/[issue/run_date]/hfi[for_date].pmtiles
:param run_date: The date of the run to process. (when was the hfi file created?)
:param run_type: forecast or actual
:param filename: hfi[for_date].pmtiles -> hfi20230821.pmtiles
:return: s3 bucket key for pmtiles file
"""
pmtiles_filepath = os.path.join("psu", "pmtiles", "hfi", run_type.value, run_date.strftime("%Y-%m-%d"), filename)

return pmtiles_filepath


def get_pmtiles_filename(for_date: date):
"""
Returns the object store filename for a pmtiles file based on a given for_date.
:param for_date: the date the hfi pmtiles is forecasted for
:return: filename string
"""
return f'hfi{for_date.strftime("%Y%m%d")}.pmtiles'


def get_raster_filepath(run_date: date, run_type: RunType, filename: str) -> str:
"""
Get the file path for both reading and writing the tif raster from/to the object store.
Example: {bucket}/psu/rasters/hfi/actual/[issue/run_date]/hfi[for_date].tif
:param run_date: The date of the run to process. (when was the hfi file created?)
:param run_type: forecast or actual
:param filename: hfi[for_date].tif -> hfi20230821.tif
:return: s3 bucket key for raster file
"""
raster_filepath = os.path.join("psu", "rasters", "hfi", run_type.value, run_date.strftime("%Y-%m-%d"), filename)

return raster_filepath


def get_raster_tif_filename(for_date: date) -> str:
"""
Returns the object store filename for a raster tif based on a given for_date.
:param for_date: the date the hfi tif is forecasted for
:return: filename string
"""
return f'snow_masked_hfi{for_date.strftime("%Y%m%d")}.tif'
24 changes: 0 additions & 24 deletions api/app/auto_spatial_advisory/hfi_pmtiles.py

This file was deleted.

103 changes: 57 additions & 46 deletions api/app/auto_spatial_advisory/process_hfi.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import logging
import os
from datetime import date, datetime
from datetime import date, datetime, timedelta
from time import perf_counter
import tempfile
from shapely import wkb, wkt
Expand All @@ -18,15 +18,15 @@
from app.auto_spatial_advisory.run_type import RunType
from app.auto_spatial_advisory.snow import apply_snow_mask
from app.geospatial import NAD83_BC_ALBERS
from app.auto_spatial_advisory.hfi_pmtiles import get_pmtiles_filepath
from app.auto_spatial_advisory.hfi_filepath import get_pmtiles_filename, get_pmtiles_filepath, get_raster_filepath, get_raster_tif_filename
from app.utils.polygonize import polygonize_in_memory
from app.utils.pmtiles import tippecanoe_wrapper, write_geojson
from app.utils.s3 import get_client


logger = logging.getLogger(__name__)

HFI_PMTILES_PERMISSIONS = "public-read"
HFI_GEOSPATIAL_PERMISSIONS = "public-read"
HFI_PMTILES_MIN_ZOOM = 4
HFI_PMTILES_MAX_ZOOM = 11

Expand Down Expand Up @@ -98,57 +98,68 @@ async def process_hfi(run_type: RunType, run_date: date, run_datetime: datetime,

hfi_key = get_s3_key(run_type, run_date, for_date)
logger.info(f"Key to HFI in object storage: {hfi_key}")
with tempfile.TemporaryDirectory() as temp_dir:
temp_filename = os.path.join(temp_dir, "classified.tif")
classify_hfi(hfi_key, temp_filename)
# If something has gone wrong with the collection of snow coverage data and it has not been collected
# within the past 7 days, don't apply an old snow mask, work with the classified hfi data as is
if last_processed_snow is None or last_processed_snow[0].for_date + datetime.timedelta(days=7) < datetime.now():
logger.info("No recently processed snow data found. Proceeding with non-masked hfi data.")
working_hfi_path = temp_filename
else:
working_hfi_path = await apply_snow_mask(temp_filename, last_processed_snow[0], temp_dir)
# Create a snow coverage mask from previously downloaded snow data.
with polygonize_in_memory(working_hfi_path, "hfi", "hfi") as layer:
# We need a geojson file to pass to tippecanoe
temp_geojson = write_geojson(layer, temp_dir)

pmtiles_filename = f'hfi{for_date.strftime("%Y%m%d")}.pmtiles'
temp_pmtiles_filepath = os.path.join(temp_dir, pmtiles_filename)
logger.info(f"Writing pmtiles -- {pmtiles_filename}")
tippecanoe_wrapper(temp_geojson, temp_pmtiles_filepath, min_zoom=HFI_PMTILES_MIN_ZOOM, max_zoom=HFI_PMTILES_MAX_ZOOM)

async with get_client() as (client, bucket):
async with get_client() as (client, bucket):
with tempfile.TemporaryDirectory() as temp_dir:
temp_filename = os.path.join(temp_dir, "classified.tif")
classify_hfi(hfi_key, temp_filename)
# If something has gone wrong with the collection of snow coverage data and it has not been collected
# within the past 7 days, don't apply an old snow mask, work with the classified hfi data as is
if last_processed_snow is None or last_processed_snow[0].for_date + timedelta(days=7) < datetime.now():
logger.info("No recently processed snow data found. Proceeding with non-masked hfi data.")
working_hfi_path = temp_filename
else:
# Create a snow coverage mask from previously downloaded snow data.
working_hfi_path = await apply_snow_mask(temp_filename, last_processed_snow[0], temp_dir)

raster_filename = get_raster_tif_filename(for_date)
raster_key = get_raster_filepath(run_date, run_type, raster_filename)
logger.info(f"Uploading file {raster_filename} to {raster_key}")
await client.put_object(
Bucket=bucket,
Key=raster_key,
ACL=HFI_GEOSPATIAL_PERMISSIONS, # We need these to be accessible to everyone
Body=open(working_hfi_path, "rb"),
)
logger.info("Done uploading %s", raster_key)
with polygonize_in_memory(working_hfi_path, "hfi", "hfi") as layer:
# We need a geojson file to pass to tippecanoe
temp_geojson = write_geojson(layer, temp_dir)

pmtiles_filename = get_pmtiles_filename(for_date)
temp_pmtiles_filepath = os.path.join(temp_dir, pmtiles_filename)
logger.info(f"Writing pmtiles -- {pmtiles_filename}")
tippecanoe_wrapper(temp_geojson, temp_pmtiles_filepath, min_zoom=HFI_PMTILES_MIN_ZOOM, max_zoom=HFI_PMTILES_MAX_ZOOM)

key = get_pmtiles_filepath(run_date, run_type, pmtiles_filename)
logger.info(f"Uploading file {pmtiles_filename} to {key}")

await client.put_object(
Bucket=bucket,
Key=key,
ACL=HFI_PMTILES_PERMISSIONS, # We need these to be accessible to everyone
ACL=HFI_GEOSPATIAL_PERMISSIONS, # We need these to be accessible to everyone
Body=open(temp_pmtiles_filepath, "rb"),
)
logger.info("Done uploading file")

spatial_reference: osr.SpatialReference = layer.GetSpatialRef()
target_srs = osr.SpatialReference()
target_srs.ImportFromEPSG(NAD83_BC_ALBERS)
target_srs.SetAxisMappingStrategy(osr.OAMS_TRADITIONAL_GIS_ORDER)
coordinate_transform = osr.CoordinateTransformation(spatial_reference, target_srs)

async with get_async_write_session_scope() as session:
advisory = await get_hfi_classification_threshold(session, HfiClassificationThresholdEnum.ADVISORY)
warning = await get_hfi_classification_threshold(session, HfiClassificationThresholdEnum.WARNING)

logger.info("Writing HFI advisory zones to API database...")
for i in range(layer.GetFeatureCount()):
# https://gdal.org/api/python/osgeo.ogr.html#osgeo.ogr.Feature
feature: ogr.Feature = layer.GetFeature(i)
obj = create_model_object(feature, advisory, warning, coordinate_transform, run_type, run_datetime, for_date)
await save_hfi(session, obj)

# Store the unique combination of run type, run datetime and for date in the run_parameters table
await save_run_parameters(session, run_type, run_datetime, for_date)
logger.info("Done uploading %s", key)

spatial_reference: osr.SpatialReference = layer.GetSpatialRef()
target_srs = osr.SpatialReference()
target_srs.ImportFromEPSG(NAD83_BC_ALBERS)
target_srs.SetAxisMappingStrategy(osr.OAMS_TRADITIONAL_GIS_ORDER)
coordinate_transform = osr.CoordinateTransformation(spatial_reference, target_srs)

async with get_async_write_session_scope() as session:
advisory = await get_hfi_classification_threshold(session, HfiClassificationThresholdEnum.ADVISORY)
warning = await get_hfi_classification_threshold(session, HfiClassificationThresholdEnum.WARNING)

logger.info("Writing HFI advisory zones to API database...")
for i in range(layer.GetFeatureCount()):
# https://gdal.org/api/python/osgeo.ogr.html#osgeo.ogr.Feature
feature: ogr.Feature = layer.GetFeature(i)
obj = create_model_object(feature, advisory, warning, coordinate_transform, run_type, run_datetime, for_date)
await save_hfi(session, obj)

# Store the unique combination of run type, run datetime and for date in the run_parameters table
await save_run_parameters(session, run_type, run_datetime, for_date)

perf_end = perf_counter()
delta = perf_end - perf_start
Expand Down
31 changes: 15 additions & 16 deletions api/app/utils/polygonize.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
""" Code for polygonizing a geotiff file. """
"""Code for polygonizing a geotiff file."""

import logging
from contextlib import contextmanager
from osgeo import gdal, ogr, osr
Expand All @@ -9,13 +10,13 @@


def _create_in_memory_band(data: np.ndarray, cols, rows, projection, geotransform):
""" Create an in memory data band to represent a single raster layer.
"""Create an in memory data band to represent a single raster layer.
See https://gdal.org/user/raster_data_model.html#raster-band for a complete
description of what a raster band is.
"""
mem_driver = gdal.GetDriverByName('MEM')
mem_driver = gdal.GetDriverByName("MEM")

dataset = mem_driver.Create('memory', cols, rows, 1, gdal.GDT_Byte)
dataset = mem_driver.Create("memory", cols, rows, 1, gdal.GDT_Byte)
dataset.SetProjection(projection)
dataset.SetGeoTransform(geotransform)
band = dataset.GetRasterBand(1)
Expand All @@ -26,7 +27,7 @@ def _create_in_memory_band(data: np.ndarray, cols, rows, projection, geotransfor

@contextmanager
def polygonize_in_memory(geotiff_filename, layer, field) -> ogr.Layer:
""" Given some tiff file, return a polygonized version of it, in memory, as an ogr layer. """
"""Given some tiff file, return a polygonized version of it, in memory, as an ogr layer."""
source: gdal.Dataset = gdal.Open(geotiff_filename, gdal.GA_ReadOnly)

source_band = source.GetRasterBand(1)
Expand All @@ -37,9 +38,7 @@ def polygonize_in_memory(geotiff_filename, layer, field) -> ogr.Layer:

# generate mask data
mask_data = np.where(source_data == nodata_value, False, True)
mask_ds, mask_band = _create_in_memory_band(
mask_data, source_band.XSize, source_band.YSize, source.GetProjection(),
source.GetGeoTransform())
mask_ds, mask_band = _create_in_memory_band(mask_data, source_band.XSize, source_band.YSize, source.GetProjection(), source.GetGeoTransform())

# Create a memory OGR datasource to put results in.
# https://gdal.org/drivers/vector/memory.html#vector-memory
Expand Down Expand Up @@ -71,15 +70,15 @@ def polygonize_geotiff_to_shapefile(raster_source_filename, vector_dest_filename
<vector_dest_filename>.shp, and inserts polygonized contents of source
file into destination file.
"""
if raster_source_filename[-3:] != '.tif':
return f'{raster_source_filename} is an invalid file format for raster source'
if vector_dest_filename[-3:] != '.shp':
vector_dest_filename += '.shp'
if raster_source_filename[-3:] != ".tif":
return f"{raster_source_filename} is an invalid file format for raster source"
if vector_dest_filename[-3:] != ".shp":
vector_dest_filename += ".shp"

source_data = gdal.Open(raster_source_filename, gdal.GA_ReadOnly)
source_band = source_data.GetRasterBand(1)
value = ogr.FieldDefn('Band 1', ogr.OFTInteger)
logger.info('%s raster count %s', raster_source_filename, source_data.RasterCount)
value = ogr.FieldDefn("Band 1", ogr.OFTInteger)
logger.info("%s raster count %s", raster_source_filename, source_data.RasterCount)

driver = ogr.GetDriverByName("ESRI Shapefile")
destination = driver.CreateDataSource(vector_dest_filename)
Expand All @@ -88,7 +87,7 @@ def polygonize_geotiff_to_shapefile(raster_source_filename, vector_dest_filename
dest_layer = destination.CreateLayer(vector_dest_filename, geom_type=ogr.wkbPolygon, srs=dest_srs)
dest_layer.CreateField(value)
# 'Band 1' is the field name on the layer for Fuel Type ID
dest_field = dest_layer.GetLayerDefn().GetFieldIndex('Band 1')
dest_field = dest_layer.GetLayerDefn().GetFieldIndex("Band 1")
gdal.Polygonize(source_band, None, dest_layer, dest_field, [])

return f'Polygonized {raster_source_filename} to {vector_dest_filename}'
return f"Polygonized {raster_source_filename} to {vector_dest_filename}"

0 comments on commit 1f7854e

Please sign in to comment.