Skip to content

Commit

Permalink
ensure mp pools use fork explicility
Browse files Browse the repository at this point in the history
  • Loading branch information
robertjwilson committed Nov 29, 2023
1 parent 8a60a11 commit eaba1de
Show file tree
Hide file tree
Showing 3 changed files with 866 additions and 836 deletions.
49 changes: 2 additions & 47 deletions nctoolkit/anomaly.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from nctoolkit.temp_file import temp_file
from nctoolkit.session import remove_safe, session_info, nc_safe_par, nc_safe
from nctoolkit.api import update_options
from nctoolkit.runners import ann_anomaly

if platform.system() == "Linux":
import multiprocessing as mp
Expand All @@ -25,52 +26,6 @@
manager = Manager()


def ann_anomaly(
ff, baseline, metric, window, align, precision, new_files, new_commands, nc_safe
):
"""
Function to calculate the anomaly for a single file
"""
# throw error if baseline is not valid
orig_safe = copy.deepcopy(nc_safe)
target = temp_file("nc")
nc_safe.append(target)
try:

# create the target file
# generate the cdo command
if metric == "absolute":
cdo_command = (
f"cdo -sub -runmean,{window} -yearmean {ff} -timmean "
f"-selyear,{baseline[0]}/{baseline[1]} {ff} {target}"
)
else:
cdo_command = (
f"cdo -div -runmean,{window} -yearmean {ff} -timmean "
f"-selyear,{baseline[0]}/{baseline[1]} {ff} {target}"
)

# run the command and save the temp file

cdo_command = tidy_command(cdo_command)
cdo_command = cdo_command.replace(
"cdo ", f"cdo --timestat_date {align} "
).replace(" ", " ")

target = run_cdo(cdo_command, target, precision=precision)
if target not in nc_safe:
nc_safe.append(target)

# update the new files and commands
new_files.append(target)
new_commands.append(cdo_command)
except BaseException as e:
try:
nc_safe.remove(target)
except:
pass
return e


def annual_anomaly(self, baseline=None, metric="absolute", window=1, align="right"):
"""
Expand Down Expand Up @@ -188,7 +143,7 @@ def annual_anomaly(self, baseline=None, metric="absolute", window=1, align="righ
target_list = []
results = dict()

pool = mp.pool.Pool(cores)
pool = mp.get_context('fork').Pool(cores)
precision = copy.deepcopy(self._precision)
for ff in self:
results[ff] = pool.apply_async(
Expand Down
Loading

0 comments on commit eaba1de

Please sign in to comment.