import copy
import datetime
import multiprocessing
import pathlib
from typing import Union
import os
import pickle
# For testing coverage reports
try:
from pytest_cov.embed import cleanup_on_sigterm
except ImportError:
pass
else:
cleanup_on_sigterm()
from .ensemble_tools import mute
from .job import Job
from .schedulers import Scheduler
from .simulation import Simulation
from .ensemble import EnsembleSimulation
from .teams import parallel_teams_run, assign_teams
def integer_coercable(val):
try:
int(str(val))
integer_coercable = True
except ValueError:
integer_coercable = False
return integer_coercable
def translate_forcing_dirs(forcing_dir, member, init_time):
# Rules for both forcing_dirs and restart_dirs:
# 1) A dot or a null string (are identical pathlib.Path objects and) mean "do nothing"
# with respect to the default path in the domain.
# 2) An existing path/file is kept.
# 3) A negative integer is units hours, pointing to a previous cast in the cycle.
# 4) Other wise, value error raised.
if integer_coercable(forcing_dir):
if int(str(forcing_dir)) > 0:
raise ValueError('Only non-negative integers can be used to specify forcing_dirs')
forcing_cast_time = init_time + datetime.timedelta(hours=int(str(forcing_dir)))
# The last line is a bit hacky.
if not hasattr(member, 'number'):
forcing_dir = pathlib.Path(
'../cast_' +
forcing_cast_time.strftime('%Y%m%d%H') +
'/' +
pathlib.Path(member.base_hrldas_namelist['noahlsm_offline']['indir']).name
)
else:
forcing_dir = pathlib.Path(
'../../cast_' +
forcing_cast_time.strftime('%Y%m%d%H') +
'/' + member.run_dir + '/' +
pathlib.Path(member.base_hrldas_namelist['noahlsm_offline']['indir']).name
)
else:
if forcing_dir == pathlib.Path(''):
return None
elif forcing_dir.exists():
forcing_dir = forcing_dir.resolve()
else:
raise ValueError("No such forcing directory: " + str(forcing_dir))
member.base_hrldas_namelist['noahlsm_offline']['indir'] = str(forcing_dir)
return None
def translate_restart_dirs(restart_dir, member, init_time):
# Rules for both forcing_dirs and restart_dirs:
# 1) A dot or a null string (are identical pathlib.Path objects and) mean "do nothing"
# with respect to the default path in the domain.
# 2) An existing path/file is used/kept (a non-existent path is not, should give error).
# 3) A negative integer is units hours, pointing to a previous cast in the cycle.
# 4) Other wise, value error raised.
if integer_coercable(restart_dir):
if int(str(restart_dir)) > 0:
raise ValueError('Only non-negative integers can be used to specify restart_dirs')
forcing_cast_time = init_time + datetime.timedelta(hours=int(str(restart_dir)))
if not hasattr(member, 'number'):
restart_dir = pathlib.Path('../cast_' + forcing_cast_time.strftime('%Y%m%d%H'))
else:
restart_dir = pathlib.Path(
'../../cast_' + forcing_cast_time.strftime('%Y%m%d%H') + '/' + member.run_dir
)
else:
if restart_dir == pathlib.Path(''):
return None
elif restart_dir.exists():
restart_dir = restart_dir.resolve()
else:
raise ValueError("No such restart directory: " + str(restart_dir))
member.base_hrldas_namelist['noahlsm_offline']['restart_filename_requested'] = \
str(restart_dir / init_time.strftime('RESTART.%Y%m%d%H_DOMAIN1'))
member.base_hydro_namelist['hydro_nlist']['restart_file'] = \
str(restart_dir / init_time.strftime('HYDRO_RST.%Y-%m-%d_%H:00_DOMAIN1'))
if 'nudging_nlist' in member.base_hydro_namelist.keys() and \
'nudginglastobsfile' in member.base_hydro_namelist['nudging_nlist'].keys():
member.base_hydro_namelist['nudging_nlist']['nudginglastobsfile'] = \
str(restart_dir / init_time.strftime('nudgingLastObs.%Y-%m-%d_%H:%M:%S.nc'))
return None
def parallel_compose_casts(arg_dict):
"""Parallelizable function to compose casts of a CycleSimulation."""
cast = copy.deepcopy(arg_dict['prototype'])
cast.init_time = arg_dict['init_time']
cast.run_dir = str(pathlib.Path('cast_' + cast.init_time.strftime('%Y%m%d%H')))
cast.forcing_dir = arg_dict['forcing_dir']
cast.restart_dir = arg_dict['restart_dir']
if isinstance(cast, Simulation):
translate_forcing_dirs(cast.forcing_dir, cast, cast.init_time)
translate_restart_dirs(cast.restart_dir, cast, cast.init_time)
else:
for forcing_dir, restart_dir, member in zip(
cast.forcing_dir, cast.restart_dir, cast.members
):
translate_forcing_dirs(forcing_dir, member, cast.init_time)
translate_restart_dirs(restart_dir, member, cast.init_time)
job = copy.deepcopy(arg_dict['job'])
khour = job.model_end_time - job.model_start_time
job.model_start_time = arg_dict['init_time']
job.model_end_time = arg_dict['init_time'] + khour
cast.add(job)
if arg_dict['scheduler'] is not None:
cast.add(arg_dict['scheduler'])
orig_dir = os.getcwd()
os.mkdir(cast.run_dir)
os.chdir(cast.run_dir)
if isinstance(cast, Simulation):
cast.compose()
else:
cast.compose(rm_members_from_memory=arg_dict['rm_members_from_memory'])
# The Simulation object clean up.
if 'model' in dir(cast):
del cast.model
if 'domain' in dir(cast):
del cast.domain
if 'output' in dir(cast):
del cast.output
if isinstance(arg_dict['prototype'], Simulation):
cast.pickle('WrfHydroSim.pkl')
else:
cast.pickle('WrfHydroEns.pkl')
os.chdir(orig_dir)
return cast
def parallel_run_casts(arg_dict):
"""Parallelizable function to run an Cycle."""
if type(arg_dict['cast']) is str:
os.chdir(str(pathlib.Path(arg_dict['compose_dir']) / arg_dict['cast']))
else:
os.chdir(str(pathlib.Path(arg_dict['compose_dir']) / arg_dict['cast'].run_dir))
pkl_file = pathlib.Path("WrfHydroSim.pkl")
if not pkl_file.exists():
pkl_file = pathlib.Path("WrfHydroEns.pkl")
cast_pkl = pickle.load(pkl_file.open("rb"))
exit_status = cast_pkl.run()
return exit_status
[docs]
class CycleSimulation(object):
"""Class for a WRF-Hydro CycleSimulation object. The Cycle Simulation object is used to
orchestrate a set of 'N' WRF-Hydro simulations, referred to as 'casts', which only differ
in their 1) restart times and 2) their forcings.
"""
[docs]
def __init__(
self,
init_times: list,
restart_dirs: list,
forcing_dirs: list = [],
ncores: int = 1
):
""" Instantiate a Cycle object.
Args:
init_times: A required list of datetime.datetime objects which specify the
restart time of each cast in the cycle. (Same for deterministic
and ensemble cycle simultions).
restart_dirs:
Deterministic: a required list of either strings or pathlib.Path objects.
Ensemble: a required list of lists. The outer list is for the cycles
"casts" requested in init_times. The inner list is for each ensemble member
in the cast.
The following rules are applied to the individual entires:
1) A dot or a null string (are identical pathlib.Path objects and) mean
"do nothing" with respect to the default path in the domain.
2) An existing path/file is used/kept (a non-existent path is not, gives
an error).
3) A negative integer in units hours, pointing to a previous cast in the
cycle.
4) Other wise, value error raised.
forcing_dirs: optional
Deterministic: list of either strings or pathlib.Path objects
Ensemble: A list of lists, as for restart_dirs.
See restart_dirs for usage rules.
ncores: integer number of cores for running parallelizable methods (not the
casts themselves). For an ensemble cycle, setting this value > 1 will
force the ensemble.ncores = 1.
"""
self.casts = []
"""list: a list of 'casts' which are the individual simulations in the cycle object."""
self._init_times = []
"""list: required list of datetime.datetime objects which specify the restart time of
each cast in the cycle."""
self._restart_dirs = []
"""list: required list of either strings or pathlib.Path objects where the
following rules are applied:
1) A dot or a null string (are identical pathlib.Path objects and) mean "do nothing"
with respect to the default path in the domain.
2) An existing path/file is kept.
3) A negative integer is units hours, pointing to a previous cast in the cycle.
4) Other wise, value error raised.
"""
self._forcing_dirs = []
"""list: optional list of either strings or pathlib.Path objects. See
_restart_dirs for usage rules."""
self._job = None
"""list: a list containing Job objects"""
self._scheduler = None
"""Scheduler: A scheduler object to use for each Job in self.jobs"""
self.ncores = ncores
"""ncores: integer number of cores for running parallelizable methods."""
self._addinittimes(init_times)
self._addrestartdirs(restart_dirs)
if forcing_dirs != []:
self._addforcingdirs(forcing_dirs)
def __len__(self):
return(len(self._init_times))
# # The "canonical" name for len
# @property
# def N(self):
# return(self.__len__())
# Metadata to store with the "cast" simulations, conceptually this
# data belongs to the casts:
# 1) cast time
# 2) cast directory
# 3) cast forcing directory
# 4) restart dirs?
# JLM todo: check/revise this...
def add(
self,
obj: Union[Simulation, EnsembleSimulation, Scheduler, Job]
):
"""Add an approparite object to an CycleSimulation, such as a Simulation, Job, or
Scheduler.
Args:
obj: the object to add.
"""
if isinstance(obj, Simulation):
self._addsimulation(obj)
elif isinstance(obj, EnsembleSimulation):
self._addensemble(obj)
elif issubclass(type(obj), Scheduler):
self._addscheduler(obj)
elif isinstance(obj, Job):
self._addjob(obj)
else:
raise TypeError('Object is not of a type expected for a CycleSimulation.')
def _addinittimes(self, init_times: list):
"""Private method to add init times to a CycleSimulation
Args:
init_times: a list of datetime.datetime objects.
"""
if not all([isinstance(ii, datetime.datetime) for ii in init_times]):
raise ValueError('List object not all datetime.datetime objects, as expected')
self._init_times = copy.deepcopy(init_times)
def _add_restart_forcing_dirs(self, dirs_list, identifier):
"""Private method to common to adding forcing and restart directories
Args:
dirs_list: deterministic: a list of dirs, ensemble: a list of lists of dirs
identifier: string for error messages to identify if restart or forcing dirs
are problematic.
"""
# Check the length
def check_len_init(the_list):
if len(self._init_times) != len(the_list):
raise ValueError("Length of " + identifier + " does not match that of init_times.")
def int_to_str(var):
if type(var) is int:
return str(var)
return var
deterministic_types = [str, int, pathlib.Path, pathlib.PosixPath]
ensemble_types = [list]
if all([type(ii) in deterministic_types for ii in dirs_list]):
check_len_init(dirs_list)
return_list = [pathlib.Path(int_to_str(cc)) for cc in dirs_list]
elif all([type(ii) in ensemble_types for ii in dirs_list]):
check_len_init(dirs_list)
return_list = []
for rr in dirs_list:
if len(rr) != len(dirs_list[0]):
raise ValueError("Inconsistent ensemble length by implied by " + identifier)
# The ensemble length is unknown, it's implied by len(rr)
if all([type(ii) in deterministic_types for ii in rr]):
return_list.append([pathlib.Path(int_to_str(cc)) for cc in rr])
else:
raise ValueError("Types in ensemble " + identifier + " argument "
"are not appropriate.")
else:
raise ValueError("Types in " + identifier + " argument are not appropriate.")
return return_list
def _addforcingdirs(self, forcing_dirs: list):
"""Private method to add forcing dirs to a Cycle.
Args:
forcing_dirs: a list of str objects.
"""
self._forcing_dirs = self._add_restart_forcing_dirs(forcing_dirs, 'forcing_dirs')
def _addrestartdirs(self, restart_dirs: list):
"""Private method to add init times to a CycleSimulation
Args:
restart_dirs: deterministic cycle takes a list of str objects, an
ensemble cycle takes a list (for each cycle) of lists of str objects (for the
ensemble).
"""
self._restart_dirs = self._add_restart_forcing_dirs(restart_dirs, 'restart_dirs')
def _addscheduler(self, scheduler: Scheduler):
"""Private method to add a Scheduler to an CycleSimulation
Args:
scheduler: The Scheduler to add
"""
self._scheduler = copy.deepcopy(scheduler)
def _addjob(self, job: Job):
"""Private method to add a job to an CycleSimulation
Args:
job: The job to add
"""
self._job = copy.deepcopy(job)
self._job.restart = True
def _addsimulation(
self,
sim: Simulation
):
"""Private method to add a Simulation to an EnsembleSimulation
Args:
sim: The Simulation to add
"""
sim_copy = copy.deepcopy(sim)
# Ensure that the jobs and scheduler are empty and None
sim_copy.jobs = []
sim_copy.scheduler = None
self._simulation = sim_copy
def _addensemble(
self,
ens: EnsembleSimulation
):
"""Private method to add a Simulation to an EnsembleSimulation
Args:
ens: The EnsembleSimulation to add
"""
if not all([isinstance(ii, list) for ii in self._restart_dirs]) or \
not all([isinstance(ii, list) for ii in self._forcing_dirs]):
raise ValueError("An ensemble cycle simulation requires the restart_dirs to be "
"a list of lists.")
common_msg = "Ensemble to add has inconsistent length with existing cycle"
if len(self._restart_dirs) > 0 and len(ens) != len(self._restart_dirs[0]):
raise ValueError(common_msg + " restart_dirs")
if len(self._forcing_dirs) > 0 and len(ens) != len(self._forcing_dirs[0]):
raise ValueError(common_msg + " forcing_dirs")
ens_copy = copy.deepcopy(ens)
# Ensure that the jobs and scheduler are empty and None
ens_copy.jobs = []
ens_copy.scheduler = None
# Dont let multiprocessing use multiprocessing.
if self.ncores > 1:
ens_copy.ncores = 1
self._ensemble = ens_copy
def compose(
self,
symlink_domain: bool = True,
force: bool = False,
check_nlst_warn: bool = False,
rm_casts_from_memory: bool = True,
rm_members_from_memory: bool = True
):
"""Cycle compose (directories and files to disk)
Args:
symlink_domain: Symlink the domain files rather than copy
force: Compose into directory even if not empty. This is considered bad practice but
is necessary in certain circumstances.
rm_casts_from_memory: Most applications will remove the casts from the
ensemble object upon compose. Testing and other reasons may keep them around.
check_nlst_warn: Allow the namelist checking/validation to only result in warnings.
This is also not great practice, but necessary in certain circumstances.
"""
current_dir = pathlib.Path(os.getcwd())
current_dir_files = list(current_dir.rglob('*'))
if len(current_dir_files) > 0 and force is False:
raise FileExistsError('Unable to compose, current working directory is not empty. \n'
'Change working directory to an empty directory with os.chdir()')
if '_simulation' in dir(self):
cast_prototype = '_simulation'
else:
if '_ensemble' not in dir(self):
raise ValueError("The cycle does not contain a _simulation or an _ensemble.")
cast_prototype = '_ensemble'
if len(self) < 1:
raise ValueError("There are no casts (init_times) to compose.")
self._compose_dir = pathlib.Path(os.getcwd())
# Allowing forcing_dirs to be optional.
if self._forcing_dirs == []:
if cast_prototype == '_simulation':
self._forcing_dirs = [pathlib.Path('.')] * len(self)
else:
self._forcing_dirs = \
[([pathlib.Path('.') for _ in range(len(self.__dict__[cast_prototype]))])
for cc in range(len(self))]
# An ensemble must have a compiled model.
if cast_prototype == '_simulation':
# compile the model (once) before setting up the casts.
if self._simulation.model.compile_log is None:
comp_dir = self._compose_dir / 'compile'
self._simulation.model.compile(comp_dir)
# Set the ensemble jobs on the casts before composing (this is a loop over the jobs).
if self.ncores == 1:
self.casts = [
parallel_compose_casts(
{
'prototype': self.__dict__[cast_prototype],
'init_time': init_time,
'restart_dir': restart_dir,
'forcing_dir': forcing_dir,
'job': self._job,
'scheduler': self._scheduler,
'rm_members_from_memory': rm_members_from_memory,
}
) for init_time, restart_dir, forcing_dir in zip(
self._init_times,
self._restart_dirs,
self._forcing_dirs
)
]
else:
# Set the pool for the following parallelizable operations
with multiprocessing.Pool(self.ncores, initializer=mute) as pool:
self.casts = pool.map(
parallel_compose_casts,
({
'prototype': self.__dict__[cast_prototype],
'init_time': init_time,
'restart_dir': restart_dir,
'forcing_dir': forcing_dir,
'job': self._job,
'scheduler': self._scheduler,
'rm_members_from_memory': rm_members_from_memory,
} for init_time, restart_dir, forcing_dir in zip(
self._init_times,
self._restart_dirs,
self._forcing_dirs
)
)
)
# Return from indivdual compose.
os.chdir(self._compose_dir)
# After successful compose, delete the members from memory and replace with
# their relative dirs, if requested
if rm_casts_from_memory:
self.rm_casts()
# Remove bloaty atts
if '_simulation' in dir(self):
del self._simulation
if '_ensemble' in dir(self):
del self._ensemble
del self._job
def rm_casts(self):
"""Remove members from memory, replace with their paths."""
run_dirs = [cc.run_dir for cc in self.casts]
self.casts = run_dirs
def run(
self,
n_concurrent: int = 1,
teams: bool = False,
teams_exe_cmd: str = None,
teams_exe_cmd_nproc: int = None,
teams_node_file: dict = None,
env: dict = None,
teams_dict: dict = None
):
"""Run the cycle of simulations.
Inputs:
n_concurrent: int = 1, Only used for non-team runs.
teams: bool = False, Use teams?
teams_exe_cmd: str, The mpi-specific syntax needed. For
example: 'mpirun --host {hostname} -np {nproc} {cmd}'
teams_exe_cmd_nproc: int, The number of cores per model/wrf_hydro
simulation to be run.
teams_node_file: dict = None, Optional file that acts like a node
file. It is not currently implemented but the key specifies the
scheduler format that the file follows. An example pbs node
file is in tests/data and this argument is used here to test
without a sched.
env: dict = None, optional envionment to pass to the run.
teams_dict: dict, Skip the arguments if you already have a
teams_dict to use (backwards compatibility)
Outputs: 0 for success.
"""
# Save the ensemble object out to the ensemble directory before run
# The object does not change with the run.
path = pathlib.Path(self._compose_dir).joinpath('WrfHydroCycle.pkl')
self.pickle(path)
if teams or teams_dict is not None:
if teams_dict is None and teams_exe_cmd is None:
raise ValueError("The teams_exe_cmd is required for using teams.")
if teams_dict is None:
teams_dict = assign_teams(
self,
teams_exe_cmd=teams_exe_cmd,
teams_exe_cmd_nproc=teams_exe_cmd_nproc,
teams_node_file=teams_node_file,
env=env
)
with multiprocessing.Pool(len(teams_dict), initializer=mute) as pool:
exit_codes = pool.map(
parallel_teams_run,
(
{'obj_name': 'casts',
'team_dict': team_dict,
'compose_dir': self._compose_dir,
'env': env}
for (key, team_dict) in teams_dict.items()
)
)
# # Keep around for serial testing/debugging
# exit_codes = [
# parallel_teams_run(
# {'obj_name': 'casts',
# 'team_dict': team_dict,
# 'compose_dir': self._compose_dir,
# 'env': env})
# for (key, team_dict) in teams_dict.items()
# ]
exit_code = int(not all([list(ee.values())[0] == 0 for ee in exit_codes]))
elif n_concurrent > 1:
with multiprocessing.Pool(n_concurrent, initializer=mute) as pool:
exit_codes = pool.map(
parallel_run_casts,
({'cast': cc, 'compose_dir': self._compose_dir} for cc in self.casts)
)
exit_code = int(not all([ee == 0 for ee in exit_codes]))
else:
# Keep the following for debugging: Run it without pool.map
exit_codes = [
parallel_run_casts({'cast': cc, 'compose_dir': self._compose_dir})
for cc in self.casts
]
exit_code = int(not all([ee == 0 for ee in exit_codes]))
# Return to the cycle dir.
os.chdir(self._compose_dir)
return exit_code
def pickle(self, path: str):
"""Pickle ensemble sim object to specified file path
Args:
path: The file path for pickle
"""
path = pathlib.Path(path)
with path.open(mode='wb') as f:
pickle.dump(self, f, 2)