Source code for cathy_tools

"""
Main class controlling the wrapper.

This class is managing three main components: 
    
1. Reading/writing inputs and output files 

We used a generic formalation for the function's names based on the CATHY files. 

Example:
-------
    In order to update the soil file:
        `update_soil(arguments)`
    In order to update the atmbc file:
        `update_atmbc(arguments)`
        
    Note that the name of the arguments are similar to the name of the variable to update
    In order to update the parm file with a new minimum time step DTMIN:
        `update_parm(DTMIN=1e3)`
        
The update function is composed of three main actions:
    - Set the defaut parameters (it is actually not reading the input file to set the defaut parameters)
    - replace values by the new ones introduced via the function argument
    - write the new file (it overwrites the existing file)
    
Note also that updating an input file may affect the CATHY.H control file. 
When needed, the code takes care of updating values of the CATHY.H file retroactivelly.

Remenber to update all your prepo files before calling `run_preprocessor()`, and
all you input files before calling `run_processor()` 

2. Compiling the fortran files
3. Running executable

Once all the files were updated, the `run_preprocessor()` or `run_processor()` 
are taking care of recompiling the source files via bash cmd.
   
"""

import os
import sys
import warnings
from pathlib import Path

import numpy as np
from git import Repo  # In order to fetch the file directly from the repo

import pyCATHY.meshtools as mt
from pyCATHY.importers import cathy_inputs as in_CT
from pyCATHY.importers import cathy_outputs as out_CT
from pyCATHY.plotters import cathy_plots as plt_CT

warnings.filterwarnings("ignore", category=DeprecationWarning)

import glob
import pickle
import re
import shutil
import subprocess
import time  # measure time of computation
from collections import OrderedDict
from os import listdir
from os.path import isfile, join

import pandas as pd
import pyvista as pv  # read .vtk files
import rich.console
from rich import print

import matplotlib.pyplot as plt
from scipy.interpolate import Rbf

# -----------------------------------------------------------------------------
[docs] def subprocess_run_multi(pathexe_list): """ Run multiple exe files in parallel. Multiprocessor functions outside main CATHY object # https://stackoverflow.com/questions/44144584/typeerror-cant-pickle-thread-lock-objects """ print(f"x= {pathexe_list}, PID = {os.getpid()}") os.chdir(pathexe_list) callexe = "./" + "cathy" with warnings.catch_warnings(): warnings.simplefilter("ignore") p = subprocess.run( [callexe], text=True, # capture_output=True stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) return p
[docs] def make_console(verbose): """ Start up the :class:`rich.console.Console` instance we'll use. Parameters ---------- verbose : bool Whether or not to print status messages to stderr. """ return rich.console.Console(stderr=True, quiet=not verbose)
# -----------------------------------------------------------------------------
[docs] class CATHY: """ Main CATHY object. When instantiated it creates the tree project directories with 'prj_name' as root folder. The src files are fetched from the online repository if not existing (note that it is possible to call a specific version). """
[docs] def __init__( self, dirName=None, prj_name="my_cathy_prj", notebook=False, version="1.0.0", verbose=True, **kwargs, ): """ Create CATHY object. ..note: All variables in CAPITAL LETTERS use the semantic from the CATHY legacy fortran codes, while the others are created exclusively for the wrapper. ..note: All file variable are self dictionnary objects; example: soil file is contained in self.soil """ self.t0 = time.time() # executation time estimate self.console = make_console(verbose) self.console.print(":checkered_flag: [b]Initiate CATHY object[/b]") self.verbose = False # flag for notebook execution # --------------------------------------------------------------------- self.notebook = notebook # flag if the script is run in a notebook # create working and project dir # --------------------------------------------------------------------- if dirName is None: self.workdir = os.path.join(os.getcwd()) else: self.workdir = os.path.join(os.getcwd(), dirName) # change working directory if not os.path.exists(os.path.join(self.workdir)): os.makedirs(self.workdir, exist_ok=True) # os.chdir(self.workdir) self.project_name = prj_name if not os.path.exists(os.path.join(self.workdir, self.project_name)): os.makedirs(os.path.join(self.workdir, self.project_name), exist_ok=True) # convention for directory tree # --------------------------------------------------------------------- self.processor_name = "cathy" self.input_dirname = "input" self.output_dirname = "output" # dict related to CATHY inputs files # --------------------------------------------------------------------- self.parm = {} # dict of parm input parameters self.soil = {} # dict of soil input parameters self.ic = {} # dict of ic input parameters self.cathyH = {} # dict of cathyH C header parameters # self.nudging = {'NUDN': 0} # Temporary # THIS SHOULD BE REMOVED # # dict related to Data assimilation # --------------------------------------------------------------------- self.DAFLAG = False # Flag to trigger data assimilation process # (self.dict_obs is assimiliatiom times size) self.dict_obs = OrderedDict() # dictionnary containing all the observation data # (self.stacked_data_cov is data_size * assimiliatiom times size) self.count_DA_cycle = None # counter of the Assimilation Cycle # self.df_performance = pd.DataFrame() # pandas dataframe with performence evaluation metrics # dict related to the mesh # --------------------------------------------------------------------- self.grid3d = {} for key, value in kwargs.items(): # clear src files if required # --------------------------------------------------------------------- if key == "clear_src": # clear src files if value == True: if os.path.exists( os.path.join(self.workdir, self.project_name, "src") ): print("clear src files") shutil.rmtree( os.path.join(self.workdir, self.project_name, "src") ) shutil.rmtree( os.path.join(self.workdir, self.project_name, "input") ) shutil.rmtree( os.path.join(self.workdir, self.project_name, "prepro") ) # fetch src files if not existing from Gitbucket repo # --------------------------------------------------------------------- if not os.path.exists(os.path.join(self.workdir, self.project_name, "src")): self.console.print(":worried_face: [b]src files not found[/b]") self.console.print("working directory is:" + str((self.workdir))) if version == "1.0.0": try: # Repo.clone_from( # "https://bitbucket.org/cathy1_0/cathy.git", # os.path.join(self.workdir, self.project_name, "tmp_src"), # branch="master", #) Repo.clone_from( "https://github.com/CATHY-Org/CATHY_src_org.git", os.path.join(self.workdir, self.project_name, "tmp_src"), branch="main", ) self.console.print(":inbox_tray: [b]Fetch cathy src files[/b]") shutil.move( os.path.join(self.workdir, self.project_name, "tmp_src/src"), os.path.join(self.workdir, self.project_name, "src"), ) pathsrc = os.path.join( self.workdir, self.project_name, "tmp_src/runs/weilletal/" ) onlyfiles = [ f for f in listdir(pathsrc) if isfile(join(pathsrc, f)) ] for ( file ) in ( onlyfiles ): shutil.move( os.path.join(pathsrc, file), os.path.join(self.workdir, self.project_name, file), ) except: print("no internet connection to fetch the files") sys.exit() pass if version == "G. Manoli": print("fetch cathy G. Manoli src files") path_manoli = "/home/ben/Documents/CATHY/CathyGitbucket/Test_Gabriele/1_Gabriele_Piante_NON_modificato/CATHY_RWU_ABL_1D/" shutil.copytree( path_manoli, os.path.join(self.workdir, self.project_name, "src") ) if not os.path.exists(os.path.join(self.workdir, self.project_name, "prepro")): self.console.print( ":inbox_tray: [b]Fetch cathy prepro src files[/b]" ) shutil.move( os.path.join( self.workdir, self.project_name, "tmp_src/runs/weilletal/prepro", ), os.path.join(self.workdir, self.project_name, "prepro"), ) # os.remove(os.path.join( # self.workdir, self.project_name, "prepro") + '/dem' ) if not os.path.exists(os.path.join(self.workdir, self.project_name, "input")): self.console.print(":inbox_tray: [b]Fetch cathy input files[/b]") shutil.move( os.path.join( self.workdir, self.project_name, "tmp_src/runs/weilletal/input", ), os.path.join(self.workdir, self.project_name, "input"), ) if not os.path.exists(os.path.join(self.workdir, self.project_name, "output")): self.create_output(output_dirname="output") shutil.rmtree( os.path.join(self.workdir, self.project_name, "tmp_src") ) os.remove(os.path.join(self.workdir, self.project_name, "readme.txt")) # clear output files if required # --------------------------------------------------------------------- for key, value in kwargs.items(): if key == "clear_outputs": if value == True: if not os.path.exists( os.path.join(self.workdir, self.project_name, "output") ): self.create_output(output_dirname="output") else: shutil.rmtree( os.path.join(self.workdir, self.project_name, "output") ) shutil.rmtree( os.path.join(self.workdir, self.project_name, "vtk") ) self.create_output(output_dirname="output") pass
# --------------------------------------------------------------------------- # # replace console cmd by a subprocess call # --------------------------------------------------------------------------- # def run_preprocessor(self, KeepOutlet=True, verbose=False, **kwargs): """ Run cppp.exe 1. updates cathy parameters prepo 2. recompile the source files prepo 3. run the preprocessor using bash cmd Running the executable file has allowed to generate a complete set of files describing physiographic features of the drainage system, as shown in Table 2. """ self.console.print(":cooking: [b]gfortran compilation[/b]") for loopi in range(2): # run it twice (to avoid the first error) os.chdir(os.path.join(self.workdir, self.project_name, "prepro/src/")) # clean all files compiled for file in glob.glob("*.o"): os.remove(file) # if self.notebook==False: bashCommand = "gfortran -O -o pycppp mpar.f90 mbbio.f90 wbb_sr.f90 csort.f90 qsort.f90 depit.f90 cca.f90 smean.f90 dsf.f90 facet.f90 hg.f90 mrbb_sr.f90 bb2shp_sr.f90 shape.f90 dbase.f90 streamer.f90 cppp.f90" try: p = os.system(bashCommand + "> /dev/null 2>&1") # run it twice (to avoid the first error) p = os.system(bashCommand + "> /dev/null 2>&1") except: print("bash cmd not recognized") os.chdir(self.workdir) # move to the directory where the source FORTRAN files are contained (cathy_main.f) shutil.move( os.path.join(self.workdir, self.project_name, "prepro/src/pycppp"), os.path.join(self.workdir, self.project_name, "prepro/pycppp"), ) self.console.print(":athletic_shoe: [b]Run preprocessor[/b]") os.chdir(os.path.join(self.workdir, self.project_name, "prepro")) bashcmd = "./pycppp" my_data = "2\n0\n1\n" # user input data with warnings.catch_warnings(): warnings.simplefilter("ignore") p = subprocess.run( [bashcmd], text=True, input=my_data, # capture_output=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) if verbose: print(p.stdout) print(p.stderr) if "DEM resolution is not a multiple of the rivulet spacing!" in p.stdout: print("DEM resolution is not a multiple of the rivulet spacing!") sys.exit() if "catchment with more than one outlet cell!" in p.stdout: print("catchment with more than one outlet cell!") sys.exit() if KeepOutlet == False: print("remove outlet") idoutlet = np.where(self.DEM == min(np.unique(self.DEM))) self.DEM[idoutlet[0], idoutlet[1]] = max(np.unique(self.DEM)) if hasattr(self, 'DEM') is False: DEM_mat, DEM_header = in_CT.read_dem( os.path.join(self.workdir, self.project_name, "prepro/dem"), os.path.join(self.workdir, self.project_name, "prepro/dtm_13.val"), ) self.DEM = DEM_mat # if hasattr(self, 'hapin') is False: # self.update_cathyH( # ROWMAX=self.hapin['N'], # COLMAX=self.hapin['M'] # ) # if np.shape(self.DEM)[1]==self.hapin['N']: # self.console.rule(''':warning: !transposing DEM dimension! # This should be avoided in a future version # :warning:''', # style="yellow") # self.update_prepo_inputs(N=self.hapin['M'], # M=self.hapin['N']) # self.update_zone() os.chdir(self.workdir) pass def recompileSrc(self, verbose=False): """ Other option is self.run_processor(runProcess=False) """ ti = time.time() # executation time estimate self.console.print( ":hammer_and_wrench: [b] Recompile src files[/b] [" + str(int(abs(ti - self.t0))) + "s]" ) # gfortran_executable = 'gfortran.exe' # # Execute the where command to find the path to gfortran # where_command = ['where', gfortran_executable] # result = subprocess.run(where_command, stdout=subprocess.PIPE, shell=True, check=True) # # Get the path to gfortran from the output of the where command # gfortran_path = result.stdout.decode('utf-8').strip().split('\r\n')[0] # clean all files previously compiled for file in glob.glob("*.o"): os.remove(file) # list all the fortran files to compile and compile for file in glob.glob("*.f"): # filepath = Path(self.workdir) / self.project_name / 'src' / str(file) filepath = str(file) bashCommand = "gfortran -c " + str(filepath) # gfortran -c *.f # gfortran *.o -L\MinGW\lib -llapack -lblas -o cathy # print(bashCommand) with warnings.catch_warnings(): warnings.simplefilter("ignore") process = subprocess.Popen( bashCommand.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE ) # if verbose==True: # output, error = process.communicate() output, error = process.communicate() # list all the fortran compiled files to compile and run files = "" for file in glob.glob("*.o"): # files += " " + os.path.join(self.workdir, self.project_name, 'prepro/src/' + str(file)) files += " " + str(file) bashCommand = "gfortran" + files + " -llapack -lblas -o " + self.processor_name ti = time.time() # executation time estimate self.console.print( ":cooking: [b]gfortran compilation[/b] [" + str(int(abs(ti - self.t0))) + "s]" ) with warnings.catch_warnings(): warnings.simplefilter("ignore") process = subprocess.Popen( bashCommand.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE ) output, error = process.communicate() print(error) try: shutil.move( os.path.join( self.workdir, self.project_name, "src", self.processor_name ), os.path.join(self.workdir, self.project_name, self.processor_name), ) except: self.console.print(":pensive_face: [b]Cannot find the new processsor[/b]") pass def run_processor(self, recompile=True, runProcess=True, verbose=False, **kwargs): """ Run cathy.exe 1. updates cathy parameters and cathyH based on **kwargs 2. recompile the source files (set False for notebook) and create the executable 3. run the processor using bash cmd if runProcess is True Parameters ---------- recompile : Bool, optional recomplile CATHY src files. The default is True. runProcess : Bool, optional exectute compile CATHY exe. The default is True. verbose : Bool, optional Output CATHY log. The default is False. """ # set parallel flag for mutirun (used for DA ensemble) # -------------------------------------------------------------------- parallel = False if "parallel" in kwargs: parallel = kwargs["parallel"] # update parm and cathyH # -------------------------------------------------------------------- # VERY VERY IMPORTANT NEVER COMMENT ! self.check_DEM_versus_inputs() # to uncomment # if len(kwargs)>0: self.update_parm(**kwargs) # to uncomment self.update_cathyH(**kwargs) ### to uncomment # self.cathyH if recompile: # recompile # -------------------------------------------------------------------- os.chdir(os.path.join(self.workdir, self.project_name, "src")) self.recompileSrc() if "path_CATHY_folder" in kwargs: os.chdir(kwargs["path_CATHY_folder"]) else: # back to root dir os.chdir(os.path.join(self.workdir, self.project_name)) # some checkings # -------------------------------------------------------------------- try: os.path.exists(os.path.join("inputs")) except OSError: print("input folder missing") sys.exit() # check if output folder if not os.path.exists("output"): os.makedirs("output", exist_ok=True) # check if vtk folder if not os.path.exists("vtk"): os.makedirs("vtk", exist_ok=True) # run the processor # -------------------------------------------------------------------- if runProcess: # t0 = time.time() # executation time estimate self.console.print(":athletic_shoe: [b]Run processor[/b]") callexe = "./" + self.processor_name # case of simple simulation # ---------------------------------------------------------------- with warnings.catch_warnings(): warnings.simplefilter("ignore") p = subprocess.run( [callexe], stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) # process = subprocess.Popen( # [callexe], stdout=subprocess.PIPE, stderr=subprocess.PIPE # ) # # if verbose==True: # # output, error = process.communicate() # output, error = process.communicate() # Redirecting output to /dev/null or NUL # output_redirection = "> /dev/null 2>&1" if os.name != 'nt' else "> NUL 2>&1" # # Execute the command without displaying the shell # os.system(f"{callexe} {output_redirection}") # result = os.popen(f"{callexe} {output_redirection}").read() # p = subprocess.run( # [callexe], # stdout=subprocess.DEVNULL, # stderr=subprocess.DEVNULL, # ) # p = subprocess.Popen([callexe], # ) # p = subprocess.Popen([callexe], # stdout=subprocess.PIPE, # stderr=subprocess.PIPE # ) # warnings.simplefilter("ignore") # p = subprocess.Popen([callexe], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # p = subprocess.run( # [callexe], # text=True, # capture_output=True, # check=True # ) # print('verbose='+ str(verbose)) if verbose: # stdout, stderr = p.communicate() # This will block until the process finishes # print(result) print(p.stdout) # print(p.stderr) os.chdir(os.path.join(self.workdir)) try: self.grid3d = out_CT.read_grid3d( os.path.join(self.workdir, self.project_name, 'output', 'grid3d') ) except: self.run_processor(IPRT1=3) # if not "nnod3" in self.grid3d.keys(): # self.run_processor(IPRT1=3) # computation time # ---------------------------------------------------------------- # t1 = time.time() # self.total_computation_time = t1 - t0 return def create_output(self, output_dirname="output"): """ Create output directories Parameters ---------- output_dirname : str, optional Name of the dir. The default is "output". Returns ------- None. """ # create project_name/output folders if not os.path.exists(os.path.join(self.project_name, output_dirname)): os.mkdir(os.path.join(self.workdir, self.project_name, output_dirname)) if not os.path.exists(os.path.join(self.project_name, "vtk")): os.mkdir(os.path.join(self.workdir, self.project_name, "vtk")) # compute the processor (make clean make) using the Makefile pass def display_time_run(self): return self.total #%% Checkings def check_DEM_versus_inputs(self): """ Check shape consistency between attributes and DEM """ try: if hasattr(self, "veg_map"): if np.shape(self.veg_map) != np.shape(self.DEM): raise ValueError( "Inconsistent shapes between vegetation map and DEM - need to update veg first" ) except: self.update_veg_map(indice_veg=np.ones(np.shape(self.DEM))) pass #%% # --------------------------------------------------------------------------- # # update files # --------------------------------------------------------------------------- # def update_cathyH(self, verbose=False, **kwargs): """ Update CathyH file input based on **kwargs parameters. Parameters ---------- **kwargs : all variable parameters included into cathyH. For instance ROWMAX # maximum NROW, with NROW = number of rows in the DEM. Returns ------- type Write a modified cathyH file based on new parameters parsed """ indent = "" for kk, value in kwargs.items(): if kk == "indent": indent = value if verbose: self.console.print( indent + ":arrows_counterclockwise: [b]Update cathyH files[/b]" ) CATHYH_file = open( os.path.join(self.workdir, self.project_name, "src", "CATHY.H"), "r" ) Lines0_109 = CATHYH_file.readlines() CATHYH_file.close() # check if hapin and parm exist # create them if not existing if hasattr(self, "hapin") is False: self.update_prepo_inputs() if hasattr(self, "dem_parameters") is False: self.update_dem_parameters() if hasattr(self, "veg_map") is False: self.update_veg_map() if "NR" not in self.parm: self.console.rule( ":warning: warning messages above :warning:", style="yellow" ) self.console.print( r""" The parm dictionnary is empty Falling back to defaults to update CATHYH This can have consequences !! """, style="yellow", ) self.console.rule("", style="yellow") self.update_parm() DEMRES = 1 ROWMAX = self.hapin["M"] COLMAX = self.hapin["N"] if len(self.cathyH) == 0: self.cathyH = { # "ROWMAX": self.hapin["N"], # maximum NROW, with NROW = number of rows in the DEM # "COLMAX": self.hapin["M"], # maximum NCOL, with NCOL = number of columns in the DEM "ROWMAX": ROWMAX, # maximum NROW, with NROW = number of rows in the DEM "COLMAX": COLMAX, # maximum NCOL, with NCOL = number of columns in the DEM # 'COARSE': , "MAXCEL": int(self.hapin["N"]) * int(self.hapin["M"]), "MAXRES": 1, "DEMRES": DEMRES, "NODMAX": (int(self.hapin["N"]) / DEMRES + 1) * (int(self.hapin["M"]) / DEMRES + 1), "NTRMAX": 2* (int(self.hapin["N"]) * int(self.hapin["M"]))/ (DEMRES * DEMRES), "NP2MAX": 1, "MAXSTR": self.dem_parameters["nstr"], "NPMAX": 1, "NQMAX": 1, "NSFMAX": 1, "NNSFMX": 1, "MAXNUDN": 1, # self.nudging["NUDN"], "MAXNUDT": 1, "MAXNUDC": 1, # 'MAXNENS': , "MAXZON": self.dem_parameters["nzone"], # maximum NZONE, with NZONE = number of material types in the porous medium "MAXTRM": self.hapin["N"]*self.hapin["M"]*self.dem_parameters["nstr"]*30, "MAXIT": 30, "NRMAX": self.parm["NR"]+1, # maximum NPRT (ref. parm file), with NPRT = number of time values for detailed output "MAXPRT": self.parm["NPRT"], # maximum NUMVP (ref. parm input file), NUMVP = number of surface nodes for vertical profile output "MAXVP": int(self.parm["NUMVP"]), "N1MAX": self.dem_parameters[ "n1" ], # maximum N1 (it is good to have N1 ≤ 20), N1 = maximum number of element connections to a node "MAXBOT": 1, "INTBOT": 1, "NTAB": 100, "MAXQOUT": 1, "MAXVTKPRT": self.parm["NPRT"], # Is related to data assimilation (DA) (to do not considered if you do not have DA) "MAXENNOUT": 52560, "MAXVEG": self.MAXVEG, # # "MAXVEG": 1, # } for kk, value in kwargs.items(): if kk in self.cathyH.keys(): self.cathyH[kk] = value self.cathyH['MAXCEL'] = int(self.cathyH["ROWMAX"]) * int(self.cathyH["COLMAX"]) self.cathyH['NODMAX'] = int((int(self.cathyH["ROWMAX"]) / DEMRES + 1) * (int(self.cathyH["COLMAX"]) / DEMRES + 1)) self.cathyH['NTRMAX'] = int((2*self.cathyH['MAXCEL'])/ (DEMRES * DEMRES)) self.cathyH['MAXTRM'] = self.cathyH["ROWMAX"]*self.cathyH["COLMAX"]*self.dem_parameters["nstr"]*30 self.cathyH['NFACEMAX'] = self.cathyH['NODMAX']*3 # self.cathyH['MAXTRM'] = 1599000 # MAXTRM=1599000 # cathyH_laC = { # # "ROWMAX": 247, # maximum NROW, with NROW = number of rows in the DEM # # "COLMAX": 221, # maximum NCOL, with NCOL = number of columns in the DEM # # # 'COARSE': , # # "MAXRES": 1, # # "DEMRES": 1, # "NODMAX": 28440, # "NTRMAX": 2*27848, # "MAXTRM": 2364075, # "NRMAX": 1, # } # for k in cathyH_laC: # self.cathyH[k] = cathyH_laC[k] # --------------------------------------------------------------------- # write new cathy H with open( os.path.join(self.workdir, self.project_name, "src", "CATHY.H"), "w+" ) as CATHYH_file: for i, l in enumerate(Lines0_109): if i < 109: CATHYH_file.write(l) CATHYH_file.write( " PARAMETER (ROWMAX={},COLMAX={},DEMRES={})\n".format( self.cathyH["ROWMAX"], self.cathyH["COLMAX"], self.cathyH["DEMRES"] ) ) CATHYH_file.write(" PARAMETER (MAXCEL=ROWMAX*COLMAX,MAXRES=1)\n") # CATHYH_file.write( # " PARAMETER (NODMAX=(ROWMAX/DEMRES+1)*(COLMAX/DEMRES+1))\n" # ) CATHYH_file.write( " PARAMETER (NODMAX={})\n".format(self.cathyH["NODMAX"]) ) # CATHYH_file.write( # " PARAMETER (NTRMAX=2*MAXCEL/(DEMRES*DEMRES))\n".format() # ) CATHYH_file.write( " PARAMETER (NTRMAX={})\n".format(self.cathyH["NTRMAX"]) ) CATHYH_file.write( " PARAMETER (NP2MAX=1,MAXSTR={})\n".format(self.cathyH["MAXSTR"]) ) CATHYH_file.write( # " PARAMETER (NFACEMAX={})\n".format(self.cathyH['NFACEMAX'])) " PARAMETER (NFACEMAX={})\n".format(self.cathyH['NFACEMAX'])) CATHYH_file.write( " PARAMETER (NMAX=NODMAX*(MAXSTR + 1),NTEMAX=3*NTRMAX*MAXSTR)\n".format() ) CATHYH_file.write( " PARAMETER (NPMAX={},NPMAX_TRA=1,NQMAX={},NSFMAX={})\n".format( self.cathyH["NPMAX"], self.cathyH["NQMAX"], self.cathyH["NSFMAX"] ) ) CATHYH_file.write( " PARAMETER (NNSFMX={},MAXDIR=NODMAX+NPMAX+NSFMAX*NNSFMX)\n".format( self.cathyH["NNSFMX"] ) ) CATHYH_file.write( " PARAMETER (MAXNUDN={},MAXNUDT={},MAXNUDC={})\n".format( self.cathyH["MAXNUDN"], self.cathyH["MAXNUDT"], self.cathyH["MAXNUDC"], ) ) CATHYH_file.write( " PARAMETER (MAXZON={},MAXTRM={},MAXIT={},MAXVEG={})\n".format( self.cathyH["MAXZON"], self.cathyH["MAXTRM"], self.cathyH["MAXIT"], self.cathyH["MAXVEG"], ) ) CATHYH_file.write( " PARAMETER (NRMAX={},MAXPRT={},MAXVP={})\n".format( self.cathyH["NRMAX"], self.cathyH["MAXPRT"], self.cathyH["MAXVP"] ) ) CATHYH_file.write( " PARAMETER (N1MAX={},NTPMAX=N1MAX*NMAX)\n".format( self.cathyH["N1MAX"] ) ) CATHYH_file.write( " PARAMETER (MAXBOT={},INTBOT={},MAXQOUT={})\n".format( self.cathyH["MAXBOT"], self.cathyH["INTBOT"], self.cathyH["MAXQOUT"] ) ) CATHYH_file.write( "cxcx PARAMETER (NIAUXMAX=NFACEMAX + MAXTRM + 1)\n".format() ) CATHYH_file.write( "cxcx PARAMETER (NRAUXMAX=5*NFACEMAX + MAXTRM,NQMAX_TRA=NODMAX)\n".format() ) CATHYH_file.write(" PARAMETER (NIAUXMAX=NMAX + MAXTRM + 1)\n".format()) CATHYH_file.write( " PARAMETER (NRAUXMAX=5*NMAX + MAXTRM,NQMAX_TRA=NODMAX)\n".format() ) CATHYH_file.write( " PARAMETER (MAXVTKPRT={})\n".format(self.cathyH["MAXVTKPRT"]) ) # CATHYH_file.write(" PARAMETER (MAXVTKPRT=9)\n".format()) CATHYH_file.write(" PARAMETER (MAXFCONTONODE=100,MAXLKP=3)\n") CATHYH_file.close() pass def update_hapin(self, Lines, hapin, tmp_lnb, tmp_param_value): L = [] tmp_param_value_new = [] count = 0 # Loop over hap.in file lines # -------------------------------------------------------------------- for i, line in enumerate(Lines): xnew = line.strip().split("=") if len(xnew) > 1: # skip lines with no = # loop over hapin dictionnary # ----------------------------------- for key, value in self.hapin.items(): if count < len(hapin): if key == hapin[tmp_lnb[count]]: if value != tmp_param_value[count]: # print(key) # print(value) if isinstance(value, list): value_str = "/t".join(value) xnew[1] = value_str else: xnew[1] = value line = xnew[0] + "= " + str(xnew[1]) + "\n" tmp_param_value_new.append(value) count += 1 # count line nb L.append(line) # # write the new hap.in file hap_file = open( os.path.join(self.workdir, self.project_name, "prepro/hap.in"), "w+" ) hap_file.writelines(L) hap_file.close() def update_prepo_inputs(self, DEM=None, verbose=False, show=False, **kwargs): """ Update default prepro inputs i.e. hap.in and dtm_13.val files based on kwargs Parameters ---------- DEM : TYPE, optional DESCRIPTION. The default is None. verbose : TYPE, optional DESCRIPTION. The default is False. show : TYPE, optional DESCRIPTION. The default is False. **kwargs : TYPE DESCRIPTION. """ # print("update hap.in") self.console.print(":arrows_counterclockwise: [b]Update hap.in file[/b]") structural_parameter = [ "delta_x", "delta_y", "N", "M", "N_celle", "xllcorner", "yllcorner", ] terrain_parameter = [ "pt", "imethod", "lambda", "CC_threshold", "ndcf", "nchc", "A_threshold", "ASk_threshold", "kas", "DN_threshold", "local_slope_t", "p_outflow_vo", "bcc", "cqm", "cqg", ] # 'RIVULET NETWORK PARAMETERS (HYDRAULIC GEOMETRY OF THE SINGLE RIVULET)' rivulet_parameter = [ "dr", "As_rf", "(Qsf_rf,w_rf)", "(Wsf_rf,b1_rf,b2_rf)", "(kSsf_rf,y1_rf,y2_rf)", "Qsi_rf", ] channel_parameter = [ "As_cf", "(Qsf_cf,w_cf)", "(Wsf_cf,b1_cf,b2_cf)", "(kSsf_cf,y1_cf,y2_cf)", "Qsi_cf", ] # idhapin = np.ones(22) + [1,1,2,3,3,1] #+ [1,2,3,3,1] hapin = ( structural_parameter + terrain_parameter + rivulet_parameter + channel_parameter ) hap_file = open( os.path.join(self.workdir, self.project_name, "prepro/hap.in"), "r" ) Lines = hap_file.readlines() hap_file.close() # read current values in hap.in tmp_param_value = [] tmp_lnb = [] # save line nb where values are existing count = 0 for line in Lines: x = line.strip().split("=") if len(x) > 1: # skip lines with no = xs = " ".join(x[1].split()) xsr = xs.replace(" ", ",") l = xsr.split(",") if isinstance(l, list): l = "/t".join(l) tmp_param_value.append(l) tmp_lnb.append(count) count += 1 self.hapin = {} for i in range(len(tmp_param_value)): if re.search("/t", tmp_param_value[i]) is None: if tmp_param_value[i].isdigit(): tmp_param_value[i] = int(tmp_param_value[i]) else: tmp_param_value[i] = float(tmp_param_value[i]) else: list_tmp = tmp_param_value[i].split("/t") tmp_param_value[i] = [float(x) for x in list_tmp] self.hapin[hapin[i]] = tmp_param_value[i] # if self.hapin["dr"] != self.hapin["delta_x"]: # if self.hapin["dr"] % self.hapin["delta_x"] != 0: # print("adapt rivulet param to dem resolution") # self.hapin["dr"] = self.hapin["delta_x"] # sys.exit() for key, value in kwargs.items(): self.hapin[key] = value self.update_hapin(Lines, hapin, tmp_lnb, tmp_param_value) self.update_dem_parameters(**kwargs) # dtm_13.val # If we start with a DEM file ("dtm_13.val") for an already delineated # catchment (i.e., a "catchment" DEM file instead of a "full" DEM file), then # only the last step in the above procedure (i.e., run "cppp" just once) is # needed (make sure that "Boundary channel construction" is set to 0 in # "hap.in"). if DEM is not None: self.DEM = DEM self.hapin["N"] = np.shape(DEM)[1] self.hapin["M"] = np.shape(DEM)[0] self.update_hapin(Lines, hapin, tmp_lnb, tmp_param_value ) # check presence of the outlet if len(np.unique(DEM)) == 1: print("Error: outlet not defined") DEM[0, 0] = 0 self.console.print(":arrows_counterclockwise: [b]Update dtm_13 file[/b]") self.update_dem(DEM) self.update_zone() with open( os.path.join(self.workdir, self.project_name, "prepro/dtm_13.val"), "w+" ) as f: # use exponential notation np.savetxt(f, DEM, fmt="%1.4e") # np.shape(DEM) self.update_cathyH( ROWMAX=self.hapin["M"], COLMAX=self.hapin["N"], MAXSTR=self.dem_parameters['nstr'], DEMRES=min([self.hapin["delta_x"], self.hapin["delta_y"]]) ) self.update_dem_parameters(**kwargs) if show == True: plt_CT.show_dem(DEM, self.hapin) pass def update_dem_parameters(self, **kwargs): """ Update DEM parameters - if no args reset to default values ivert =0 each layer will be parallel to the surface, including the base of the 3‐d grid. ZRATIO is applied to each vertical cross section. =1 base of the 3‐d grid will be flat. ZRATIO is applied to each vertical cross section. =2 base of the 3‐d grid will be flat, as will the NSTR‐1 horizontal cross sections above it. ZRATIO is applied only to the vertical cross section having the lowest elevation. =3 for each cell of the dem a single depth value is read in file input IIN60 (basement). ZRATIO is applied to each vertical cross section. =4 the first NSTR‐1 layers from the surface will be parallel to the surface and the base of the 3‐d grid will be flat. ZRATIO is applied only to the vertical cross section having the lowest elevation. isp =0 for flat surface layer (only one Z value is read in, and is replicated to all surface nodes); otherwise surface layer is not flat (Z values read in for each surface node); (for ISP=0, IVERT=0, 1, and 2 yield the same 3‐d mesh, given the same values of BASE and ZRATIO). base Value which defines the thickness or base of the 3‐d mesh. For IVERT=0, BASE is subtracted from each surface elevation value, so that each vertical cross section will be of thickness BASE, and the base of the 3‐d mesh will be parallel to the surface. For IVERT=1 or 2, BASE is subtracted from the lowest surface elevation value, say ZMIN, so that each vertical cross section will be of thickness (Z ‐ ZMIN) + BASE, where Z is the surface elevation for that cross section. The base of the 3‐d mesh will thus be flat. """ self.console.print( ":arrows_counterclockwise: [b]Update dem_parameters file [/b]" ) if hasattr(self, "dem_parameters") == False: dem_parameters_dict = in_CT.read_dem_parameters( os.path.join( self.workdir, self.project_name, "input", "dem_parameters", ) ) self.dem_parameters = dem_parameters_dict if 'zratio(i).i=1.nstr' in self.dem_parameters: self.dem_parameters['zratio(i),i=1,nstr'] = self.dem_parameters.pop('zratio(i).i=1.nstr') # create dictionnary from kwargs for keykwargs, value in kwargs.items(): if keykwargs == "zratio": key = "zratio(i),i=1,nstr" value = list(value) if (abs(float(sum(value))-float(1)))>1e-3: self.console.rule( ":warning: warning messages above :warning:", style="yellow" ) self.console.print( r"The sum of all the layers is not equal to 1 but to {}".format( np.round(sum(value), 2) ), style="yellow", ) self.console.rule("", style="yellow") else: key = keykwargs try: self.dem_parameters[key] self.dem_parameters[key] = value except: pass dem_parameters_tmp = {} for key, value in self.dem_parameters.items(): if isinstance(value, list): strlst = "\t".join(str(e) for e in value) self.dem_parameters[key] = strlst # if (key +'_list') in self.dem_parameters.keys() is False: # dem_parameters_tmp[key +'_list'] = value if 'zratio' in kwargs: self.dem_parameters['nstr'] = len(kwargs['zratio']) # self.dem_parameters = self.dem_parameters | dem_parameters_tmp # write file header_fmt = [1, 1, 1, 1, 3, 3, 1] with open( os.path.join( self.workdir, self.project_name, self.input_dirname, "dem_parameters" ), "w+", ) as dem_parametersfile: # Write values first # ----------------------- counth = 0 for h in header_fmt: if h == 3: dem_parametersfile.write( str(list(self.dem_parameters.values())[counth]) + "\t" + str(list(self.dem_parameters.values())[counth + 1]) + "\t" + str(list(self.dem_parameters.values())[counth + 2]) + "\t" + "\n" ) counth += 3 if h == 1: # print(str(list(self.dem_parameters.values())[counth])) dem_parametersfile.write( str(list(self.dem_parameters.values())[counth]) + "\t" + "\n" ) counth += 1 # Write keys # ----------------------- counth = 0 for h in header_fmt: if h == 3: dem_parametersfile.write( str(list(self.dem_parameters.keys())[counth]) + "\t" + str(list(self.dem_parameters.keys())[counth + 1]) + "\t" + str(list(self.dem_parameters.keys())[counth + 2]) + "\t" + "\n" ) counth += 3 if h == 1: dem_parametersfile.write( str(list(self.dem_parameters.keys())[counth]) + "\t" + "\n" ) counth += 1 dem_parametersfile.close() pass def update_lakes_map(self, lakes_map=[]): """ Update lakes_map file Parameters ---------- dem : np.array([]), optional 2d numpy array describing the zone for x and y direction. The array dim must be equal to DEM dim. The default is [].""" with open( os.path.join(self.workdir, self.project_name, "prepro/lakes_map"), "w+" ) as lakesfile: lakesfile.write("north: 0" + "\n") lakesfile.write("south: " + str(self.hapin["yllcorner"]) + "\n") lakesfile.write("east: 0" + "\n") lakesfile.write("west: " + str(self.hapin["xllcorner"]) + "\n") lakesfile.write("rows: " + str(self.hapin["N"]) + "\n") lakesfile.write("cols: " + str(self.hapin["M"]) + "\n") np.savetxt(lakesfile, lakes_map, fmt="%i") # np.shape(DEM) lakesfile.close() pass def update_dem(self, DEM=[]): """ Update zone file Parameters ---------- dem : np.array([]), optional 2d numpy array describing the zone for x and y direction. The array dim must be equal to DEM dim. The default is [].""" with open( os.path.join(self.workdir, self.project_name, "prepro/dem"), "w+" ) as demfile: demfile.write("north: 0" + "\n") demfile.write("south: " + str(self.hapin["yllcorner"]) + "\n") demfile.write("east: 0" + "\n") demfile.write("west: " + str(self.hapin["xllcorner"]) + "\n") demfile.write("rows: " + str(self.hapin["N"]) + "\n") demfile.write("cols: " + str(self.hapin["M"]) + "\n") np.savetxt(demfile, DEM, fmt="%1.4e") # np.shape(DEM) demfile.close() with open( os.path.join(self.workdir, self.project_name, "prepro/dtm_13.val"), "w+" ) as f: np.savetxt(f, DEM, fmt="%1.4e") self.DEM = DEM pass def update_zone(self, zone=[]): """ Update zone file Parameters ---------- zone : np.array([]), optional 2d numpy array describing the zone for x and y direction. The array dim must be equal to DEM dim. The smallest numbering is 1 (not 0!) If zone is empty, reset to homogeneous zone = 1 The default is []. Notes ----- .. note:: - Create object zone variable - Update dem_parameters file (number of zones). - Update parm file. - Update CATHYH file. - Update vtk mesh file. """ self.console.print(":arrows_counterclockwise: [b]update zone file [/b]") with open( os.path.join(self.workdir, self.project_name, "prepro/zone"), "w+" ) as zonefile: zonefile.write("north: 0" + "\n") zonefile.write("south: " + str(self.hapin["yllcorner"]) + "\n") zonefile.write("east: 0" + "\n") zonefile.write("west: " + str(self.hapin["xllcorner"]) + "\n") zonefile.write("rows: " + str(self.hapin["M"]) + "\n") zonefile.write("cols: " + str(self.hapin["N"]) + "\n") if len(zone) == 0: zone = np.c_[np.ones([self.hapin["M"], self.hapin["N"]])] np.savetxt(zonefile, zone, fmt="%i") else: # if np.shape(zone)== : np.savetxt(zonefile, zone, fmt="%i") zonefile.close() self.zone = zone # update number of zone in the dem parameter file self.update_dem_parameters(nzone=len(np.unique(zone))) self.update_parm() self.update_cathyH(MAXZON=len(np.unique(zone))) # %% INPUT FILES def update_parm(self, **kwargs): """ Update parm file .. note: - create object zone parm - Updated CATHYH file (NPRT and NUMVP). """ self.console.print(":arrows_counterclockwise: [b]Update parm file [/b]") warnings_parm = [] if len(self.parm) == 0: #%% dict_parm = in_CT.read_parm(os.path.join(self.workdir, self.project_name, "input", "parm" ) ) #%% self.parm = dict_parm if '(TIMPRT(I)I=1NPRT)' in self.parm: self.parm['(TIMPRT(I),I=1,NPRT)'] = self.parm.pop('(TIMPRT(I)I=1NPRT)') # create self.parm dictionnary from kwargs # -------------------------------------------------------------------- for kk, value in kwargs.items(): if kk in self.parm.keys(): if self.verbose == True: print(f"key: {kk} | value: {value}") # times of interest TIMPRTi # ---------------------------------------------------------------- if kk == "TIMPRTi": key = "(TIMPRT(I),I=1,NPRT)" self.parm[key] = list(value) # points of interest NODVP # ---------------------------------------------------------------- elif kk == "NODVP": key = "(NODVP(I),I=1,NUMVP)" self.parm[key] = value # check if consistency between node of interest and # number of nodes of interest # ------------------------------------------------------------ if len(value) != self.parm["NUMVP"]: warnings_parm.append( "Adjusting NUMVP with respect to NODVP requested" + "\n" ) self.parm["NUMVP"] = len(value) # points of interest NR # ---------------------------------------------------------------- elif kk == "ID_NR": key = "CONTR(I),I=1,NR" self.parm[key] = value # check if consistency between node of interest and # number of nodes of interest # ------------------------------------------------------------ if len(value) != self.parm["NR"]: warnings_parm.append( "Adjusting NR with respect to CONTR requested" + "\n" ) self.parm["NR"] = len(value) # points of interest NR # ---------------------------------------------------------------- elif kk == "ID_QOUT": key = "(ID_QOUT(I),I=1,NUM_QOUT)" self.parm[key] = value # check if consistency between node of interest and # number of nodes of interest # ------------------------------------------------------------ if len(value) != self.parm["NUM_QOUT"]: warnings_parm.append( "Adjusting NUM_QOUT with respect to ID_QOUT requested" + "\n" ) self.parm["NUM_QOUT"] = len(value) # other type of kwargs # ---------------------------------------------------------------- else: if kk in self.parm.keys(): self.parm[kk] = value # check if consistency between times of interest and TMAX # ------------------------------------------------------------ if "(TIMPRT(I),I=1,NPRT)" in kk: if self.parm["TMAX"] < max(self.parm["(TIMPRT(I),I=1,NPRT)"]): self.parm["TMAX"] = max(self.parm["(TIMPRT(I),I=1,NPRT)"]) # check if consistency between times of interest and # number of times of interest # ------------------------------------------------------------ if len(self.parm["(TIMPRT(I),I=1,NPRT)"]) != self.parm["NPRT"]: warnings_parm.append( "Adjusting NPRT with respect to time of interests requested" + "\n" ) self.parm["NPRT"] = len(self.parm["(TIMPRT(I),I=1,NPRT)"]) # check if consistency between DELTAT, DTMIN, and DTMAX # ------------------------------------------------------------ if min(np.diff(self.parm["(TIMPRT(I),I=1,NPRT)"])) <= 0: raise ValueError("Vtk time steps should be monotonically increasing") if self.parm["DTMIN"] > min(np.diff(self.parm["(TIMPRT(I),I=1,NPRT)"]) # ) or self.parm["DELTAT"] > 1e-1 * min( # np.diff(self.parm["(TIMPRT(I),I=1,NPRT)"]) ): warnings_parm.append( "Adjusting DTMIN with respect to time of interests requested" + "\n" ) self.parm["DTMIN"] = min(np.diff(self.parm["(TIMPRT(I),I=1,NPRT)"])) / 1e2 if self.parm["DELTAT"] < self.parm["DTMIN"]: warnings_parm.append( "adjusting 2*DTMIN == DELTAT: " + str(self.parm["DTMIN"]) + "\n" ) self.parm["DTMIN"] = self.parm["DELTAT"] / 2 if self.parm["DELTAT"] >= self.parm["DTMAX"]: warnings_parm.append("Adjusting DTMAX == 2*DELTAT" + "\n") self.parm["DTMAX"] = 1e2 * self.parm["DELTAT"] if self.parm["TMAX"] < max(self.parm["(TIMPRT(I),I=1,NPRT)"]): warnings_parm.append( "Adjusting TMAX with respect to time of interests requested" + "\n" ) self.parm["TMAX"] = max(self.parm["(TIMPRT(I),I=1,NPRT)"]) if len(warnings_parm) > 0: self.console.rule( ":warning: warning messages above :warning:", style="yellow" ) for ww in warnings_parm: self.console.print(warnings_parm, style="yellow") self.console.rule("", style="yellow") # transform array args to list # -------------------------------------------------------------------- for kk, value in self.parm.items(): if "numpy.array" in str(type(value)): value = list(value) self.parm[kk] = value # update CATHYH # -------------------------------------------------------------------- self.update_cathyH( MAXPRT=self.parm["NPRT"], MAXVP=self.parm["NUMVP"], indent=" :arrow_right_hook:", **kwargs, ) # write parm file # -------------------------------------------------------------------- if "filename" in kwargs: file2write = kwargs["filename"] else: file2write = os.path.join( self.workdir, self.project_name, self.input_dirname, "parm" ) # backup file during DA scheme cycle # -------------------------------------------------------------------- backup = False if "backup" in kwargs: backup = kwargs["backup"] if backup == True: if self.count_DA_cycle is not None: dst_dir = file2write + str(self.count_DA_cycle) shutil.copy(file2write, dst_dir) self._write_parm_file(file2write) pass def _write_parm_file(self, file2write): """ Overwrite existing parm file Returns ------- New overwritten file. """ if int(self.parm['NR'])==0: header_fmt_parm = [3, 3, 2, 4, 4, 3, 3, 2, 4, 3, 3, 4, 4, 4, 2, 1, 1, 1] try: del self.parm["CONTR(I),I=1,NR"] except: pass else: header_fmt_parm = [3, 3, 2, 4, 4, 3, 3, 2, 4, 3, 3, 4, 4, 4, 2, 1, 1, 1, 1] counth = 0 # with open( # os.path.join(self.workdir, self.project_name, self.input_dirname, "parm"), # "w+", # ) as parmfile: with open(file2write, "w+") as parmfile: # Write line by line according to header format # ---------------------------------------------------------------- for i, h in enumerate(header_fmt_parm): left = right = [] # left = values # ------------------------------------------------------------ left = str(list(self.parm.values())[counth : counth + h]) left = left.strip("[]").replace(",", "") left = left.strip("[]").replace("[", "") # right = keys # ------------------------------------------------------------ right = str(list(self.parm.keys())[counth : counth + h]) right = right.strip("[]").replace("',", "") right = right.replace("'", "") # add left + right # ------------------------------------------------------------ line = left + "\t" + right + "\n" counth += h parmfile.write(str(line)) parmfile.close() pass def update_ic(self, INDP=2, IPOND=0, WTPOSITION=0, verbose=False, **kwargs): """ The initial conditions file contains the pressure heads distribution for the study area (INDP) For example, to simulate a uniform water table depth or 0.5 m or 1.0 m from the ground surface, INDP=3 and WTHEIGHT=4.5 are selected Parameters ---------- INDP : int, optional Flag for pressure head initial conditions (all nodes). The default is 2. - =0 for input of uniform initial conditions (one value read in) - =1 for input of non-uniform IC's (one value read in for each node) - =2 for calculation of fully saturated vertical hydrostatic equilibrium IC's (calculated in subroutine ICVHE). In the case of IPOND>0, the fully saturated hydrostatic IC is calculated (in subroutine ICVHEPOND) starting from the ponding head values at the surface nodes, rather than surface pressure heads of 0. - =3 for calculation of partially saturated vertical hydrostatic equilibrium IC's (calculated in subroutine ICVHWT) with the water table height (relative to the base of the 3‐d grid) given by parameter WTHEIGHT - =4 for calculation of partially saturated vertical hydrostatic equilibrium IC's (calculated in subroutine ICVDWT) with the water table depth (relative to the surface of the 3‐d grid) given by parameter WTHEIGHT IPOND : int, optional Flag for ponding head initial conditions (surface nodes). The default is 0. - =0 no input of ponding head initial conditions; otherwise (IPOND = 1 or 2) ponding head initial conditions are read into PONDNOD, and, where PONDNOD > 0, these values are used to update the surface node values in PtimeP read in according to the previous INDP flag - =1 uniform ponding head initial conditions (one value read in) - =2 non-uniform ponding head initial conditions (one value read in for each node) WTPOSITION : float, optional For the case INDP=3, specifies the initial water table height relative to the base of the 3‐d grid. The default is 0. kwargs : - pressure_head_ini (int): uniform value of pressure head to assign (only if INDP=0) Returns ------- new ic file written/overwritten. """ if "shellprint_update" in kwargs: if kwargs["shellprint_update"] is False: pass else: self.console.print(":arrows_counterclockwise: [b]Update ic[/b]") # check value of WTPOSITION # -------------------------------------------------------------------- # For the case INDP=3, specifies the initial water table # height relative to the base of the 3‐d grid # if WTPOSITION>0: # print('WTPOSITION must be negative - water table height # relative to the base of the 3‐d grid') # sys.exit() # set default parameters # -------------------------------------------------------------------- self.ic = { "INDP": INDP, "IPOND": IPOND, # For the case INDP=3, specifies the initial water table # height relative to the base of the 3‐d grid "WTPOSITION": WTPOSITION, } # write ic file # -------------------------------------------------------------------- if "filename" in kwargs: filename = kwargs["filename"] else: filename = os.path.join( self.workdir, self.project_name, self.input_dirname, "ic" ) backup = False if "backup" in kwargs: backup = kwargs["backup"] if backup == True: if self.count_DA_cycle is not None: dst_dir = filename + str(self.count_DA_cycle - 1) shutil.copy(filename, dst_dir) with open(filename, "w+") as icfile: if INDP == 0: icfile.write(str(INDP) + "\t" + str(IPOND) + "\t" + "INDP \n") if "pressure_head_ini" in kwargs: np.savetxt(icfile, [kwargs["pressure_head_ini"]], fmt="%1.3e") # try: self.map_prop2mesh({"ic": kwargs["pressure_head_ini"]}) # except: # print('Impossible to map pressure head to mesh') else: raise ValueError("Missing initial pressure value") elif INDP == 1: pressure_head_ini = kwargs["pressure_head_ini"] icfile.write( str(INDP) + "\t" + str(IPOND) + "\t" + "INDP" + "\t" + "IPOND" + "\n" ) np.savetxt(icfile, pressure_head_ini, fmt="%1.3e") self.map_prop2mesh({"ic": kwargs["pressure_head_ini"]}) elif INDP==2: icfile.write( str(INDP) + "\t" + str(IPOND) + "\t" + "INDP" + "\t" + "IPOND" + "\n" ) elif INDP in [3, 4]: icfile.write( str(INDP) + "\t" + str(IPOND) + "\t" + "INDP" + "\t" + "IPOND" + "\n" ) icfile.write(str(WTPOSITION) + "\t" + "WTPOSITION" + "\n") icfile.close() pass def update_atmbc( self, HSPATM=1, IETO=0, time=None, VALUE=[None, None], netValue=[], show=False, verbose=False, **kwargs, ): """ Atmospheric forcing term (atmbc - IIN6) ..note: 1 1 HSPATM,IETO 0.0000000e+00 time 5.5e-06 VALUE 12.000000e+03 time 0.00 VALUE 18.000000e+03 time 0.00 VALUE The values are those of a 200-min rainfall event at a uniform intensity of 3.3·10-4 m/min, followed by 100 min of drainage. ..note: In case of simultaneous precipitation and evaporation, we impose at the surface the net flux, i.e., precipitation minus evaporation. Parameters ---------- HSPATM : int, optional - =0 for spatially variable atmospheric boundary condition inputs; blank or =9999 if unit IIN6 input is to be ignored; otherwise atmospheric BC's are homogeneous in space. IETO : int, optional - =0 for linear interpolation of the atmospheric boundary condition inputs between different different ATMTIM - otherwise the inputs are assigned as a piecewise constant function (ietograph). The default is 0. time : list ATMBC Times in seconds. The default is None. VALUE : list List of array. The default is [Precipitation, EvapoTranspiration]. show : bool, optional Plot atmbc. The default is False. verbose : bool, optional Display. The default is False. **kwargs Returns ------- ..note: - Update parm file (NPRT). - Update CATHYH file (MAXPRT). """ if "shellprint_update" in kwargs: if kwargs["shellprint_update"] is False: pass else: self.console.print(":arrows_counterclockwise: [b]Update atmbc[/b]") # type(netValue) if len(netValue) > 0: v_atmbc = netValue else: # atmbc are homoegenous # ----------------------------------------------------------------- if HSPATM == 1: if len(VALUE) == 2: # take the difference between Precipitation and EvapoTranspiration v_atmbc = VALUE[0] - abs(VALUE[1]) else: # Assume it is already the net v_atmbc = VALUE print('Assuming it is already the net') else: print('spatial v_atmbc must be provided as net between Precipitation and EvapoTranspiration') # set default parameters # -------------------------------------------------------------------- self.atmbc = {"HSPATM": HSPATM, "IETO": IETO, "time": time, "VALUE": v_atmbc} # len(self.atmbc['time']) # overwrite existing input atmbc file # -------------------------------------------------------------------- if "filename" in kwargs: filename = kwargs["filename"] else: filename = os.path.join( self.workdir, self.project_name, self.input_dirname, "atmbc" ) backup = True if "backup" in kwargs: backup = kwargs["backup"] if backup == True: if self.count_DA_cycle is not None: dst_dir = filename + str(self.count_DA_cycle - 1) shutil.copy(filename, dst_dir) with open(filename, "w+") as atmbcfile: atmbcfile.write( str(HSPATM) + "\t" + str(IETO) + "\t" + "HSPATM" + "\t" + "IETO" + "\n" ) # atmbc are homoegenous # ----------------------------------------------------------------- if HSPATM == 1: for t, v in zip(time, v_atmbc): if verbose: print(t, v) atmbcfile.write("{:.0f}".format(t) + "\t" + "time" + "\n") # atmbcfile.close() if isinstance(v, float) | isinstance(v, int): atmbcfile.write("{:.3e}".format(v) + "\t" + "VALUE" + "\n") else: atmbcfile.write("{:.3e}".format(float(v)) + "\t" + "VALUE" + "\n") # atmbcfile.write(str(v) + "\t" + "VALUE" + "\n") # atmbc are heteregeneous # ----------------------------------------------------------------- else: for t, v in zip(time, v_atmbc): atmbcfile.write("{:.0f}".format(t) + "\t" + "time" + "\n") np.savetxt(atmbcfile, v, fmt="%.3e") atmbcfile.close() self.update_parm(TIMPRTi=[time[0],time[-1]], NPRT=2, TMAX=max(time)) # self.update_parm(TIMPRTi=time, NPRT=len(time), TMAX=max(time)) # don't need to update if sequential DA as the cathy.exe is already created # --------------------------------------------------------------------- if "omit_cathyH" not in kwargs: self.update_cathyH(MAXPRT=len(time)) if show: if HSPATM ==0: print('impossible to plot for non homogeneous atmbc') # # sys.exit() else: # x_units = "sec" # for key, value in kwargs.items(): # if key == "x_units": # x_units = value if len(netValue) > 0: plt_CT.show_atmbc(time, v_atmbc, **kwargs) else: plt_CT.show_atmbc(time, VALUE, **kwargs) pass def update_nansfdirbc( self, time=[], NDIR=0, NDIRC=0, NQ3=None, no_flow=False, pressure_head=[], mesh_bc_df=None, ): """ Dirichlet Boundary conditions (or specified pressure) at time t - To simulate the no-flow boundaries conditions for the bottom and vertical sides of the domain it is necessary to set NDIR and NDIRC equal to zero. - To simulate different boundary conditions, it is necessary to indicate the number of selected nodes through NDIR or NDIRC, then to specify the node ID’s that you want to consider and eventually the value of pressure head or flux that you want to assign. ..note:: update_nansfdirbc use the grid3d to refer to mesh nodes Parameters ---------- time : np.array([]), optional Absolute simulation time in sec. The default is []. NDIR : int, optional Number of non-atmospheric, non‐seepage face Dirichlet nodes in 2-d mesh. The BC's assigned to these surface nodes are replicated vertically. The default is 0. NDIRC : int, optional Number of 'fixed' non-atmospheric, non-seepage face Dirichlet nodes in 3‐d mesh ('fixed' in the sense that these BC's are not replicated to other nodes ‐ compare NDIR). The default is 0. NQ3 : int, optional Number of non-atmospheric, non‐seepage face Neumann nodes in 3‐d mesh. The default is None. noflow : Bool, optional To simulate the no-flow boundaries conditions for the bottom and # vertical sides of the domain. The default is True. pressure_head : TYPE, optional Specify a value of node pressure head to impose as Dirichlet boundary condition. The default is []. Returns ------- None. """ # check that the mesh exist # -------------------------------------------------------------------- try: self.grid3d = out_CT.read_grid3d(os.path.join(self.workdir, self.project_name, 'output', 'grid3d') ) except OSError: print("grid3d missing - need to run the processor with IPRT1=3 first") # check that mesh_bound_cond_df exist # -------------------------------------------------------------------- if not hasattr(self, "mesh_bound_cond_df"): if len(time)>25: print('Nb of times is too big to handle bc condition in the df') # time = [0] # self.init_boundary_conditions( # "nansfdirbc", # time, # NDIR=NDIR, # NDIRC=NDIRC, # pressure_head=pressure_head, # no_flow=no_flow, # ) # self.assign_mesh_bc_df("nansfdirbc", time, # pressure_head=pressure_head, # no_flow=no_flow # ) else: # if len(time)>25: # print('Nb of times is too big to handle bc condition in the df') # time = 0 # self.update_mesh_boundary_cond(mesh_bc_df) self.assign_mesh_bc_df("nansfdirbc", time, no_flow=no_flow ) # apply BC # -------------------------------------------------------------------- # if no_flow: # Dirichlet == 0 # print("shortcut set_BC_laterals mesh dataframe") # # self.set_BC_laterals(time=time, BC_type='Dirichlet', val=0) # else: # raise ValueError( # "Non homogeneous Dirichlet Boundary conditions Not yet implemented" # ) self.update_parm() self.update_cathyH() with open( os.path.join( self.workdir, self.project_name, self.input_dirname, "nansfdirbc" ), "w+", ) as nansfdirbcfile: # self.mesh_bound_cond_df # To simulate the no-flow boundaries conditions for the bottom and # vertical sides of the domain --> NDIR and NDIRC equal to zero # ------------------------------------------------------------- if no_flow: # Dirichlet if len(time) == 0: time = self.atmbc["time"] for tt in time: # nansfdirbcfile.write(str(tt) + "\t" + "time" + "\n") nansfdirbcfile.write("{:.0f}".format(tt) + "\t" + "time" + "\n") nansfdirbcfile.write( str(NDIR) + "\t" + str(NDIRC) + "\t" + "NDIR" + "\t" + "NDIRC" + "\n" ) else: if len(time) == 0: time = self.atmbc["time"] for tt in time: # nansfdirbcfile.write(str(tt) + "\t" + "time" + "\n") nansfdirbcfile.write("{:.d}".format(tt) + "\t" + "time" + "\n") nansfdirbcfile.write( str(NDIR) + "\t" + str(NDIRC) + "\t" + "NDIR" + "\t" + "NDIRC" + "\n" ) raise ValueError( "Non homogeneous Dirichlet Boundary conditions Not yet implemented" ) nansfdirbcfile.close() self.update_parm() self.update_cathyH() # exemple provided by Laura B. # ---------------------------- # C Write dirbc # write(33,*) 0.0, 'time' # write(33,*) '0', a # do i=1,nnod3 # if ((x(i).eq.0).or.(x(i).eq.5).or.(y(i).eq.0).or. # 1 (y(i).eq.5))then # write(33,*) i # endif # enddo # do i=1,nnod3 # if ((x(i).eq.0).or.(x(i).eq.5).or.(y(i).eq.0).or. # 1 (y(i).eq.5))then # write(33,*) -z(i)-WTdepth # endif # enddo # write(33,*) 2e+20, 'time' # write(33,*) '0', a # do i=1,nnod3 # if ((x(i).eq.0).or.(x(i).eq.5).or.(y(i).eq.0).or. # 1 (y(i).eq.5))then # write(33,*) i # endif # enddo # do i=1,nnod3 # if ((x(i).eq.0).or.(x(i).eq.5).or.(y(i).eq.0).or. # 1 (y(i).eq.5))then # write(33,*) -z(i)-WTdepth # endif # enddo # modicare il valore di NPMAX nel file 27 CATHY.H nel caso # in cui si inseriscano dei NDIRC ed il valore di NP2MAX nel caso si inseriscano dei # NDIR. I valori di NPMAX e NP2MAX corrispondono al numero massimo # di nodi NDIRC e NDIR che si possono inserire. pass def update_nansfneubc( self, time=[], NQ=0, ZERO=0, fixed_pressure=False, no_flow=False ): """ Neumann boundary conditions (or specifed flux) at time t Parameters ---------- time : np.array([]), optional Absolute simulation time in sec. The default is []. NQ : TYPE, optional number of non-atmospheric, non seepage face Neumann nodes in 3-D mesh. The default is 0. ZERO : TYPE, optional DESCRIPTION. The default is 0. Returns ------- None. """ # check that the mesh existNeuma # -------------------------------------------------------------------- try: self.grid3d = out_CT.read_grid3d(os.path.join(self.workdir, self.project_name, 'output', 'grid3d') ) except OSError: print("grid3d missing - need to run the processor with IPRT1=3 first") # check that mesh_bound_cond_df exist # -------------------------------------------------------------------- if hasattr(self, "mesh_bound_cond_df") is False: if len(time)>25: print('Nb of times is too big to handle bc condition in the df') # time = [0] # self.init_boundary_conditions("nansfneubc", time, NQ=NQ, ZERO=ZERO) # self.assign_mesh_bc_df("nansfneubc", time, # no_flow=no_flow # ) else: # if len(time)>25: # print('Nb of times is too big to handle bc condition in the df') # time = 0 self.assign_mesh_bc_df("nansfneubc", time, no_flow=no_flow ) # self.update_mesh_boundary_cond("nansfneubc", time, NQ=NQ, ZERO=ZERO) # apply BC # -------------------------------------------------------------------- if no_flow: # Neumanm # self.set_BC_laterals(time=time, BC_type='Neumanm', val=0) print("shortcut set_BC_laterals mesh dataframe") else: raise ValueError( "Non homogeneous Neumanm Boundary conditions Not yet implemented" ) # read existing input nansfneubc file # -------------------------------------------------------------------- with open( os.path.join( self.workdir, self.project_name, self.input_dirname, "nansfneubc" ), "w+", ) as nansfneubcfile: if len(time) == 0: time = self.atmbc["time"] for tt in time: nansfneubcfile.write("{:.0f}".format(tt) + "\t" + "time" + "\n") nansfneubcfile.write( str(ZERO) + "\t" + str(NQ) + "\t" + "ZERO" + "\t" + "NQ" + "\n" ) nansfneubcfile.close() pass def update_sfbc(self, time=[], sfbc=False, no_flow=False): """ Seepage face boundary conditions at time t Parameters ---------- time : np.array([]), optional Absolute simulation time in sec. The default is []. Returns ------- None. """ # check that the mesh exist # -------------------------------------------------------------------- try: self.grid3d = out_CT.read_grid3d(os.path.join(self.workdir, self.project_name, 'output', 'grid3d') ) except OSError: print("grid3d missing - need to run the processor with IPRT1=3 first") # check that mesh_bound_cond_df exist # -------------------------------------------------------------------- if hasattr(self, "mesh_bound_cond_df") is False: if len(time)>25: print('Nb of times is too big to handle bc condition in the df') # time = [0] # self.init_boundary_conditions("sfbc", time) # self.assign_mesh_bc_df('sfbc', time, # no_flow=no_flow # ) else: # if len(time)>25: # print('Nb of times is too big to handle bc condition in the df') # time = 0 self.assign_mesh_bc_df('sfbc', time, no_flow=no_flow ) # self.update_mesh_boundary_cond() # apply BC # -------------------------------------------------------------------- # if no_flow: # # if len(time) == 0: # # time = self.atmbc["time"] # # self.set_BC_laterals(time=time, BC_type='sfbc', val=0) # print("shortcut set_BC_laterals mesh dataframe") # else: # raise ValueError( # "Non homogeneous Neumanm Boundary conditions Not yet implemented" # ) with open( os.path.join(self.workdir, self.project_name, self.input_dirname, "sfbc"), "w+", ) as sfbcfile: if len(time) == 0: time = self.atmbc["time"] for tt in time: sfbcfile.write("{:.0f}".format(tt) + "\n") sfbcfile.write("0" + "\n") sfbcfile.close() pass def update_soil( self, IVGHU=[], FP=[], FP_map=[], SPP=[], SPP_map=[], zone3d=[], show=False, **kwargs, ): """ Soil parameters (soil - IIN4). The porous media properties. ..note:: The first thing that must be decides is the type of relationship to describe the hydraulic characteristics of the unsaturated soil (i.e. retention curves). This can be done through the choice of the parameter **IVGHU** amongst the several options. Parameters ---------- IVGHU : int, optional = -1 table look up for moisture curves = 0 for van Genuchten moisture curves = 1 for extended van Genuchten moisture curves = 2 for moisture curves from Huyakorn et al (WRR 20(8) 1984, WRR 22(13) 1986) with Kr=Se**n conductivity relationship = 3 for moisture curves from Huyakorn et al (WRR 20(8) 1984, WRR 22(13) 1986)with conductivity relationship from Table 3 of 1984 paper (log_10 Kr(Se) curve) = 4 for Brooks‐Corey moisture curves. The default is []. FP : list, optional Feddes Parameters. The default is []. [PCANA PCREF PCWLT ZROOT PZ OMGC] - 'PCANA': float, anaerobiosis point --> h2 (≈ -0.5m) - 'PCREF': float, field capacity --> h3 (≈ -4m) - 'PCWLT': float, wilting point --> h4 (≈ -150m) - 'ZROOT': float, root depth - 'PZ': float, pz is an empirical parameter - 'OMGC': float, 0<OMGC<1 Compensatory mechanisms for root water uptake .. note: Calculate actual transpiration Ta (m d−1). A multiplicative reduction factor is defined by four pressure heads (0 > h1 > h2 > h3 > h4) delimiting five phases of uptake For details, see http://dx.doi.org/10.1002/2015WR017139 BETA = (1-DEPTH/ZROOT(VEG_TYPE(I)))*DEXP(-1.0d0*PZ(VEG_TYPE(I))*DEPTH/ZROOT(VEG_TYPE(I))) BTRANI(K) = MAX(ZERO,BETA*DZ*GX) BTRAN(I) = BTRAN(I) + BETA*DZ OMG(I) = OMG(I) + GX*BETA*DZ QTRANIE(K)=ETP(I)*BTRANI(K)/BTRAN(I)/MAX(OMG(I),OMGC(VEG_TYPE(I))) FP : Dict, optional Dictionnary containing the SPP properies per zones (root map, indice of vegetation) SPP : np.array, optional Array of size (nb of layers*nb of zones)*8 (8: Physical properties) SPP_map : Dict or Pd.DataFrame, optional Dictionnary containing the SPP properies per zones Example for 2 zones: { 'PERMX': [0.000188, 9.4e-05], 'PERMY': [0.000188, 0.000188], 'PERMZ': [0.000188, 0.000188], 'ELSTOR': [1e-05, 1e-05], 'POROS': [0.55, 0.55], 'VGNCELL': [1.46, 1.46], 'VGRMCCELL': [0.15, 0.15], 'VGPSATCELL': [0.03125, 0.03125] } Soil Physical Properties. The default is []. - 'PERMX' (NSTR, NZONE): saturated hydraulic conductivity - xx - 'PERMY' (NSTR, NZONE): saturated hydraulic conductivity - yy - 'PERMZ' (NSTR, NZONE): saturated hydraulic conductivity - zz - 'ELSTOR' (NSTR, NZONE): specific storage - 'POROS' (NSTR, NZONE): porosity (moisture content at saturation) = \thetaS retention curves parameters VGN, VGRMC, and VGPSAT - 'VGNCELL' (NSTR, NZONE): van Genuchten curve exponent = n - 'VGRMCCELL' (NSTR, NZONE): residual moisture content = \thetaR - 'VGPSATCELL' (NSTR, NZONE): van Genuchten curve exponent --> VGPSAT == -1/alpha (with alpha expressed in [L-1]); Example for 1 zone: PERMX PERMY PERMZ ... VGNCELL VGRMCCELL VGPSATCELL str zone ... 0 0 0.000188 0.000188 0.000188 ... 1.46 0.15 0.03125 1 0 0.000188 0.000188 0.000188 ... 1.46 0.15 0.03125 2 0 0.000188 0.000188 0.000188 ... 1.46 0.15 0.03125 3 0 0.000188 0.000188 0.000188 ... 1.46 0.15 0.03125 4 0 0.000188 0.000188 0.000188 ... 1.46 0.15 0.03125 ..note:: At the file level, this is an example of 3 layers and 2 zones; The inner reading cycle is by zone, and the outer one by layers, i.e.: PERMX_z1_str1 PERMY_z1_str1 PERMZ_z1_str1 ELSTOR_z1_str1 POROS_z1_str1 VGNCELL_z1_str1 VGRMCCELL__z1_str1 VGPSATCELL__z1_str1 PERMX_z2_str1 PERMY_z2_str1 PERMZ_z2_str1 ELSTOR_z2_str1 POROS_z2_str1 VGNCELL_z2_str1 VGRMCCELL__z2_str1 VGPSATCELL__z2_str1 PERMX_z1_str2 PERMY_z1_str2 PERMZ_z1_str2 ELSTOR_z1_str2 POROS_z1_str2 VGNCELL_z1_str2 VGRMCCELL__z1_str2 VGPSATCELL__z1_str2 PERMX_z2_str2 PERMY_z2_str2 PERMZ_z2_str2 ELSTOR_z2_str2 POROS_z2_str2 VGNCELL_z2_str2 VGRMCCELL__z2_str2 VGPSATCELL__z2_str2 PERMX_z1_str3 PERMY_z1_str3 PERMZ_z1_str3 ELSTOR_z1_str3 POROS_z1_str3 VGNCELL_z1_str3 VGRMCCELL__z1_str3 VGPSATCELL__z1_str3 PERMX_z2_str3 PERMY_z2_str3 PERMZ_z2_str3 ELSTOR_z2_str3 POROS_z2_str3 VGNCELL_z2_str3 VGRMCCELL__z2_str3 VGPSATCELL__z2_str3 Make sure all 8 parameters for each layer/zone are on the same line. Also, make sure NZONE = 2 in dem_parameters and MAXZON in CATHY.H is larger than or equal to NZONE. Returns ------- - update parm file - update CATHY.H file - update mesh vtk file """ if "shellprint_update" in kwargs: if kwargs["shellprint_update"] is False: pass else: self.console.print(":arrows_counterclockwise: [b]Update soil[/b]") # set default parameters if SPP and/or FP args are not existing yet # -------------------------------------------------------------------- if len(self.soil) == 0: self.set_SOIL_defaults() try: df = self.read_inputs('soil', MAXVEG=self.MAXVEG ) if len(FP_map)==0: FP_map = df[1].reset_index(drop=True).to_dict(orient='list') except: pass if len(SPP_map) == 0: SPP_map = self.set_SOIL_defaults(SPP_map_default=True) if (not hasattr(self, "soil_FP")) and (len(FP_map) == 0): FP_map = self.set_SOIL_defaults(FP_map_default=True) elif len(FP_map) == 0: FP_map = self.soil_FP["FP_map"] # check size of the heteregeneity # ----------------------------------- if len(zone3d) == 0: print("homogeneous soil") # check size of soil properties map versus nb of zones/ nb of layers # -------------------------------------------------------------------- if isinstance(SPP_map["PERMX"], float): if self.dem_parameters["nzone"] != 1: raise ValueError("Wrong number of zones") else: self.zone3d=zone3d self.dem_parameters['nzone'] = np.size(self.zone3d[0]) self.update_dem_parameters() self.update_cathyH(MAXZON=self.dem_parameters['nzone']) # read function arguments kwargs and udpate soil and parm files # -------------------------------------------------------------------- for kk, value in kwargs.items(): self.soil[kk] = value self.parm[kk] = value # loop over Feddes parameters # -------------------------------------------------------------------- for fp in FP_map: # loop over fedded parameterssoil_het_dim self.soil[fp] = FP_map[fp] # check consistency between parameters # -------------------------------------------------------------------- if isinstance(SPP_map["PERMX"], float): for k in SPP_map: SPP_map[k] = [SPP_map[k]] if isinstance(SPP_map, dict): SPP_map_dict = SPP_map if hasattr(self, 'zone3d'): num_rows, num_cols = np.shape(self.zone3d)[0],(np.shape(self.zone3d)[1] * np.shape(self.zone3d)[2]) df_SPP_map = self.init_soil_df(num_cols, num_rows) for i, layersi_zones in enumerate(self.zone3d): layersi_zones = np.hstack(layersi_zones) unique_zones = np.unique(layersi_zones) layersi_zones_prop = np.zeros(np.shape(layersi_zones)) for c in SPP_map_dict.keys(): for zi in unique_zones: layersi_zones_prop[layersi_zones == zi] = SPP_map_dict[c][int(zi-1)] df_SPP_map.loc[(slice(None), i + 1), c] = layersi_zones_prop else: nzones, nstr = self.dem_parameters["nzone"], self.dem_parameters["nstr"] df_SPP_map = self.init_soil_df(nzones, nstr) for key, values in SPP_map.items(): if len(values) == 1: values_all_layers = values*nstr self.console.rule( ":warning: Assuming that soil is homogeneous with depth :warning:", style="yellow" ) df_SPP_map[key] = values_all_layers SPP_map = df_SPP_map if (SPP_map["VGRMCCELL"] >= SPP_map["POROS"]).any(): self.console.rule( ":warning: residual water content is > porosity :warning:", style="yellow" ) # if (SPP_map["VGRMCCELL"] >= SPP_map["POROS"]).all(): # raise ValueError( # "ALL residual water content are" # + str(SPP_map["VGRMCCELL"]) # + "> porosity " # + str(SPP_map["POROS"]) # ) # create prepro inputs if not existing (containing info about the DEM) # -------------------------------------------------------------------- if hasattr(self, "dem_parameters") is False: self.update_prepo_inputs() # Soil Physical Properties strat by strat # -------------------------------------------------------------------- self.soil_SPP = {} self.soil_SPP["SPP_map"] = SPP_map # mapping with respect to zones if len(SPP) > 0: self.soil_SPP["SPP"] = SPP # matrice with respect to zones else: SoilPhysProp = SPP_map.values self.soil_SPP["SPP"] = SoilPhysProp # matrice with respect to zones/layers # Vegetation properties (PCANA,PCREF,PCWLT,ZROOT,PZ,OMGC) # -------------------------------------------------------------------- FeddesParam = self._prepare_SOIL_vegetation_tb(FP_map) self.soil_FP = {} self.soil_FP["FP"] = FeddesParam self.soil_FP["FP_map"] = FP_map # mapping with respect to zones map_veg = np.ones(np.shape(self.veg_map)) for i, value in enumerate(FP_map['PCANA']): map_veg[self.veg_map == i + 1] = i + 1 self.update_veg_map(map_veg) if show: update_map_veg = self.map_prop_veg(FP_map) fig, ax = plt_CT.dem_plot_2d_top(update_map_veg, label="all") fig.savefig( os.path.join(self.workdir, self.project_name, "map_veg.png"), dpi=400 ) # write soil file # -------------------------------------------------------------------- # print('write soil') self._write_SOIL_file(self.soil_SPP["SPP"], FeddesParam, **kwargs ) # map SPP to the mesh # -------------------------------------------------------------------- dem, dem_header = self.read_inputs('dem') if len(zone3d) > 0: saveMeshPath = os.path.join(self.workdir, self.project_name, 'vtk', self.project_name + '.vtk' ) if 'saveMeshPath' in kwargs: saveMeshPath = kwargs.pop('saveMeshPath') mt.add_markers2mesh( zone3d, dem, self.mesh_pv_attributes, self.dem_parameters, self.hapin, self.grid3d, to_nodes=False, show=show, saveMeshPath=saveMeshPath, ) # mt.add_markers2mesh( # zone3d, # self.mesh_pv_attributes, # self.dem_parameters, # self.hapin, # to_nodes=True # ) # for spp in SPP_map: # self.map_prop_2mesh_markers(spp, SPP_map[spp], to_nodes=False) pass def _get_soil_SPP_columnsNames(self): # Define the column names columns = ['PERMX', 'PERMY', 'PERMZ','ELSTOR', 'POROS', 'VGNCELL', 'VGRMCCELL', 'VGPSATCELL' ] return columns def init_soil_df(self, nzones, nstr): columns = self._get_soil_SPP_columnsNames() # Generate the lists of integers for zones and strings zones = list(range(nzones)) strings = [ni + 1 for ni in range(nstr)] # Create multi-level index multi_index = pd.MultiIndex.from_product([zones, strings], names=['zone', 'str'] ) # Create an empty DataFrame with multi-level index and specified columns SPP_map = pd.DataFrame(index=multi_index, columns=columns) return SPP_map def set_SOIL_defaults(self, FP_map_default=False, SPP_map_default=False): self.soil = { "PMIN": -5.0, "IPEAT": 0, "SCF": 1.0, # here we assume that all soil is covered by the vegetation "CBETA0": 0.4, "CANG": 0.225, # Feddes parameters default values "PCANA": [0.0], "PCREF": [-4.0], "PCWLT": [-150], "ZROOT": [0.1], "PZ": [1.0], "OMGC": [1.0], "IVGHU": 0, # The first thing that must be decided! "HUALFA": 0.02, "HUBETA": 2, "HUGAMA": 2, "HUPSIA": 0, "HUSWR": 0.333, "HUN": 1, "HUA": -5, "HUB": 1, "BCBETA": 1.2, "BCRMC": 0, "BCPSAT": -0.345, } if FP_map_default: FP_map = { # Feddes parameters default values "PCANA": [0.0], "PCREF": [-4.0], "PCWLT": [-150], "ZROOT": [1.0], "PZ": [1.0], "OMGC": [1.0], } # self.soil.update(FP) return FP_map # # set Soil Physical Properties defaults parameters # # -------------------------------------------------------------------- if SPP_map_default: PERMX = PERMY = PERMZ = 1.88e-04 ELSTOR = 1.00e-05 POROS = 0.55 VGNCELL = 1.46 VGRMCCELL = 0.15 VGPSATCELL = 0.03125 # Replace these values with your actual number of zones and strings nzones = self.dem_parameters["nzone"] nstr = self.dem_parameters["nstr"] SPP_map = self.init_soil_df(nzones,nstr) for c in SPP_map.columns: SPP_map[c] = eval(c) return SPP_map pass def _prepare_SPP_tb(self, SPP, zone3d): """ prepare SOIL Physical Properties table Parameters ---------- SPP : TYPE DESCRIPTION. Returns ------- np.array describing the SoilPhysProp with rows corresponding to the layer. """ # check number of zones if self.dem_parameters["nzone"] > 1 or len(zone3d) > 0: if len(SPP["PERMX"]) <= 1: for i, spp in enumerate(SPP): SPP[spp] = SPP[spp] * np.ones(self.dem_parameters["nzone"]) else: pass # check size of the heteregeneity of SPP # ---------------------------------------- # 1d --> uniform # 2d --> lateral variations due to zones defined in surface # 3d --> lateral + vertical variations due to zones and strates SoilPhysProp = [] if len(zone3d) == 0: # loop over strates # ----------------------------------------------------------------- for istr in range(self.dem_parameters["nstr"]): # loop over zones (defined in the zone file) # -------------------------------------------------------------- LayeriZonei = np.zeros([self.dem_parameters["nzone"], 8]) for izone in range(self.dem_parameters["nzone"]): for i, spp in enumerate(SPP): LayeriZonei[izone, i] = SPP[spp][izone] SoilPhysProp.append(LayeriZonei) else: zones3d_def = np.arange(1, self.hapin["M"] * self.hapin["N"] + 1) zones3d_def = np.reshape(zones3d_def, [self.hapin["M"], self.hapin["N"]]) self.update_zone(zones3d_def) # loop over strates # ----------------------------------------------------------------- for istr in range(self.dem_parameters["nstr"]): # loop over zones (defined in the zone file) # -------------------------------------------------------------- LayeriZonei = np.zeros([self.dem_parameters["nzone"], 8]) for izone in range(self.hapin["N"] * self.hapin["M"]): for i, spp in enumerate(SPP): flag_zone = int(np.ravel(zone3d[istr])[izone]) LayeriZonei[izone, i] = SPP[spp][flag_zone - 1] SoilPhysProp.append(LayeriZonei) SoilPhysProp = np.vstack(SoilPhysProp) # np.shape(SoilPhysProp) # case if there is only one zone in the mesh # ------------------------------------------------------------- else: if len(SPP["PERMX"]) <= 1: izoneSoil = [] for spp in SPP: izoneSoil.append(SPP[spp]) izoneSoil = np.hstack(izoneSoil) SoilPhysProp = np.tile(izoneSoil, (self.dem_parameters["nstr"], 1)) # case where it is heterogeneous in depth (z) # ------------------------------------------------------ else: izoneSoil_per_layer = [] for stri in range(self.dem_parameters["nstr"]): izoneSoil = [] for spp in SPP: izoneSoil.append(SPP[spp][stri]) izoneSoil_per_layer.append(np.hstack(izoneSoil)) SoilPhysProp = np.vstack(izoneSoil_per_layer) return SoilPhysProp def _prepare_SOIL_vegetation_tb(self, FP_map): """ _prepare_SOIL_vegetation_tb Parameters ---------- FP_map : dict dict containing Feddes parameters. - 'PCANA': anaerobiosis point - 'PCREF': field capacity - 'PCWLT': wilting point - 'ZROOT': float root depth - 'PZ': ?? - 'OMGC': float ?? For details, see http://dx.doi.org/10.1002/2015WR017139 Returns ------- FeddesParam: numpy array table or array describing Feddes parameters for a given DEM """ # Vegetation properties (PCANA,PCREF,PCWLT,ZROOT,PZ,OMGC) # -------------------------------------------------------------------- self.veg_map # Check if root_map file exist and is updated # ------------------------------------------- if hasattr(self, "veg_map") is False: if len(FP_map[list(FP_map)[0]]) == 1: self.update_veg_map() else: raise ValueError( "Found multiple values of Feddes zones" + "but vegetation map is not defined" ) # Check vegetation heterogeneity dimension # ---------------------------------------- if self.cathyH["MAXVEG"] != len(FP_map[list(FP_map)[0]]): raise ValueError( "Wrong number of vegetations: PCANA size is " + str(len(FP_map[list(FP_map)[0]])) + " while MAXVEG is " + str(self.cathyH["MAXVEG"]) ) # check number of vegetation # -------------------------------------------------------------------- if self.cathyH["MAXVEG"] > 1: FeddesParam = np.zeros([self.cathyH["MAXVEG"], 6]) for iveg in range( self.cathyH["MAXVEG"] ): # loop over veg zones within a strate izoneVeg_tmp = [] for sfp in FP_map: izoneVeg_tmp.append(FP_map[sfp][iveg]) izoneVeg_tmp = np.hstack(izoneVeg_tmp) FeddesParam[iveg, :] = izoneVeg_tmp # case where unique vegetation type else: FeddesParam = np.c_[ self.soil["PCANA"], self.soil["PCREF"], self.soil["PCWLT"], self.soil["ZROOT"], self.soil["PZ"], self.soil["OMGC"], ] return FeddesParam def _write_SOIL_file(self, SoilPhysProp, FeddesParam, **kwargs): """ _write_SOIL_file Parameters ---------- SoilPhysProp : Soil physical properties Numpy array of Soil physical properties. FeddesParam : Feddes Parameters Dictionnatry of Feddes Parameters """ # backup file during DA scheme cycle # -------------------------------------------------------------------- backup = True if "backup" in kwargs: backup = kwargs["backup"] # number of side header for each row header_fmt_soil = [1, 2, 2, 6, 1, 5, 1, 2, 3] # open soil file # -------------------------------------------------------------------- if self.DAFLAG: soil_filepath = os.path.join(os.getcwd(), self.input_dirname, "soil") else: soil_filepath = os.path.join( self.workdir, self.project_name, self.input_dirname, "soil" ) if "filename" in kwargs: soil_filepath = os.path.join(kwargs["filename"]) #, "soil") # print('v'*13) # print(backup) # print('backup soil') # print(self.count_DA_cycle) # print(soil_filepath) if backup: if self.count_DA_cycle is not None: dst_dir = soil_filepath + str(self.count_DA_cycle) shutil.copy(soil_filepath, dst_dir) # print('backup soil ' + dst_dir) with open(os.path.join(soil_filepath), "w+") as soilfile: counth = 0 # count header index # Write line by line according to header format # ---------------------------------------------------------------- for i, h in enumerate(header_fmt_soil): # left = values # ------------------------------------------------------------ left = right = [] left = str(list(self.soil.values())[counth : counth + h]) left = left.strip("[]").replace(",", "") # right = keys # ------------------------------------------------------------ right = str(list(self.soil.keys())[counth : counth + h]) right = right.strip("[]").replace(",", "") right = right.replace("'", "") # Line 4: write Feddes Parameters Table # ------------------------------------------------------------ if i == 3: # Feddes parameters np.savetxt(soilfile, FeddesParam, fmt="%1.3e") counth += h else: line = left + "\t" + right + "\n" counth += h soilfile.write(str(line)) # End of soil file: write Soil Properties Table # ------------------------------------------------------------ np.savetxt(soilfile, SoilPhysProp, fmt="%1.3e") soilfile.write( "PERMX PERMY PERMZ ELSTOR POROS,VGNCELL,VGRMCCELL,VGPSATCELL" + "\n" ) soilfile.close() def update_veg_map(self, indice_veg=None, show=False, **kwargs): """ Contains the raster map describing which type of vegetation every cell belongs to. Parameters ---------- indice_veg : raster, optional DESCRIPTION. The default is 1. show : bool, optional Plot the vegetation raster. The default is False. Returns ------- indice_veg : raster New vegetation distribution raster. """ if indice_veg is None: indice_veg, str_hd_rootmap = in_CT.read_root_map(os.path.join( self.workdir, self.project_name, self.input_dirname, "root_map" ) ) self.veg_map = indice_veg if hasattr(self, "hapin") is False: self.update_prepo_inputs() with open( os.path.join( self.workdir, self.project_name, self.input_dirname, "root_map" ), "w+", ) as rootmapfile: rootmapfile.write("north: 0" + "\n") rootmapfile.write("south: " + str(self.hapin["yllcorner"]) + "\n") rootmapfile.write("east: 0" + "\n") rootmapfile.write("west: " + str(self.hapin["xllcorner"]) + "\n") rootmapfile.write("rows: " + str(self.hapin["M"]) + "\n") rootmapfile.write("cols: " + str(self.hapin["N"]) + "\n") if isinstance(indice_veg, int): indice_veg = ( np.c_[np.ones([int(self.hapin["M"]), int(self.hapin["N"])])]*indice_veg ) np.savetxt(rootmapfile, indice_veg, fmt="%1.2e") else: np.savetxt(rootmapfile, indice_veg, fmt="%1.2e") rootmapfile.close() # exclude vegetation label from number of vegetation if is it outside the DEM domain # i.e if DEM values are negative # --------------------------------------------------------------------------------- if len(np.unique(indice_veg))>1: exclude_veg = self._check_outside_DEM(indice_veg) self.MAXVEG = len(np.unique(indice_veg)) - exclude_veg if exclude_veg>0: print('excluding outside DEM') print('MAXVEG='+ str(self.MAXVEG)) else: self.MAXVEG = len(np.unique(indice_veg)) self.update_cathyH(MAXVEG=self.MAXVEG) # to uncomment if show: ax = plt_CT.show_indice_veg(self.veg_map, **kwargs) return indice_veg, ax return indice_veg def _check_outside_DEM(self,raster2check): if hasattr(self,'DEM') is False: DEM_mat, DEM_header = in_CT.read_dem( os.path.join(self.workdir, self.project_name, "prepro/dem"), os.path.join(self.workdir, self.project_name, "prepro/dtm_13.val"), ) self.DEM = DEM_mat # exclude vegetation label from number of vegetation if is it outside the DEM domain # i.e if DEM values are negative exclude_out_ind = 0 if np.min(self.DEM)<0: # if len(raster2check[self.DEM<0]): exclude_out_ind = 1 return exclude_out_ind #%% Add inputs and outputs attributes to the mesh def init_boundary_conditions(self, BCtypName, time, **kwargs): """ .. note: The boundary conditions are defined in the nansfdirbc (Dirichlet), nansfneubc (Neumann), and sfbc (seepage face) files. We have two types of boundary conditions (BC): - Neumann BC (or specifed flux) - Dirichlet BC (or pressure). .. note: - Pioggia: condizioni di Neumann. Quando non ci può più essere infiltrazione metto Dirichlet. - Evaporazione: si indica un limite di pressione minimo ( Pmin ) al di sotto del quale si ha uno switch da Neumann a Dirichlet (in quanto al di sotto di questo valore non si ha più evapotraspirazione). .. note: The boundary condition for any given surface node can switch between a Dirichlet condition and a Neumann condition depending on the saturation (or pressure) state of that node. .. note: A Neumann (or specified flux) boundary condition corresponds to atmosphere-controlled infiltration or exfiltration, with the flux equal to the rainfall or potential evaporation rate given by the atmospheric input data. When the surface node reaches a threshold level of saturation or moisture deficit, the boundary condition is switched to a Dirichlet (specified head) condition, and the infiltration or exfiltration process becomes soil limited [1]. Returns ------- None. """ # self.update_nansfdirbc() # self.update_nansfneubc() # self.update_sfbc() try: self.console.print( ":orange_square: [b] init boundary condition dataframe [/b]" ) # self.create_mesh_vtk() if not "nnod3" in self.grid3d.keys(): self.run_processor(IPRT1=3) if hasattr(self, "mesh_bound_cond_df") is False: self.create_mesh_bounds_df( BCtypName, self.grid3d["mesh3d_nodes"], time, **kwargs ) except: raise ValueError("cannot init boundary conditions dataframe") pass pass def check_for_inconsistent_BC(self): pass def set_BC_laterals(self, time, BC_type="", val=0): """ Set all sides expect surface one """ nnodes = len( self.mesh_bound_cond_df[self.mesh_bound_cond_df["time (s)"] == 0]["id_node"] ) for tt in time: BC_bool_name = [] BC_bool_val = [] for id_node in range(nnodes): if self.mesh_bound_cond_df["bound"].loc[int(id_node)] == True: BC_bool_name.append(BC_type) BC_bool_val.append(0) else: BC_bool_name.append(None) BC_bool_val.append(val) self.update_mesh_boundary_cond( time=tt, BC_bool_name=BC_bool_name, BC_bool_val=BC_bool_val ) self.check_for_inconsistent_BC() # self.update_mesh_vtk(BC_bool_name,BC_bool_val) pass def get_outer_nodes(self, x, y, z): x_min, x_max = np.min(x), np.max(x) y_min, y_max = np.min(y), np.max(y) z_min, z_max = np.min(z), np.max(z) outer_mask = np.logical_or.reduce((x == x_min, x == x_max, y == y_min, y == y_max, z == z_min, z == z_max)) outer_nodes = np.column_stack((x[outer_mask], y[outer_mask], z[outer_mask])) return outer_nodes, outer_mask def create_mesh_bounds_df(self, BCtypName, grid3d, times, **kwargs): """ Create a dataframe with flag for different boundary condtions assigned to each nodes Parameters ---------- grid3d : TYPE DESCRIPTION. Returns ------- None. """ # Step 1: Extract mesh coordinates and build a dataframe # --------------------------------------------------------------------- grid3d = np.c_[np.arange(0,len(grid3d)),grid3d] self.mesh_bound_cond_df = pd.DataFrame(grid3d) self.mesh_bound_cond_df = self.mesh_bound_cond_df.rename( columns={ 0: "id_node", 1: "x", 2: "y", 3: "z", } ) self.mesh_bound_cond_df["id_node"] = self.mesh_bound_cond_df["id_node"].astype( int ) self.mesh_bound_cond_df["time"] = 0 coords = self.mesh_bound_cond_df[['x','y','z']].to_numpy() len(coords) # Step 2: build a mask of the size of the mesh # ------------------------------------------------------------------- x = np.unique(coords[:,0]) y = np.unique(coords[:,1]) layers_top, layers_bottom = mt.get_layer_depths(self.dem_parameters) layers_top.append(layers_bottom[-1]) z = layers_top grid_x, grid_y = np.meshgrid(x, y) X, Y, Z = np.meshgrid(x, np.flipud(y), np.flipud(z)) # dem_mat, header_dem = simu.read_inputs('dem') xdem, ydem = plt_CT.get_dem_coords( hapin=self.hapin, ) grid_xdem, grid_ydem = np.meshgrid(xdem, ydem) # Step 3: interpolate DEM (defined on mesh cell center) on mesh nodes # ------------------------------------------------------------------- rbf3 = Rbf(grid_xdem.flatten(), grid_ydem.flatten(), self.DEM.flatten(), function="multiquadric", smooth=5) grid_dem = rbf3(grid_x, grid_y) np.shape(grid_dem) Ztopo = np.zeros(np.shape(Z)) for i in range(np.shape(Z)[2]): Ztopo[:,:,i]=(Z[:,:,i]+grid_dem) outer_nodes, outer_mask = self.get_outer_nodes(X, Y, Z) # maskFalse = np.zeros(np.shape(outer_mask), dtype=bool) z_min, z_max = np.min(Z), np.max(Z) x_min, x_max = np.min(X), np.max(X) y_min, y_max = np.min(Y), np.max(Y) noflow_mask = np.logical_or.reduce((X == x_min, X == x_max, Y == y_min, Y == y_max, Z == z_min)) top_mask = np.logical_or.reduce((Z == z_max)) top_mask = (outer_mask & top_mask).transpose(2,0,1) bot_mask = np.logical_or.reduce((Z == z_min)) bot_mask = (outer_mask & bot_mask).transpose(2,0,1) all_bounds_mask = outer_mask.transpose(2,0,1) # Step 4: Fill the dataframe with flag for outer nodes # ------------------------------------------------------------------- len(noflow_mask.transpose(2,0,1).flatten()) # print('Fill the dataframe with flag for outer nodes') self.mesh_bound_cond_df["noflow_bound"] = noflow_mask.transpose(2,0,1).flatten() self.mesh_bound_cond_df["top"] = top_mask.flatten() self.mesh_bound_cond_df["bot"] = bot_mask.flatten() self.mesh_bound_cond_df["all_bounds"] = all_bounds_mask.flatten() # Step 4: Replicate df for all given times # ------------------------------------------------------------------- # times = [2, 3, 4] # if len(times)>1: # mesh_bound_cond_df_withtimes =self.mesh_bound_cond_df.copy() # for ti in times: # print(ti) # self.mesh_bound_cond_df['time']= ti # mesh_bound_cond_df_withtimes=pd.concat([mesh_bound_cond_df_withtimes, # self.mesh_bound_cond_df]) # self.mesh_bound_cond_df = mesh_bound_cond_df_withtimes if len(times) > 1: mesh_bound_cond_df_withtimes = pd.concat( [self.mesh_bound_cond_df.assign(time=ti) for ti in times] ) self.mesh_bound_cond_df = mesh_bound_cond_df_withtimes # if len(times) > 1: # mesh_bound_cond_df_withtimes = self.mesh_bound_cond_df.copy() # for ti in times[1:]: # mesh_bound_cond_df_withtimes = mesh_bound_cond_df_withtimes.append( # self.mesh_bound_cond_df.loc[:, self.mesh_bound_cond_df.columns != 'time'] # .assign(time=ti) # ) # self.mesh_bound_cond_df = mesh_bound_cond_df_withtimes def assign_mesh_bc_df(self, BCtypName, times=0, **kwargs): # step 5 add flag for each type of BC # --------------------------------------- # specified pressure if "nansfdirbc" in BCtypName: if "no_flow" in kwargs: self.mesh_bound_cond_df[BCtypName] = -99 mask = self.mesh_bound_cond_df["noflow_bound"] == True self.mesh_bound_cond_df.loc[mask, BCtypName] = 0 # specified flux if "nansfneubc" in BCtypName: if "no_flow" in kwargs: self.mesh_bound_cond_df[BCtypName] = -99 mask = self.mesh_bound_cond_df["noflow_bound"] == True self.mesh_bound_cond_df.loc[mask, BCtypName] = 0 # specified seepage if "sfbc" in BCtypName: if "no_flow" in kwargs: self.mesh_bound_cond_df[BCtypName] = -99 mask = self.mesh_bound_cond_df["noflow_bound"] == True self.mesh_bound_cond_df.loc[mask, BCtypName] = 0 # print( # "SKip time dependence init boundary condition dataframe - consequences (?)" # ) pass def update_mesh_boundary_cond( self, time, BC_bool_name=[], BC_bool_val=[], ): """ update_mesh_bounds Parameters ---------- bound_type : str Neumann or Dirichlet. bound_bool : TYPE Boolean for bound cond. """ self.console.print(":sponge: [b]update boundary condition dataframe[/b]") self.mesh_bound_cond_df.loc[ self.mesh_bound_cond_df["time"] == time, "BC_type" ] = BC_bool_name self.mesh_bound_cond_df.loc[ self.mesh_bound_cond_df["time"] == time, "BC_type_val" ] = BC_bool_val pass def create_mesh_vtkris3d_vtk9(self): """ Create mesh for vtk format version 9 # vtk DataFile Version 9.0 3D Unstructured Grid of Linear Triangles ASCII DATASET STRUCTURED_GRID FIELD FieldData 1 TIME 1 1 double 0.00000 POINTS 7056 float TETRA (5,NT) - element connectivities in 3-d mesh (TETRA(5,I) # C indicates material type for 3-d element I) """ if not "nnod3" in self.grid3d.keys(): self.run_processor(IPRT1=3) with open( os.path.join(self.workdir, self.project_name, "vtk/mesh_tmp.vtk"), "w+" ) as vtkmesh: vtkmesh.write("# vtk DataFile Version 2.0\n") vtkmesh.write("3D Unstructured Grid of Linear Triangles\n") vtkmesh.write("ASCII\n") vtkmesh.write("DATASET UNSTRUCTURED_GRID\n") vtkmesh.write("FIELD FieldData 1\n") vtkmesh.write("TIME 1 1 double\n") vtkmesh.write(" 0.00000\n") vtkmesh.write( "POINTS " + "{:3.0f}".format(self.grid3d["nnod3"]) + " float\n" ) np.savetxt(vtkmesh, self.grid3d["mesh3d_nodes"], fmt="%1.6e") len(self.grid3d["mesh3d_nodes"]) ntetra = len(self.grid3d["mesh_tetra"]) numb = ntetra * 5 mesh_tretra_m = self.grid3d["mesh_tetra"].T[0:4] - 1 # np.shape(mesh_tretra_m) tetra_mesh = np.vstack([4 * np.ones(ntetra), mesh_tretra_m]).T vtkmesh.write( "CELLS " + "{:d}".format(ntetra) + "\t" + "{:d}".format(numb) + "\n" ) np.savetxt(vtkmesh, tetra_mesh, fmt="%d", delimiter="\t") vtkmesh.write("CELL_TYPES " + "{:d}".format(ntetra) + "\n") np.savetxt(vtkmesh, 10 * np.ones(ntetra).T, fmt="%d") vtkmesh.close() pass def create_mesh_vtkris3d_vtk2(self, verbose): """ Create mesh for vtk format version 2 # vtk DataFile Version 2.0 3D Unstructured Grid of Linear Triangles ASCII DATASET STRUCTURED_GRID FIELD FieldData 1 TIME 1 1 double 0.00000 POINTS 7056 float TETRA (5,NT) - element connectivities in 3-d mesh (TETRA(5,I) # C indicates material type for 3-d element I) """ with open( os.path.join(self.workdir, self.project_name, "vtk/mesh_tmp.vtk"), "w+" ) as vtkmesh: vtkmesh.write("# vtk DataFile Version 2.0\n") vtkmesh.write("3D Unstructured Grid of Linear Triangles\n") vtkmesh.write("ASCII\n") vtkmesh.write("DATASET UNSTRUCTURED_GRID\n") vtkmesh.write("FIELD FieldData 1\n") vtkmesh.write("TIME 1 1 double\n") vtkmesh.write(" 0.00000\n") vtkmesh.write( "POINTS " + "{:3.0f}".format(self.grid3d["nnod3"]) + " float\n" ) np.savetxt(vtkmesh, self.grid3d["mesh3d_nodes"], fmt="%1.6e") len(self.grid3d["mesh3d_nodes"]) ntetra = len(self.grid3d["mesh_tetra"]) numb = ntetra * 5 mesh_tretra_m = self.grid3d["mesh_tetra"].T[0:4] - 1 # np.shape(mesh_tretra_m) tetra_mesh = np.vstack([4 * np.ones(ntetra), mesh_tretra_m]).T vtkmesh.write( "CELLS " + "{:d}".format(ntetra) + "\t" + "{:d}".format(numb) + "\n" ) np.savetxt(vtkmesh, tetra_mesh, fmt="%d", delimiter="\t") vtkmesh.write("CELL_TYPES " + "{:d}".format(ntetra) + "\n") np.savetxt(vtkmesh, 10 * np.ones(ntetra).T, fmt="%d") vtkmesh.close() pass def create_mesh_vtk(self, verbose=False): """ Create custum mesh THIS SHOULD BE MOVED TO MESHTOOLS """ # if not 'nnod3' in self.grid3d.keys(): self.run_preprocessor(verbose=verbose) self.run_processor(IPRT1=3, verbose=verbose) self.create_mesh_vtkris3d_vtk2(verbose) # self.create_mesh_vtkris3d_vtk9() self.mesh_pv_attributes = pv.read( os.path.join(self.workdir, self.project_name, "vtk/mesh_tmp.vtk") ) self.mesh_pv_attributes.save( os.path.join( self.workdir, self.project_name, "vtk/", self.project_name + ".vtk" ), binary=False, ) # THIS IS A TEMPORARY IMPLEMENTATION OF THE PYVISTA MESH # https://docs.pyvista.org/examples/00-load/create-structured-surface.html pass def update_mesh_vtk(self, prop="", prop_value=[], replaceVTK=True, **kwargs): """ https://docs.pyvista.org/api/core/_autosummary/pyvista.ExplicitStructuredGrid.add_field_data.html#pyvista.ExplicitStructuredGrid.add_field_data Parameters ---------- prop : TYPE, optional DESCRIPTION. The default is ''. prop_value : TYPE, optional DESCRIPTION. The default is []. Returns ------- None. """ self.mesh_pv_attributes.add_field_data(prop_value, prop) if replaceVTK: self.mesh_pv_attributes.save( os.path.join( self.workdir, self.project_name, "vtk/", self.project_name + ".vtk" ), binary=False, ) pass def map_prop_2mesh_markers(self, prop_name, prop_map, zones_markers_3d=None, to_nodes=False, **kwargs): """ Map a physical property to the CATHY mesh nodes/cells. The mapping length should be equal to the mesh node markers unique value length. If no markers (i.e zones_markers_3d = None) are defined for the mesh nodes then each layers is associated with a unique marker Parameters ---------- prop_name : str property name i.e. ic, POROS, ... . prop_map : list Values of the property. to_nodes : bool, optional Map to the mesh nodes. The default is False. Returns ------- pv.Mesh Updated pyvista mesh with new property. """ if hasattr(self, "mesh_pv_attributes") == False: self.create_mesh_vtk() saveMeshPath = os.path.join( self.workdir, self.project_name, "vtk/", self.project_name + ".vtk" ) if 'saveMeshPath' in kwargs: saveMeshPath = kwargs.pop('saveMeshPath') # if 'node_markers_new' in self.mesh_pv_attributes == False: if zones_markers_3d is None: zones_markers_3d = [] for l in range(self.dem_parameters['nstr']): zones_markers_3d.append(np.ones([self.hapin["M"], self.hapin["N"]])*l) mt.add_markers2mesh( zones_markers_3d, self.DEM, self.mesh_pv_attributes, self.dem_parameters, self.hapin, self.grid3d, to_nodes=False, show=False, saveMeshPath = saveMeshPath ) if to_nodes: prop_mesh_nodes = np.zeros(len(self.mesh_pv_attributes["node_markers_new"])) for m in range(len(prop_map)): prop_mesh_nodes[ self.mesh_pv_attributes["node_markers_new"] == m ] = prop_map[m] self.mesh_pv_attributes[prop_name] = prop_mesh_nodes self.mesh_pv_attributes.save(saveMeshPath, binary=False, ) return list(prop_mesh_nodes) else: prop_mesh_cells = np.zeros(len(self.mesh_pv_attributes["cell_markers"])) for m in range(len(prop_map)): prop_mesh_cells[ self.mesh_pv_attributes["cell_markers"] == m ] = prop_map[m] self.mesh_pv_attributes[prop_name] = prop_mesh_cells self.mesh_pv_attributes.save(saveMeshPath, binary=False, ) self.mesh_pv_attributes.set_active_scalars(prop_name) prop_mesh_nodes = self.mesh_pv_attributes.cell_data_to_point_data() return prop_mesh_cells, prop_mesh_nodes[prop_name] def map_prop2mesh(self, dict_props): """ Add a given physical property to the CATHY mesh """ if hasattr(self, "mesh_pv_attributes") == False: self.create_mesh_vtk() for dp in dict_props.keys(): if ~isinstance(dict_props[dp], list): print( "Single value detected for " + str(dp) + " ==> assumming it homogeneous" ) self.update_mesh_vtk( prop=dp, prop_value=np.ones(len(self.mesh_pv_attributes.points)) * dict_props[dp], ) else: self.update_mesh_vtk(prop=dp, prop_value=dict_props[dp]) pass def map_prop_veg(self, dict_props): if hasattr(self, "veg_map") == False: warnings.warn("no known existing vegetation map.") pass else: map_veg_dict = {} update_map_veg = {} for d in dict_props.keys(): map_veg = np.zeros(np.shape(self.veg_map)) for i, value in enumerate(dict_props[d]): map_veg[self.veg_map == i + 1] = value update_map_veg[d] = map_veg return update_map_veg def map_prop2zone(self, dict_props, prop): if hasattr(self, "zone") == False: warnings.warn("no known existing zones.") pass else: prop_zones = np.zeros(np.shape(self.zone)) for z in range(len(np.unique(self.zone))): prop_zones[self.zone == z + 1] = dict_props[prop][z] return prop_zones # ------------------------------------------------------------------------ #%% Plot call # ------------------------------------------------------------------------ def show(self, prop="hgsfdet", **kwargs): """ Call and parse to cathy.plotter from the main CATHY class Parameters ---------- prop : str property to plot. **kwargs : kwargs Returns ------- None. """ try: df = self.read_outputs(filename=prop) except: pass if prop == "hgsfdet": plt_CT.show_hgsfdet(df, **kwargs) elif prop == "hgraph": plt_CT.show_hgraph(df, **kwargs) elif prop == "cumflowvol": plt_CT.show_COCumflowvol(df, **kwargs) elif prop == "dtcoupling": plt_CT.show_dtcoupling(df, **kwargs) elif prop == "wtdepth": plt_CT.show_wtdepth(df, **kwargs) elif prop == "spatialET": df_fort777 = out_CT.read_fort777(os.path.join(self.workdir, self.project_name, 'fort.777'), ) cmap = plt_CT.show_spatialET(df_fort777, **kwargs) return cmap elif prop == "WTD": # water table depth xyz = self.read_outputs('xyz') df_psi = self.read_outputs('psi') grid3d = self.read_outputs('grid3d') nstr = self.dem_parameters['nstr']+1 nnod = int(grid3d['nnod']) NPRT = np.shape(df_psi.values)[0] XYZsurface= xyz[['x','y','z']].iloc[0:nnod].to_numpy() WT, FLAG = self.infer_WTD_from_psi(df_psi.values,nnod,nstr,NPRT,xyz,XYZsurface) cmap = plt_CT.plot_WTD(XYZsurface,WT,**kwargs) return cmap else: print("no proxy to plot") # elif filename == 'psi': # plt_CT.show_psi(path) # elif filename == 'sw': # plt_CT.show_sw(path) pass def show_bc(self, BCtypName=None, time=0, ax=None, **kwargs): """Show bc""" if BCtypName is None: fig = plt.figure() # Plot nansfdirbc # ------------------------------------ ax = fig.add_subplot(1, 3, 1, projection='3d') plt_CT.plot_mesh_bounds('nansfdirbc', self.mesh_bound_cond_df, time, ax ) # Plot nansfneubc # ------------------------------------ ax = fig.add_subplot(1, 3, 2, projection='3d') plt_CT.plot_mesh_bounds('nansfneubc', self.mesh_bound_cond_df, time, ax ) # Plot sfbc # ------------------------------------ ax = fig.add_subplot(1, 3, 3, projection='3d') plt_CT.plot_mesh_bounds('sfbc', self.mesh_bound_cond_df, time, ax ) plt.tight_layout() else: plt_CT.plot_mesh_bounds(BCtypName, self.mesh_bound_cond_df, time, ax) pass def show_input(self, prop="hgsfdet", ax=None, **kwargs): """ Call and parse to cathy.plotter from the main CATHY class Parameters ---------- prop : str property to plot. **kwargs : kwargs Returns ------- None. """ df = self.read_inputs(filename=prop, **kwargs) if prop == "atmbc": plt_CT.show_atmbc(df["time"], df["value"], ax=ax, **kwargs) elif prop == "root_map": plt_CT.show_indice_veg(df[0], ax=ax, **kwargs) elif prop == "dem": if hasattr(self, "hapin") is False: self.update_prepo_inputs() hapin = self.hapin plt_CT.show_dem(df[0], hapin, ax=ax, **kwargs) elif prop == "zone": plt_CT.show_zone(df[0], ax=ax) elif prop == "soil": # in 2 dimensions # ------------- zone_mat = in_CT.read_zone( os.path.join(self.workdir, self.project_name, "prepro/zone") ) layer_nb = 0 if "layer_nb" in kwargs: layer_nb = kwargs["layer_nb"] yprop = "PERMX" if "yprop" in kwargs: yprop = kwargs["yprop"] soil_map_prop = zone_mat[0] # self.DEM # NZONES = exclude_zone = self._check_outside_DEM(zone_mat[0]) NZONES = len(np.unique(zone_mat[0])) - exclude_zone if NZONES-1>1: for z in range(NZONES-1): soil_map_prop[zone_mat[0] == z+1] = df[0][yprop].xs( (z+1,layer_nb) ) else: soil_map_prop[zone_mat[0] == 1] = df[0][yprop].xs((1, layer_nb) ) cmap = plt_CT.show_soil(soil_map_prop, ax=ax, **kwargs) return cmap # in 3 dimensions # -------------- # SPP = df[0].to_dict() # SoilPhysProp = self._prepare_SPP_tb(SPP) # soil_SPP_plot = {} # soil_SPP_plot['SPP'] = SoilPhysProp # matrice with respect to zones # soil_SPP_plot['SPP_map'] = SPP # mapping with respect to zones # self.map_prop2mesh(SPP) elif ("dtm_" in prop) | ("lakes_map" in prop): raster_mat, header_raster = in_CT.read_raster( os.path.join(self.workdir, self.project_name, "prepro/" + prop) ) if hasattr(self, "hapin") is False: self.update_prepo_inputs() hapin = self.hapin plt_CT.show_raster( raster_mat, header_raster, prop=prop, hapin=hapin, ax=ax, **kwargs ) else: print("no proxy to plot") pass #%% Read outputs/inputs # ------------------------------------------------------------------------ def read_outputs(self, filename, **kwargs): """ Read CATHY format output file Parameters ---------- filename : str name of the output file to read. Returns ------- A dataframe or a dict describing file data/entries. """ path = os.path.join(self.workdir, self.project_name, "output", filename) if "path" in kwargs: path = kwargs["path"] + "/" + filename if filename == "vp": df = out_CT.read_vp(path) return df elif filename == "hgraph": df = out_CT.read_hgraph(path) return df elif filename == "dtcoupling": df = out_CT.read_dtcoupling(path) return df elif filename == "hgsfdet": df = out_CT.read_hgsfdet(path) return df elif filename == "psi": df = out_CT.read_psi(path) return df elif filename == "sw": df = out_CT.read_sw(path) return df elif filename == "mbeconv": df = out_CT.read_mbeconv(path) return df elif filename == "cumflowvol": df = out_CT.read_cumflowvol(path) return df elif filename == "wtdepth": df = out_CT.read_wtdepth(path) elif filename == "xyz": df = out_CT.read_xyz(path) return df elif filename == "grid3d": df = out_CT.read_grid3d(path) return df else: print("no file specified") pass def read_inputs(self, filename, **kwargs): """ Read CATHY format input file Parameters ---------- filename : str name of the input CATHY file to read. Returns ------- A dataframe or a dict describing file data/entries. """ if filename == "atmbc": if len(self.grid3d) == 0: # self.run_processor(IPRT1=3, DAFLAG=0) self.grid3d = out_CT.read_grid3d(os.path.join(self.workdir, self.project_name, 'output', 'grid3d') ) df, HSPATM, IETO, = in_CT.read_atmbc( os.path.join(self.workdir, self.project_name, "input", filename), grid=self.grid3d ) self.atmbc = {"HSPATM": HSPATM, "IETO": IETO, "time": df['time'].unique(), "VALUE": df['value'], "atmbc_df": df } return df elif filename == "dem": df = in_CT.read_dem( os.path.join(self.workdir, self.project_name, "prepro/dem"), os.path.join(self.workdir, self.project_name, "prepro/dtm_13.val"), ) return df elif filename == "root_map": df = in_CT.read_root_map( os.path.join(self.workdir, self.project_name, "input", filename) ) return df elif filename == "soil": dem_parm = in_CT.read_dem_parameters( os.path.join(self.workdir, self.project_name, "input", "dem_parameters") ) # self.dem_parameters # self.update_dem_parameters() if 'MAXVEG' in kwargs: MAXVEG = kwargs['MAXVEG'] else: MAXVEG = self.MAXVEG df = in_CT.read_soil( os.path.join(self.workdir, self.project_name, "input", filename), dem_parm, MAXVEG=MAXVEG, ) # df[0] return df elif filename == "zone": df = in_CT.read_zone( os.path.join(self.workdir, self.project_name, "prepro/zone") ) return df elif ("dtm_" in filename) | ("lakes_map" in filename): raster_mat, header_raster = in_CT.read_raster( os.path.join(self.workdir, self.project_name, "prepro/" + filename) ) return raster_mat, header_raster else: print("unknown file requested") pass # -------------------------------------------------------------------# # %% utils # -------------------------------------------------------------------# def infer_WTD_from_psi(self,psi,nnod,nstr,NPRT,xyz,XYZsurface): # Vertical profiles in rows and layers in columns, # define topography Z Z = np.zeros([nnod,nstr]) for l in range(nstr): for nn in range(nnod): Z[nn,l] = xyz['z'].iloc[l*nnod+nn] Z = np.fliplr(Z) FLAG = np.zeros([nnod,NPRT]); WT=[] for nprti in range(NPRT): # loop over NPRT vpPSI = np.zeros([nnod,nstr]) for l in range(nstr): for nn in range(nnod): vpPSI[nn,l] = psi[nprti][l*nnod+nn] vpPSI = np.fliplr(vpPSI) # % Z0 contains for every vertical profile the height of the watertable, Z0 = XYZsurface[:,2].copy() for ni in range(nnod): ## loop over all mesh nodes for nstri in range(nstr-1): ## loop over mesh layers # print(nstri) if vpPSI[ni, nstri] > 0 and vpPSI[ni, nstri+1] < 0 and FLAG[ni, nprti] == 0: # Watertable, interpolate linearly (Z=rc*VPPSI+z0) rc = (Z[ni, nstri] - Z[ni, nstri+1]) / (vpPSI[ni, nstri] - vpPSI[ni, nstri+1]) Z0[ni] = Z[ni, nstri] - rc * vpPSI[ni, nstri] FLAG[ni, nprti] = 1 elif vpPSI[ni, nstri] > 0 and vpPSI[ni, nstri+1] < 0 and FLAG[ni, nprti] == 1: # Second watertable FLAG[ni, nprti] = 2 elif nstri == nstr - 2 and vpPSI[ni, nstri+1] >= 0 and FLAG[ni, nprti] == 0: # Watertable not encountered and nodes still saturated FLAG[ni, nprti] = 3 elif nstri == nstr -2 and FLAG[ni, nprti] == 0: # Watertable not encountered, and not fully saturated profile. Set watertable to lowest node Z0[ni] = Z[ni, nstri - nstr + 2] FLAG[ni, nprti] = 4 Z0 = XYZsurface[:,2]-Z0; WT.append(Z0) WT = np.vstack(WT) return WT, FLAG def find_nearest_node(self, node_coords, grid3d=[]): """ Find nearest mesh node Parameters ---------- node_coords : list List of coordinates [[x,y,z],[x2,y2,z2]]. Returns ------- closest_idx : list Node indice in the grid3d. closest : list Node coordinate in the grid3d. """ if np.array(node_coords).ndim <= 1: node_coords = [node_coords] if len(grid3d) == 0: grid3d = out_CT.read_grid3d(os.path.join(self.workdir, self.project_name, 'output', 'grid3d') ) closest_idx = [] closest = [] for i, nc in enumerate(node_coords): # euclidean distance if len(nc)==3: d = ( (grid3d["mesh3d_nodes"][:, 0] - nc[0]) ** 2 + (grid3d["mesh3d_nodes"][:, 1] - nc[1]) ** 2 # + (abs(grid3d["mesh3d_nodes"][:, 2]) - abs(nc[2])) ** 2 + (grid3d["mesh3d_nodes"][:, 2] - nc[2]) ** 2 ) ** 0.5 else: d = ( (grid3d["mesh3d_nodes"][:, 0] - nc[0]) ** 2 + (grid3d["mesh3d_nodes"][:, 1] - nc[1]) ** 2 ) ** 0.5 closest_idx.append(np.argmin(d)) closest.append(grid3d["mesh3d_nodes"][closest_idx[i],:]) threshold = 5e-1 if d[np.argmin(d)] > threshold: self.console.print( ":warning: [b]No node close to the required points![/b]" ) return closest_idx, closest def rich_display(self, title="Star Wars Movies", **kwargs): """ Describe the variable state and fate during the simulation with a rich table Returns ------- None. """ self.console.print(eval("self." + str(title))) pass def backup_simu(self): """ Save a copy of the simulation for reuse within python Returns ------- project_filename.pkl """ with open( os.path.join(self.workdir, self.project_name, self.project_name + ".pkl"), "wb", ) as f: pickle.dump(CATHY, f) f.close() pass def backup_results_DA(self, meta_DA=[]): """ Save minimal dataframes of the simulation for result visualisation within python Returns ------- project_filename_df.pkl """ with open( os.path.join( self.workdir, self.project_name, self.project_name + "_df.pkl" ), "wb", ) as f: pickle.dump(meta_DA, f) pickle.dump(self.dict_parm_pert, f) pickle.dump(self.df_DA, f) pickle.dump(self.dict_obs, f) if hasattr(self, "df_performance"): pickle.dump(self.df_performance, f) if hasattr(self, "df_Archie"): pickle.dump(self.df_Archie, f) f.close() def load_pickle_backup(self,filename=""): if len(filename) == 0: filename = os.path.join( self.workdir, self.project_name, self.project_name + "_df.pkl" ) # filename = os.path.join( # self.workdir, self.project_name, str(idsimu) + "_df.pkl" # ) backup_list = [] all_names = [ "meta_DA", "dict_parm_pert", "df_DA", "dict_obs", "df_performance", "df_Archie", ] names = [] i = 0 # pickle_off = open(filename, "rb") # df = pd.read_pickle(pickle_off) with open(filename, "rb") as f: for i in range(len(all_names)): try: df = pd.read_pickle(f) backup_list.append(df) # backup_list.append(pickle.load(f)) names.append(all_names[i]) # i += 1 except: pass f.close() dict_backup = {} for i, n in enumerate(names): dict_backup[n] = backup_list[i] return dict_backup