"""
Module to load input data files.
@author: Mengqi Zhao (mengqi.zhao@pnnl.gov)
@Project: GLORY v1.0
License: BSD 3-Clause, see LICENSE and DISCLAIMER files
Copyright (c) 2023, Battelle Memorial Institute
"""
import logging
import os
import pandas as pd
[docs]
class DataLoader:
"""
Load Data
"""
global capacity_gcam_pre
def __init__(self, config, basin_id, period, base_period=2020, demand_gcam=None, capacity_gcam=None):
"""
Initialization
:param basin_id: integer for basin id to select
:param period: integer for period (year)
:param base_period: integer for base year
:param demand_gcam: dataframe for GCAM sovled demand
:param capacity_gcam: dataframe for storage capacity based on GCAM solved runoff demand
"""
logging.info(f'Starting function read_data for basin {basin_id}.')
self.basin_id = basin_id
self.period = period
self.base_period = base_period
self.demand_gcam = demand_gcam
self.capacity_gcam = capacity_gcam
self.climate = self.load_data(config.input_files['climate'])
self.profile = self.load_data(config.input_files['monthly_profile'])
self.demand_hist = self.load_data(config.input_files['sectoral_demand'])
self.reservoir = self.load_data(config.input_files['reservoir'])
self.slope = self.load_data(config.input_files['slope'])['slope'].iloc[0]
self.basin_name_std = self.load_basin_mapping(f_basin_country=config.reference_files['basin_to_country_mapping'],
f_basin_region=config.reference_files['basin_to_region_mapping'],
header_num=7)
self.inflow = self.climate.loc[self.climate['period'] == self.period, 'runoff_km3'].iloc[0]
self.evap_depth = self.climate.loc[self.climate['period'] == self.period, 'evaporation_km'].iloc[0]
self.res_area = self.reservoir['nonhydro_area_km2'].iloc[0]
self.inflow_profile = dict(zip(self.profile.loc[self.profile['period'] == self.period, 'month'],
self.profile.loc[self.profile['period'] == self.period, 'inflow']))
self.evap_profile = dict(zip(self.profile.loc[self.profile['period'] == self.period, 'month'],
self.profile.loc[self.profile['period'] == self.period, 'evaporation']))
self.demand_profile = self.get_demand_profile()
# capacity values
self.no_expansion = False
self.max_capacity = self.get_max_capacity()
self.current_capacity = self.get_current_capacity()
# expected incremental size for reservoir storage capacity expansion
self.expan_incr = self.reservoir['mean_cap_km3'].iloc[0]
# define constant
self.storage_min = 0
self.m = 0.1
logging.info('Function read_data completed successfully.')
[docs]
def load_data(self, fn, header_num=0):
"""
Load data from a CSV file to pandas dataframe.
:param fn: string for name of file to load
:param header_num: integer for number of lines in file to skip, if text or csv file
:return: pandas dataframe
"""
if not os.path.isfile(fn):
raise IOError("Error: File does not exist:", fn)
# for CSV files
elif fn.endswith('.csv'):
df = pd.read_csv(fn, skiprows=header_num)
df = df.loc[df['basin_id'] == self.basin_id]
else:
raise RuntimeError("File {} has unrecognized extension".format(fn))
return df
[docs]
def load_gcam_demand(self):
"""
Load and Format data extract from GCAM using gcamwrapper. The data should be for 235 basins, 6 demand sectors,
and a single time period.
:return: dataframe for sectoral annual demand for selected basin
"""
if self.demand_gcam is not None:
df = self.demand_gcam
# convert objects columns to string
df = df[['sector', 'subsector', 'year', 'physical-demand']]. \
rename(columns={'subsector': 'gcam_basin_name', 'physical-demand': 'value'})
df[['sector', 'gcam_basin_name']] = df[['sector', 'gcam_basin_name']].astype('string', errors='raise')
# format water withdrawal extracted from GCAM using gcamwrapper
df['sector'] = df['sector'].str.split(pat='_', expand=True).astype('string', errors='raise').iloc[:, 2]
# replace demand sector names
rep = {'an': 'livestock', 'elec': 'electric', 'ind': 'industry',
'irr': 'irrigation', 'muni': 'domestic', 'pri': 'mining'}
df = df.replace({'sector': rep})
# add standard basin id and basin name
df = df.merge(self.basin_name_std, how='left', on='gcam_basin_name')
# filter to selected basin
df = df.loc[df['basin_id'] == self.basin_id]
# aggregate demand for each sector within each basin
grp = df.groupby(['basin_id', 'gcam_basin_name', 'sector', 'year'], as_index=False).sum()
# rename column 'value'
grp = grp.rename(columns={'value': 'demand_km3'})
else:
grp = None
return grp
[docs]
@staticmethod
def load_basin_mapping(f_basin_country, f_basin_region, header_num=7):
"""
Mapping different formats of basin names.
:param f_basin_country: string for full file path to basin-country mapping file
:param f_basin_region: string for full file path to basin-region mapping file
:param header_num: integer for numbers of rows to skip until the header
:return: dataframe
"""
# load basin mapping data
df_basin = pd.read_csv(f_basin_country, skiprows=header_num)
df_region = pd.read_csv(f_basin_region)
# select relevant columns and rename
df_basin = df_basin.loc[:, ['GCAM_basin_ID', 'Basin_long_name', 'GLU_name']]
df_basin = df_basin.rename(columns={'GCAM_basin_ID': 'basin_id',
'Basin_long_name': 'basin_name',
'GLU_name': 'gcam_basin_name'})
# convert from object to string or int
df_basin[['basin_name', 'gcam_basin_name']] = df_basin[['basin_name', 'gcam_basin_name']].astype('string', errors='raise')
df_basin['basin_id'] = df_basin['basin_id'].astype('int32', errors='raise')
# join region name
df = pd.merge(df_basin, df_region, how='left', on=['gcam_basin_name'])
return df
[docs]
def get_demand_profile(self):
"""
Calculate total demand profile with historical sectoral profile and sectoral demand.
:return: dictionary
"""
if self.period <= self.base_period:
# get historical demand
df_demand = self.demand_hist[['sector', 'demand_km3']]
elif self.period > self.base_period:
if self.load_gcam_demand() is None:
df_demand = self.demand_hist[['sector', 'demand_km3']]
else:
if self.load_gcam_demand()['demand_km3'].sum() == 0:
print('Basin: ', self.basin_id, ' has a sum of 0 demand from all sectors. Replace demand profile with historical profile.')
df_demand = self.demand_hist[['sector', 'demand_km3']]
else:
# reformat gcam withdrawal
df_demand = self.load_gcam_demand()[['sector', 'demand_km3']]
# only keep sectoral demand profiles and melt
df = self.profile.loc[self.profile['period'] == self.period].copy()
df = df.drop(['basin_id', 'basin_name', 'period', 'evaporation', 'inflow'], axis=1). \
melt(id_vars=['month']).rename(columns={'variable': 'sector'})
# merge annual demand and sectoral demand profiles
df = pd.merge(df, df_demand, how='left', on=['sector'])
# calculate demand amount for each demand sector
df['demand_sector'] = df['value'] * df['demand_km3']
# calculate total monthly demand by aggregating all demand sectors
df = df.groupby('month', as_index=False).sum()
# calculate profile
df['profile'] = df['demand_sector'] / df['demand_km3']
# construct dictionary for month and demand profile
dict_out = dict(zip(df['month'], df['profile']))
return dict_out
[docs]
def get_current_capacity(self):
"""
calculate if there will be expansion on storage capacity.
:return: float value for storage capacity
"""
if self.period <= self.base_period:
# existing storage capacity
val = self.reservoir['nonhydro_cap_km3'].iloc[0]
if self.capacity_gcam is not None:
if self.period == self.base_period + 5:
val = max(self.reservoir['nonhydro_cap_km3'].iloc[0],
self.capacity_gcam.loc[self.capacity_gcam['basin_id'] == self.basin_id, 'capacity_gcam'].values[0])
else:
# if gcam solved capacity is larger than previous capacity, then expand
capacity_gcam_pre_basin = capacity_gcam_pre.loc[capacity_gcam_pre['basin_id'] == self.basin_id, 'capacity_gcam'].values[0]
capacity_gcam_basin = self.capacity_gcam.loc[self.capacity_gcam['basin_id'] == self.basin_id, 'capacity_gcam'].values[0]
if capacity_gcam_basin > capacity_gcam_pre_basin:
val = capacity_gcam_basin
else:
val = capacity_gcam_pre_basin
else:
val = self.reservoir['nonhydro_cap_km3'].iloc[0]
return val
[docs]
def get_max_capacity(self):
"""
Adjust maximum storage capacity value only if the basin have no expandable capacity.
Max storage capacity input data for some basins is already adjusted based on the historical storage capacity.
:return: float64
"""
val = self.reservoir['expan_cap_km3'].iloc[0]
# adjust max storage cap to mean annual runoff both max and current capacities are 0
if val == 0:
val = 0.01 * self.climate.loc[self.climate['period'] == self.period, 'runoff_km3'].iloc[0]
return val