Source code for friendly_data.iamc

"""Interface to convert a Friendly dataset to IAMC format

Configuration can be done using two separate files, A global config file (in
YAML format) can set options like mapping an index column to the corresponding
IAMC names, and setting default values for mandatory columns.  Whereas per
dataset configuration like, identifying index columns, mapping a dataset to its
IAMC variable name, defining column aliases, and aggregations can be done in an
index file (in YAML format).

from itertools import chain
from logging import getLogger
from pathlib import Path
from typing import cast, Dict, Iterable, List, Tuple, Union

from glom import glom, Iter, Match, MatchError, Or, T
import pandas as pd

from friendly_data._types import _path_t
from friendly_data.converters import _reader, resolve_aliases, to_df
from friendly_data.dpkg import pkgindex
from friendly_data.dpkg import res_from_entry
from friendly_data.helpers import idx_lvl_values, idxslice
from friendly_data.helpers import import_from
from friendly_data.helpers import filter_dict
from friendly_data.helpers import is_fmtstr
from import dwim_file

# weak dependency on pyam; damn plotly!
pyam = import_from("pyam", "")

logger = getLogger(__name__)

[docs]class IAMconv: """Converter class for IAMC data This class resolves index columns against the "semi-hierarchical" variables used in IAMC data, and separates them into individual datasets that are part of the datapackage. It relies on the index file and index column definitions to do the disaggregation. It also supports the reverse operation of aggregating multiple datasets into an IAMC dataset. **TODO:** - describe assumptions (e.g. case insensitive match) and fallbacks (e.g. missing title) - limitations (e.g. when no index column exists) """ _IAMC_IDX = pyam.IAMC_IDX + ["year"] @classmethod def _validate(cls, conf: Dict) -> Dict: # FIXME: check if file exists for user defined idxcols conf_match = Match( { "indices": {str: Or(str, int)}, # int for year str: object, # fall through for other config keys } ) try: return glom(conf, conf_match) except MatchError as err: logger.exception( f"{err.args[1]}: must define a dictionary of files pointing to idxcol" "definitions for IAMC conversion, or set a default value for one of:" f"{', '.join(cls._IAMC_IDX)}" ) raise err from None @classmethod def _warn_empty(cls, df: pd.DataFrame, entry: Dict): if df.empty: # prefer name over path because when name is present, it is more # likely to be more meaningful path_or_name = entry["name"] if "name" in entry else entry["path"] logger.warning(f"{path_or_name}: empty dataframe, check index entry")
[docs] @classmethod def from_file(cls, confpath: _path_t, idxpath: _path_t) -> "IAMconv": """Create a mapping of IAMC indicator variables with index columns Parameters ---------- confpath : Union[str, Path] Path to config file for IAMC <-> data package config file idxpath : Union[str, Path] Path to index file **kwargs Keyword arguments passed on to the pandas reader backend. Returns ------- IAMconv """ basepath = Path(idxpath).parent conf = cls._validate(cast(Dict, dwim_file(confpath))) return cls(pkgindex.from_file(idxpath), conf["indices"], basepath=basepath)
[docs] @classmethod def read_indices(cls, path: _path_t, basepath: _path_t, **kwargs) -> pd.Series: """Read index column definitions provided in config""" _lvls: pd.Series = _reader( Path(basepath) / path, usecols=["name", "iamc"], index_col="name", **kwargs, ) # fallback when iamc name is missing; capitalized name is the most common return _lvls.squeeze("columns").fillna({i: i.capitalize() for i in _lvls.index})
@property def basepath(self): """Data package basepath, directory the index file is located""" return self._basepath @property def indices(self) -> Dict: """Index definitions - Default value of mandatory index columns in case they are missing - Different levels of user defined index columns; points to a 2-column CSV file, with the "name" and "iamc" columns """ return self._indices @indices.setter def indices(self, indices: Dict): self._indices = { col: path_or_default if col in self._IAMC_IDX else self.read_indices(path_or_default, self.basepath) for col, path_or_default in indices.items() } @property def res_idx(self) -> pkgindex: """Package index Each entry corresponds to a resource that maybe included in IAMC output. """ return self._res_idx @res_idx.setter def res_idx(self, idx: pkgindex): self._res_idx = pkgindex(glom(idx, Iter().filter(T.get("iamc")).all())) def __init__(self, idx: pkgindex, indices: Dict, basepath: _path_t): """Converter initialised with a set of IAMC variable index column defintions Parameters ---------- idx : `friendly_data.dpkg.pkgindex` Index of datasets with IAMC variable definitions indices : Dict[str, Union[int, float, str, Path]] Index column definitions; a default value for an IAMC index column, path to a 2-column CSV file defining levels for a user defined index column (see :meth:`IAMconv.indices`) basepath : Union[str, Path] Top-level directory of the data package """ self._basepath = Path(basepath) # order important, needed by @indices.setter self.indices = indices self.res_idx = idx
[docs] def index_levels(self, idxcols: Iterable) -> Dict[str, pd.Series]: """Index levels for user defined index columns Parameters ---------- idxcols : Iterable[str] Iterable of index column names Returns ------- Dict[str, pd.Series] Different values for a given set of index columns """ userdefined = set(idxcols) - set(self._IAMC_IDX) if len(userdefined) == 0: raise ValueError(f"idxcols={idxcols}: only for user defined idxcols") return filter_dict(self.indices, userdefined)
[docs] def resolve_idxcol_defaults(self, df: pd.DataFrame) -> pd.DataFrame: """Find missing IAMC indices and set them to the default value from config The IAMC format requires the following indices: `self._IAMC_IDX`; if any of them are missing, the corresponding index level is created, and the level values are set to a constant specified in the config. Parameters ---------- df : pandas.DataFrame Returns ------- pandas.DataFrame Dataframe with default index columns resolved """ defaults = filter_dict(self.indices, set(self._IAMC_IDX) - set(df.index.names)) return cast( pd.DataFrame, df.assign(**defaults).set_index(list(defaults), append=True) )
[docs] def iamcify(self, df: pd.DataFrame) -> pd.DataFrame: """Transform dataframe to match the IAMC (long) format""" useridxlvls = list(set(df.index.names) - set(self._IAMC_IDX)) # ensure all user defined index columns are removed before concatinating df = ( df.rename(columns={df.columns[0]: "value"}) .set_index("variable", append=True) .reset_index(useridxlvls, drop=True) ) df.index = df.index.reorder_levels(self._IAMC_IDX) return df
[docs] def agg_idxcol(self, df: pd.DataFrame, col: str, entry: Dict) -> List[pd.DataFrame]: """Aggregate values and generate IAMC dataframes Parameters ---------- df : pd.DataFrame Dataframe to aggregate from col : str Column to perform aggregation on entry : Dict Index entry with aggregation rules Returns ------- List[pd.DataFrame] List of IAMC dataframes """ dfs = [] for lvls, var in glom(entry["agg"][col], [(T.values(), tuple)]): rest = df.index.names.difference([col]) _df = cast( pd.DataFrame, df.query(f"{col} in @lvls").groupby(rest).sum().assign(variable=var), ) dfs.append(self.iamcify(_df)) return dfs
[docs] def agg_vals_all(self, entry: Dict) -> Tuple[str, List[str]]: """Find all values in index column that are present in an aggregate rule""" assert len(entry["agg"]) == 1, "only support aggregating one column" col, conf = entry["agg"].copy().popitem() vals = glom(conf, (Iter().map(T["values"]).flatten().all(), set, list)) return col, vals
def _match_item( self, item: Union[_path_t, Tuple[str, pd.DataFrame]] ) -> Union[None, Tuple[Dict, pd.DataFrame]]: """Match a file or dataframe to an index entry (internal method) Parameters ---------- item : Union[Union[str, Path], Tuple[str, pd.DataFrame]] The item to find in the index. A file is matched with the ``path`` key of the index entry, and the `key` of the dataframe is matched with the ``name`` key in the entry. Returns ------- Tuple[Dict, pandas.DataFrame] The dictionary is the index entry; when ``item`` is a file, the index entry is used to read the file into a dataframe, and in case of a dataframe, it is passed on transparently. """ if isinstance(item, tuple): match_key = "name" match_val = item[0] else: match_key = "path" match_val = f"{item}" # NOTE: res_from_entry requires: "path", "idxcols", "alias"; later # in the iteration, "iamc" & "agg" is required keys = [match_key, "idxcols", "alias", "iamc", "agg"] _entries = [ entry for entry in self.res_idx.records(keys) # convert to string for path comparison if f"{match_val}" == entry[match_key] ] if _entries: entry = _entries[0] if len(_entries) > 1: logger.warning(f"{entry[match_key]}: duplicate entries, picking first") else: return None if isinstance(item, tuple): df = item[1] else: df = to_df(res_from_entry(entry, self.basepath)) return entry, df
[docs] def to_df( self, files_or_dfs: Union[Iterable[_path_t], Dict[str, pd.DataFrame]] ) -> pd.DataFrame: """Convert CSV files/dataframes to IAMC format according to the index Parameters ---------- files_or_dfs : Union[Iterable[Union[str, Path]], Dict[str, pandas.DataFrame]] List of files or a dictionary of dataframes, to be collated and converted to IAMC format. Each item must have an entry in the package index the converter was initialised with, it is skipped otherwise. Files are matched by file ``path``, whereas dataframes match when the dictionary key matches the index entry ``name``. Note when the files are read, the basepath is set to whatever the converter was initialised with. If :meth:`IAMconv.from_file` was used, it is the parent directory of the index file. Returns ------- DataFrame A ``pandas.DataFrame`` in IAMC format """ dfs = [] if isinstance(files_or_dfs, dict): iterable = cast(Iterable, files_or_dfs.items()) else: iterable = files_or_dfs for item in iterable: match = self._match_item(item) if match is None: continue res = self.frames(*match) # match -> entry, dataframe dfs.append(res) df = pd.concat(chain.from_iterable(dfs), axis=0) if df.empty: logger.warning("empty data set, check config and index file") return df
[docs] def frames(self, entry: Dict, df: pd.DataFrame) -> List[pd.DataFrame]: """Convert the dataframe to IAMC format according to configuration in the entry Parameters ---------- entry : Dict Index entry df : pandas.DataFrame The dataframe that is to be converted to IAMC format Returns ------- List[pandas.DataFrame] List of ``pandas.DataFrame``s in IAMC format """ dfs = [] df = resolve_aliases(df, entry.get("alias", {})) df = self.resolve_idxcol_defaults(df) lvls = self.index_levels(df.index.names) if entry["agg"]: # None if not defined col, _agg_vals = self.agg_vals_all(entry) df_agg = cast(pd.DataFrame, df.query(f"{col} in @_agg_vals")) dfs.extend(self.agg_idxcol(df_agg, col, entry)) _vals = lvls[col].index # noqa: F841, used by query below df = cast(pd.DataFrame, df.query(f"{col} in @_vals")) # NOTE: need to remove aggregated levels, then calculate the # intersection with the levels that are in the current dataframe _lvls = { col: vals.loc[ vals.index.difference(_agg_vals).intersection( idx_lvl_values(df.index, col) ) ] for col, vals in lvls.items() } else: # NOTE: need to calculate the intersection of levels that are # in the current dataframe and the levels defined in the config _lvls = { col: vals.loc[vals.index.intersection(idx_lvl_values(df.index, col))] for col, vals in lvls.items() } if is_fmtstr(entry["iamc"]): sel = idxslice( df.index.names, {col: val.index for col, val in _lvls.items()}, ) df = df.loc[sel] df.index = df.index.remove_unused_levels() iamc_variable = pd.DataFrame( { col: df.index.get_level_values(col).map(val) for col, val in _lvls.items() }, index=df.index, ).apply(lambda r: entry["iamc"].format(**r.to_dict()), axis=1) else: iamc_variable = entry["iamc"] _df = self.iamcify(df.assign(variable=iamc_variable)) dfs.append(_df) return dfs
[docs] def to_csv( self, files: Iterable[_path_t], output: _path_t, wide: bool = False, ): """Write converted IAMC data frame to a CSV file Parameters ---------- files : Iterable[Union[str, Path]] List of files to collate and convert to IAMC output : Union[str, Path] (default: empty string) Path of the output CSV file; if empty, nothing is written to file. basepath : Union[str, Path] Data package base path wide : bool (default: False) Write the CSN in wide format (with years as columns) """ df = self.to_df(files) if wide: df = pyam.IamDataFrame(df) Path(output).parent.mkdir(parents=True, exist_ok=True) df.to_csv(output)