Source code for wrfhydropy.core.schedulers

# Note: All other imports for individual schedulers should be done in the respective scheduler
# class functions so that imports can be isolated to relevant schedulers

from abc import ABC, abstractmethod


[docs]class Scheduler(ABC): def __init__(self): super().__init__()
[docs] @abstractmethod def schedule(self, jobs): pass
[docs]class PBSCheyenne(Scheduler): """A Scheduler object compatible with PBS on the NCAR Cheyenne system."""
[docs] def __init__( self, account: str, nproc: int, nnodes: int, mem: int = None, ppn: int = None, queue: str = 'regular', walltime: str = "12:00:00", email_who: str = None, email_when: str = 'abe', custom: dict = {} ): """Initialize an PBSCheyenne object. Args: account: The account string nproc: Number of processors to request nnodes: Number of nodes to request ppn: Number of processors per node mem: Memory in GB usage/request on node (109 for fat nodes). email_who: Email address for PBS notifications email_when: PBS email frequency options. Options include 'a' for on abort, 'b' for before each job, and 'e' for after each job. queue: The queue to use, options are 'regular', 'premium', and 'shared' walltime: The wall clock time in HH:MM:SS format, max time is 12:00:00 """ # Declare attributes. # property construction self._sim_dir = None self._nproc = nproc self._nnodes = nnodes self._ppn = ppn # Scheduler options dict # TODO: Make this more elegant than hard coding for maintenance sake self.scheduler_opts = { 'account': account, 'email_when': email_when, 'email_who': email_who, 'queue': queue, 'walltime': walltime, 'mem': mem, 'custom': custom }
[docs] def schedule(self, jobs: list): """Schedule one or more jobs using the scheduler scheduler Args: jobs: list of jobs to schedule """ import subprocess import shlex import pathlib import os current_dir = pathlib.Path(os.curdir) # TODO: Find a way to protect the job order so that once someone executes schedule... # they can't change the order, may not be an issue except for if scheduling fails # somewhere self._write_job_pbs(jobs=jobs) # Make lists to store pbs scripts and pbs job ids to get previous dependency pbs_jids = [] pbs_scripts = [] qsub_str = "/bin/bash -c '" for job_num, option in enumerate(jobs): # This gets the pbs script name and pbs jid for submission # the obs jid is stored in a list so that the previous jid can be retrieved for # dependency job_id = jobs[job_num].job_id pbs_scripts.append(str(jobs[job_num].job_dir) + "/job_" + job_id + ".pbs") pbs_jids.append("job_" + job_id) # If first job, schedule using hold if job_num == 0: qsub_str += pbs_jids[job_num] + "=`qsub -h " + pbs_scripts[job_num] + "`;" # Else schedule using job dependency on previous pbs jid else: qsub_str += pbs_jids[job_num] + "=`qsub -W depend=afterok:${" + pbs_jids[ job_num-1] + "} " + pbs_scripts[job_num] + "`;" qsub_str += "qrls ${" + pbs_jids[0] + "};" qsub_str += "'" # Just for debugging purposes print("qsub_str: ", qsub_str) # This stacks up dependent jobs in PBS in the same order as the job list subprocess.run(shlex.split(qsub_str), cwd=str(current_dir))
def _write_job_pbs(self, jobs): """Private method to write bash PBS scripts for submitting each job """ import copy import sys # Get the current pytohn executable to handle virtual environments in the scheduler python_path = sys.executable for job in jobs: # Copy the job because the exe cmd is edited below job = copy.deepcopy(job) custom = self.scheduler_opts['custom'] # Write PBS script jobstr = "" jobstr += "#!/bin/sh\n" jobstr += "#PBS -N {0}\n".format(job.job_id) jobstr += "#PBS -A {0}\n".format(self.scheduler_opts['account']) jobstr += "#PBS -q {0}\n".format(self.scheduler_opts['queue']) if self.scheduler_opts['email_who'] is not None: jobstr += "#PBS -M {0}\n".format(self.scheduler_opts['email_who']) jobstr += "#PBS -m {0}\n".format(self.scheduler_opts['email_when']) jobstr += "\n" if '-l' not in custom or ( '-l' in custom and 'walltime' not in custom['-l']): jobstr += "#PBS -l walltime={0}\n".format(self.scheduler_opts['walltime']) if '-l' not in custom or ( '-l' in custom and 'select' not in custom['-l']): prcstr = "select={0}:ncpus={1}:mpiprocs={1}" prcstr = prcstr.format(self.nnodes, self.ppn) if self.scheduler_opts['mem'] is not None: prcstr = prcstr + ":mem={0}GB" prcstr = prcstr.format(self.scheduler_opts['mem']) prcstr = prcstr jobstr += "#PBS -l " + prcstr + "\n" jobstr += "\n" if '-l' in custom: jobstr += "#PBS -l " + custom['-l'] + "\n" jobstr += "\n" jobstr += "# Not using PBS standard error and out files to capture model output\n" jobstr += "# but these files might catch output and errors from the scheduler.\n" jobstr += "#PBS -o {0}\n".format(job.job_dir) jobstr += "#PBS -e {0}\n".format(job.job_dir) jobstr += "\n" # End PBS Header # if job.modules: # jobstr += 'module purge\n' # jobstr += 'module load {0}\n'.format(job.modules) # jobstr += "\n" jobstr += "# CISL suggests users set TMPDIR when running batch jobs on Cheyenne.\n" jobstr += "export TMPDIR=/glade/scratch/$USER/temp\n" jobstr += "mkdir -p $TMPDIR\n" jobstr += "\n" if self.scheduler_opts['queue'] == 'share': jobstr += "export MPI_USE_ARRAY=false\n" jobstr += "{0} run_job.py --job_id {1}\n".format(python_path, job.job_id) jobstr += "exit $?\n" pbs_file = job.job_dir.joinpath("job_" + job.job_id + ".pbs") with pbs_file.open(mode='w') as f: f.write(jobstr) # Write the python run script for the job if '{nproc}' in job._exe_cmd: # If the job exe uses "nproc" then apply the schedulers value. job._exe_cmd = job._exe_cmd.format(**{'nproc': self.nproc}) else: # regression tests use "{0}" format, try that here too job._exe_cmd = job._exe_cmd.format(self.nproc) job._write_run_script() def _solve_nodes_cores(self): """Private method to solve the number of nodes and cores if not all three specified""" import math if not self._nproc and self._nnodes and self._ppn: self._nproc = self._nnodes * self._ppn if not self._nnodes and self._nproc and self._ppn: self._nnodes = math.ceil(self._nproc / self._ppn) if not self._ppn and self._nnodes and self._nproc: self._ppn = math.ceil(self._nproc / self._nnodes) if None in [self._nproc, self._nnodes, self._ppn]: raise ValueError("Not enough information to solve all of nproc, nnodes, ppn.") @property def nproc(self): self._solve_nodes_cores() return self._nproc @nproc.setter def nproc(self, value): self._nproc = value @property def nnodes(self): self._solve_nodes_cores() return self._nnodes @nnodes.setter def nnodes(self, value): self._nnodes = value @property def ppn(self): self._solve_nodes_cores() return self._ppn @ppn.setter def ppn(self, value): self._ppn = value