"""
Class managing Data Assimilation process i.e.:
1. Parameters and data perturbation
This step consit in the generation of the ensemble.
DA class
---
Steps 2 and 3 are controlled via a specific class
2. Open Loop
Without data run the hydrological model for an ensemble of realisations
using the `_run_hydro_DA_openLoop` module.
3. Loop of DA:
- Running hydro model
Run iteratively (between two observations) the hydrological model
using the `_run_ensemble_hydrological_model` module.
- Map states 2 Observations
The module `map_states2Observations()` takes care of the mapping.
For complex mapping such as ERT, it is done via a separate class.
Example for the ERT we have a `class mappingERT()` which uses the
Archie's petrophysical relationship.
- Running the analysis
The analysis is controlled by `run_analysis()` which takes as argument
the type of analysis to run i.e. EnkF or Pf.
- Update ensemble
The ensemble file update is controlled by `update_ENS_files()`
- Evaluate performance
"""
import glob
import multiprocessing
import os
import re
import shutil
import subprocess
import warnings
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from rich.progress import track
from pyCATHY.cathy_tools import CATHY, subprocess_run_multi
from pyCATHY.DA import enkf, pf, mapper
from pyCATHY.importers import cathy_inputs as in_CT
from pyCATHY.importers import cathy_outputs as out_CT
# from pyCATHY.importers import sensors_measures as in_meas
# import pyCATHY.meshtools as mt
from pyCATHY.plotters import cathy_plots as plt_CT
warnings.filterwarnings("ignore",
category=DeprecationWarning
)
# from functools import partial
REMOVE_CPU = 1
[docs]
def run_analysis(
DA_type,
data,
data_cov,
param,
list_update_parm,
ensembleX,
prediction,
default_state="psi",
**kwargs,
):
"""
Perform the DA analysis step
Parameters
----------
DA_type : str
type of analysis i.e ENKF or Particul filters.
data : np.array([])
measured data.
data_cov : np.array([]) or int
measured data covariance matrice.
param : np.array([])
model parameters to update.
ensemble_state : np.array([])
[pressure head, saturation water] at each nodes.
prediction : np.array([])
predicted observation (after mapping).
rejected_ens : list
list of ensemble to reject
"""
id_state = 0
if default_state == "sw":
id_state = 1
if DA_type == "enkf_Evensen2009":
Analysis = enkf.enkf_analysis(data,
data_cov,
param,
ensembleX[id_state],
prediction,
)
return Analysis
if DA_type == "enkf_Evensen2009_Sakov":
Analysis = enkf.enkf_analysis(data,
data_cov,
param,
ensembleX[id_state],
prediction,
Sakov=True,
)
return Analysis
if DA_type == "enkf_analysis_inflation":
Analysis = enkf.enkf_analysis_inflation(
data,
data_cov,
param,
ensembleX[id_state],
prediction,
**kwargs
)
return Analysis
if DA_type == "enkf_analysis_inflation_multiparm":
Analysis = enkf.enkf_analysis_inflation_multiparm(
data,
data_cov,
param,
ensembleX[id_state],
prediction,
**kwargs
)
return Analysis
if DA_type == "enkf_dual_analysis":
Analysis = enkf.enkf_dual_analysis(data,
data_cov,
param,
ensembleX[id_state],
prediction,
Sakov=False,
)
return Analysis
if DA_type == "enkf_dual_analysis_Sakov":
Analysis = enkf.enkf_dual_analysis(data,
data_cov,
param,
ensembleX[id_state],
prediction,
Sakov=True,
)
return Analysis
if DA_type == "pf":
print("not yet implemented")
[Analysis, AnalysisParam] = pf.pf_analysis(
data,
data_cov,
param,
ensembleX[id_state],
prediction
)
return Analysis, AnalysisParam
# utils observations
# ------------------
[docs]
def dictObs_2pd(dict_obs):
"""dict of observation to dataframe of observation"""
df_obs = pd.DataFrame.from_dict(dict_obs).stack().to_frame()
df_obs = pd.DataFrame(df_obs[0].values.T.tolist(), index=df_obs.index)
df_obs.index.names = ["sensorNameidx", "assimilation time"]
return df_obs
[docs]
def resynchronise_times(data_measure, atmbc_df, time_offset):
"""resynchronise dict as old key is elapsed time in second from the first observation,
while new key is from the first atmbc time
"""
data_measure_sync = dict(data_measure)
try:
new_keys = []
for d in range(len(data_measure_sync.keys())):
items_dict = list(data_measure_sync.items())
# new_key = (
# atmbc_df["diff"]
# .dt.total_seconds()
# .unique()[d]
# )
old_key = list(data_measure_sync.keys())[d]
new_key = list(data_measure_sync.keys())[d] - time_offset
for sensor in list(items_dict[d][1].keys()):
data_measure_sync[old_key][sensor]["assimilation_times"] = new_key
new_keys.append(new_key)
data_measure_sync = dict(zip(new_keys, data_measure_sync.values()))
except:
print("datetime first atmbc point")
print(atmbc_df["datetime"][0])
print("datetime first measurement")
print(data_measure[0][sensor]["datetime"])
print("cant synchronise times - continue without")
return data_measure_sync
#%%
# DA class
# ---------
[docs]
class DA(CATHY):
# def __init__(self,**kwargs):
# self.damping = 1
# -------------------------------------------------------------------#
# %% DATA ASSIMILATION FCTS
def _DA_init(self, dict_obs, dict_parm_pert, list_parm2update="all"):
"""
Initial preparation for DA
.. note::
0. check dictionnaries validity
1. update cathyH file given NENS, ENS_times + flag self.parm['DAFLAG']==True
2. create the processor cathy_origin.exe
3. create directory tree for the ensemble
4. create panda dataframe for each perturbated variable
5. update input files
Parameters
----------
NENS : int
# Number of realizations in the ensemble
ENS_times : int
# Observation times
Returns
-------
None.
Note: for now only implemented for EnkF DA
"""
# Initiate
# -------------------------------------------------------------------
update_key = "ini_perturbation"
# check if dict_obs is ordered in time
# ------------------------------------
if (
all(i < j for i, j in zip(list(dict_obs.keys()), list(dict_obs.keys())[1:]))
) is False:
raise ValueError("Observation List is not sorted.")
# if hasattr(self,'dict_obs') is False:
self.dict_obs = dict_obs # self.dict_obs is already assigned (in read observatio! change it to self.obs
self.dict_parm_pert = dict_parm_pert
self.df_DA = pd.DataFrame()
self.df_Archie = pd.DataFrame()
# backup all spatial ET iterations (self.backupfort777)
# ET file are large!
self.backupfort777 = False
# Infer ensemble size NENS from perturbated parameter dictionnary
# -------------------------------------------------------------------
for name in self.dict_parm_pert:
self.NENS = len(self.dict_parm_pert[name]["ini_perturbation"])
# Infer ensemble update times ENS_times from observation dictionnary
# -------------------------------------------------------------------
ENS_times = []
for ti in self.dict_obs:
ENS_times.append(float(ti))
# start DA cycle counter
# -------------------------------------------------------------------
self.count_DA_cycle = 0
self.count_atmbc_cycle = 0
# (the counter is incremented during the update analysis)
if hasattr(self, "ens_valid") is False:
self.ens_valid = list(np.arange(0, self.NENS))
# create sub directories for each ensemble
# ---------------------------------------------------------------------
self._create_subfolders_ensemble()
# update list of updated parameters based on problem heterogeneity
# ---------------------------------------------------------------------
newlist_parm2update = ["St. var."]
for d in self.dict_parm_pert.keys():
for l in list_parm2update:
if l in d:
newlist_parm2update.append(d)
return ENS_times, newlist_parm2update, update_key
def run_DA_sequential(
self,
DA_type,
dict_obs,
list_parm2update,
dict_parm_pert,
list_assimilated_obs,
open_loop_run=False,
parallel=True,
**kwargs,
):
"""
Run Data Assimilation
.. note::
Steps are:
1. DA init (create subfolders)
2a. run CATHY hydrological model (open loop)
2b. run CATHY hydrological model recursively using DA times
<- Loop-->
3. check before analysis
4. analysis
5. check after analysis
6. update input files
<- Loop-->
Parameters
----------
callexe : str
processor exe filename
parallel : bool
True for multiple ensemble folder simulations.
DA_type : str
type of Data Assimilation.
list_assimilated_obs : TYPE
list of names of observations to assimilate.
dict_obs : dict
observations dictionnary.
list_update_parm : list
list of names of parameters to update.
dict_parm_pert : dict
parameters perturbated dictionnary.
"""
self.run_processor(DAFLAG=1,
runProcess=False,
**kwargs) # to create the processor exe
callexe = "./" + self.processor_name
self.damping = 1
if "damping" in kwargs:
self.damping = kwargs["damping"]
threshold_rejected = 10
if "threshold_rejected" in kwargs:
threshold_rejected = kwargs["threshold_rejected"]
self.verbose = False
if "verbose" in kwargs:
self.verbose = kwargs["verbose"]
# initiate DA
# -------------------------------------------------------------------
ENS_times, list_parm2update, update_key = self._DA_init(
dict_obs,
dict_parm_pert,
list_parm2update,
)
# initiate mapping petro
# -------------------------------------------------------------------
self._mapping_petro_init()
# update the ensemble files with pertubarted parameters
# -------------------------------------------------------------------
self.update_ENS_files(
dict_parm_pert,
list_parm2update="all",
cycle_nb=self.count_DA_cycle,
)
all_atmbc_times = np.copy(self.atmbc["time"])
# Open loop
# -------------------------------------------------------------------
if open_loop_run:
self._DA_openLoop(ENS_times, list_assimilated_obs, parallel)
# end of Open loop
# -------------------------------------------------------------------
# -------------------------------------------------------------------
# update input files ensemble again (time-windowed)
# -------------------------------------------------------------------
self._update_input_ensemble(
list(self.atmbc["time"]),
list_parm2update=list_parm2update,
)
# -----------------------------------
# Run hydrological model sequentially
# = Loop over atmbc times (including assimilation observation times)
# -----------------------------------
for t_atmbc in all_atmbc_times: # atmbc times MUST include assimilation observation times
self._run_ensemble_hydrological_model(parallel, callexe)
os.chdir(os.path.join(self.workdir))
# self.plot_ini_state_cov()
# def plot_ini_state_cov():
# ensemble_psi, ensemble_sw, ens_size, sim_size = self._read_state_ensemble()
# check scenario (result of the hydro simulation)
# ----------------------------------------------------------------
rejected_ens = self._check_before_analysis(
self.ens_valid,
threshold_rejected,
)
id_valid = ~np.array(rejected_ens)
self.ens_valid = list(np.arange(0, self.NENS)[id_valid])
ensemble_psi_valid_bef_update = []
#!! define subloop here to optimize the inflation!!
# if 'optimize_inflation' in DA_type:
# threshold_convergence = 10
# while self.df_performance < threshold_convergence:
# check if there is an observation at the given atmbc time
# --------------------------------------------------------
if t_atmbc in ENS_times:
#%%
# map states to observation = apply H operator to state variable
# ----------------------------------------------------------------
prediction = self.map_states2Observations(
list_assimilated_obs,
default_state="psi",
parallel=parallel,
)
# print(['Im done with mapping']*10)
#%%
# DA analysis
# ----------------------------------------------------------------
self.console.print(
r""":chart_with_upwards_trend: [b]Analysis[/b]:
- DA type: {}
- Damping: {}
""".format(
DA_type, self.damping
)
)
(
ensemble_psi_valid_bef_update,
ensemble_sw_valid_bef_update,
ens_size,
sim_size,
) = self._read_state_ensemble()
(
ensemble_psi,
ensemble_sw,
data,
analysis,
analysis_param,
) = self._DA_analysis(
prediction,
DA_type,
list_parm2update,
list_assimilated_obs,
ens_valid=self.ens_valid,
)
# print(['Im done with _DA_analysis']*10)
# DA mark_invalid_ensemble
# ----------------------------------------------------------------
(
prediction_valid,
ensemble_psi_valid,
ensemble_sw_valid,
analysis_valid,
analysis_param_valid,
) = self._mark_invalid_ensemble(
self.ens_valid,
prediction,
ensemble_psi,
ensemble_sw,
analysis,
analysis_param,
)
# print(['Im done with _invalid_ensemble']*10)
#%%
# check analysis quality
# ----------------------------------------------------------------
self.console.print(
":face_with_monocle: [b]check analysis performance[/b]"
)
self._performance_assessement(
list_assimilated_obs,
data,
prediction_valid,
t_obs=self.count_DA_cycle,
)
# print(['Im done with _analysis performance']*10)
#%%
# the counter is incremented here
# ----------------------------------------------------------------
self.count_DA_cycle = self.count_DA_cycle + 1
# # update parameter dictionnary
# ----------------------------------------------------------------
def check_ensemble(ensemble_psi_valid, ensemble_sw_valid):
if np.any(ensemble_psi_valid > 0):
print("!positive pressure heads observed!")
check_ensemble(ensemble_psi_valid, ensemble_sw_valid)
self.update_pert_parm_dict(
update_key,
list_parm2update,
analysis_param_valid
)
else:
self.console.print(
":confused: No observation for this time - run hydrological model only"
)
(
ensemble_psi_valid,
ensemble_sw_valid,
ens_size,
sim_size,
) = self._read_state_ensemble()
print("!shortcut here ensemble are not validated!")
analysis_valid = ensemble_psi_valid
# DA mark_invalid_ensemble
# ----------------------------------------------------------------
# (
# prediction_valid,
# ensemble_psi_valid,
# ensemble_sw_valid,
# analysis_valid,
# analysis_param_valid,
# ) = self._mark_invalid_ensemble(
# self.ens_valid,
# prediction,
# ensemble_psi,
# ensemble_sw,
# analysis,
# analysis_param,
# )
print("!shortcut here ensemble are not validated!")
# print(f'''max, min, mean psi:
# {np.max(analysis_valid)},
# {np.min(analysis_valid)},
# {np.mean(analysis_valid)}
# ''')
self.count_atmbc_cycle = self.count_atmbc_cycle + 1
self.console.rule(
":round_pushpin: end of time step (s) "
+ str(int(t_atmbc))
+ "/"
+ str(int(all_atmbc_times[-1]))
+ " :round_pushpin:",
style="yellow",
)
self.console.rule(
":round_pushpin: end of atmbc update # "
+ str(self.count_atmbc_cycle)
+ "/"
+ str(len(all_atmbc_times))
+ " :round_pushpin:",
style="yellow",
)
# create dataframe _DA_var_pert_df holding the results of the DA update
# ---------------------------------------------------------------------
# self.DA_df
if len(ensemble_psi_valid_bef_update)==0:
(
ensemble_psi_valid_bef_update,
ensemble_sw_valid_bef_update,
ens_size,
sim_size,
) = self._read_state_ensemble()
else:
self._DA_df(
state=[ensemble_psi_valid_bef_update,
ensemble_sw_valid_bef_update
],
state_analysis=analysis_valid,
rejected_ens=rejected_ens,
)
# self.df_DA
# print(['Im done with dataframe _DA_var_pert_df']*10)
# export summary results of DA
# ----------------------------------------------------------------
meta_DA = {
"listAssimilatedObs": list_assimilated_obs,
"listUpdatedparm": list_parm2update,
# '':,
# '':,
# '':,
}
# backup ET map for each iteration of ensemble numbers
if self.backupfort777:
self.backup_fort777()
self.backup_results_DA(meta_DA)
self.backup_simu()
# overwrite input files ensemble (perturbated variables)
# ---------------------------------------------------------------------
if (
self.count_atmbc_cycle < len(all_atmbc_times) - 1
): # -1 cause all_atmbc_times include TMAX
# len(all_atmbc_times)
self._update_input_ensemble(
all_atmbc_times,
list_parm2update,
analysis=analysis_valid
)
else:
self.console.rule(
":red_circle: end of DA" ":red_circle:", style="yellow"
)
pass
self.console.rule(
":red_circle: end of DA update "
+ str(self.count_DA_cycle)
+ "/"
+ str(len(ENS_times))
+ " :red_circle:",
style="yellow",
)
self.console.rule(
":dart: % of valid ensemble: "
+ str((len(self.ens_valid) * 100) / (self.NENS))
+ ":dart:",
style="yellow",
)
plt.close("all")
# clean all vtk file produced
# --------------------------
directory = "./"
pathname = directory + "/**/*converted*.vtk"
files = glob.glob(pathname, recursive=True)
remov_convert = False
if remov_convert:
[os.remove(f) for f in files]
hard_remove = False
if hard_remove:
directory = "./"
pathname = directory + "/**/*.vtk"
files = glob.glob(pathname, recursive=True)
[os.remove(f) for f in files]
pass
# backup ET map file at each DA iteration
def backup_fort777(self):
for ensi in range(self.NENS):
dst_dir = os.path.join(
self.workdir,
self.project_name,
'DA_Ensemble/cathy_'+ str(ensi+1),
'DAnb' + str(self.count_DA_cycle) + 'fort.777'
)
shutil.copy(os.path.join(self.workdir,
self.project_name,
'DA_Ensemble/cathy_'+ str(ensi+1),
'fort.777'),
dst_dir)
def _run_hydro_DA_openLoop(
self, time_of_interest, nodes_of_interest, simu_time_max, ens_nb
):
"""
Run openLoop and save result into DA dataframe (ensemble nb = 999)
Parameters
----------
time_of_interest : TYPE
DESCRIPTION.
nodes_of_interest : TYPE
DESCRIPTION.
simu_time_max : TYPE
DESCRIPTION.
Returns
-------
None.
"""
cwd = os.getcwd()
self.console.print(":unlock: [b]open loop call[/b]")
self.run_processor(
recompile=True,
TIMPRTi=time_of_interest,
NODVP=nodes_of_interest,
TMAX=simu_time_max,
DAFLAG=0,
path_CATHY_folder=cwd,
)
def _DA_openLoop(self, ENS_times, list_assimilated_obs, parallel, verbose=False):
"""
Run open Loop (no update/assimilation) hydro simulation for an ensemble of realisation
Evaluate the performance by comparison with measured data after mapping
Parameters
----------
ENS_times : list
List of the time steps (in sec) where observations are assimilated.
list_assimilated_obs : list
List of assimilation observations.
parallel : Bool, optional
Parallelize. The default is True.
verbose : Bool, optional
Display prints. The default is False.
Returns
-------
Prediction_OL.
Performance dataframe.
"""
# multi run CATHY hydrological model from the independant folders
# composing the ensemble
# ----------------------------------------------------------------
if parallel == True:
self.console.print(
":athletic_shoe: [b]Run hydrological model // :unlock: [/b]"
)
pathexe_list = []
for ens_i in range(self.NENS):
path_exe = os.path.join(
self.workdir,
self.project_name,
"DA_Ensemble/cathy_" + str(ens_i + 1),
)
pathexe_list.append(path_exe)
with multiprocessing.Pool(processes=multiprocessing.cpu_count()-REMOVE_CPU) as pool:
result = pool.map(subprocess_run_multi, pathexe_list)
if verbose == True:
self.console.print(result)
# Loop over ensemble realisations one by one
# ------------------------------------------------------------
else:
for ens_i in range(self.NENS):
os.chdir(
os.path.join(
self.workdir,
self.project_name,
"DA_Ensemble/cathy_" + str(ens_i + 1),
)
)
# len(list(self.atmbc['time']))
self._run_hydro_DA_openLoop(
time_of_interest=list(self.atmbc["time"]),
nodes_of_interest=[],
simu_time_max=max(list(self.atmbc["time"])),
ens_nb=ens_i + 1,
)
# save into the DA_df dataframe
# -----------------------------------------------------------------
path_fwd_CATHY_list = []
for ens_i in range(self.NENS):
path_fwd_CATHY_list.append(
os.path.join(
self.workdir,
self.project_name,
"DA_Ensemble/cathy_" + str(ens_i + 1),
)
)
os.chdir(
os.path.join(
self.workdir,
self.project_name,
"DA_Ensemble/cathy_" + str(ens_i + 1),
)
)
df_psi = self.read_outputs(
filename="psi", path=os.path.join(os.getcwd(), "output")
)
df_sw, _ = self.read_outputs(
filename="sw", path=os.path.join(os.getcwd(), "output")
)
shift = len(df_psi) - self.parm["NPRT"]
if shift < 0 | shift > 2:
print("Error for the ensemble nb:" + str(ens_i))
raise ValueError(
"Error on the simulation:"
"nb of times contained in the outputs files is too small;"
"Check "
)
for t in range(np.shape(df_psi)[0] - 2):
self._DA_df(
state=[df_psi[t + shift, :], df_sw.iloc[t + shift, :]],
t_ass=t,
openLoop=True,
ens_nb=ens_i + 1,
)
os.chdir(
os.path.join(
self.workdir,
)
)
prediction_OL = self._evaluate_perf_OL(
parallel, list_assimilated_obs, path_fwd_CATHY_list, ENS_times
)
# ------------------------------------------------------
# END of Open Loop simulation and performance evaluation
# ------------------------------------------------------
pass
def _DA_analysis(
self,
prediction,
DA_type="enkf_Evensen2009_Sakov",
list_update_parm=["St. var."],
list_assimilated_obs="all",
ens_valid=[],
):
"""
Analysis ensemble using DA
1. map state variable 2 Observations
2. apply filter
3. checkings
Parameters
----------
typ : str
type of Data Assimilation
NUDN : int
# NUDN = number of observation points for nudging or EnKF (NUDN=0 for no nudging)
ENS_times : np.array([])
# Observation time for ensemble kalman filter (EnKF).
rejected_ens : list
list of ensemble to reject
Returns
-------
None.
Note: for now only implemented for EnkF DA
"""
update_key = "ini_perturbation"
if self.count_DA_cycle > 0:
update_key = "update_nb" + str(self.count_DA_cycle)
# find the method to map for this time step
# --------------------------------------------------------
#%%
obskey2map, obs2map = self._obs_key_select(list_assimilated_obs)
#%%
# prepare states
# ---------------------------------------------------------------------
ensemble_psi, ensemble_sw, ens_size, sim_size = self._read_state_ensemble()
# ---------------------------------------------------------------------
# prepare data
# ---------------------------------------------------------------------
# select ONLY data to assimilate
# ---------------------------------------------------------------------
data, _ = self._get_data2assimilate(list_assimilated_obs)
self.console.print(
r""":
- Data size: {}
--> Observations --> {}
""".format(
str(np.shape(data)), list_assimilated_obs
)
)
# check size of data_cov
# ---------------------------------------------------------------------
if np.shape(self.stacked_data_cov[self.count_DA_cycle])[0] != len(data):
raise ValueError("Wrong stacked_data_cov shape -- need to (re)compute data covariance")
data_cov = self.stacked_data_cov[self.count_DA_cycle]
# # filter non-valid ensemble before Analysis
# # ---------------------------------------------------------------------
prediction_valid = prediction
ensemble_psi_valid = ensemble_psi[:, ens_valid]
ensemble_sw_valid = ensemble_sw[:, ens_valid]
# prepare parameters
# ---------------------------------------------------------------------
# When updating the states only, the elements of X are the
# pressure heads at each node of the finite element grid, while
# the state augmentation technique is used when also updat-
# ing the parameters
param = self.transform_parameters(list_update_parm, update_key)
print("parm size: " + str(len(param)))
param_valid = []
if len(param) > 0:
param_valid = param[:, ens_valid]
# run Analysis
# ---------------------------------------------------------------------
result_analysis = run_analysis(
DA_type,
data,
data_cov,
param_valid,
list_update_parm,
[ensemble_psi_valid, ensemble_sw_valid],
prediction_valid,
alpha=self.damping,
)
_ , obs2eval_key = self._get_data2assimilate(list_assimilated_obs)
# fig, ax = plt.subplots(figsize=(4,4))
# # for i in range(len(prediction_valid.T)):
# ax.scatter(data,np.mean(prediction_valid,axis=1),label='mean')
# ax.scatter(data,np.median(prediction_valid,axis=1),label='median')
# ax.scatter(data,np.min(prediction_valid,axis=1),color='k',alpha=0.1)
# ax.scatter(data,np.max(prediction_valid,axis=1),color='k',alpha=0.1)
# plt.legend()
# # Add 1:1 line
# lims = [min(min(data), min(np.mean(prediction_valid, axis=1))),
# max(max(data), max(np.mean(prediction_valid, axis=1)))]
# ax.plot(lims, lims, 'k--', alpha=0.75, zorder=0)
# ax.set_xlabel(f' {obs2eval_key[0]} Observations')
# ax.set_ylabel(f' {obs2eval_key[0]} Predictions')
# ax.set_xlim([min(data), max(data)])
# ax.set_ylim([min(data), max(data)])
# ax.set_aspect('equal')
# fig.savefig(os.path.join(self.workdir,
# self.project_name,
# 'T' + str(self.count_DA_cycle) +
# '_11line.png'
# )
# )
# plot ensemble covariance matrices and changes (only for ENKF)
# ---------------------------------------------------------------------
if len(result_analysis) > 2:
[
A,
Amean,
dA,
dD,
MeasAvg,
S,
COV,
B,
dAS,
analysis,
analysis_param,
] = result_analysis
try:
plt_CT.show_DA_process_ens(
ensemble_psi_valid,
data,
COV,
dD,
dAS,
B,
analysis,
savefig=True,
savename=os.path.join(
self.workdir,
self.project_name,
"DA_Ensemble",
"DA_Matrices_t" + str(self.count_DA_cycle),
),
# label_sensor=str([*self.dict_obs[0]])
label_sensor=str([*obskey2map]),
)
except:
self.console.print("[b]Impossible to plot matrices[/b]")
# particule filter
# ----------------
else:
[analysis, analysis_param] = result_analysis
# Back-transformation of the parameters
# ---------------------------------------------------------------------
# for pp in enumerate(list_update_parm[:]):
# if 'St. var.' in pp[1]:
# pass
# else:
# analysis_param_valid = []
# if len(param)>0:
# analysis_param_valid = analysis_param[ens_valid]
self.transform_parameters(
list_update_parm, param=np.hstack(analysis_param), back=True
)
# if self.dict_parm_pert[pp[1]]['transf_type'] == 'log':
# print('back log transformation')
# analysis_param = np.exp(analysis_param)
# return ensemble_psi, Analysis, AnalysisParam, Data, data_cov
# ---------------------------------------------------------------------
return (ensemble_psi, ensemble_sw, data, analysis, analysis_param)
def _mark_invalid_ensemble(
self, ens_valid, prediction, ensemble_psi, ensemble_sw, analysis, analysis_param
):
"""mark invalid ensemble - invalid ensemble are filled with NaN values"""
ensemble_psi_valid = np.empty(ensemble_psi.shape)
ensemble_psi_valid[:] = np.NaN
ensemble_psi_valid[:, ens_valid] = ensemble_psi[:, ens_valid]
ensemble_sw_valid = np.empty(ensemble_psi.shape)
ensemble_sw_valid[:] = np.NaN
ensemble_sw_valid[:, ens_valid] = ensemble_sw[:, ens_valid]
analysis_valid = np.empty(ensemble_psi.shape)
analysis_valid[:] = np.NaN
analysis_valid[:, ens_valid] = analysis
prediction_valid = np.empty([prediction.shape[0], ensemble_psi.shape[1]])
prediction_valid[:] = np.NaN
prediction_valid[:, ens_valid] = prediction
analysis_param_valid = []
if len(analysis_param[0]) > 0:
analysis_param_valid = np.empty(
[analysis_param.shape[1], ensemble_psi.shape[1]]
)
analysis_param_valid[:] = np.NaN
analysis_param_valid[:, ens_valid] = analysis_param.T
return (
prediction_valid,
ensemble_psi_valid,
ensemble_sw_valid,
analysis_valid,
analysis_param_valid,
)
def _run_ensemble_hydrological_model(self, parallel, callexe):
"""multi run CATHY hydrological model from the independant folders composing the ensemble"""
self.console.print(":athletic_shoe: [b]Run hydrological model[/b]")
# ----------------------------------------------------------------
if parallel == True:
pathexe_list = []
# for ens_i in range(self.NENS):
for ens_i in self.ens_valid:
path_exe = os.path.join(
self.workdir,
self.project_name,
"DA_Ensemble/cathy_" + str(ens_i + 1),
)
pathexe_list.append(path_exe)
with multiprocessing.Pool(processes=multiprocessing.cpu_count()-REMOVE_CPU) as pool:
result = pool.map(subprocess_run_multi, pathexe_list)
if self.verbose:
self.console.print(result)
# process each ensemble folder one by one
# ----------------------------------------------------------------
else:
# Loop over ensemble realisations
# ------------------------------------------------------------
for ens_i in track(
range(self.NENS), description="Run hydrological fwd model..."
):
self.console.print(
":keycap_number_sign: [b]ensemble nb:[/b]"
+ str(ens_i + 1)
+ "/"
+ str(self.NENS)
)
os.chdir(
os.path.join(
self.workdir,
self.project_name,
"DA_Ensemble/cathy_" + str(ens_i + 1),
)
)
p = subprocess.run([callexe], text=True, capture_output=True)
if self.verbose:
self.console.print(p.stdout)
self.console.print(p.stderr)
pass
def _check_before_analysis(self, ens_valid=[], threshold_rejected=10):
"""
Filter is applied only on selected ensemble
Parameters
----------
update_key : TYPE
DESCRIPTION.
Returns
-------
rejected_ens
"""
self.console.print(":white_check_mark: [b]check scenarii before analysis[/b]")
###CHECK which scenarios are OK and which are to be rejected and then create and applied the filter
###only on the selected scenarios which are to be considered.
rejected_ens_new = []
for n in range(self.NENS):
if n in ens_valid:
# print('ensemble_nb ' + str(n) + ' before update analysis')
try:
df_mbeconv = out_CT.read_mbeconv(
os.path.join(
self.workdir,
self.project_name,
"DA_Ensemble/cathy_" + str(n + 1),
"output/mbeconv",
)
)
if df_mbeconv["CUM."].isnull().values.any():
rejected_ens_new.append(True)
self.console.print(
":x: [b]df_mbeconv['CUM.'][/b]"
+ str(df_mbeconv["CUM."].isnull().values.any())
+ ", ens_nb:"
+ str(n + 1)
)
elif (np.round(df_mbeconv["TIME"].iloc[-1])) < np.round(
self.parm["(TIMPRT(I),I=1,NPRT)"][-1]
) - self.parm["DELTAT"]:
rejected_ens_new.append(True)
self.console.print(
":x: [b]df_mbeconv['time'][/b]"
+ str(df_mbeconv["TIME"].iloc[-1])
+ ", ens_nb:"
+ str(n + 1)
)
elif len(df_mbeconv) == 0:
rejected_ens_new.append(True)
self.console.print(
":x: [b]len(df_mbeconv)['TIME'][/b]"
+ str(len(df_mbeconv))
+ ", ens_nb:"
+ str(n + 1)
)
else:
rejected_ens_new.append(False)
except:
rejected_ens_new.append(True)
print("cannot read mbeconv")
pass
# UnboundLocalError: local variable 'df_mbeconv' referenced before assignment
else:
rejected_ens_new.append(True)
# check whether the number of discarded scenarios is major than the 10% of the total number [N];
# if the answer is yes than the execution stops
# --------------------------------------------------------------------
if sum(rejected_ens_new) >= (threshold_rejected / 100) * self.NENS:
print("new parameters (if updated)")
print(self.dict_parm_pert)
raise ValueError(
"% number of rejected ensemble is too high:"
+ str((sum(rejected_ens_new) * 100) / (self.NENS))
)
return rejected_ens_new
def _check_after_analysis(self, update_key, list_update_parm):
"""
Check which scenarios parameters are OK and which one to discard after analysis.
Test the range of each physical properties.
Parameters
----------
update_key : str
Update cycle key identifier.
list_update_parm : list
list of updated model parameters (after analysis).
"""
self.console.print(":white_check_mark: [b]check scenarii post update[/b]")
id_valid = [list(np.arange(0, self.NENS))]
id_nonvalid = []
def test_negative_values(update_key, pp):
id_nonvalid.append(
list(np.where(self.dict_parm_pert[pp[1]][update_key] < 0)[0])
)
id_valid.append(
list(np.where(self.dict_parm_pert[pp[1]][update_key] > 0)[0])
)
for i in id_nonvalid:
self.console.print(
":x: [b]negative"
+ str(pp)
+ ":[/b]"
+ str(self.dict_parm_pert[pp[1]][update_key][i])
+ ", ens_nb:"
+ str(i)
)
return id_nonvalid, id_valid
def test_range_values(update_key, pp, min_r, max_r):
id_nonvalid.append(
list(np.where(self.dict_parm_pert[pp[1]][update_key] < min_r)[0])
)
id_nonvalid.append(
list(np.where(self.dict_parm_pert[pp[1]][update_key] > max_r)[0])
)
id_valid.append(
list(np.where(self.dict_parm_pert[pp[1]][update_key] > min_r)[0])
)
id_valid.append(
list(np.where(self.dict_parm_pert[pp[1]][update_key] < max_r)[0])
)
for i in id_nonvalid:
self.console.print(
":x: [b]out of range"
+ str(pp)
+ ":[/b]"
+ str(self.dict_parm_pert[pp[1]][update_key][i])
+ ", ens_nb:"
+ str(i)
)
return id_nonvalid, id_valid
for pp in enumerate(list_update_parm[:]):
if "St. var." in pp[1]:
pass
else:
if "rFluid".casefold() in pp[1].casefold():
id_nonvalid, id_valid = test_negative_values(update_key, pp)
elif "porosity".casefold() in pp[1].casefold():
id_nonvalid, id_valid = test_negative_values(update_key, pp)
elif "a_Archie".casefold() in pp[1].casefold():
id_nonvalid, id_valid = test_range_values(update_key, pp, 0, 3)
elif "Ks".casefold() in pp[1].casefold():
id_nonvalid.append(
list(np.where(self.dict_parm_pert[pp[1]][update_key] < 0)[0])
)
id_valid.append(
list(np.where(self.dict_parm_pert[pp[1]][update_key] > 0)[0])
)
for i in id_nonvalid:
self.console.print(
":x: [b]negative Ks:[/b]"
+ str(self.dict_parm_pert[pp[1]][update_key][i])
+ ", ens_nb:"
+ str(i)
)
elif "PCREF".casefold() in pp[1].casefold():
id_nonvalid.append(
list(np.where(self.dict_parm_pert[pp[1]][update_key] > 0)[0])
)
id_valid.append(
list(np.where(self.dict_parm_pert[pp[1]][update_key] < 0)[0])
)
for i in id_nonvalid:
self.console.print(
"Positive values encountered in PCREF:"
+ str(self.dict_parm_pert[pp[1]][update_key][i])
+ ", ens_nb:"
+ str(i)
)
# ckeck if new value of Zroot is feasible
# ------------------------------------------------------------
elif "Zroot".casefold() in pp[1].casefold():
print('*'*26)
print(self.dict_parm_pert[pp[1]][update_key])
print(abs(min(self.grid3d["mesh3d_nodes"][:, -1])))
id_nonvalid.append(
list(
np.where(
self.dict_parm_pert[pp[1]][update_key]
> abs(min(self.grid3d["mesh3d_nodes"][:, -1]))
)[0]
)
)
id_valid.append(
list(
np.where(
self.dict_parm_pert[pp[1]][update_key]
< abs(min(self.grid3d["mesh3d_nodes"][:, -1]))
)[0]
)
)
for i in id_nonvalid:
self.console.print(
":x: [b]unfeasible root depth:[/b]"
+ str(self.dict_parm_pert[pp[1]][update_key][i])
+ ", ens_nb:"
+ str(i)
)
id_nonvalid_flat = [item for sublist in id_nonvalid for item in sublist]
id_valid = set.difference(
set(list(np.arange(0, self.NENS))), set(list(np.unique(id_nonvalid_flat)))
)
return id_valid
def spatialize_parameters(self, list_update_parm, parm):
"""
Extend the number of parameters according to the number of zones
(useful for the heteregeneous case)
"""
i = 0
parm_extended = []
for l in list_update_parm:
print(l)
if "St. var." in l:
pass
else:
nb_of_zones = 1
if "surf_zones_param" in self.dict_parm_pert[l]:
nb_of_zones = self.dict_parm_pert[l]["surf_zones_param"]
print(nb_of_zones)
parm_extended.append(np.tile(parm[i, :], (nb_of_zones, 1)))
i = i + 1
parm_extended = np.vstack(parm_extended)
return parm_extended
def transform_parameters(
self, list_update_parm, update_key=None, back=False, param=None
):
"""
Parameter (back) transformation before analysis (log, bounded log, ...)
..note:: parameters are log transform so that they become Gaussian distributed
to be updated by the Ensemble Kalman Filter
"""
param_new = []
for pp in enumerate(list_update_parm[:]):
param_trans = []
if "St. var." in pp[1]:
pass
else:
if update_key:
param = self.dict_parm_pert[pp[1]][update_key]
if self.dict_parm_pert[pp[1]]["transf_type"] == "log":
print("log transformation")
if back:
param_trans = np.exp(param)
else:
param_trans = np.log(param)
elif self.dict_parm_pert[pp[1]]["transf_type"] == "log-ratio":
print("bounded log transformation")
range_tr = self.dict_parm_pert[pp[1]]["transf_bounds"]
A = range_tr["A"] # min range
B = range_tr["B"] # max range
if back:
param_trans = np.exp((param - A) / (B - param))
else:
param_trans = np.log((param - A) / (B - param))
elif self.dict_parm_pert[pp[1]]["transf_type"] == "hyperbolic":
print("hyperbolic transformation")
# Y = sinh−1(U )
param_trans = np.log(self.dict_parm_pert[pp[1]][update_key])
else:
param_trans = param
param_new.append(param_trans)
if len(param_new) > 0:
np.vstack(param_new)
return np.array(param_new)
def _evaluate_perf_OL(
self, parallel, list_assimilated_obs, path_fwd_CATHY_list, ENS_times
):
prediction_OL = []
if parallel:
if "ERT" in list_assimilated_obs:
prediction_OL_ERT = mapper._map_ERT_parallel(
self.dict_obs,
self.count_DA_cycle,
self.project_name,
self.Archie_parms,
path_fwd_CATHY_list,
list_assimilated_obs="all",
default_state="psi",
verbose=False,
savefig=True,
DA_cnb=self.count_DA_cycle,
ENS_times=self.ENS_times,
)
else:
write2shell_map = True
for t in range(len(ENS_times)):
prediction_stacked = self.map_states2Observations(
list_assimilated_obs,
ENS_times=ENS_times,
savefig=False,
parallel=parallel,
write2shell_map=write2shell_map,
)
prediction_OL.append(prediction_stacked)
write2shell_map = False
else:
for t in range(len(ENS_times)):
prediction_stacked = self.map_states2Observations(
list_assimilated_obs,
ENS_times=ENS_times,
savefig=False,
parallel=parallel,
)
# prediction_stacked is meas_size * ens_size
prediction_OL.append(prediction_stacked)
# prediction_OL is meas_size * ens_size * ENS_times size
if parallel:
if "ERT" in list_assimilated_obs:
prediction_OL.append(prediction_OL_ERT)
if np.shape(prediction_OL)[0] > 1:
prediction_OL = np.hstack(prediction_OL)
prediction_OL = np.reshape(
prediction_OL,
[
np.shape(prediction_OL)[1],
self.NENS,
len(ENS_times),
],
)
for t in range(len(ENS_times)):
# print(str(t) + 't perf OL')
data_t, _ = self._get_data2assimilate(
list_assimilated_obs,
time_ass=t,
)
self._performance_assessement(
list_assimilated_obs,
data_t,
prediction_OL[:, :, t],
t_obs=t,
openLoop=True,
)
return prediction_OL
def _mapping_petro_init(self):
"""
Initiate Archie and VGP petro/pedophysical parameters
If Archie and VGP dictionnaries are not existing fill with default values
"""
# print('Initiate Archie and VGP')
warnings_petro = []
# check that porosity is a list
# -------------------------------------
# if hasattr(self, 'soil_SPP') is False:
# self.read_inputs('soil')
porosity = self.soil_SPP["SPP"][:, 4][0]
if not isinstance(porosity, list):
porosity = [porosity]
# check if Archie parameters exists
# ---------------------------------
if hasattr(self, "Archie_parms") == False:
warnings_petro.append(
"Archie parameters not defined - Falling back to defaults"
)
print("Archie parameters not defined set defaults")
self.Archie_parms = {
"porosity": porosity,
"rFluid_Archie": [1.0],
"a_Archie": [1.0],
"m_Archie": [2.0],
"n_Archie": [2.0],
"pert_sigma_Archie": [0],
}
# check if VGN parameters exists
# ---------------------------------
if hasattr(self, "VGN_parms") == False:
warnings_petro.append(
"VGN parameters not defined - Falling back to defaults"
)
self.VGN_parms = {}
self.console.rule(":warning: warning messages above :warning:", style="yellow")
for ww in warnings_petro:
self.console.print(ww, style="yellow")
self.console.rule("", style="yellow")
pass
def _read_dict_pert_update_ens_SPP(self,
dict_parm_pert,
df_SPP_2fill,
key_root,
key,
update_key,
ens_nb,
shellprint_update
):
'''
Use dictionnary of perturbated parameters and parse it progressively to
df_SPP_2fill to build a matrice and update soil
'''
# initiate SPP_2_fill if not existing
# -----------------------------------------------------------
if len(df_SPP_2fill) == 0:
for i in range(self.NENS):
df_SPP, _ = in_CT.read_soil(os.path.join(
self.workdir,
self.project_name,
'DA_Ensemble/cathy_'+ str(i+1),
'input/soil'
),
self.dem_parameters,
self.cathyH["MAXVEG"]
)
# df_SPP_2fill = [df_SPP.copy() for _ in range(self.NENS)]
df_SPP_2fill.append(df_SPP.copy())
if key_root[0].casefold() == 'ks':
df_SPP_2fill[i]['PERMX'] = np.nan
df_SPP_2fill[i]['PERMY'] = np.nan
df_SPP_2fill[i]['PERMZ'] = np.nan
if key_root[0].casefold() == 'porosity':
df_SPP_2fill[i]['POROS'] = np.nan
print('SPP ensemble update per zones is not yet implemented')
print('''
Assuming that the dictionnary of perturbated parameters
is build per layers i.e. ks0= layer 0, ks1=layer 1, etc...
'''
)
if key_root[0].casefold() == 'ks':
df_SPP_2fill[ens_nb].loc[(slice(None), int(key_root[1])), 'PERMX'] = dict_parm_pert[key][update_key][ens_nb]
df_SPP_2fill[ens_nb].loc[(slice(None), int(key_root[1])), 'PERMY'] = dict_parm_pert[key][update_key][ens_nb]
df_SPP_2fill[ens_nb].loc[(slice(None), int(key_root[1])), 'PERMZ'] = dict_parm_pert[key][update_key][ens_nb]
elif key_root[0].casefold() in 'porosity':
df_SPP_2fill[ens_nb].loc[(slice(None), int(key_root[1])), 'POROS'] = dict_parm_pert[key][update_key][ens_nb]
#%% check if contains nan (if not update soil file)
contains_nan = df_SPP_2fill[ens_nb].isna().any().any()
if hasattr(self, 'zone3d') is False:
nstr = self.dem_parameters["nstr"]
self.zone3d= [np.ones(np.shape(self.DEM))]*nstr
if contains_nan==False:
self.update_soil(
SPP_map=df_SPP_2fill[ens_nb],
FP_map=self.soil_FP["FP_map"],
zone3d=self.zone3d,
verbose=self.verbose,
filename=os.path.join(os.getcwd(), "input/soil"),
shellprint_update=shellprint_update,
backup=True,
)
return df_SPP_2fill
def update_ENS_files(self, dict_parm_pert, list_parm2update, **kwargs):
"""
Update by overwriting ensemble files (usually after analysis step or initially to build the ensemble)
- update state
- update model parameters:
- initial conditions
- Archie
- Hydraulic conductivity ks
- Feddes parameters
- Atmbc boundary conditions
Returns
-------
Overwritten files.
"""
self.console.print(":arrows_counterclockwise: [b]Update ensemble[/b]")
# replace dictionnary key with cycle nb
# -------------------------------------
update_key = "ini_perturbation"
if "cycle_nb" in kwargs:
if kwargs["cycle_nb"] > 0:
update_key = "update_nb" + str(kwargs["cycle_nb"])
if "analysis" in kwargs:
analysis = kwargs["analysis"]
if list_parm2update == "all":
list_parm2update = ["St. var."]
for pp in dict_parm_pert:
list_parm2update.append(pp)
# loop over ensemble files to update ic -->
# this is always done since psi are state variable to update
# ------------------------------------------------------------------
shellprint_update = True
for ens_nb in range(self.NENS):
# change directory according to ensmble file nb
os.chdir(
os.path.join(
self.workdir,
self.project_name,
"./DA_Ensemble/cathy_" + str(ens_nb + 1),
)
)
# state variable update use ANALYSIS update or not
# --------------------------------------------------------------
if "St. var." in list_parm2update:
if kwargs["cycle_nb"] > 0:
df_psi = self.read_outputs(
filename="psi",
path=os.path.join(os.getcwd(), self.output_dirname),
)
self.update_ic(
INDP=1,
IPOND=0,
pressure_head_ini=analysis[:, ens_nb],
filename=os.path.join(os.getcwd(), "input/ic"),
backup=True,
shellprint_update=shellprint_update,
)
else:
if kwargs["cycle_nb"] > 0:
raise ValueError(
"no state variable update - use last iteration as initial conditions ?"
)
df_psi = self.read_outputs(
filename="psi",
path=os.path.join(os.getcwd(), self.output_dirname),
)
if kwargs["cycle_nb"] > 0:
self.update_ic(
INDP=1,
IPOND=0,
pressure_head_ini=df_psi[-1, :],
filename=os.path.join(os.getcwd(), "input/ic"),
backup=False,
shellprint_update=shellprint_update,
)
shellprint_update = False
# Initialise matrice of ensemble
# ------------------------------
FeddesParam_mat_ens = [] # matrice of ensemble for Feddes parameters
VG_parms_mat_ens = [] # matrice of ensemble for VG parameters
VG_p_possible_names = ["n_VG", "thetar_VG", "alpha_VG", "VGPSATCELL_VG"]
VG_p_possible_names_positions_in_soil_table = [5, 6, 7, 7]
# SPP_possible_names = ["porosity", "ks"]
ks_het_ens = [] # matrice of ensemble for heterogeneous SPP parameters
POROS_het_ens = [] # matrice of ensemble for heterogeneous SPP parameters
# Archie
# -------
Archie_parms_mat_ens = [] # matrice of ensemble for Archie parameters
Archie_p_names = [
"porosity",
"rFluid_Archie",
"a_Archie",
"m_Archie",
"n_Archie",
"pert_sigma_Archie",
]
# Soil 3d if existing
# -------------------
zone3d = []
if hasattr(self, 'zone3d'):
zone3d = self.zone3d
# loop over dict of perturbated variable
# ----------------------------------------------------------------------
for parm_i, key in enumerate(
list_parm2update
): # loop over perturbated variables dict
# print('update parm' + key)
key_root = re.split("(\d+)", key)
if len(key_root) == 1:
key_root.append("0")
shellprint_update = True
# ------------------------------------------------------------------
for ens_nb in range(self.NENS):
# print('update ensemble' + str(ens_nb))
# change directory according to ensmble file nb
os.chdir(
os.path.join(
self.workdir,
self.project_name,
"./DA_Ensemble/cathy_" + str(ens_nb + 1),
)
)
# atmbc update
# --------------------------------------------------------------
# if key == "atmbc":
# if non-homogeneous atmbc, take only the first node perturbated parameters
# tp avoid looping over all the perturbated atmbc nodes as the update is done
# only once over the stacked perturbated atmbc parameters
if key_root[0].casefold() in "atmbc".casefold():
if key.casefold() in "atmbc0".casefold():
tper_stacked = []
for kk in dict_parm_pert:
if 'atmbc' in kk:
tper_stacked.append(dict_parm_pert[kk]['time_variable_perturbation'])
tper_stacked = np.array(tper_stacked)
VALUE = tper_stacked[:,:,ens_nb]
times = dict_parm_pert[key]['data2perturbate']['time']
self.update_atmbc(
HSPATM=self.atmbc['HSPATM'],
IETO=self.atmbc['IETO'],
time=times,
netValue=VALUE.T,
filename=os.path.join(os.getcwd(), "input/atmbc"),
verbose=self.verbose,
)
else:
pass
# ic update (i.e. water table position update)
# --------------------------------------------------------------
elif key_root[0].casefold() in "WTPOSITION".casefold():
if kwargs["cycle_nb"] == 0:
self.update_ic(
INDP=4,
IPOND=0,
WTPOSITION= dict_parm_pert[key]["ini_perturbation"][
ens_nb
],
filename=os.path.join(os.getcwd(), "input/ic"),
backup=False,
shellprint_update=shellprint_update,
)
# ic update (i.e. initial pressure head update)
# --------------------------------------------------------------
elif key_root[0].casefold() in "ic".casefold():
match_ic_withLayers = re.search(r'ic\d+', key)
if match_ic_withLayers:
DApath = os.getcwd()
saveMeshPath = os.path.join(DApath, "vtk",
self.project_name + '.vtk'
)
ic_values_by_layers = dict_parm_pert[key]['ini_ic_withLayers'][:,ens_nb]
ic_nodes = self.map_prop_2mesh_markers('ic',
ic_values_by_layers,
to_nodes=True,
saveMeshPath=saveMeshPath,
)
self.update_ic(
INDP=1,
IPOND=0,
pressure_head_ini=ic_nodes,
filename=os.path.join(DApath, "input/ic"),
backup=False,
shellprint_update=shellprint_update,
)
else:
if kwargs["cycle_nb"] == 0:
self.update_ic(
INDP=0,
IPOND=0,
pressure_head_ini=dict_parm_pert[key]["ini_perturbation"][ens_nb],
filename=os.path.join(os.getcwd(), "input/ic"),
backup=False,
shellprint_update=shellprint_update,
)
# kss update
# --------------------------------------------------------------
elif key_root[0].casefold() in 'ks':
ks_het_ens = self._read_dict_pert_update_ens_SPP(
dict_parm_pert,
ks_het_ens,
key_root,
key,
update_key,
ens_nb,
shellprint_update
)
elif key_root[0].casefold() in 'porosity':
if hasattr(self, 'zone3d'):
POROS_het_ens = self._read_dict_pert_update_ens_SPP(
dict_parm_pert,
POROS_het_ens,
key_root,
key,
update_key,
ens_nb,
shellprint_update
)
else:
df_SPP, _ = in_CT.read_soil(os.path.join(
self.workdir,
self.project_name,
'DA_Ensemble/cathy_'+ str(ens_nb+1),
'input/soil'
),
self.dem_parameters,
self.cathyH["MAXVEG"]
)
df_SPP_2fill = df_SPP.copy()
df_SPP_2fill['POROS'] = dict_parm_pert[key][update_key][ens_nb]
df_SPP_2fill['POROS']
self.update_soil(
SPP_map=df_SPP_2fill,
FP_map=self.soil_FP["FP_map"],
verbose=self.verbose,
filename=os.path.join(os.getcwd(), "input/soil"),
shellprint_update=shellprint_update,
backup=True,
)
# VG parameters update
# --------------------------------------------------------------
elif key_root[0] in VG_p_possible_names:
# equivalence_CATHY = {
# 'thetar_VG':'VGRMCCELL',
# 'alpha_VG':'-1/VGPSATCELL',
# 'n_VG':'VGNCELL',
# }
# equivalence_CATHY
# retention curves parameters VGN, VGRMC, and VGPSAT
# - 'VGNCELL' (NSTR, NZONE): van Genuchten curve exponent = n
# - 'VGRMCCELL' (NSTR, NZONE): residual moisture content = \thetaR
# - 'VGPSATCELL' (NSTR, NZONE): van Genuchten curve exponent -->
# VGPSAT == -1/alpha (with alpha expressed in [L-1]);
SPP_map = self.soil_SPP["SPP_map"]
if len(VG_parms_mat_ens) == 0:
VG_parms_mat = self.soil_SPP["SPP"]
np.shape(VG_parms_mat)
if len(SPP_map["PERMX"]) < 2:
VG_parms_mat_ens = np.matlib.repmat(
VG_parms_mat[0], self.NENS, 1
)
else:
# VG_parms_mat_ens = np.repeat(VG_parms_mat, self.NENS, axis=0)
print("Non homogeneous soil VG properties not implemented")
if len(SPP_map["PERMX"]) < 2:
# for k in parm_pert.keys():
for k in VG_p_possible_names:
match = [
parm_incr
for parm_incr in dict_parm_pert.keys()
if k in parm_incr
]
if len(match) > 0:
idVG = VG_p_possible_names_positions_in_soil_table[
VG_p_possible_names.index(k)
]
VG_parms_mat_ens[:, idVG] = dict_parm_pert[match[0]][
update_key
]
else:
pass
else:
# VG_parms_mat_ens[:,:,idVG] = np.matlib.repmat(parm_pert[key][update_key],len(VG_parms_mat),1).T
print("Non homogeneous soil VG properties not implemented")
VG_parms_ensi = []
for es in range(self.NENS):
fed = dict()
for i, f in enumerate(list(SPP_map.keys())):
if len(SPP_map["PERMX"]) < 2:
if (
f == "alpha_VG"
): # convert to CATHY VGPSATCELL == -1/alpha
fed[f] = -1 / VG_parms_mat_ens[es, i]
else:
fed[f] = [VG_parms_mat_ens[es, i]]
else:
print(
"Non homogeneous soil VG properties not implemented"
)
# fed[f] = list(VG_parms_mat_ens[es,:,i])
VG_parms_ensi.append(fed)
self.update_soil(
SPP_map=VG_parms_ensi[ens_nb],
FP_map=self.soil_FP["FP_map"],
# zone3d=self.zone3d,
verbose=self.verbose,
filename=os.path.join(os.getcwd(), "input/soil"),
shellprint_update=shellprint_update,
backup=True,
)
# FeddesParam update
# --------------------------------------------------------------
elif key_root[0] in ["PCANA", "PCREF", "PCWLT", "ZROOT", "PZ", "OMGC"]:
SPP_map = self.soil_SPP["SPP_map"]
if len(FeddesParam_mat_ens) == 0:
FeddesParam_mat = self.soil_FP["FP"]
FeddesParam_mat_ens = np.repeat(
FeddesParam_mat, self.NENS, axis=0
)
FeddesParam_mat_ens = np.reshape(
FeddesParam_mat_ens, (self.NENS, self.cathyH["MAXVEG"], 6)
)
idFeddes = ["PCANA", "PCREF", "PCWLT", "ZROOT", "PZ", "OMGC"].index(
key_root[0]
)
# if self.cathyH["MAXVEG"]<2: # Case where only 1 vegetation type
FeddesParam_mat_ens[:, int(key_root[1]), idFeddes] = dict_parm_pert[
key
][update_key]
# FeddesParam_mat_ens[:,:,idFeddes] = parm_pert[key][update_key]
# else: # Case where only 1 vegetation type
FeddesParam_map_ensi = []
for es in range(self.NENS):
fed = dict()
for i, f in enumerate(
["PCANA", "PCREF", "PCWLT", "ZROOT", "PZ", "OMGC"]
):
fed[f] = list(FeddesParam_mat_ens[es, :, i])
FeddesParam_map_ensi.append(fed)
self.update_soil(
FP_map=FeddesParam_map_ensi[ens_nb],
SPP_map=SPP_map,
zone3d=zone3d,
verbose=self.verbose,
filename=os.path.join(os.getcwd(), "input/soil"),
shellprint_update=shellprint_update,
backup=True,
)
# Archie_p update
# --------------------------------------------------------------
elif key_root[0] in Archie_p_names:
self.console.print(
":arrows_counterclockwise: [b]Update Archie parameters[/b]"
)
# self.Archie_parms = {'rFluid':rFluid, 'a':a, 'm':m, 'n':n, 'pert_sigma':pert_sigma}
idArchie = Archie_p_names.index(key_root[0])
if len(Archie_parms_mat_ens) == 0:
for p in Archie_p_names:
if len(self.Archie_parms[p]) != self.NENS:
self.Archie_parms[p] = list(
self.Archie_parms[p] * np.ones(self.NENS)
)
Archie_parms_mat_ens = np.zeros(
[len(Archie_p_names), self.NENS]
)
Archie_parms_mat_ens[:] = np.nan
Archie_parms_mat_ens[idArchie, ens_nb] = dict_parm_pert[key][
update_key
][ens_nb]
if np.isnan(Archie_parms_mat_ens[idArchie, :]).any() == False:
self.Archie_parms[key_root[0]] = list(
Archie_parms_mat_ens[idArchie, :]
)
# print(self.Archie_parms)
else:
# variable perturbated to ommit: state, Archie param
var_pert_to_ommit = ["St. var."]
if not key in var_pert_to_ommit:
raise ValueError(
"key:" + str(key) + " to update is not existing!"
)
shellprint_update = False
pass
def _get_data2assimilate(self, list_assimilated_obs, time_ass=None, match=False):
"""
Loop over observation dictionnary and select corresponding data for a given assimilation time
Parameters
----------
list_assimilated_obs : list
list of observation to assimilate.
Returns
-------
data : list
list of data values.
"""
# find the method to map for this time step
# --------------------------------------------------------
obskey2map, obs2map = self._obs_key_select(list_assimilated_obs)
def extract_data(sensor, time_ass, data):
data2add = []
if "ERT" in sensor:
if "pygimli" in items_dict[time_ass][1][sensor]["data_format"]:
data2add.append(
np.array(items_dict[time_ass][1][sensor]["data"]["rhoa"])
)
else:
data2add.append(
items_dict[time_ass][1][sensor]["data"]["resist"].to_numpy()
)
data2add = np.hstack(data2add)
else:
data2add.append(items_dict[time_ass][1][sensor]["data"])
return data2add
if time_ass is None:
time_ass = self.count_DA_cycle
data = []
# Loop trought observation dictionnary for a given assimilation time (count_DA_cycle)
# -----------------------------------------------------------------
items_dict = list(self.dict_obs.items())
for sensor in items_dict[time_ass][1].keys():
if "all" in list_assimilated_obs:
data2add = extract_data(sensor, time_ass, data)
data.append(data2add)
else:
if match:
if sensor in list_assimilated_obs:
data2add = extract_data(sensor, time_ass, data)
data.append(data2add)
else:
str_sensor_rootname = re.sub(r'\d', '', sensor)
if str_sensor_rootname in list_assimilated_obs:
data2add = extract_data(sensor, time_ass, data)
data.append(data2add)
return np.hstack(data), obskey2map
def _obs_key_select(self, list_assimilated_obs):
"""find data to map from dictionnary of observations"""
# Loop trought observation dictionnary for a given assimilation time (count_DA_cycle)
# -----------------------------------------------------------------
obskey2map = [] # data type 2 map
obs2map = [] # data dict 2 map
items_dict = list(self.dict_obs.items())
for sensor in items_dict[self.count_DA_cycle][1].keys():
if "all" in list_assimilated_obs:
obskey2map.append(sensor)
obs2map.append(items_dict[self.count_DA_cycle][1][sensor])
else:
str_sensor_rootname = re.sub(r'\d', '', sensor)
if str_sensor_rootname in list_assimilated_obs:
obskey2map.append(sensor)
obs2map.append(items_dict[self.count_DA_cycle][1][sensor])
return obskey2map, obs2map
def map_states2Observations(
self,
list_assimilated_obs="all",
parallel=False,
default_state="psi",
**kwargs
):
"""
Translate (map) the state values (pressure head or saturation) to observations (or predicted) values
In other words: apply H mapping operator to convert model to predicted value Hx
In practice, for each assimilation time, loop over the observation
dictionnary and infer observation type. Stack Hx.
Hx = [Hx0,Hx1,Hx_nb_of_observation]
.. note::
- For ERT data, H is Archie law, SWC --> ER0 --> ERapp
- For SWC data, H is a multiplicator (porosity)
- For tensiometer data, no mapping needed or VGP model if model state used is saturation
- For discharge, no mapping needed
- For scale data: no mapping needed
.. note::
- For ERT data each ERT mesh node is an observation
Parameters
----------
state : np.array([])
[pressure heads, saturation] for each mesh nodes. The default is [None, None].
list_assimilated_obs : TYPE, optional
DESCRIPTION. The default is 'all'.
parallel : Bool, optional
DESCRIPTION. The default is False.
**kwargs : TYPE
DESCRIPTION.
Returns
-------
Hx_ens : np.array (size: nb of observations x nb of ensembles)
Ensemble of the simulated (predicted) observations.
"""
#%%
if parallel:
path_fwd_CATHY_list = [] # list of ensemble path of cathy output
for ens_nb in self.ens_valid: # loop over ensemble files
path_fwd_CATHY_list.append(
os.path.join(
self.workdir,
self.project_name,
"DA_Ensemble/cathy_" + str(ens_nb + 1),
)
)
write2shell_map = True
if "write2shell_map" in kwargs:
write2shell_map = kwargs["write2shell_map"]
Hx_ens = [] # matrice of predicted observation for each ensemble realisation
for ens_nb in self.ens_valid: # loop over ensemble files
# print('ens nb:' + str(ens_nb))
path_fwd_CATHY = os.path.join(
self.workdir, self.project_name, "DA_Ensemble/cathy_" + str(ens_nb + 1)
)
df_psi = self.read_outputs(
filename="psi", path=os.path.join(path_fwd_CATHY, self.output_dirname)
)
df_sw, _ = self.read_outputs(
filename="sw", path=os.path.join(path_fwd_CATHY, self.output_dirname)
)
# infer soil parameters properties
# ---------------------------------
# porosity = self.soil_SPP["SPP"][:, 4][0]
SPP_ensi, _ = in_CT.read_soil(os.path.join(path_fwd_CATHY,
self.input_dirname,'soil'
),
self.dem_parameters,
self.cathyH["MAXVEG"]
)
# find data to map with dictionnary of observations
# --------------------------------------------
obskey2map, obs2map = self._obs_key_select(list_assimilated_obs)
state = [df_psi.iloc[-1], df_sw.iloc[-1].values]
Hx_stacked = [] # stacked predicted observation
# Loop over observations to map
# ---------------------------------------------------------------------
for i, obs_key in enumerate(obskey2map):
# print('obs_key nb:' + obs_key)
if "tensio" in obs_key:
obs2map_node = obs2map[i]["mesh_nodes"]
Hx_PH = mapper.tensio_mapper(state,obs2map_node)
Hx_stacked.append(Hx_PH)
if "swc" in obs_key:
# case 2: sw assimilation (Hx_SW)
# --------------------------------------------------------------------
obs2map_node = obs2map[i]["mesh_nodes"]
Hx_SW = mapper.swc_mapper(state,
obs2map_node,
self.grid3d['mesh3d_nodes'],
self.dem_parameters,
SPP_ensi,
self.console
)
Hx_stacked.append(Hx_SW)
if 'ET' in obs_key:
obs2map_node = obs2map[i]["mesh_nodes"]
Hx_ET = mapper.ET_mapper(obs2map_node,
self.console,
path_fwd_CATHY,
)
Hx_stacked.append(Hx_ET)
if "stemflow" in obs_key:
# Atmact-vf(13) : Actual infiltration (+ve) or exfiltration (-ve) at atmospheric BC nodes as a volumetric flux [L^3/T]
# Atmact-v (14) : Actual infiltration (+ve) or exfiltration (-ve) volume [L^3]
# Atmact-r (15) : Actual infiltration (+ve) or exfiltration (-ve) rate [L/T]
# Atmact-d (16) : Actual infiltration (+ve) or exfiltration (-ve) depth [L]
print("Not yet implemented")
if "scale" in obs_key:
# Atmact-vf(13) : Actual infiltration (+ve) or exfiltration (-ve) at atmospheric BC nodes as a volumetric flux [L^3/T]
# Atmact-v (14) : Actual infiltration (+ve) or exfiltration (-ve) volume [L^3]
# Atmact-r (15) : Actual infiltration (+ve) or exfiltration (-ve) rate [L/T]
# Atmact-d (16) : Actual infiltration (+ve) or exfiltration (-ve) depth [L]
print("Not yet implemented")
df_dtcoupling = self.read_outputs(
filename="dtcoupling",
path=os.path.join(path_fwd_CATHY, self.output_dirname),
)
# Hx_scale = state[1][obs2map[i]["mesh_nodes"]] * porosity
# Hx_stacked.append(Hx_scale)
if "discharge" in obs_key:
# case 3: discharge
# need to read the hgsfdet file (Hx_Q)
# --------------------------------------------------------------------
# if key[0] in 'discharge':
# derivation of the dircharge, Q from file 'hgsfdet'
# Hx_Q = []
# Hx.vstack(Hx_Q)
print("Not yet implemented")
if "ERT" in obs_key:
if parallel == False:
Hx_ERT, df_Archie, mesh2test = mapper._map_ERT(
state,
path_fwd_CATHY,
ens_nb,
**kwargs
)
df_Archie_new = mapper._add_2_ensemble_Archie(self.df_Archie,
df_Archie
)
self.df_Archie = df_Archie_new
if "pygimli" in obs2map[i]["data_format"]:
Hx_stacked.append(Hx_ERT["rhoa"])
else:
Hx_stacked.append(Hx_ERT["resist"])
Hx_stacked = np.hstack(Hx_stacked)
if len(Hx_stacked) > 0:
Hx_ens.append(Hx_stacked)
if isinstance(Hx_ens, float):
pass
else:
if len(Hx_ens)>0:
if np.shape(Hx_ens)!=(len(self.ens_valid),len(obskey2map)):
Hx_ens = np.hstack(Hx_ens)
else:
Hx_ens = np.array(Hx_ens).T
#%%
# ---------------------------------------------------------------------
# special case of ERT // during sequential assimilation
# ---------------------------------------------------------------------
for i, obs_key in enumerate(obskey2map):
if "ERT" in obs_key:
if write2shell_map:
self.console.print(":zap: Mapping to ERT prediction")
self.console.print(
r"""
Fwd modelling ERT
Noise level: {??}
""",
style="green",
)
self.console.rule(
":octopus: Parameter perturbation :octopus:", style="green"
)
self.console.print(
r"""
Archie perturbation for DA analysis
Archie rFluid: {}
Pert. Sigma Archie: {}
Nb of zones (?): {}
""".format(
self.Archie_parms["rFluid_Archie"],
self.Archie_parms["pert_sigma_Archie"],
len(self.Archie_parms["rFluid_Archie"]),
),
style="green",
)
self.console.rule("", style="green")
if parallel:
Hx_ens_ERT, df_Archie, mesh2test = mapper._map_ERT_parallel(
self.dict_obs,
self.count_DA_cycle,
self.project_name,
self.Archie_parms,
path_fwd_CATHY_list,
list_assimilated_obs="all",
default_state="psi",
DA_cnb=self.count_DA_cycle,
savefig=False,
verbose=False,
)
# cell_array_names = mesh2test.array_names
# mesh2test['ER_converted1'].min()
# mesh2test['ER_converted1'].max()
# mesh2test.set_active_scalars('ER_converted1')
df_Archie_new = mapper._add_2_ensemble_Archie(self.df_Archie,
df_Archie
)
self.df_Archie = df_Archie_new
if len(Hx_ens) > 0:
Hx_ens = np.vstack([Hx_ens, Hx_ens_ERT])
else:
Hx_ens = Hx_ens_ERT
return Hx_ens # meas_size * ens_size
def _read_state_ensemble(self):
# read grid to infer dimension of the ensemble X
# --------------------------------------------------------------------
# self.grid3d = {}
if len(self.grid3d) == 0:
self.grid3d = in_CT.read_grid3d(
os.path.join(self.workdir, self.project_name)
)
M_rows = np.shape(self.grid3d["mesh3d_nodes"])[0]
N_col = self.NENS
# N_col = len(ens_valid)
# Ensemble matrix X of M rows and N + fill with psi values
# --------------------------------------------------------------------
psi = np.zeros([M_rows, N_col]) * 1e99
sw = np.zeros([M_rows, N_col]) * 1e99
for j in range(self.NENS):
# for j in range(len(ens_valid)):
try:
df_psi = out_CT.read_psi(
os.path.join(
self.workdir,
self.project_name,
"DA_Ensemble/cathy_" + str(j + 1),
"output/psi",
)
)
psi[:, j] = df_psi.iloc[-1, :]
except:
pass
try:
df_sw, _ = out_CT.read_sw(
os.path.join(
self.workdir,
self.project_name,
"DA_Ensemble/cathy_" + str(j + 1),
"output/sw",
)
)
sw[:, j] = df_sw.iloc[-1, :]
except:
pass
# check if there is still zeros
if np.count_nonzero(psi == 1e99) != 0:
print("!!!unconsistent filled X, missing values!!!")
# sys.exit()
return psi, sw, N_col, M_rows
def update_pert_parm_dict(self, update_key, list_update_parm, analysis_param_valid):
"""update dict of perturbated parameters i.e. add a new entry with new params"""
index_update = 0
if self.count_DA_cycle > 0:
for pp in list_update_parm:
if "St. var." in pp:
index_update = index_update + 1
pass
else:
update_key = "update_nb" + str(self.count_DA_cycle)
self.dict_parm_pert[pp][update_key] = analysis_param_valid[
index_update - 1
]
index_update = index_update + 1
# check after analysis
# ----------------------------------------------------------------
id_valid_after = self._check_after_analysis(update_key, list_update_parm)
intersection_set = set.intersection(
set(id_valid_after), set(self.ens_valid)
)
self.ens_valid = list(intersection_set)
pass
def _update_input_ensemble(
self, ENS_times, list_parm2update=["St. var."], analysis=[], **kwargs
):
"""
Select the time window of the hietograph.
Write new variable perturbated/updated value into corresponding input file
Parameters
----------
parm_pert : dict
dict of all perturbated variables holding values and metadata.
Returns
-------
New file written/overwritten into the input dir.
"""
self.console.print(":sponge: [b]update input ensemble[/b]")
# resample atmbc file for the given DA window
# ---------------------------------------------------------------------
self.selec_atmbc_window(self.NENS, ENS_times)
# ---------------------------------------------------------------------
self.update_ENS_files(
self.dict_parm_pert,
list_parm2update,
cycle_nb=self.count_DA_cycle,
analysis=analysis,
)
pass
def selec_atmbc_window(self, NENS, ENS_times):
"""
Select the time window of the hietograph
== time between two assimilation observations
Parameters
----------
NENS : list
# of valid ensemble.
ENS_times : list
Assimilation times (in sec).
"""
if len(self.grid3d) == 0:
self.run_processor(IPRT1=3, DAFLAG=0)
self.grid3d = out_CT.read_grid3d(os.path.join(self.workdir,
self.project_name,
'output', 'grid3d')
)
# read full simulation atmbc and filter time window
# ----------------------------------------------------------------------
df_atmbc, HSPATM, IETO = in_CT.read_atmbc(
os.path.join(self.workdir, self.project_name, "input", "atmbc"),
grid=self.grid3d,
)
# Lopp over ensemble
# ------------------
for ens_nb in range(self.NENS):
cwd = os.path.join(self.workdir,
self.project_name,
"./DA_Ensemble/cathy_" + str(ens_nb + 1)
)
# Case where atmbc are perturbated
# -------------------------------
if 'atmbc0' in self.dict_parm_pert.keys(): # case where atmbc are homogeneous
if HSPATM!=0:
VALUE = np.array(self.dict_parm_pert['atmbc0']['time_variable_perturbation'])[:,ens_nb]
times = self.dict_parm_pert['atmbc0']['data2perturbate']['time']
df_atmbc = pd.DataFrame(np.c_[times,VALUE],
columns=['time','value']
)
if self.count_atmbc_cycle is not None:
if HSPATM==0:
time_window_atmbc = [
df_atmbc.time.unique()[self.count_atmbc_cycle],
df_atmbc.time.unique()[self.count_atmbc_cycle+1]
]
else:
time_window_atmbc = [
df_atmbc.iloc[self.count_atmbc_cycle]["time"],
df_atmbc.iloc[self.count_atmbc_cycle + 1]["time"],
]
else:
time_window_atmbc = [ENS_times[0], ENS_times[1]]
df_atmbc_window = df_atmbc[
(df_atmbc["time"] >= time_window_atmbc[0])
& (df_atmbc["time"] <= time_window_atmbc[1])
]
diff_time = time_window_atmbc[1] - time_window_atmbc[0]
self.update_parm(
TIMPRTi=[0, diff_time],
TMAX=diff_time,
filename=os.path.join(cwd, "input/parm"),
IPRT1=2,
backup=True,
)
VALUE = []
for t in df_atmbc_window["time"].unique():
VALUE.append(df_atmbc_window[df_atmbc_window["time"] == t]["value"].values)
self.update_atmbc(
HSPATM=HSPATM,
IETO=IETO,
time=[0, diff_time],
netValue=VALUE,
filename=os.path.join(cwd, "input/atmbc"),
)
pass
def _DA_df(
self, state=[None, None], state_analysis=None, rejected_ens=[], **kwargs
):
"""
Build a dataframe container to keep track of the changes before and after update for each ensemble.
This dataframe is the support for DA results visualisation.
Parameters
----------
sw : np.array([])
State variable mesh nodes values (before update).
sw_analysis : np.array([])
State variable mesh nodes values after DA analysis.
Returns
-------
None.
"""
df_DA_ti_ni = pd.DataFrame() # nested df for a given ensemble/given time
df_DA_ti = pd.DataFrame() # nested df for a given time
cols_root = [
"time",
"Ensemble_nb",
"psi_bef_update",
"sw_bef_update_",
"analysis",
"OL", # Open loop boolean
"rejected",
]
# Open loop kwargs
# --------------------------
t_ass = []
if "t_ass" in kwargs:
t_ass = kwargs["t_ass"]
NENS = []
if "ens_nb" in kwargs:
ens_nb = kwargs["ens_nb"]
# --------------------------
if "openLoop" in kwargs:
data_df_root = [
t_ass * np.ones(len(state[0])),
ens_nb * np.ones(len(state[0])),
state[0],
state[1],
state[0],
True * np.ones(len(state[0])),
False * np.ones(len(state[0])),
]
df_DA_ti = pd.DataFrame(np.transpose(data_df_root), columns=cols_root)
else:
for n in range(self.NENS):
data_df_root = [
self.count_atmbc_cycle * np.ones(len(state[0])),
n * np.ones(len(state[0])),
state[0][:, n],
state[1][:, n],
state_analysis[:, n],
False * np.ones(len(state[0])),
int(rejected_ens[n]) ** np.ones(len(state[0])),
]
df_DA_ti_ni = pd.DataFrame(
np.transpose(data_df_root), columns=cols_root
)
df_DA_ti = pd.concat([df_DA_ti, df_DA_ti_ni], axis=0)
self.df_DA = pd.concat([self.df_DA, df_DA_ti], axis=0, ignore_index=True)
pass
def _add_to_perturbated_dict(self, var_per_2add):
"""
Update dict of perturbated variable.
"""
self.var_per_dict = self.var_per_dict | var_per_2add
return self.var_per_dict
def _create_subfolders_ensemble(self):
"""
Create ensemble subfolders
"""
if not os.path.exists(
os.path.join(self.workdir, self.project_name, "DA_Ensemble/cathy_origin")
):
os.makedirs(
os.path.join(
self.workdir, self.project_name, "DA_Ensemble/cathy_origin"
)
)
# copy input, output and vtk dir
for dir2copy in enumerate(["input", "output", "vtk"]):
shutil.copytree(
os.path.join(self.workdir, self.project_name, dir2copy[1]),
os.path.join(
self.workdir,
self.project_name,
"DA_Ensemble/cathy_origin",
dir2copy[1],
),
)
try:
# for ff in [self.processor_name,'cathy.fnames','prepro']
# copy exe into cathy_origin folder
shutil.copy(
os.path.join(self.workdir, self.project_name, self.processor_name),
os.path.join(
self.workdir,
self.project_name,
"DA_Ensemble/cathy_origin",
self.processor_name,
),
)
# copy cathy.fnames into cathy_origin folder
shutil.copy(
os.path.join(self.workdir, self.project_name, "cathy.fnames"),
os.path.join(
self.workdir,
self.project_name,
"DA_Ensemble/cathy_origin/cathy.fnames",
),
)
# copy prepro into cathy_origin folder
shutil.copytree(
os.path.join(self.workdir, self.project_name, "prepro"),
os.path.join(
self.workdir, self.project_name, "DA_Ensemble/cathy_origin/prepro"
),
)
except:
self.console.print(":worried_face: [b]processor exe not found[/b]")
path_origin = os.path.join(
self.workdir, self.project_name, "DA_Ensemble/cathy_origin"
)
# copy origin folder to each ensemble subfolders
for i in range(self.NENS):
path_nudn_i = os.path.join(
self.workdir, self.project_name, "DA_Ensemble/cathy_" + str(i + 1)
)
if not os.path.exists(path_nudn_i):
shutil.copytree(path_origin, path_nudn_i)
def set_Archie_parm(
self,
porosity=[],
rFluid_Archie=[1.0],
a_Archie=[1.0],
m_Archie=[2.0],
n_Archie=[2.0],
pert_sigma_Archie=[0],
):
"""
Dict of Archie parameters. Each type of soil is describe within a list
Note that if pert_sigma is not None a normal noise is
added during the translation of Saturation Water to ER
Parameters
----------
rFluid : TYPE, optional
Resistivity of the pore fluid. The default is [1.0].
a : TYPE, optional
Tortuosity factor. The default is [1.0].
m : TYPE, optional
Cementation exponent. The default is [2.0].
(usually in the range 1.3 -- 2.5 for sandstones)
n : TYPE, optional
Saturation exponent. The default is [2.0].
pert_sigma_Archie : TYPE, optional
Gaussian noise to add. The default is None.
..note:
Field procedure to obtain tce he covariance structure of the model
estimates is described in Tso et al () - 10.1029/2019WR024964
"Fit a straight line for log 10 (S) and log 10 (ρ S ) using the least-squares criterion.
The fitting routine returns the covariance structure of the model estimates, which can be used to de-
termine the 68% confidence interval (1 standard deviation) of the model estimates.""
"""
if len(porosity) == 0:
porosity = self.soil_SPP["SPP"][:, 4][0]
if not isinstance(porosity, list):
porosity = [porosity]
self.Archie_parms = {
"porosity": porosity,
"rFluid_Archie": rFluid_Archie,
"a_Archie": a_Archie,
"m_Archie": m_Archie,
"n_Archie": n_Archie,
"pert_sigma_Archie": pert_sigma_Archie,
}
def _performance_assessement(
self,
list_assimilated_obs,
data,
prediction,
t_obs,
**kwargs
):
"""
(Normalized) root mean square errors (NRMSEs)
RMSE is compute separately for each observation assimilated
----------
list_assimilated_obs : TYPE
DESCRIPTION.
Data : TYPE
Refers to the measured data.
Observation : TYPE
Refers to the simulated data.
Returns
-------
TYPE
DESCRIPTION.
"""
OL_bool = False
if "openLoop" in kwargs:
OL_bool = True
if hasattr(self, "df_performance") is False:
# initiate if simulation just started
# -------------------------------------------------------------
self.df_performance = pd.DataFrame()
self.RMSE_avg_stacked = []
# NAs default
# -------------------------------------------------------------
# RMSE_avg = np.nan # root mean square error (RMSE)
# NMRMSE = np.nan # time-averaged normalized root mror: unexpected indent
# average differences over the number of ensemble
# ALL OBSERVATIONS
# ------------------------------
all_Obs_diff_mat = np.zeros(np.shape(prediction))
for i in range(len(data)):
for j in range(np.shape(prediction)[1]): # Loop over ensemble collumns
all_Obs_diff_mat[i, j] = abs(data[i] - prediction[i, j])
# all_Obs_diff_mat[:,j] = abs(data[i]-prediction[i,j])
all_Obs_diff_avg = np.nansum(all_Obs_diff_mat, axis=1) * (1 / len(prediction))
# average differences over all the observations
all_Obs_RMSE_avg_ti = np.sum(all_Obs_diff_avg, axis=0) * (1 / len(data))
# # compute metrics for each observation variable
# # ------------------------------------------
# Loop trought observation dictionnary for a given assimilation time (count_DA_cycle)
# -----------------------------------------------------------------
obs2eval_key = [] # data type 2 eval
obs2eval = [] # data dict 2 eval
_, obs2eval_key = self._get_data2assimilate(list_assimilated_obs)
start_line_obs = 0
for s, name_sensor in enumerate(obs2eval_key): # case where there is other observations than ERT
obs2eval, _ = self._get_data2assimilate([name_sensor], match=True)
# number of observation at a given time
# for ERT number of observation = is the number of grid cells
n_obs = len(obs2eval)
prediction2eval = prediction[start_line_obs : start_line_obs + n_obs]
obs2eval_diff_mat = np.zeros(np.shape(prediction2eval))
for i in range(len(obs2eval)):
for j in range(
np.shape(prediction2eval)[1]
): # Loop over ensemble collumns
obs2eval_diff_mat[i, j] = abs(obs2eval[i] - prediction2eval[i, j])
obs2eval_diff_avg = np.nansum(obs2eval_diff_mat, axis=1) * (
1 / len(prediction2eval)
)
start_line_obs = start_line_obs + n_obs
if "ERT" in name_sensor:
RMSE_sensor_ti = np.sum(obs2eval_diff_avg, axis=0) * (1 / len(data))
# Plot here scatter Transfer resistance scatter plot between the observed and simulated data
# plt.scatter(obs2eval[i],prediction2eval[i,j])
else:
RMSE_sensor_ti = obs2eval_diff_avg
# compute normalised RMSE
# ---------------------------------------------------------
self.RMSE_avg_stacked.append(all_Obs_RMSE_avg_ti)
if 'ObsType' in self.df_performance:
if name_sensor in self.df_performance['ObsType'].unique():
bool_sensor = self.df_performance['ObsType']==name_sensor
RMSE_sensor_stacked = (self.df_performance[bool_sensor]["RMSE" + name_sensor].sum()
+ RMSE_sensor_ti
)
else:
RMSE_sensor_stacked = RMSE_sensor_ti
# if hasattr(self, 'RMSE') is False:
if t_obs == 0:
NMRMSE_sensor_ti = RMSE_sensor_ti
NMRMSE_avg_ti = all_Obs_RMSE_avg_ti
else:
NMRMSE_sensor_ti = (1 / (t_obs + 1)) * RMSE_sensor_stacked
NMRMSE_avg_ti = (1 / (t_obs + 1)) * np.sum(self.RMSE_avg_stacked)
# root names for the collumns name
# -------------------------------------------------------------
cols_root = [
"time",
"ObsType",
"RMSE" + name_sensor,
"RMSE_avg",
"NMRMSE" + name_sensor,
"NMRMSE_avg",
"OL",
]
# root data
# -------------------------------------------------------------
data_df_root = [
[t_obs],
[name_sensor],
[RMSE_sensor_ti],
[all_Obs_RMSE_avg_ti],
[NMRMSE_sensor_ti],
[NMRMSE_avg_ti],
[OL_bool],
]
df_performance_ti = pd.DataFrame(
np.array(data_df_root, dtype=object).T, columns=cols_root
)
# concatenate with main RMSE dataframe
# -------------------------------------------------------------
self.df_performance = pd.concat(
[self.df_performance, df_performance_ti], axis=0, ignore_index=True
)
return self.df_performance