Source code for importers.cathy_inputs

# -*- coding: utf-8 -*-
"""Reader for CATHY input files"""

import os
import re

import numpy as np
import pandas as pd

from pyCATHY.plotters import cathy_plots as pltCT


[docs] def read_atmbc(filename, grid=[], show=False, **kwargs): """ read atmbc file Returns ------- atmbc dataframe. """ # filename = './input/atmbc' atmbc_file = open(filename, "r") lines = atmbc_file.readlines() # lines = [ligne.strip() for ligne in lines] # avoid retour chariot atmbc_file.close() # read header flags Line0_header = [int(word) for word in lines[0].split() if word.isdigit()] HSPATM = Line0_header[0] IETO = Line0_header[1] t = [] value = [] tstep_idx = [] # loop over lines if HSPATM != 0: # homogeneous on all surf mesh nodes for i, ll in enumerate(lines): if i > 0: # test_char = re.search('[a-zA-Z]', ll) # if test_char is not None: if "TIME".casefold() in ll.casefold(): tstep_idx.append(i) splt = ll.split() t.append(float(splt[0])) # two cases (according to file formatting): # numerical values + flag of value # ----------------------------------------------------------------- # 1/ value (numeric) + 'VALUE' elif "VALUE".casefold() in ll.casefold() or "ATMINP".casefold() in ll.casefold(): value.append(float(ll.split()[0])) elif "\n" in ll: if len(ll.split())>0: if (i % 2) == 0: value.append(float(ll.split()[0])) else: t.append(float(ll.split()[0])) else: pass # take care of the additionnal empty lines (retour chariot) # in the end of the file # pass else: # only numerical values # 2/ value (numeric) # ----------------------------------------------------------------- value.append(float(ll.split()[0])) if len(value) != len(t): raise ValueError( "Number of values does not match number of times (check flags TIME, VALUE)" ) d_atmbc = [] d_atmbc = np.vstack([t, value]) cols_atmbc = ["time", "value"] df_atmbc = pd.DataFrame(d_atmbc.T, columns=cols_atmbc) else: # heterogeneous on all surf mesh nodes for i, ll in enumerate(lines): if i > 0: # test_char = re.search('[a-zA-Z]', ll) # if test_char is not None: if "TIME".casefold() in ll.casefold(): tstep_idx.append(i) splt = ll.split() t.append(float(splt[0])) # Example numerical data # data_list = [1.0, 10, 15, 20, 1.5, 5, 8, 12, ...] # Replace with your actual data # n = int(grid["nnod"]) # Number of values for each time # Create a list of dictionaries with 'time' and 'values' data_dicts = [] for i, j in enumerate(range(1, len(lines), int(grid["nnod"])+1)): values = lines[j+1:j+int(grid["nnod"])+1] for value in values: data_dicts.append({'time': t[i], 'value': float(value.split()[0])}) # Create a DataFrame from the list of dictionaries df_atmbc = pd.DataFrame(data_dicts) # d_atmbc = [] # for ti in range(len(t)): # d_atmbc.append( # np.column_stack( # ( # t[ti] * np.ones(int(grid["nnod3"])), # value[ti] * np.ones(int(grid["nnod3"])), # grid["mesh3d_nodes"], # ) # ) # ) # d_atmbc = np.vstack(d_atmbc) # cols_atmbc = ["time", "value", "nodeidx", "x", "y", "z"] # df_atmbc = pd.DataFrame(d_atmbc, columns=cols_atmbc) if show == True: pltCT.show_atmbc(t, value, IETO=IETO) try: pltCT.show_atmbc_3d(df_atmbc) except: print("no vtk file found") return df_atmbc, HSPATM, IETO
[docs] def read_parm(filename, **kwargs): ''' read parm file Returns ------- parm dict ''' # parm_header = [ # "PERMX", # "PERMY", # "PERMZ", # "ELSTOR", # "POROS", # "VGNCELL", # "VGRMCCELL", # "VGPSATCELL", # ] parm_header = [ "IPRT1", # Flag for output of input and coordinate data "NCOUT", "TRAFLAG", # Flag for transport "ISIMGR", # Flag for type of simulation and type of surface grid "PONDH_MIN", # Minimum ponding head "VELREC", "KSLOPE", "TOLKSL", "PKRL", "PKRR", "PSEL", "PSER", "PDSE1L", "PDSE1R", "PDSE2L", "PDSE2R", "ISFONE", "ISFCVG", "DUPUIT", "TETAF", "LUMP", "IOPT", "NLRELX", "OMEGA", "L2NORM", "TOLUNS", "TOLSWI", "ERNLMX", "ITUNS", "ITUNS1", "ITUNS2", "ISOLV", "ITMXCG", "TOLCG", "DELTAT", "DTMIN", # Minimum FLOW3D time step size allowed "DTMAX", # Maximum FLOW3D time step size allowed "TMAX", # Time at end of simulation (TMAX is set to 0.0 for steady state problem) "DTMAGA", "DTMAGM", "DTREDS", "DTREDM", "IPRT", "VTKF", "NPRT", "(TIMPRT(I),I=1,NPRT)", "NUMVP", "(NODVP(I),I=1,NUMVP)", # should be positive (first node is 1) "NR", "CONTR(I),I=1,NR", "NUM_QOUT", "(ID_QOUT(I),I=1,NUM_QOUT)", ] dict_parm = {} parm_file = open(filename, "r") lines = parm_file.readlines() lines = [ligne.strip() for ligne in lines] # avoid retour chariot parm_file.close() lines = [ll for ll in lines if len(ll)>0] lines_s = [ll.split() for ll in lines] flat_lines_s = [item for sublist in lines_s for item in sublist] lines_s_num, lines_s_names = _search_num_values_in_list(flat_lines_s) ii = 0 for lname in lines_s_names: # print(lname) if lname in ["(TIMPRT(I),I=1,NPRT)","(TIMPRT(I)I=1NPRT)","(TIMPRT(I).I=1.NPRT)"]: lines_s_num_TIMPRT = [] for k in range(dict_parm['NPRT']): lines_s_num_TIMPRT.append(lines_s_num[ii+k]) dict_parm[lname] = lines_s_num_TIMPRT ii += dict_parm['NPRT'] elif lname == "(NODVP(I),I=1,NUMVP)" or lname== "(NODVP(I).I=1.NUMVP)": lines_s_num_NODVP = [] for k in range(dict_parm['NUMVP']): lines_s_num_NODVP.append(lines_s_num[ii+k]) dict_parm[lname] = lines_s_num_NODVP ii += dict_parm['NUMVP'] elif lname == "(ID_QOUT(I).I=1.NUM_QOUT)" or lname == "(ID_QOUT(I),I=1,NUM_QOUT)": lines_s_num_ID_QOUT = [] if dict_parm['NUM_QOUT'] == 0: dict_parm[lname] = 441 ii +=1 else: for k in range(dict_parm['NUM_QOUT']): lines_s_num_ID_QOUT.append(lines_s_num[ii+k]) dict_parm[lname] = lines_s_num_ID_QOUT ii += dict_parm['NUM_QOUT'] else: dict_parm[lname] = lines_s_num[ii] ii +=1 # workaound for NUM_QOUT # ----------------------- # dict_parm['NUM_QOUT'] = 441 return dict_parm
[docs] def read_dem_parameters(dem_parametersfile): # header_fmt = [1, 1, 1, 1, 3, 3] # nb of values per lines dem_parameters = [ "delta_x", "delta_y", "factor", "dostep", "nzone", "nstr", "n1", "ivert", "isp", "base", "zratio(i),i=1,nstr", ] parm_file = open(dem_parametersfile, "r") lines = parm_file.readlines() lines = [ligne.strip() for ligne in lines] # avoid retour chariot parm_file.close() lines = [ll for ll in lines if len(ll)>0] lines_s = [ll.split() for ll in lines] flat_lines_s = [item for sublist in lines_s for item in sublist] lines_s_num, lines_s_names = _search_num_values_in_list(flat_lines_s) dict_dem_parm = {} ii = 0 for lname in lines_s_names: if lname == "zratio(i),i=1,nstr" or lname == 'zratio(i).i=1.nstr': lines_s_num_zratio = [] for k in range(dict_dem_parm['nstr']): # print(k) lines_s_num_zratio.append(lines_s_num[ii+k]) dict_dem_parm[lname] = lines_s_num_zratio ii += dict_dem_parm['nstr'] else: dict_dem_parm[lname] = lines_s_num[ii] ii +=1 return dict_dem_parm
[docs] def read_soil(soilfile, dem_parm, MAXVEG): """ Read soil file This is an example of 3 layers and 2 zones; The inner reading cycle is by zone, and the outer one by layers, i.e.: PERMX_z1_str1 PERMY_z1_str1 PERMZ_z1_str1 ELSTOR_z1_str1 POROS_z1_str1 VGNCELL_z1_str1 VGRMCCELL__z1_str1 VGPSATCELL__z1_str1 PERMX_z2_str1 PERMY_z2_str1 PERMZ_z2_str1 ELSTOR_z2_str1 POROS_z2_str1 VGNCELL_z2_str1 VGRMCCELL__z2_str1 VGPSATCELL__z2_str1 PERMX_z1_str2 PERMY_z1_str2 PERMZ_z1_str2 ELSTOR_z1_str2 POROS_z1_str2 VGNCELL_z1_str2 VGRMCCELL__z1_str2 VGPSATCELL__z1_str2 PERMX_z2_str2 PERMY_z2_str2 PERMZ_z2_str2 ELSTOR_z2_str2 POROS_z2_str2 VGNCELL_z2_str2 VGRMCCELL__z2_str2 VGPSATCELL__z2_str2 PERMX_z1_str3 PERMY_z1_str3 PERMZ_z1_str3 ELSTOR_z1_str3 POROS_z1_str3 VGNCELL_z1_str3 VGRMCCELL__z1_str3 VGPSATCELL__z1_str3 PERMX_z2_str3 PERMY_z2_str3 PERMZ_z2_str3 ELSTOR_z2_str3 POROS_z2_str3 VGNCELL_z2_str3 VGRMCCELL__z2_str3 VGPSATCELL__z2_str3 Zone indexing starts from 1 while layers indexing starts from 0! Parameters ---------- soilfile : str filename (including abs path). dem_parm : dict dict of dem parameters (from read_dem_parameters) Returns ------- dataframe df containing soil properties dict Header of the dem file """ # Read FP parameters # --------------------- FP_header = ['PCANA','PCREF','PCWLT','ZROOT','PZ','OMGC'] # Read SPP parameters # --------------------- soil_header = [ "PERMX", "PERMY", "PERMZ", "ELSTOR", "POROS", "VGNCELL", "VGRMCCELL", "VGPSATCELL", ] FP_soil = [] str_hd_soil = {} nb_of_header_lines = 9 + MAXVEG - 1 with open(os.path.join(soilfile), "r") as f: # open the file for reading count = 0 for line in f: # iterate over each line if count>2 and count<MAXVEG+3: FP_soil.append(line.split('PCANA')[0].split(' ')) # if count < 9: # str_hd, value_hd = line.split() # split it by whitespace # soilfile[str_hd.replace(":", "")] = value_hd count += 1 soil = np.loadtxt( soilfile, skiprows=nb_of_header_lines, max_rows=count - nb_of_header_lines - 1 ) np.shape(soil) FP_soil_mat = [] for fp in FP_soil: FP_soil_mat.append([float(fpi) for fpi in fp if len(fpi)>0]) FP_soil_mat = np.array(FP_soil_mat) df_FP = pd.DataFrame(FP_soil_mat) df_FP.columns = FP_header df_FP.index.name = 'Veg. Indice' df_FP.index = df_FP.index + 1 layer_id = [] zone_id = [] for ds in range(int(dem_parm["nstr"])): for dz in range(int(dem_parm["nzone"])): layer_id.append(ds) zone_id.append(dz+1) # zone numbers starts from 1 if len(soil) != len(layer_id): #*dem_parm['nzone']: print( "Inconsistent number of zones/layers with respect to the number of soil lines: " + str(len(soil)) + '/' + str(len(layer_id)) + "!Revise dem_parm file!" ) # name_str = np.sort(name_str) df_soil = pd.DataFrame(soil, [zone_id, layer_id], soil_header) # df_soil.xs(1) df_soil.index.set_names("zone", level=0, inplace=True) df_soil.index.set_names("layer", level=1, inplace=True) return df_soil,df_FP
[docs] def read_raster(filename): """ Applicable for all DEM type of CATHY outputs such as dtm_Ws1_sf_1, dtm_Ws1_sf_2, ... Parameters ---------- filename : str filename of the raster type file to read (including abs path). Returns ------- np.array([]) 2d array containing the raster info. dict Header of the raster file """ str_hd_raster = {} with open(os.path.join(filename), "r") as f: # open the file for reading count = 0 for line in f: # iterate over each line if count < 6: str_hd, value_hd = line.split() # split it by whitespace str_hd_raster[str_hd.replace(":", "")] = value_hd count += 1 raster_mat = np.loadtxt(filename, skiprows=6) return raster_mat, str_hd_raster
[docs] def read_zone(zonefile): """ Read zone file and infer soil zones in the DEM Parameters ---------- zonefile : str filename (including abs path). Returns ------- np.array([]) 2d array containing the zones. dict Header of the zone file """ str_hd_zones = {} with open(os.path.join(zonefile), "r") as f: # open the file for reading count = 0 for line in f: # iterate over each line if count < 6: str_hd, value_hd = line.split() # split it by whitespace str_hd_zones[str_hd.replace(":", "")] = value_hd count += 1 zones_mat = np.loadtxt(zonefile, skiprows=6) return zones_mat, str_hd_zones
[docs] def read_dem(demfile, dtm_13file): """ Read dem file and infer number of altitudes in the DEM Parameters ---------- demfile : str filename (including abs path). dtm_13file : str filename (including abs path). Returns ------- np.array([]) 2d array containing the elevation. dict Header of the dem file """ str_hd_dem = {} with open(os.path.join(demfile), "r") as f: # open the file for reading count = 0 for line in f: # iterate over each line if count < 6: str_hd, value_hd = line.split() # split it by whitespace str_hd_dem[str_hd.replace(":", "")] = value_hd count += 1 dem_file = open(dtm_13file, "r") dem_mat = np.loadtxt(dem_file, skiprows=0) dem_file.close() # import matplotlib.pyplot as plt # plt.imshow(dem_mat) return dem_mat, str_hd_dem
[docs] def read_root_map(rootmapfile): """ Read root map file and infer number of vegetations in the DEM Parameters ---------- rootmapfile : str filename (including abs path). Returns ------- np.array([]) 2d array containing the flag for the vegetation type. dict Header of the root map file """ str_hd_rootmap = {} with open(os.path.join(rootmapfile), "r") as f: # open the file for reading count = 0 for line in f: # iterate over each line if count < 6: str_hd, value_hd = line.split() # split it by whitespace str_hd_rootmap[str_hd.replace(":", "")] = value_hd count += 1 rootmap_mat = np.loadtxt(rootmapfile, skiprows=6) return rootmap_mat, str_hd_rootmap
def _search_num_values_in_list(flat_lines_s): lines_s_num = [] lines_s_names = [] for lli in flat_lines_s: if lli.isdigit(): lines_s_num.append(int(lli)) elif lli.replace('-', '', 1).isdigit(): lines_s_num.append(int(lli)) elif lli.replace('-', '', 1).replace('.', '', 1).isdigit(): lines_s_num.append(float(lli)) elif lli.replace('.', '', 1).isdigit(): lines_s_num.append(float(lli)) elif lli.replace('.', '', 1).replace('+', '', 1).isdigit(): lines_s_num.append(float(lli.replace('+', ''))) elif lli.replace('.', '', 1).replace('e+', '', 1).isdigit(): lines_s_num.append(float(lli)) elif lli.replace('.', '', 1).replace('e-', '', 1).isdigit(): lines_s_num.append(float(lli)) else: lines_s_names.append(lli) return lines_s_num, lines_s_names