Source code for cathy_utils

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Utilities for pyCATHY
- Unit conversion and labelling
- Others utilities
"""

import pandas as pd
from shapely.geometry import mapping

from datetime import datetime, timedelta
# from rosetta import rosetta, SoilData

#%% ---------------------------------------------------------------------------
#############    Unit conversion and label ####################################
## ---------------------------------------------------------------------------

def MPa2m(mpa):
    return mpa * 101.99773339984

[docs] def kPa2m(kpa): return kpa * 0.101971621
def m2kPa(m): return m/0.101971621
[docs] def kPa2cm(kpa): return kpa * 0.101971621 * 1e2
[docs] def transform2_time_delta(t, x_units): """ Time to time delta """ delta_t = pd.to_timedelta(t, unit=x_units) return delta_t
[docs] def convert_time_units(t, x_units): """ convert time units """ xlabel = " (s)" if x_units == "days": xlabel = " (days)" t_new = [x / (24 * 60 * 60) for x in t] t_new = round(t_new[0], 2) if x_units == "hours": xlabel = " (h)" t_new = [x / (60 * 60) for x in t] t_new = round(t_new[0], 1) return xlabel, t_new
# Atmpot-vf (9) : Potential atmospheric forcing (rain +ve / evap -ve) as a volumetric flux [L^3/T] # Atmpot-v (10) : Potential atmospheric forcing volume [L^3] (See parm input file for units) # Atmpot-r (11) : Potential atmospheric forcing rate [L/T] # Atmpot-d (12) : Potential atmospheric forcing depth [L] # Atmact-vf(13) : Actual infiltration (+ve) or exfiltration (-ve) at atmospheric BC nodes as a volumetric flux [L^3/T] # Atmact-v (14) : Actual infiltration (+ve) or exfiltration (-ve) volume [L^3] # Atmact-r (15) : Actual infiltration (+ve) or exfiltration (-ve) rate [L/T] # Atmact-d (16) : Actual infiltration (+ve) or exfiltration (-ve) depth [L]
[docs] def label_units(units, **kwargs): """ label units """ if units == "SW": label = "Soil Water Content \n ($m^{3}/m^{3}$)" elif units == "PH": label = "Pressure head (m)" elif units == "CKRW": label = "Relative hydraulic conductivity" elif units == "QTRANIE": label = "Root‐zone water uptake" elif units == "Atmpot-vf": label = "ETp (rain +ve / evap -ve) \n ($[L^{3}/T]$)" elif units == "Atmpot-v": label = "ETp volume \n ($[L^{3}]$)" elif units == "Atmpot-r": label = "ETp rate \n ($[L/T]$)" elif units == "Atmpot-d": label = "ETp depth \n ($[L]$)" elif units == "Atmact-vf": label = "Actual infiltration (+ve) or exfiltration (-ve) \n at atmospheric BC nodes \n as a volumetric flux \n ($[L^3/T]$)" elif units == "Atmact-v": label = "Actual infiltration (+ve) or exfiltration (-ve) volume \n ($[L^3]$)" elif units == "Atmact-r": label = "Actual infiltration (+ve) or exfiltration (-ve) rate \n ($[L/T]$)" elif units == "Atmact-d": label = "Actual infiltration (+ve) or exfiltration (-ve) depth \n ($[L]$)" return label
[docs] def change_x2date(time_in_sec, start_date, formatIn="%Y%m%d", formatOut="%Y-%m-%d %H:%M:%S" ): """change x axis in sec to datetime""" date0 = pd.to_datetime(start_date, format=formatIn) # date0 = pd.to_datetime(start_date) # date_label = [date0] date_label_str = [date0.strftime("%Y-%m-%d %H:%M:%S")] dates = [] for d in time_in_sec[1:]: # date_label.append(date0 + timedelta(seconds=int(d))) date_label_str.append( (date0 + timedelta(seconds=int(d))).strftime("%Y-%m-%d %H:%M:%S") ) # dates.append((date0 + timedelta(seconds=int(d))).strftime("%Y-%m-%d %H:%M:%S"), # format=formatOut) dates = pd.to_datetime(date_label_str, format="%Y-%m-%d %H:%M:%S") # date0 = pd.to_datetime(start_date) # Vectorized operation to add seconds directly to the start date # dates = date0 + pd.to_timedelta(time_in_sec, unit='s') return dates
#%% --------------------------------------------------------------------------- ############# Other utils functions ############################# ## --------------------------------------------------------------------------- def backup_simulog_DA(args,filename='DAlog.csv'): ''' Get simulation arguments and save it to a csv file to keep track of the Data Assimilation done. Returns the index if the simulation is already existing ''' results_df = pd.read_csv(filename,index_col=0) now = datetime.now() results_df_cols = vars(args).keys() results_df_new = pd.DataFrame([vars(args)]) cols2check = list(vars(args).keys()) values = results_df_new[cols2check].values matching_index = results_df.index[(results_df[cols2check] == values).all(axis=1)].tolist() if matching_index: now = datetime.now() results_df.loc[matching_index, 'datetime'] = now matching_index = matching_index[0] else: results_df_new['datetime']=now results_df = pd.concat([results_df,results_df_new],ignore_index=True) matching_index = len(results_df)-1 results_df.to_csv(filename) return results_df, matching_index def clip_ET_withLandCover( LCnames, gdf_AOI, ETxr, ETname='ACT. ETRA', crs_ET=None, ): """ Clips an ET xarray dataset using land cover masks and returns the modified dataset. Parameters ---------- LCnames : list of str Names of the land cover classes to use as masks, corresponding to the 'POI/AOI' column in `gdf_AOI`. gdf_AOI : geopandas.GeoDataFrame GeoDataFrame containing the area of interest (AOI) polygons with a 'POI/AOI' column for land cover types. ETxr : xarray.Dataset The xarray dataset containing the evapotranspiration (ET) data. ETname : str, optional Name of the ET variable in `ETxr`. Default is 'ACT. ETRA'. crs_ET : str, optional Coordinate reference system of the ET dataset. If not provided, it is inferred from the dataset. Returns ------- xarray.Dataset The modified `ETxr` dataset with added layers for each land cover mask under the names "<LCname>_CLCmask". Notes ----- - Each land cover mask is created using the geometry in `gdf_AOI` for the specified `LCnames`. - The `clip` method uses the geometries to create masks applied to the ET dataset. - The function returns the modified dataset without generating any plots. Examples -------- >>> ETxr_updated = clip_ET_withLandCover( ... LCnames=['Lake', 'Intensive Irrigation'], ... gdf_AOI=gdf, ... ETxr=et_xr, ... ETname='ET', ... crs_ET='EPSG:4326' ... ) """ for lcn in LCnames: # axs is still included for flexibility CLC_mask = gdf_AOI.set_index('POI/AOI').loc[lcn].geometry ETxr = ETxr.rio.write_crs(crs_ET) mask_ETA = ETxr[ETname].rio.clip( CLC_mask.apply(mapping), crs_ET, drop=False ) ETxr[lcn + '_CLCmask'] = mask_ETA return ETxr # utils observations # ------------------
[docs] def dictObs_2pd(dict_obs): """dict of observation to dataframe of observation""" df_obs = pd.DataFrame.from_dict(dict_obs).stack().to_frame() df_obs = pd.DataFrame(df_obs[0].values.T.tolist(), index=df_obs.index) df_obs.index.names = ["sensorNameidx", "assimilation time"] return df_obs
def resynchronise_times(data_measure, atmbc_df, time_offset): """resynchronise dict as old key is elapsed time in second from the first observation, while new key is from the first atmbc time """ data_measure_sync = dict(data_measure) try: new_keys = [] for d in range(len(data_measure_sync.keys())): items_dict = list(data_measure_sync.items()) # new_key = ( # atmbc_df["diff"] # .dt.total_seconds() # .unique()[d] # ) old_key = list(data_measure_sync.keys())[d] new_key = list(data_measure_sync.keys())[d] - time_offset for sensor in list(items_dict[d][1].keys()): data_measure_sync[old_key][sensor]["assimilation_times"] = new_key new_keys.append(new_key) data_measure_sync = dict(zip(new_keys, data_measure_sync.values())) except: print("datetime first atmbc point") print(atmbc_df["datetime"][0]) print("datetime first measurement") print(data_measure[0][sensor]["datetime"]) print("cant synchronise times - continue without") return data_measure_sync