Source code for wrfhydropy.util.xrcmp

#!/usr/bin/env python3

# Example Usage
# ipython --pdb  xrcmp.py -- \
#     --candidate conus_test/201806012300.RTOUT_DOMAIN1 \
#     --reference conus_test/201806020000.RTOUT_DOMAIN1 \
#     --n_cores 8 \
#     --log_file log.txt

import math
from multiprocessing import Pool
import pathlib
import sys
# import time
import xarray as xr


# A dictionary of chunks for various variables for CONUS testing
# These are for the larger fields which need some control
conus_chunks_dict = {
    # RTOUT variables to control
    'SOIL_M': {},  # with {} maxes out at < 18% memory when files do NOT match
    # HYDRO_RST variables: None currently
}


# # A decorator/closure to check timings.
# def stopwatch(the_func):
#     def the_closure(*args, **kw):
#         ts = time.time()
#         result = the_func(*args, **kw)
#         te = time.time()
#         print('Timing: ' + the_func.__name__ + ' took ', round(te - ts, 2),' seconds.')
#         return result
#     return the_closure


def calc_stats(arg_tuple):
    key = arg_tuple[0]
    can_file = arg_tuple[1]
    ref_file = arg_tuple[2]
    chunks = arg_tuple[3]
    exclude_vars = arg_tuple[4]

    # ignore excluded vars
    if key in exclude_vars:
        return None

    if chunks is None:
        chunks = {}  # default is no chunks
        if key in conus_chunks_dict:
            chunks = conus_chunks_dict[key]

    can_ds = xr.open_dataset(can_file, chunks=chunks, mask_and_scale=False)
    ref_ds = xr.open_dataset(ref_file, chunks=chunks, mask_and_scale=False)

    # Check for variables in reference and not in candidate?
    # Check for variables in candidate and not in reference?

    if can_ds[key].equals(ref_ds[key]):
        return None

    else:
        cc = can_ds[key]
        rr = ref_ds[key]

        if '|S' in str(cc.dtype):

            # Deal with strings
            nz_xr = cc.where(cc != rr, drop=True)
            if len(nz_xr) == 0:
                return None
            else:
                the_count = nz_xr.count().load().item(0)
                inf = float('inf')
                result = {
                    'Variable': key,
                    'Count': the_count,
                    'Sum': inf,
                    'Min': inf,
                    'Max': inf,
                    'Range': inf,
                    'Mean':  inf,
                    'StdDev': inf
                }
                return result

        else:
            # All non-string types
            cc = cc.astype(float)
            rr = rr.astype(float)

            # THIS NEEDS REMOVED AFTER TESTING IS COMPLETE
            # FOR convenience of comparing two files at different times.
            # if 'time' in rr.coords:
            #    rr['time'] = cc.time
            # if key == 'time':
            #    rr.values = cc.values

            diff_da = cc - rr
            diff_xr = xr.DataArray(diff_da.compute())
            # TODO: This threshold should be type dependent
            nz_xr = diff_xr.where(abs(diff_xr) > 0.000000, drop=True)
            if len(nz_xr) == 0:
                return None

            the_count = nz_xr.count().load().item(0)
            the_sum = nz_xr.sum().load().item(0)
            the_min = nz_xr.min().load().item(0)
            the_max = nz_xr.max().load().item(0)
            the_range = the_max - the_min
            the_mean = the_sum / the_count
            the_z = (nz_xr - the_mean)
            the_std = math.sqrt((the_z * the_z).sum() / the_count)
            del the_z

            result = {
                'Variable': key,
                'Count': the_count,
                'Sum': the_sum,
                'Min': the_min,
                'Max': the_max,
                'Range': the_range,
                'Mean':  the_mean,
                'StdDev': the_std
            }
            return result


# @stopwatch
[docs] def xrcmp( can_file: str, ref_file: str, log_file: str, n_cores: int = 1, chunks={}, exclude_vars: list = [], ) -> int: if exclude_vars is None: exclude_vars = [] # Delete log file first # Should write a log file that says nothing yet determined? log_file = pathlib.Path(log_file) if log_file.exists(): log_file.unlink() # Dont chunk, this is just a meta-data read. can_ds = xr.open_dataset(can_file) ref_ds = xr.open_dataset(ref_file) # May need to check that they have the same vars. can_vars = set([kk for kk in can_ds.variables.keys()]) ref_vars = set([kk for kk in ref_ds.variables.keys()]) have_same_variables = can_vars.difference(ref_vars) == set([]) can_ds.close() # These are likely critical to the success ref_ds.close() # of multiprocessing # TODO: Check that the meta data matches # This is quick if not true # ds_equal = can_ds.equals(re_ds) # if not ds_equal: if n_cores == 1: all_stats_list = [] for key, val in can_ds.items(): result = calc_stats( (key, can_file, ref_file, chunks, exclude_vars)) all_stats_list.append(result) else: the_args = [ (key, can_file, ref_file, chunks, exclude_vars) for key in can_ds.keys()] with Pool(n_cores) as pool: all_stats_list = pool.map(calc_stats, the_args) all_stats = {item['Variable']: item for item in all_stats_list if item is not None} diff_var_names = sorted(all_stats.keys()) if not diff_var_names: with open(log_file, 'w') as opened_file: opened_file.write("Files are identical\n") return 0 # Formatting: # The goal is to print something like this which is what nccmp outputs. # channel_rt # Variable Group Count Sum ... Max Range Mean StdDev # 0 streamflow / 162 0.003022 ... 0.003832 0.004315 0.000019 0.000361 # 1 nudge / 4 -0.001094 ... 0.000093 0.001272 -0.000274 0.000605 # 2 q_lateral / 170 0.000345 ... 0.000700 0.001145 0.000002 0.000086 # 3 velocity / 165 0.010788 ... 0.005488 0.006231 0.000065 0.000503 # 4 Head / 177 0.002717 ... 0.002662 0.003292 0.000015 0.000258 stat_names = sorted(all_stats[diff_var_names[0]].keys()) stat_lens = {} # the length/width of each column/stat n_dec = 3 # number of decimals for floats n_dec_p = n_dec + 1 # plus the decimal point # The format for each type, where full_len sepcifices the width of the field. type_fmt = { str: '{{:{full_len}}}', int: '{{:{full_len}}}', float: '{{:{full_len}.' + str(n_dec) + 'f}}' } # Now solve the full_len field widths for all stats. Do this by # just formatting each as it's type and finding the max (best way # to handle negatives). For floats, take the integer part to find # its length to the left of the decimal. for stat_name in stat_names: all_lens = [] for key, val in all_stats.items(): the_val = val[stat_name] the_type = type(the_val) the_fmt0 = type_fmt[the_type] if the_type is str: full_len = len(the_val) elif not math.isfinite(the_val): full_len = len(str(the_val)) else: full_len = len(str(int(the_val))) if the_type is float: full_len = full_len + n_dec_p the_fmt = the_fmt0.format(**{'full_len': full_len}) the_string = the_fmt.format(*[the_val]) all_lens.append(len(the_string)) stat_lens[stat_name] = max(all_lens) header_string = ( '{Variable:>' + str(stat_lens['Variable']) + '} ' '{Count:>' + str(stat_lens['Count']) + '} ' '{Sum:>' + str(stat_lens['Sum']) + '} ' '{Min:>' + str(stat_lens['Min']) + '} ' '{Max:>' + str(stat_lens['Max']) + '} ' '{Range:>' + str(stat_lens['Range']) + '} ' '{Mean:>' + str(stat_lens['Mean']) + '} ' '{StdDev:>' + str(stat_lens['StdDev']) + '} \n' ) var_string = ( '{Variable:>' + str(stat_lens['Variable']) + '} ' '{Count:>' + str(stat_lens['Count']) + '} ' '{Sum:>' + str(stat_lens['Sum']) + '.' + str(n_dec) + 'f} ' '{Min:>' + str(stat_lens['Min']) + '.' + str(n_dec) + 'f} ' '{Max:>' + str(stat_lens['Max']) + '.' + str(n_dec) + 'f} ' '{Range:>' + str(stat_lens['Range']) + '.' + str(n_dec) + 'f} ' '{Mean:>' + str(stat_lens['Mean']) + '.' + str(n_dec) + 'f} ' '{StdDev:>' + str(stat_lens['StdDev']) + '.' + str(n_dec) + 'f} \n' ) header_dict = {name: name for name in stat_names} the_header = header_string.format(**header_dict) with open(log_file, 'w') as opened_file: opened_file.write(the_header) for key in all_stats.keys(): opened_file.write(var_string.format(**all_stats[key])) return 1
def parse_arguments(): import argparse parser = argparse.ArgumentParser() parser.add_argument( "--candidate", metavar="FILE", type=str, required=True, help="Candidate file to compare." ) parser.add_argument( "--reference", metavar="FILE", type=str, required=True, help="Reference file to compare." ) parser.add_argument( "--log_file", metavar="FILE", type=str, required=True, help="File to log potential differences to. " "Existing file is clobbered." ) parser.add_argument( "--n_cores", metavar="n_cores", type=int, required=False, default=1, help="The number of processors to use." ) parser.add_argument( "--chunks", metavar="chunks", type=int, required=False, default=1, help="Chunks as integer." ) args = parser.parse_args() can_file = args.candidate ref_file = args.reference log_file = args.log_file chunks = args.chunks n_cores = args.n_cores if chunks == 1: chunks = {} # No chunking elif chunks == 0: chunks = None # This will use the conus_chunks_dict return can_file, ref_file, log_file, chunks, n_cores if __name__ == "__main__": can_file, ref_file, log_file, chunks, n_cores = parse_arguments() ret = xrcmp( can_file=can_file, ref_file=ref_file, log_file=log_file, n_cores=n_cores, chunks=chunks ) sys.exit(ret)