Source code for wikirepo.data.data_utils

"""
Data Utilities
--------------

Utility functions for querying data.

Contents
    _get_fxn_idx,
    _get_dir_fxns_dict,
    _check_data_assertions,
    _get_max_workers,
    incl_dir_idxs,
    gen_base_df,
    assign_to_column,
    gen_base_and_assign_to_column,
    assign_to_cols,
    gen_base_and_assign_to_cols,
    query_wd_prop,
    query_repo_dir,
    interp_by_subset,
    sum_df_prop_vals,
    split_col_val_dates,
    count_df_prop_vals
"""

import importlib
import inspect
import os
from ast import literal_eval

import numpy as np

# import modin.pandas as pd
import pandas as pd
from tqdm.auto import tqdm
from wikirepo import utils
from wikirepo.data import lctn_utils, time_utils, wd_utils


[docs]def _get_dir_fxns_dict(dir_name=None): """ Generates a jump table dictionary of all modules in the cwd and the get_ functions within. Notes ----- Indexes all data querying functions within wikirepo directories. Parameters ---------- dir_name : str (default=None) The name of the directory within wikirepo.data. Returns ------- fxns_dict : dict A dictionary with keys being module names and contents being dictionaries of standardized indexes and functions. """ data_directory = os.path.dirname(os.path.abspath(__file__)) target_directory = data_directory + "/" + dir_name modules = os.listdir(target_directory) target_modules = [m[:-3] for m in modules if (m[: len("__")] != "__")] try: import_path = ( "wikirepo." + target_directory.split("wikirepo/")[3].replace("/", ".") + "." ) except IndexError: try: import_path = ( "wikirepo." + target_directory.split("wikirepo/")[2].replace("/", ".") + "." ) except IndexError: import_path = ( "wikirepo." + target_directory.split("wikirepo/")[1].replace("/", ".") + "." ) fxns_dict = {} for mod in target_modules: script = importlib.import_module(import_path + mod) name_fxn_list = [ [fxn[0], getattr(script, fxn[0])] for fxn in inspect.getmembers(script, inspect.isfunction) ] indexed_fxn_dict = { n_f[0]: n_f[1] for n_f in name_fxn_list if n_f[0][: len("query_")] == "query_" } fxns_dict[mod] = indexed_fxn_dict return fxns_dict
[docs]def _check_data_assertions(timespan=None, interval=None, **kwargs): """ Checks standardized data assertions across functions given local functional arguments. Parameters ---------- timespan : two element tuple or list : contains datetime.date or tuple (default=None: (date.today(), date.today())) A tuple or list that defines the start and end dates to be queried. Note 1: if True, then the full timespan from 1-1-1 to the current day will be queried. Note 2: passing a single entry will query for that date only. interval : str The time interval over which queries will be made. Note 1: see data.time_utils for options. Note 2: if None, then only the most recent data will be queried. Returns ------- The results of a series of standardized assertions. """ assert (interval is None) or (interval in time_utils.incl_intervals()), ( "Please provide None for no time interval or a value for 'interval' from the following list of possible arguments: " + ", ".join(time_utils.incl_intervals()) + "." ) if timespan != None: assert ( interval != None ), "A 'timespan' has been provided, but no value for the 'interval' by which it should be segmented."
[docs]def _get_max_workers(multicore): if multicore == True: return None # the number of processors on the machine elif multicore == False: return 1 else: return multicore
[docs]def incl_dir_idxs(dir_name=None, descriptions=False): """ Returns the included indexes in the given directory - the file names of its scripts. Parameters ---------- dir_name : str (default=None) The name of the directory within wikirepo.data. descriptions : bool (default=False) Whether also return the descriptions of the indexes. Returns ------- included_indexes : list A list of included indexes as derived by module names. """ return list(_get_dir_fxns_dict(dir_name).keys())
[docs]def gen_base_df( locations=None, depth=None, timespan=None, interval=None, col_name="data" ): """ Generates a baseline dataframe to be filled with queried data. Parameters ---------- locations : str, list, or lctn_utils.LocationsDict (contains strs) : optional (default=None) The locations to query. depth : int (default=None) The depth from the given lbls or qids that data should go. Note: this uses 'P150' (contains administrative territorial entity). timespan : two element tuple or list : contains datetime.date or tuple (default=None: (date.today(), date.today())) A tuple or list that defines the start and end dates to be queried. Note 1: if True, then the full timespan from 1-1-1 to the current day will be queried. Note 2: passing a single entry will query for that date only. interval : str The time interval over which queries will be made. Note 1: see data.time_utils for options. Note 2: if None, then only the most recent data will be queried. col_name : str (default=data) The name of the column into which queried data should be merged. Returns ------- base_df : pd.DataFrame A df that is ready to have queried data added to it. """ if isinstance(locations, str): locations = [locations] if isinstance(locations, list): assert ( depth == 0 ), """The user has provided locations with depth 0, but the 'depth' argument is not 0. If a greater depth is required, use lctn_utils.gen_lctns_dict.""" elif isinstance(locations, lctn_utils.LocationsDict): if depth is not None: depth_check = lctn_utils.derive_depth(locations, depth=0) assert ( depth_check == depth ), "The given depth and the derived depth of the LocationsDict do not match. Please check the geographic depth you want to analyze." else: depth = lctn_utils.derive_depth(locations, depth=0) df_cols = lctn_utils.depth_to_cols(depth=depth) # qids that will have values assigned through a LocationsDict lookup. qid_cols = lctn_utils.depth_to_qid_cols(depth=depth) df_cols += qid_cols if interval: df_cols += [time_utils.interval_to_col_name(interval)] base_df = pd.DataFrame(columns=df_cols) for col in qid_cols: base_df[col] = base_df[col].astype(object) if depth == 0: base_df[lctn_utils.depth_to_col_name(depth=depth)] = locations current_depth = 0 current_qid_col = lctn_utils.depth_to_qid_col_name(depth=current_depth) if isinstance(locations, (lctn_utils.LocationsDict, dict)): current_depth_qids = [q for q in locations.keys()] elif isinstance(locations, list): current_depth_qids = [lctn_utils.lctn_lbl_to_qid(lctn) for lctn in locations] base_df[current_qid_col] = current_depth_qids # Assign labels for the above QIDs. if isinstance(locations, (lctn_utils.LocationsDict, dict)): base_df[lctn_utils.depth_to_col_name(depth=current_depth)] = [ list(lctn_utils.iter_key_items(node=locations, kv=q))[0]["lbl"] for q in current_depth_qids ] elif isinstance(locations, list): base_df[lctn_utils.depth_to_col_name(depth=current_depth)] = locations while current_depth < depth: assign_qid_col = lctn_utils.depth_to_qid_col_name(depth=current_depth + 1) assign_lbl_col = lctn_utils.depth_to_col_name(depth=current_depth + 1) for q in [qid for qid in current_depth_qids if qid != np.nan]: # Assign a list that will directly be exploded. key_items = list(lctn_utils.iter_key_items(node=locations, kv=q))[0] key_subs = list(lctn_utils.iter_key_items(node=key_items, kv="sub_lctns"))[ 0 ] key_sub_qids = list(key_subs.keys()) if key_sub_qids == []: # Assign to locations that don't have sub_lctns for pd.explode. key_sub_qids = np.nan base_df.at[ base_df[base_df[current_qid_col] == q].index[0], assign_qid_col ] = key_sub_qids base_df = base_df.explode(assign_qid_col) base_df.reset_index(drop=True, inplace=True) base_df[assign_qid_col] = base_df[assign_qid_col].astype(str) if isinstance(key_sub_qids, list): for sub_q in base_df.loc[base_df[current_qid_col] == q, assign_qid_col]: # For each sub_qid, assign the lbl. base_df.loc[ base_df[base_df[assign_qid_col] == sub_q].index[0], assign_lbl_col, ] = list(lctn_utils.iter_key_items(node=locations, kv=sub_q))[0][ "lbl" ] current_depth_qids = list(base_df[assign_qid_col]) current_qid_col = assign_qid_col current_depth += 1 if interval: time_col = time_utils.interval_to_col_name(interval=interval) if isinstance(locations, (lctn_utils.LocationsDict, dict)): # Find the valid times for the sub_lctn and assign them. final_sub_lctn_qid_col = lctn_utils.depth_to_qid_col_name(depth=depth) for q in base_df[final_sub_lctn_qid_col]: if q != "nan": # is str because of astype(str) key_items = list(lctn_utils.iter_key_items(node=locations, kv=q))[0] key_vts = list( lctn_utils.iter_key_items(node=key_items, kv="valid_timespan") )[0] base_df.at[ base_df[base_df[final_sub_lctn_qid_col] == q].index[0], time_col ] = key_vts base_df = base_df.explode(time_col) elif isinstance(locations, list): base_df[time_col] = [ time_utils.make_timespan(interval=interval, timespan=timespan) ] * len(base_df) base_df = base_df.explode(time_col) base_df = time_utils.truncate_date_col( df=base_df, col=time_col, interval=interval ) if col_name != None: base_df[col_name] = [np.nan] * len(base_df) # Drop all columns except for the last to allow for assignment. for col in qid_cols[:-1]: base_df.drop(col, axis=1, inplace=True) base_df = base_df.replace("nan", np.nan) base_df.reset_index(drop=True, inplace=True) return base_df
[docs]def assign_to_column( df=None, locations=None, depth=None, interval=None, col_name="data", props=None, assign="all", span=False, ): """ Assigns Wikidata property values to a designated column of a given df. Parameters ---------- df : pd.DataFrmae A df (likely base_df) to which values should be assigned. locations : str, list, or lctn_utils.LocationsDict (contains strs) : optional (default=None) The locations to query. depth : int (default=None) The depth from the given lbls or qids that data should go. Note: this uses 'P150' (contains administrative territorial entity). interval : str The time interval over which queries will be made. Note 1: see data.time_utils for options. Note 2: if None, then only the most recent data will be queried. col_name : str A column in df to which properties should be assigned. props : list or dict The properties to be assigned. assign : str (default=all) The type of assignment. span : bool (default=False) Whether to check for P580 'start time' and P582 'end time' to create spans. Returns ------- df : pd.DataFrame The df after assignment. """ if isinstance(locations, str): locations = [locations] if isinstance(locations, list): assert ( depth == 0 ), """The user has provided locations with depth 0, but the 'depth' argument is not 0. If a greater depth is required, use lctn_utils.gen_lctns_dict.""" elif isinstance(locations, lctn_utils.LocationsDict): if depth != None: depth_check = lctn_utils.derive_depth(locations, depth=0) assert ( depth_check == depth ), "The given depth and the derived depth of the LocationsDict do not match. Please check the geographic depth you want to analyze." # Column made up of QIDs for assignment. assignment_col = lctn_utils.depth_to_qid_col_name(depth=depth) if assign == "all": # Assign a value over rows by matching times. for q in df[assignment_col].unique(): if isinstance(q, str): # is a valid location. for t in props[q].keys(): if isinstance(props[q][t], list): # Multiple values to assign. df.loc[ df[ (df[assignment_col] == q) & (df[time_utils.interval_to_col_name(interval)] == t) ].index[0], col_name, ] = ", ".join(str(i) for i in props[q][t]) else: df.loc[ df[ (df[assignment_col] == q) & (df[time_utils.interval_to_col_name(interval)] == t) ].index[0], col_name, ] = props[q][t] elif assign == "most_recent": # interval and timespan are None # Assign the most recent value formatted with the date it's coming from. for q in df[assignment_col].unique(): if isinstance(q, str): # is a valid location if len(props[q].keys()) == 1: # Select the singular value even if it is 'no date'. assignment_times = list(props[q].keys()) else: # Select the documented times. assignment_times = sorted( [k for k in props[q].keys() if k != "no date"] )[::-1] if assignment_times == []: # There were multiple 'no date' values, so select the first. assignment_times = list(list(props[q].keys())[0]) most_recent_t = assignment_times[0] if isinstance(props[q][most_recent_t], list): # Multiple values to assign. if span: # We don't want the time for most recent span values. df.loc[ df.loc[df[assignment_col] == q].index, col_name ] = ", ".join(str(i) for i in props[q][most_recent_t]) else: df.loc[df.loc[df[assignment_col] == q].index, col_name] = ( ", ".join(str(i) for i in props[q][most_recent_t]) + f" ({most_recent_t})" ) else: if span: # We don't want the time for most recent span values. df.loc[df.loc[df[assignment_col] == q].index, col_name] = props[ q ][most_recent_t] else: df.loc[ df.loc[df[assignment_col] == q].index, col_name ] = f"{props[q][most_recent_t]} ({most_recent_t})" elif assign == "repeat": # Assign one value over multiple rows. for q in df[assignment_col].unique(): if isinstance(q, str): # is a valid location indexes_to_assign = df.loc[df[assignment_col] == q].index if isinstance(props[q], list): # Multiple values to assign. df.loc[indexes_to_assign, col_name] = [ ", ".join(str(i) for i in props[q]) ] * len(indexes_to_assign) else: df.loc[indexes_to_assign, col_name] = [props[q]] * len( indexes_to_assign ) else: valid_assigns = ["all", "most_recent", "repeat"] raise ValueError( "An invalid argument was passed to the 'assign' argument - please choose from one from " + ", ".join(valid_assigns) ) + "." df.replace(to_replace="nan", value=np.nan, inplace=True) # QID columns will be transferred for all properties, but all except one will be dropped. df.rename(columns={assignment_col: "qid"}, inplace=True) return df
[docs]def gen_base_and_assign_to_column( locations=None, depth=None, timespan=None, interval=None, col_name="data", props=None, assign=None, span=False, ): """ Combines data_utils.gen_base_df and data_utils.assign_to_column. """ df = gen_base_df( locations=locations, depth=depth, timespan=timespan, interval=interval, col_name=col_name, ) df = assign_to_column( df=df, locations=locations, depth=depth, interval=interval, col_name=col_name, props=props, assign=assign, span=span, ) return df
[docs]def assign_to_cols( df=None, locations=None, depth=None, sub_pid=None, interval=None, col_prefix="d", props=None, assign="all", span=False, ): """ Assigns Wikidata property values from a qualifier to a designated column of a given df. Parameters ---------- df : pd.DataFrmae A df (likely base_df) to which values should be assigned. locations : str, list, or lctn_utils.LocationsDict (contains strs) : optional (default=None) The locations to query. depth : int (default=None) The depth from the given lbls or qids that data should go. Note: this uses 'P150' (contains administrative territorial entity). sub_pid : str (default=None) The Wikidata property that subsets time values. interval : str The time interval over which queries will be made. Note 1: see data.time_utils for options. Note 2: if None, then only the most recent data will be queried. col_prefix : str (default=d) The prefix for columns that are a created from sub_prop values. props : list or dict The properties to be assigned. assign : str (default=all) The type of assignment. span : bool (default=False) Whether to check for P580 'start time' and P582 'end time' to create spans. Returns ------- df : pd.DataFrame The df after assignment. """ if isinstance(locations, str): locations = [locations] if isinstance(locations, list): assert ( depth == 0 ), """The user has provided locations with depth 0, but the 'depth' argument is not 0. If a greater depth is required, use lctn_utils.gen_lctns_dict.""" elif isinstance(locations, lctn_utils.LocationsDict): if depth != None: depth_check = lctn_utils.derive_depth(locations, depth=0) assert ( depth_check == depth ), "The given depth and the derived depth of the LocationsDict do not match. Please check the geographic depth you want to analyze." # Column made up of QIDs for assignment. assignment_col = lctn_utils.depth_to_qid_col_name(depth=depth) if assign == "all": # Assign a value over rows by matching times. for q in df[assignment_col]: if isinstance(q, str): # is a valid location. for t in props[q].keys(): # props[q][t] is a dictionary qualified values. for k in props[q][t].keys(): sub_col = col_prefix + "_" + k.replace(" ", "_").lower() if sub_col not in df.columns: df[sub_col] = [np.nan] * len(df) df.loc[ df[ (df[assignment_col] == q) & (df[time_utils.interval_to_col_name(interval)] == t) ].index[0], sub_col, ] = props[q][t][k] elif assign == "most_recent": # interval and timespan are None # Assign the most recent value formatted with the date it's coming from. for q in df[assignment_col]: if isinstance(q, str): # is a valid location if len(props[q].keys()) == 1: # Select the singular value even if it is 'no date'. assignment_times = list(props[q].keys()) else: # Select the documented times. assignment_times = sorted( [k for k in props[q].keys() if k != "no date"] )[::-1] if assignment_times == []: # There were multiple 'no date' values, so select the first. assignment_times = list(list(props[q].keys())[0]) most_recent_t = assignment_times[0] # props[q][most_recent_t] is a dictionary qualified values. for k in props[q][most_recent_t].keys(): sub_col = col_prefix + "_" + k.replace(" ", "_").lower() if sub_col not in df.columns: df[sub_col] = [np.nan] * len(df) if span == True and sub_pid == bool: # We don't want the date if it's a spanned boolean value. df.loc[df[(df[assignment_col] == q)].index[0], sub_col] = props[ q ][most_recent_t][k] else: df.loc[ df[(df[assignment_col] == q)].index[0], sub_col ] = f"{props[q][most_recent_t][k]} ({most_recent_t})" else: valid_assigns = ["all", "most_recent"] raise ValueError( "An invalid argument was passed to the 'assign' argument - please choose from one from " + ", ".join(valid_assigns) ) + "." df.replace(to_replace="nan", value=np.nan, inplace=True) # QID columns will be transferred for all properties, but all except one will be dropped. df.rename(columns={assignment_col: "qid"}, inplace=True) return df
[docs]def gen_base_and_assign_to_cols( locations=None, depth=None, sub_pid=None, timespan=None, interval=None, col_name=None, col_prefix="d", props=None, assign=None, span=False, ): """ Combines data_utils.gen_base_df and data_utils.assign_to_cols. """ df = gen_base_df( locations=locations, depth=depth, timespan=timespan, interval=interval, col_name=None, ) # col_name is None to prevent a data col df = assign_to_cols( df=df, locations=locations, depth=depth, sub_pid=sub_pid, interval=interval, col_prefix=col_prefix, # prefixed columns are instead assigned props=props, assign=assign, span=span, ) return df
[docs]def query_wd_prop( dir_name=None, ents_dict=None, locations=None, depth=None, timespan=None, interval=None, pid=None, sub_pid=None, col_name=None, col_prefix=None, ignore_char="", span=False, ): """ Queries a Wikidata property for the given continent(s). Parameters ---------- dir_name : str (default=None) The name of the directory within wikirepo.data. ents_dict : wd_utils.EntitiesDict : optional (default=None) A dictionary with keys being Wikidata QIDs and values being their entities. locations : str, list, or lctn_utils.LocationsDict (contains strs) : optional (default=None) The locations to query. depth : int (default=None) The depth from the given lbls or qids that data should go. Note: this uses 'P150' (contains administrative territorial entity). timespan : two element tuple or list : contains datetime.date or tuple (default=None: (date.today(), date.today())) A tuple or list that defines the start and end dates to be queried. Note 1: if True, then the full timespan from 1-1-1 to the current day will be queried. Note 2: passing a single entry will query for that date only. interval : str The time interval over which queries will be made. Note 1: see data.time_utils for options. Note 2: if None, then only the most recent data will be queried. pid : str (default=None) The Wikidata property that is being queried. sub_pid : str (default=None) The Wikidata property that subsets time values. col_name : str (default=None) The name of the column into which queried data should be merged. col_prefix : str (default=None) The prefix for columns that are a created from sub_pid values. Note: only use col_name or col_prefix. ignore_char : str Characters in the output that should be ignored. span : bool (default=False) Whether to check for P580 'start time' and P582 'end time' to create spans. Returns ------- df, ents_dict : pd.DataFrame, wd_utils.EntitiesDict A df of location names and the given property for the given timespan with an updated EntitiesDict. """ if ents_dict is None: ents_dict = wd_utils.EntitiesDict() if isinstance(locations, str): locations = [locations] if isinstance(locations, list): qids = [ lctn_utils.lctn_lbl_to_qid(lctn) if not wd_utils.is_wd_id(lctn) else lctn for lctn in locations ] elif isinstance(locations, lctn_utils.LocationsDict): qids = locations.get_keys_at_depth(depth) elif isinstance(locations, dict): qids = lctn_utils.get_qids_at_depth(lctns_dict=locations, depth=depth) qids = utils._make_var_list(qids)[0] if col_prefix is None: t_to_p_dict = wd_utils.t_to_prop_val_dict( dir_name=dir_name, ents_dict=ents_dict, qids=qids, pid=pid, sub_pid=sub_pid, timespan=timespan, interval=interval, ignore_char=ignore_char, span=span, ) # Assignment via a single column col_name. if interval is not None: df = gen_base_and_assign_to_column( locations=locations, depth=depth, timespan=timespan, interval=interval, col_name=col_name, props=t_to_p_dict, assign="all", span=span, ) else: df = gen_base_and_assign_to_column( locations=locations, depth=depth, timespan=timespan, interval=interval, col_name=col_name, props=t_to_p_dict, assign="most_recent", span=span, ) # to remove the time from span props else: t_to_p_dict = wd_utils.t_to_prop_val_dict_dict( dir_name=dir_name, ents_dict=ents_dict, qids=qids, pid=pid, sub_pid=sub_pid, timespan=timespan, interval=interval, ignore_char=ignore_char, span=span, ) # Assignment via generated columns prefixed as col_prefix. if interval is not None: df = gen_base_and_assign_to_cols( locations=locations, depth=depth, sub_pid=sub_pid, timespan=timespan, interval=interval, col_name=col_name, # col_name is None to disable single column col_prefix=col_prefix, props=t_to_p_dict, assign="all", span=span, ) else: df = gen_base_and_assign_to_cols( locations=locations, depth=depth, sub_pid=sub_pid, timespan=timespan, interval=interval, col_name=col_name, # col_name is None to disable single column col_prefix=col_prefix, props=t_to_p_dict, assign="most_recent", span=span, ) # to remove the time from boolean span props return df, ents_dict
[docs]def query_repo_dir( dir_name=None, ents_dict=None, locations=None, depth=None, timespan=None, interval=None, verbose=True, **kwargs, ): """ Generates a df of statistics for given a psk directory and geographic as well as time intervals. Parameters ---------- dir_name : str (default=None) The name of the directory within wikirepo.data. ents_dict : wd_utils.EntitiesDict : optional (default=None) A dictionary with keys being Wikidata QIDs and values being their entities. locations : str, list, or lctn_utils.LocationsDict (contains strs) : optional (default=None) The locations to query. depth : int (default=None) The depth from the given lbls or qids that data should go. Note: this uses 'P150' (contains administrative territorial entity). timespan : two element tuple or list : contains datetime.date or tuple (default=None: (date.today(), date.today())) A tuple or list that defines the start and end dates to be queried. Note 1: if True, then the full timespan from 1-1-1 to the current day will be queried. Note 2: passing a single entry will query for that date only. interval : str The time interval over which queries will be made. Note 1: see data.time_utils for options. Note 2: if None, then only the most recent data will be queried. verbose : bool (default=True) Whether to show a tqdm progress bar for the query. Returns ------- df_data : pd.DataFrame A df of locations and data given timespan and demographic index arguments. """ local_args = locals() _check_data_assertions(**literal_eval(str(local_args))) modules_to_query = [ arg for arg in list(local_args["kwargs"].keys()) if arg in incl_dir_idxs(dir_name) and local_args["kwargs"].get(arg, False) == True ] df_data = None for mod in tqdm(modules_to_query, desc=dir_name.capitalize(), disable=not verbose): module_fxns = _get_dir_fxns_dict(dir_name)[mod] query_fxn = [ f for f in list(module_fxns.keys()) if f[: len("query_")] == "query_" ][ 0 ] # there can only be one per module if df_data is None: df_data, ents_dict = module_fxns[query_fxn]( dir_name=dir_name, ents_dict=ents_dict, locations=locations, depth=depth, timespan=timespan, interval=interval, ) else: if interval: merge_on = lctn_utils.depth_to_cols(depth) + [ time_utils.interval_to_col_name(interval) ] else: merge_on = lctn_utils.depth_to_cols(depth) df_props, ents_dict = module_fxns[query_fxn]( dir_name=dir_name, ents_dict=ents_dict, locations=locations, depth=depth, timespan=timespan, interval=interval, ) df_data = pd.merge(df_data, df_props, on=merge_on) return df_data, ents_dict
[docs]def interp_by_subset(df=None, depth=None, col_name="data", **kwargs): """ Subsets a df by a given geo_lvl and interpolates the given column. Notes ----- pd.DataFrame.interpolate and scipy.interpolate **kwargs are passed. Parameters ---------- df : pd.DataFrame (default=None) A dataframe to have a column interpolated. depth : int (default=None) The depth from the given lbls or qids that data should go. Note: this uses 'P150' (contains administrative territorial entity). col_name : str A column in df that is to be interpolated. Returns ------- df_interpolated : pd.DataFrame The original df with the given column interpolated based on **kwargs. """ df_interpolated = pd.DataFrame() unique_lctns = list(df[lctn_utils.depth_to_col_name(depth)].unique()) if ( "method" in kwargs.keys() and "limit_direction" in kwargs.keys() and kwargs["method"] == "pad" and kwargs["limit_direction"] == "both" ): for lctn in unique_lctns: df_subset = df[df[lctn_utils.depth_to_col_name(depth)] == lctn].copy() unique_val = [ val for val in df_subset[col_name].unique() if isinstance(val, str) or not np.isnan(val) ][0] df_subset[col_name] = [unique_val] * len(df_subset) df_interpolated = df_interpolated.append(df_subset) else: for lctn in unique_lctns: df_subset = df[df[lctn_utils.depth_to_col_name(depth)] == lctn].copy() df_subset.loc[:, col_name].interpolate(inplace=True, **kwargs) df_interpolated = df_interpolated.append(df_subset) return df_interpolated
[docs]def sum_df_prop_vals( df=None, target_lctn=None, vals_lctn=None, lctn_col=None, time_col=None, prop_col=None, subtract=False, drop_vals_lctn=False, ): """ Adds or subtracts values in a dataframe for given locations. Parameters ---------- df : pd.DataFrame (default=None) A dataframe with property rows to be summed. target_lctn : str (default=None) The name of the location to which values should be added. Note: subtract=True subtracts from this location. vals_lctn : str (default=None) The name of the location that has values to be added or subtracted. lctn_col : str (default=None) The name of the column in which the locations are defined. time_col : str (default=None) The name of the column in which times are defined. prop_col : str (default=None) The name of the column in which property values are found. subtract : bool (default=False) Whether values from vals_lctn should be subtracted from target_lctn. drop_vals_lctn : bool (default=False) Whether rows with vals_lctn should be dropped from df. Returns ------- df_new : pd.DataFrame The dataframe post arithmetic operations. """ df_new = df.copy() if time_col: for t in df_new[time_col].unique(): if subtract: df_new.loc[ df_new[ (df_new[lctn_col] == target_lctn) & (df_new[time_col] == t) ].index, prop_col, ] = ( df_new.loc[ df_new[ (df_new[lctn_col] == target_lctn) & (df_new[time_col] == t) ].index, prop_col, ].values[0] - df_new.loc[ df_new[ (df_new[lctn_col] == vals_lctn) & (df_new[time_col] == t) ].index, prop_col, ].values[0] ) else: df_new.loc[ df_new[ (df_new[lctn_col] == target_lctn) & (df_new[time_col] == t) ].index, prop_col, ] = ( df_new.loc[ df_new[ (df_new[lctn_col] == target_lctn) & (df_new[time_col] == t) ].index, prop_col, ].values[0] + df_new.loc[ df_new[ (df_new[lctn_col] == vals_lctn) & (df_new[time_col] == t) ].index, prop_col, ].values[0] ) else: if subtract: df_new.loc[df_new[df_new[lctn_col] == target_lctn].index, prop_col] = ( df_new.loc[ df_new[df_new[lctn_col] == target_lctn].index, prop_col ].values[0] - df_new.loc[ df_new[df_new[lctn_col] == vals_lctn].index, prop_col ].values[0] ) else: df_new.loc[df_new[df_new[lctn_col] == target_lctn].index, prop_col] = ( df_new.loc[ df_new[df_new[lctn_col] == target_lctn].index, prop_col ].values[0] + df_new.loc[ df_new[df_new[lctn_col] == vals_lctn].index, prop_col ].values[0] ) if drop_vals_lctn: df_new = df_new[df_new[lctn_col] != vals_lctn] return df_new
[docs]def split_col_val_dates(df=None, col=None): """ Adds or subtracts values in a dataframe for given locations. Parameters ---------- df : pd.DataFrame (default=None) A dataframe with property rows to be summed. col : str (default=None) The name of the column which should have its dates split to another column. Returns ------- df_new : pd.DataFrame The dataframe post splitting the date from the values. """ df_new = df.copy() col_index = df_new.columns.get_loc(col) df_new[f"{col}_date"] = [ val.split(" (")[1] if val != np.nan else np.nan for val in df_new[col] ] df_new[col] = [ val.split(" (")[0] if val != np.nan else np.nan for val in df_new[col] ] df_new[f"{col}_date"] = [ d.replace(")", "") if d != np.nan else np.nan for d in df_new[f"{col}_date"] ] df_new[col] = [utils.round_if_int(utils.try_float(val)) for val in df_new[col]] cols = list(df_new.columns) cols.pop(df_new.columns.get_loc(f"{col}_date")) cols.insert(col_index + 1, f"{col}_date") df_new = df_new[cols] return df_new
[docs]def count_df_prop_vals(df=None, col=None, percent=False): """ Returns value counts of df columns sorted alphabetically. Parameters ---------- df : pd.DataFrame (default=None) Regional data including population size. col : str (default=None) The column in df in which counts should be made. percent : bool (default=False) Whether to return percentage values. Returns ------- val_counts or pd.value_counts : dict or pd.value_counts Aggregate or percentage value counts. """ assert col in [s for s in df.columns], f"{col} is not a column in the data." if percent: return df[col].value_counts().sort_index() / len(df) else: return df[col].value_counts().sort_index()