Source code for wrfhydropy.core.simulation

import copy
import os
import pandas as pd
import pathlib
import pickle
import shutil
from typing import Union
import warnings
import xarray

from .collection import open_whp_dataset

from .domain import Domain
from .ioutils import WrfHydroStatic, \
    WrfHydroTs, \
    check_input_files, \
    check_file_nans, \
    sort_files_by_time
from .job import Job
from .model import Model
from .namelist import Namelist
from .schedulers import Scheduler


[docs] class Simulation(object): """Class for a WRF-Hydro Simulation object. The Simulation object is used to orchestrate a WRF-Hydro simulation by accessing methods of Model, Domain, and Job objects. Optionally, a scheduler can also be added. """
[docs] def __init__(self): """Instantiates a Simulation object""" # Public attributes self.model = None """Model: A Model object""" self.domain = None """Domain: A Domain object""" self.jobs = [] """list: a list containing Job objects""" self.scheduler = None """Scheduler: A scheduler object to use for each Job in self.jobs""" self.output = SimulationOutput() """CompletedSim: A CompletedSim object returned by the self.collect() method""" self.base_hydro_namelist = Namelist() """dict: base hydro namelist produced from model and domain""" self.base_hrldas_namelist = Namelist() """dict: base hrldas namelist produced from model and domain"""
# Public methods
[docs] def add(self, obj: Union[Model, Domain, Scheduler, Job]): """Add an approparite object to a Simulation, such as a Model, Domain, Job, or Scheduler""" if isinstance(obj, Model): self._addmodel(obj) elif isinstance(obj, Domain): self._adddomain(obj) elif issubclass(type(obj), Scheduler): self._addscheduler(obj) elif isinstance(obj, Job): self._addjob(obj) else: raise TypeError('obj is not of a type expected for a Simulation')
[docs] def compose( self, symlink_domain: bool = True, force: bool = False, check_nlst_warn: bool = False ): """Compose simulation directories and files Args: symlink_domain: Symlink the domain files rather than copy force: Compose into directory even if not empty. This is considered bad practice but is necessary in certain circumstances. check_nlst_warn: Allow the namelist checking/validation to only result in warnings. This is also not great practice, but necessary in certain circumstances. """ print("Composing simulation into directory:'" + os.getcwd() + "'") # Check that the current directory is empty compose_dir = pathlib.Path(os.getcwd()) self._compose_dir = compose_dir.absolute() compose_dir_files = list(compose_dir.rglob('*')) if len(compose_dir_files) > 0 and force is False: raise FileExistsError('Unable to compose, current working directory is not empty and ' 'force is False. ' 'Change working directory to an empty directory with os.chdir()') # Symlink in domain files print('Getting domain files...') self.domain.copy_files(dest_dir=os.getcwd(), symlink=symlink_domain) # Update job objects and make job directories print('Making job directories...') for job in self.jobs: job._make_job_dir() job._write_namelists() # write namelists # If the first job is a restart, set the model end time. if self.jobs[0].restart: file_model_end_time = compose_dir / '.model_end_time' with file_model_end_time.open('w') as opened_file: _ = opened_file.write(str(self.jobs[0]._model_start_time)) # Validate jobs print('Validating job input files') self._validate_jobs(check_nlst_warn=check_nlst_warn) # Compile model or copy files if self.model.compile_log is not None: if self.model.compile_log.returncode == 0: print('Model already compiled, copying files...') self.model.copy_files(os.getcwd()) else: raise ValueError('model was previously compiled but return code is not 0') else: print('Compiling model...') self.model.compile(compile_dir=os.getcwd()) # Make copies for each TBL file (never symlink) for from_file in self.model.table_files: from_file = pathlib.Path(from_file) to_file = compose_dir.joinpath(from_file.name) shutil.copy(str(from_file), str(to_file)) print('Simulation successfully composed')
[docs] def run( self, env: dict = None ): """Run the composed simulation. Returns: 0 for success. """ compose_dir = self._compose_dir # Save the object out to the compile directory before run with compose_dir.joinpath('WrfHydroSim.pkl').open(mode='wb') as f: pickle.dump(self, f, 2) if self.scheduler is None: for job in self.jobs: job._run(env=env) else: self.scheduler.schedule(jobs=self.jobs) # Overwrite the object after run if successfull path = compose_dir.joinpath('WrfHydroSim.pkl') self.pickle(str(path)) return int(not all(jj.exit_status == 0 for jj in self.jobs))
[docs] def collect(self, sim_dir=None, output=True): """Collect simulation output after a run""" if sim_dir is None and hasattr(self, '_compose_dir'): compose_dir = self._compose_dir elif sim_dir is not None: compose_dir = sim_dir else: raise ValueError( 'The simulation has not been composed and the sim_dir ragument not supplied.') # This is what we used to do, but I'm not seeing why this behavior would make senese # or that it's clear to remove it. Toremove when it's clearly unnecessary: # compose_dir = pathlib.Path('.') # Overwrite sim job objects with collected objects matched on job id # Create dict of index/ids so that globbed jobs match the original list order id_index = dict() for index, item in enumerate(self.jobs): id_index[item.job_id] = index # Insert collect jobs into sim job list job_objs = compose_dir.rglob('WrfHydroJob_postrun.pkl') for job_obj in job_objs: collect_job = pickle.load(job_obj.open(mode='rb')) original_idx = id_index[collect_job.job_id] self.jobs[original_idx] = collect_job if output: self.output.collect_output(sim_dir=os.getcwd())
[docs] def pickle(self, path: str): """Pickle sim object to specified file path Args: path: The file path for pickle """ path = pathlib.Path(path) with path.open(mode='wb') as f: pickle.dump(self, f, 2)
def _pickle_sub_obj( self, sub_obj, path ): """ Method to reduce *composed* simulation pickle sizes for performance applications. This method replaces a simulation sub-object (model, domain, or output) with it's relative pathlib.Path. The inverse, to bring that object back from its path is restore_obj(). Usage example: sim.model = sim._pickle_sub_obj(sim.model, 'WrfHydroModel.pkl') """ with path.open(mode='wb') as f: pickle.dump(sub_obj, f, 2) return path
[docs] def pickle_sub_objs(self, obj_list: list = ['model', 'domain', 'output']): if hasattr(self, '_compose_dir'): obj_path = self._compose_dir else: obj_path = pathlib.Path('.') if 'model' in obj_list: self.model = self._pickle_sub_obj(self.model, obj_path / 'WrfHydroModel.pkl') if 'domain' in obj_list: self.domain = self._pickle_sub_obj(self.domain, obj_path / 'WrfHydroDomain.pkl') if 'output' in obj_list: self.output = self._pickle_sub_obj(self.output, obj_path / 'WrfHydroOutput.pkl')
def _restore_sub_obj( self, attr_name: pathlib.Path ): """ Method to reduce *composed* simulation pickle sizes for performance applications. This method restores a simulation sub-object (model, domain, or output) from it's relative pathlib.Path, which replaces the object in the simulation. The inverse, that pickles the subobject is _pickle_sub_obj(). Usage: sim.model = sim._restore_sub_obj(pathlib.Path('WrfHydroModel.pkl')) """ return pickle.load(attr_name.open(mode="rb"))
[docs] def restore_sub_objs(self, obj_list: list = ['model', 'domain', 'output']): if hasattr(self, '_compose_dir'): obj_path = self._compose_dir else: obj_path = pathlib.Path('.') if 'model' in obj_list: self.model = self._restore_sub_obj(obj_path / 'WrfHydroModel.pkl') if 'domain' in obj_list: self.domain = self._restore_sub_obj(obj_path / 'WrfHydroDomain.pkl') if 'output' in obj_list: self.output = self._restore_sub_obj(obj_path / 'WrfHydroOutput.pkl')
def _validate_model_domain(self, model, domain): """Private method to validate that a model and a domain are compatible""" if model.model_config != domain.domain_config: raise TypeError('Model configuration ' + model.model_config + ' not compatible with domain configuration ' + domain.domain_config) if model.version[0:2] != domain.compatible_version[0:2]: raise TypeError('Model version ' + model.version + ' not compatible with domain version ' + domain.compatible_version) elif model.version != domain.compatible_version: warnings.warn('Model minor versions ' + model.version + ' do not match domain minor versions ' + domain.compatible_version) def _validate_jobs( self, check_nlst_warn: bool = False ): """Private method to check that all files are present for each job. Args: check_nlst_warn: Allow the namelist checking/validation to only result in warnings. This is also not great practice, but necessary in certain circumstances. """ counter = 0 for job in self.jobs: counter += 1 print(job.job_id) if counter == 0: ignore_restarts = False else: ignore_restarts = True check_input_files( hrldas_namelist=job.hrldas_namelist, hydro_namelist=job.hydro_namelist, sim_dir=os.getcwd(), ignore_restarts=ignore_restarts, check_nlst_warn=check_nlst_warn ) def _set_base_namelists(self): """Private method to create the base namelists which are added to each Job. The Job then modifies the namelist times""" # Create namelists hydro_namelist = self.model.hydro_namelists hrldas_namelist = self.model.hrldas_namelists self.base_hydro_namelist = hydro_namelist.patch(self.domain.hydro_namelist_patches) self.base_hrldas_namelist = hrldas_namelist.patch(self.domain.hrldas_namelist_patches) def _addmodel(self, model: Model): """Private method to add a Model to a Simulation Args: model: The Model to add """ model = copy.deepcopy(model) if self.domain is not None: # Check that model and domain are compatible self._validate_model_domain(model, self.domain) # Add in model self.model = model # Setup base namelists self._set_base_namelists() else: self.model = model def _adddomain(self, domain: Domain): """Private method to add a Domain to a Simulation Args: domain: The Domain to add """ domain = copy.deepcopy(domain) if self.model is not None: # Check that model and domain are compatible self._validate_model_domain(self.model, domain) # Add in domain self.domain = domain # Setup base namelists self._set_base_namelists() else: self.domain = domain def _addscheduler(self, scheduler: Scheduler): """Private method to add a Scheduler to a Simulation Args: scheduler: The Scheduler to add """ self.scheduler = copy.deepcopy(scheduler) def _addjob(self, job: Job): """Private method to add a job to a Simulation Args: scheduler: The Scheduler to add """ if self.domain is not None and self.model is not None: job = copy.deepcopy(job) job._add_hydro_namelist(self.base_hydro_namelist) job._add_hrldas_namelist(self.base_hrldas_namelist) self.jobs.append(job) else: raise AttributeError('Can not add a job to a simulation without a model and a domain')
[docs] class SimulationOutput(object): """Class containing output objects from a completed Simulation, retrieved using the Simulation.collect() method""" def __init__(self): self.channel_rt = None """WrfHydroTs: Timeseries dataset of CHRTOUT files""" self.channel_rt_grid = None """WrfHydroTs: Timeseries dataset of CHRTOUT gridded files""" self.chanobs = None """WrfHydroTs: Timeseries dataset of CHANOBS files""" self.lakeout = None """WrfHydroTs: Timeseries dataset of LAKEOUT files""" self.gwout = None """WrfHydroTs: Timeseries dataset of GWOUT files""" self.rtout = None """WrfHydroTs: Timeseries dataset of RTOUT files""" self.ldasout = None """WrfHydroTs: Timeseries dataset of LDASOUT files""" self.restart_hydro = None """list: List of HYDRO_RST WrfHydroStatic objects""" self.restart_lsm = None """list: List of RESTART WrfHydroStatic objects""" self.restart_nudging = None """list: List of nudgingLastObs WrfHydroStatic objects""" def __print__(self): return self.__repr__(self) def __repr__(self): all_none = all([ ff is None for ff in self.__dict__.values() ]) if all_none: return "This simulation currently has no output. Run simulation.collect()." the_repr = '' for key, val in self.__dict__.items(): # if its an open dataset, get it's repr if isinstance(val, xarray.core.dataset.Dataset): the_repr += '\n' + key + ':\n' + val.__repr__() + '\n\n' else: if val is not None: the_len = str(len(val)) else: the_len = "0" the_repr += key + ': ' + the_len + ' files \n' return the_repr
[docs] def collect_output(self, sim_dir: Union[str, pathlib.Path] = None): """Collect simulation output after a run Args: sim_dir: The simulation directory to collect """ if sim_dir is None: sim_dir = pathlib.Path(os.curdir).absolute() else: sim_dir = pathlib.Path(sim_dir).absolute() file_glob_dict = { 'channel_rt': '*CHRTOUT_DOMAIN*', 'channel_rt_grid': '*CHRTOUT_GRID*', 'chanobs': '*CHANOBS*', 'lakeout': '*LAKEOUT*', 'gwout': '*GWOUT*', 'rtout': '*.RTOUT_*', 'ldasout': '*LDASOUT*', 'restart_hydro': 'HYDRO_RST*', 'restart_lsm': 'RESTART*', 'restart_nudging': 'nudgingLastObs*'} for key, value in file_glob_dict.items(): self.__dict__[key] = sort_files_by_time(list(sim_dir.glob(value))) if key == 'ldasout': self.__dict__[key] = self.__dict__[key][1:]
[docs] def open(self, name, n_cores=None): if not hasattr(self, name): raise ValueError('Simulation output does not contain ' + name) the_files = self.__dict__[name] if isinstance(the_files, list): self.__dict__[name] = open_whp_dataset(the_files) elif isinstance(the_files, xarray.core.dataset.Dataset): print("This output appears to already be open: " + name) else: raise ValueError("Can not open: " + name) return None
[docs] def check_output_nans(self, n_cores: int = 1): """Check all outputs for NA values""" # Get all the public attributes, which are the only atts of interest data_atts = [att for att in dir(self) if not att.startswith('_')] # Create a list to hold pandas dataframes df_list = [] # Loop over attributes for att in data_atts: # Loop over files in each attribute att_obj = getattr(self, att) if isinstance(att_obj, list) or isinstance(att_obj, WrfHydroTs): if len(att_obj) == 0: continue file = att_obj[-1] na_check_result = check_file_nans(file, n_cores=n_cores) if na_check_result is not None: na_check_result['file'] = str(file) df_list.append(na_check_result) # Combine all dfs into one if len(df_list) > 0: return pd.concat(df_list) else: return None