Source code for wrfhydropy.core.teams

import collections
import itertools
import math
import operator
import os
import pathlib
import pickle
from pprint import pprint
import wrfhydropy


def set_cycle_ens_sim_jobs(ens_obj, job):
    members = ens_obj.members
    for mem in members:
        # Currently these are always strings, never in memory.
        if isinstance(mem, str):
            pkl_file = ens_obj._compose_dir / (mem + '/WrfHydroSim.pkl')
            sim = pickle.load(pkl_file.open('rb'))
            sim.jobs[0]._entry_cmd = job._entry_cmd
            sim.jobs[0]._exe_cmd = job._exe_cmd
            sim.jobs[0]._exit_cmd = job._exit_cmd
            sim.pickle(pkl_file)


def get_cycle_ens_sim_job_exits(cycle_obj):
    members = cycle_obj.members
    statuses = {}
    for mem in members:
        pkl_file = cycle_obj._compose_dir / (mem + '/WrfHydroSim.pkl')
        sim = pickle.load(pkl_file.open('rb'))
        statuses.update({pkl_file: sim.jobs[0].exit_status})
    success = all([value == 0 for key, value in statuses.items()])
    if success:
        return 0
    else:
        return 1


[docs] def parallel_teams_run(arg_dict): """ Parallelizable function to run simulations across nodes. On the master node, python runs multiprocessing. Each separate process is a "team" of simulations to run. Multiprocessing makes MPI calls with a specific syntax to run the MPI executable on specfific (potentially other) nodes. This provides 2 levels of parallelism. This function is called (in parallel) once for each team by multiprocessing. Each team runs its set of simulations sequentially but each simulation it runs is parallel via MPI. (In the case of ensemble-cycles each team runs an ensemble but the ensemble runs its members sequentially.) Input: arg_dict: arg_dict == { 'obj_name' : string, either "member" or "cast" (or some other object), matches the object name used in the team_dict below (first argument) 'compose_dir': <pathlib.Path absolute path to the cycle top level/compse, dir where the individual cycle dirs are found>, 'team_dict' : <dict: the information needed for the team, see below> } where: team_dict == { object_name: key/name is either 'members' or 'casts', the value is a <list: groups of simulations to run either simulation objects or their run_dirs> 'nodes' : <list: the nodes previously parsed from something like$PBS_NODE_FILE>, 'exe_cmd' : <string: the MPI-specific model invokation command>, 'env' : <dict: containging the environment in which to run the cmds, may be None or 'None'> } The 'exe_cmd' is a form of invocation for the distribution of MPI to be used. For openmpi, for example for OpenMPI, this is exe_cmd: 'mpirun --host {nodelist} -np {nproc} {cmd}' The variables in brackets are expanded by internal variables. The 'exe_cmd' command substitutes the wrfhydropy of 'wrf_hydro.exe' convention for {cmd}. The {nproc} argument is the length of the list passed in the nodes argument, and the {nodellist} are the comma separated arguments in that list. The "entry_cmd" and "exit_cmd" ARE TAKEN FROM THE JOB object. 1) can be semicolon-separated commands 2) where these are run depends on MPI. OpenMPI, for example, handles these on the same processor set as the model runs. Notes: Currently this is working/tested with openmpi and intel mpi. MPT requires MPI_SHEPERD env variable and it's performance is not satisfactory so far. """ obj_name = arg_dict['obj_name'] compose_dir = arg_dict['compose_dir'] team_dict = arg_dict['team_dict'] exit_statuses = {} for obj in team_dict[obj_name]: if type(obj) is str: os.chdir(str(pathlib.Path(compose_dir) / obj)) else: os.chdir(str(pathlib.Path(compose_dir) / obj.run_dir)) # The cycle ensemble has an extra level of ensemble between the casts and the sims. # An ensemble and a non-ensemble-cycle have sim objects at this level have_cycle_ens = False object_pkl_file = pathlib.Path("WrfHydroSim.pkl") if not object_pkl_file.exists(): # But a cycle ensemble will have ensembles at this level.... have_cycle_ens = True object_pkl_file = pathlib.Path("WrfHydroEns.pkl") if not object_pkl_file.exists(): raise FileNotFoundError( "No appropriate pickle object for running " + obj_name + ".") object_pkl = pickle.load(open(object_pkl_file, "rb")) job = object_pkl.jobs[0] if job._entry_cmd is not None: entry_cmds = job._entry_cmd.split(';') new_entry_cmd = [] for cmd in entry_cmds: if 'mpirun' not in cmd: new_entry_cmd.append( # Switch out the ./wrf_hydro.exe cmd with each command. team_dict['exe_cmd'].format( **{ 'cmd': cmd, 'nodelist': team_dict['nodes'][0], # only use one task 'nproc': 1 } ) ) else: new_entry_cmd.append(cmd) job._entry_cmd = '; '.join(new_entry_cmd) if job._exit_cmd is not None: exit_cmds = job._exit_cmd.split(';') new_exit_cmd = [] for cmd in exit_cmds: if 'mpirun' not in cmd: new_exit_cmd.append( # Switch out the ./wrf_hydro.exe cmd with each command. team_dict['exe_cmd'].format( **{ 'cmd': cmd, 'nodelist': team_dict['nodes'][0], # only use one task 'nproc': 1 } ) ) else: new_exit_cmd.append(cmd) job._exit_cmd = '; '.join(new_exit_cmd) job._exe_cmd = team_dict['exe_cmd'].format( **{ 'cmd': './wrf_hydro.exe', 'nodelist': ','.join(team_dict['nodes']), 'nproc': len(team_dict['nodes']) } ) # This will write the cmd to be executed into the member dir. # with open('team_run_cmd', 'w') as opened_file: # opened_file.write(job._exe_cmd) object_pkl.pickle(object_pkl_file) if have_cycle_ens: # An ensemble-cycle neeeds the job components set on the simulations. # This object is acutally an ensemble... set_cycle_ens_sim_jobs(object_pkl, job) object_pkl.run(env=team_dict['env']) if have_cycle_ens: # An ensemble-cycle neeeds the job components set on the simulations. exit_status = get_cycle_ens_sim_job_exits(object_pkl) else: exit_status = object_pkl.jobs[0].exit_status exit_statuses.update({obj: exit_status}) return exit_statuses
def assign_teams( obj, teams_exe_cmd: str, teams_exe_cmd_nproc: int, teams_node_file: dict = None, scheduler: str = 'pbs', env: dict = None ) -> dict: """ Assign teams for parallel runs across nodes. Inputs: obj: The ensemble or cycle object, containin lists of members or casts to be run. teams_exe_cmd: str, The mpi-specific syntax needed. For example 'mpirun --host {nodelist} -np {nproc} {cmd}' teams_exe_cmd_nproc: int, The number of cores per model/wrf_hydro simulation to be run. teams_node_file: [str, pathlib.Path] = None, Optional file that acts like a node file. It is not currently implemented but the key specifies the scheduler format that the file follows. An example pbs node file is in tests/data and this argument is used here to test without a sched. env: dict = None, optional envionment to pass to the run. Outputs: dict: the teams_dict to be used by parallel_teams_run. See requirements above. """ if 'casts' in dir(obj): object_list = obj.casts object_name = 'casts' elif 'members' in dir(obj): object_list = obj.members object_name = 'members' n_runs = len(object_list) if scheduler is 'pbs': if teams_node_file is None: teams_node_file = os.environ.get('PBS_NODEFILE') pbs_nodes = [] # TODO: comment the target format here. with open(teams_node_file, 'r') as infile: for line in infile: pbs_nodes.append(line.rstrip()) n_total_processors = len(pbs_nodes) # less may be used. n_teams = min(math.floor(len(pbs_nodes) / teams_exe_cmd_nproc), n_runs) pbs_nodes_counts = dict(collections.Counter(pbs_nodes)) if n_teams == 0: raise ValueError("teams_exe_cmd_nproc > total number of cores available") if (n_teams > 1 and any([ teams_exe_cmd_nproc > val for val in pbs_nodes_counts.values()])): raise ValueError("teams_exe_cmd_nproc > number of cores/node: " 'teams does not currently function in this capacity.') # Map the objects on to the teams (this seems overly complicated, should prob # consider using pandas: teams_dict = {} # If the cast/ensemble is still in memory, this looks different. if isinstance(object_list[0], wrfhydropy.Simulation): object_dirs = [oo.run_dir for oo in object_list] else: object_dirs = object_list object_teams = [the_object % n_teams for the_object in range(n_runs)] object_team_seq = [[dir, team] for dir, team in zip(object_dirs, object_teams)] object_team_seq.sort(key=operator.itemgetter(1)) team_groups = itertools.groupby(object_team_seq, operator.itemgetter(1)) team_objects = [[item[0] for item in data] for (key, data) in team_groups] # Map the nodes on to the teams # Homogonization step here to avoid communication across nodes... # Sorting necessary for testing. unique_nodes = sorted([node for node in list(set(pbs_nodes))]) print("\n*** Team " + object_name + ' ***') print("Running on nodes: " + ', '.join(unique_nodes)) del pbs_nodes pbs_nodes = [] # This is a proposal for cross-node execution setup that seems to work # but it crashes. # if any([ teams_exe_cmd_nproc > val for val in pbs_nodes_counts.values()]): # pbs_nodes_avail = [ nn.split('.')[0] for nn in pbs_nodes_in] # # copy.deepcopy(pbs_nodes_in) # for i_team in range(n_teams): # the_team_nodes = [] # for ii in range(teams_exe_cmd_nproc): # the_team_nodes += [pbs_nodes_avail.pop(0)] # pbs_nodes += [the_team_nodes] # team_nodes = pbs_nodes # else: for i_team in range(n_teams): pbs_nodes = pbs_nodes + ( [unique_nodes[i_team % len(unique_nodes)]] * teams_exe_cmd_nproc) node_teams = [the_node // teams_exe_cmd_nproc for the_node in range(len(pbs_nodes))] node_team_seq = [[node, team] for node, team in zip(pbs_nodes, node_teams)] node_team_seq.sort(key=operator.itemgetter(1)) team_groups = itertools.groupby(node_team_seq, operator.itemgetter(1)) team_nodes = [[item[0] for item in data] for (key, data) in team_groups] # End else # Get the entry and exit commands from the job on the first cast/member # Foolery for in/out of memory if isinstance(object_list[0], str): # An ensemble and a non-ensemble-cycle have sim objects at this level pkl_file = obj._compose_dir / (object_list[0] + '/WrfHydroSim.pkl') if not pkl_file.exists(): # But a cycle ensemble will have ensembles at this level.... pkl_file = obj._compose_dir / (object_list[0] + '/WrfHydroEns.pkl') if not pkl_file.exists(): raise FileNotFoundError( "No appropriate pickle object for running " + object_name + ".") jobs = pickle.load(pkl_file.open('rb')).jobs else: jobs = object_list[0].jobs if len(jobs) > 1: raise ValueError('Teams runs only support single job simulations') entry_cmd = jobs[0]._entry_cmd exit_cmd = jobs[0]._entry_cmd # Assign teams! for team in range(n_teams): teams_dict.update({ team: { object_name: team_objects[team], 'nodes': team_nodes[team], 'entry_cmd': entry_cmd, 'exit_cmd': exit_cmd, 'exe_cmd': teams_exe_cmd, 'env': env } }) print('\nPBS_NODE_FILE present: ') print(' ' + str(len(unique_nodes)) + ' nodes with') print(' ' + str(n_total_processors) + ' TOTAL processors requested.') print('\nTeams parallelization:') print(' ' + str(n_runs) + ' total ' + object_name) print(' ' + str(n_teams) + ' concurrent teams using') print(' ' + str(teams_exe_cmd_nproc) + ' processors each.') print('\nTeams dict:') pprint(teams_dict) print('\n') return teams_dict