Source code for niklib.models.preprocessors.core

"""Contains core functionalities that is shared by all preprocessors.

"""

# core
from sklearn import model_selection
from sklearn.compose import ColumnTransformer
from sklearn.compose import make_column_selector
import pandas as pd
import numpy as np
# ours
from niklib.models.preprocessors import EXAMPLE_COLUMN_TRANSFORMER_CONFIG_X
from niklib.models.preprocessors import EXAMPLE_PANDAS_TRAIN_TEST_SPLIT
from niklib.models.preprocessors import EXAMPLE_TRAIN_TEST_EVAL_SPLIT
from niklib.models.preprocessors import TRANSFORMS
# helpers
from typing import Callable, Tuple, Optional, Any, List, Union
import inspect
import logging
import pathlib
import json


# configure logging
logger = logging.getLogger(__name__)


[docs] class TrainTestEvalSplit: """Convert a pandas dataframe to a numpy array for with train, test, and eval splits For conversion from :class:`pandas.DataFrame` to :class:`numpy.ndarray`, we use the same functionality as :meth:`pandas.DataFrame.to_numpy`, but it separates dependent and independent variables given the target column ``target_column``. Note: * To obtain the eval set, we use the train set as the original data to be splitted i.e. the eval set is a subset of train set. This is of course to make sure model by no means sees the test set. * ``args`` cannot be set directly and need to be provided using a json file. See :meth:`set_configs` for more information. * You can explicitly override following ``args`` by passing it as an argument to :meth:`__init__`: * :attr:`random_state` * :attr:`stratify` Returns: Tuple[:class:`numpy.ndarray`, ...]: Order is ``(x_train, x_test, x_eval, y_train, y_test, y_eval)`` """
[docs] def __init__( self, stratify: Any = None, random_state: Union[np.random.Generator, int] = None ) -> None: self.logger = logging.getLogger(logger.name + self.__class__.__name__) self.CONF = self.set_configs() # override configs if explicitly set self.random_state = random_state self.stratify = stratify
[docs] def set_configs(self, path: Union[str, pathlib.Path] = None) -> dict: """Defines and sets the config to be parsed The keys of the configs are the attributes of this class which are: * ``test_ratio`` (float): Ratio of test data * ``eval_ratio`` (float): Ratio of eval data * ``shuffle`` (bool): Whether to shuffle the data * ``stratify`` (:class:`numpy.ndarray`, optional): If not None, this is used to stratify the data * ``random_state`` (int, optional): Random state to use for shuffling Note: You can explicitly override following attributes by passing it as an argument to :meth:`__init__`: * :attr:`random_state` * :attr:`stratify` The values of the configs are parameters and can be set manually or extracted from JSON config files by providing the path to the JSON file. Args: path: path to the JSON file containing the configs Returns: dict: A dictionary of ``str``: ``Any`` pairs of configs as class attributes """ # convert str path to Path if isinstance(path, str): path = pathlib.Path(path) # if no json is provided, use the default configs if path is None: path = EXAMPLE_TRAIN_TEST_EVAL_SPLIT self.conf_path = path # log the path used self.logger.warning(f'Config file "{self.conf_path}" is being used') # read the json file with open(path, 'r') as f: configs = json.load(f) # set the configs if explicit calls made to this method self.CONF = configs # return the parsed configs return configs
[docs] def as_mlflow_artifact(self, target_path: Union[str, pathlib.Path]) -> None: """Saves the configs to the MLFlow artifact directory Args: target_path: Path to the MLFlow artifact directory. The name of the file will be same as original config file, hence, only provide path to dir. """ # convert str path to Path if isinstance(target_path, str): target_path = pathlib.Path(target_path) if self.conf_path is None: raise ValueError( 'Configs have not been set yet. Use set_configs to set them.') # read the json file with open(self.conf_path, 'r') as f: configs = json.load(f) # save the configs to the artifact directory target_path = target_path / self.conf_path.name with open(target_path, 'w') as f: json.dump(configs, f) self.logger.info(f'"{target_path}" saved as a artifact.')
[docs] def __call__( self, df: pd.DataFrame, target_column: str, *args: Any, **kwds: Any ) -> Tuple[np.ndarray, ...]: """Convert a pandas dataframe to a numpy array for with train, test, and eval splits Args: df (:class:`pandas.DataFrame`): Dataframe to convert target_column (str): Name of the target column Returns: Tuple[:class:`numpy.ndarray`, ...]: Order is ``(x_train, x_test, x_eval, y_train, y_test, y_eval)`` """ # get values from config test_ratio = self.CONF['test_ratio'] eval_ratio = self.CONF['eval_ratio'] shuffle = self.CONF['shuffle'] stratify = self.CONF['stratify'] random_state = self.CONF['random_state'] if self.random_state is None else self.random_state # separate dependent and independent variables y = df[target_column].to_numpy() x = df.drop(columns=[target_column], inplace=False).to_numpy() # create train and test data x_train, x_test, y_train, y_test = model_selection.train_test_split( x, y, train_size=None, test_size=test_ratio, shuffle=shuffle, stratify=stratify, random_state=random_state ) if eval_ratio == 0.: return (x_train, x_test, y_train, y_test) # create eval data from train data x_train, x_eval, y_train, y_eval = model_selection.train_test_split( x_train, y_train, train_size=None, test_size=eval_ratio, shuffle=shuffle, stratify=stratify, random_state=random_state ) return (x_train, x_test, x_eval, y_train, y_test, y_eval)
[docs] class PandasTrainTestSplit: """Split a pandas dataframe with train and test Note: This is a class very similar to :class:`TrainTestEvalSplit` with this difference that this class is specialized for Pandas Dataframe and since we are going to use augmentation on Pandas Dataframe rather than Numpy, then this class enable us to do augmentation only on train split and let the test part stay as it is. Note: * ``args`` cannot be set directly and need to be provided using a json file. See :meth:`set_configs` for more information. * You can explicitly override following ``args`` by passing it as an argument to :meth:`__init__`: * :attr:`random_state` * :attr:`stratify` Returns: Tuple[:class:`numpy.ndarray`, ...]: A tuple of ``(data_train, data_test)`` which contains both dependent and independent variables """
[docs] def __init__( self, stratify: Any = None, random_state: Union[np.random.Generator, int] = None ) -> None: self.logger = logging.getLogger(logger.name + self.__class__.__name__) self.CONF = self.set_configs() # override configs if explicitly set self.random_state = random_state self.stratify = stratify
[docs] def set_configs(self, path: Union[str, pathlib.Path] = None) -> dict: """Defines and sets the config to be parsed The keys of the configs are the attributes of this class which are: * ``train_ratio`` (float): Ratio of train data * ``shuffle`` (bool): Whether to shuffle the data * ``stratify`` (:class:`numpy.ndarray`, optional): If not None, this is used to stratify the data * ``random_state`` (int, optional): Random state to use for shuffling Note: You can explicitly override following attributes by passing it as an argument to :meth:`__init__`: * :attr:`random_state` * :attr:`stratify` The values of the configs are parameters and can be set manually or extracted from JSON config files by providing the path to the JSON file. Args: path: path to the JSON file containing the configs Returns: dict: A dictionary of ``str``: ``Any`` pairs of configs as class attributes """ # convert str path to Path if isinstance(path, str): path = pathlib.Path(path) # if no json is provided, use the default configs if path is None: path = EXAMPLE_PANDAS_TRAIN_TEST_SPLIT self.conf_path = path # log the path used self.logger.warning(f'Config file "{self.conf_path}" is being used') # read the json file with open(path, 'r') as f: configs = json.load(f) # set the configs if explicit calls made to this method self.CONF = configs # return the parsed configs return configs
[docs] def as_mlflow_artifact(self, target_path: Union[str, pathlib.Path]) -> None: """Saves the configs to the MLFlow artifact directory Args: target_path: Path to the MLFlow artifact directory. The name of the file will be same as original config file, hence, only provide path to dir. """ # convert str path to Path if isinstance(target_path, str): target_path = pathlib.Path(target_path) if self.conf_path is None: raise ValueError( 'Configs have not been set yet. Use set_configs to set them.') # read the json file with open(self.conf_path, 'r') as f: configs = json.load(f) # save the configs to the artifact directory target_path = target_path / self.conf_path.name with open(target_path, 'w') as f: json.dump(configs, f) self.logger.info(f'"{target_path}" saved as a artifact.')
[docs] def __call__( self, df: pd.DataFrame, target_column: str, *args: Any, **kwds: Any ) -> Tuple[pd.DataFrame, pd.DataFrame]: """Split a pandas dataframe with train and test splits Args: df (:class:`pandas.DataFrame`): Dataframe to convert target_column (str): Name of the target column Returns: Tuple[:class:`numpy.ndarray`, ...]: Order is ``(data_train, data_test)`` """ # get values from config train_ratio = self.CONF['train_ratio'] shuffle = self.CONF['shuffle'] stratify = self.CONF['stratify'] random_state = self.CONF['random_state'] if self.random_state is None else self.random_state # shuffle dataframe if shuffle: df = df.sample(frac=1, random_state=random_state) # stratify dataframe if stratify is not None: raise NotImplementedError( 'Stratify is not implemented yet. Sadge :(') # split dataframe into train and test by rows idx: int = int(len(df) * train_ratio) data_train: pd.DataFrame = df.iloc[:idx, :] data_test: pd.DataFrame = df.iloc[idx:, :] return (data_train, data_test)
[docs] class ColumnSelector: """Selects columns based on regex pattern and dtype User can specify the dtype of columns to select, and the dtype of columns to ignore. Also, user can specify the regex pattern for including and excluding columns, separately. This is particularly useful when combined with :class:`sklearn.compose.ColumnTransformer` to apply different sort of ``transformers`` to different subsets of columns. E.g:: # select columns that contain 'Country' in their name and are of type `np.float32` columns = preprocessors.ColumnSelector(columns_type='numeric', dtype_include=np.float32, pattern_include='.*Country.*', pattern_exclude=None, dtype_exclude=None)(df=data) # use a transformer for selected columns ct = preprocessors.ColumnTransformer( [('some_name', # just a name preprocessors.StandardScaler(), # the transformer columns), # the columns to apply the transformer to ], ) ct.fit_transform(...) Note: If the data that is passed to the :class:`ColumnSelector` is a :class:`pandas.DataFrame`, then you can ignore calling the instance of this class and directly use it in the pipeline. E.g:: # select columns that contain 'Country' in their name and are of type `np.float32` columns = preprocessors.ColumnSelector(columns_type='numeric', dtype_include=np.float32, pattern_include='.*Country.*', pattern_exclude=None, dtype_exclude=None) # THIS LINE # use a transformer for selected columns ct = preprocessors.ColumnTransformer( [('some_name', # just a name preprocessors.StandardScaler(), # the transformer columns), # the columns to apply the transformer to ], ) ct.fit_transform(...) See Also: :class:`sklearn.compose.make_column_selector` as ``ColumnSelector`` follows the same semantics. """
[docs] def __init__( self, columns_type: str, dtype_include: Any, pattern_include: Optional[str] = None, dtype_exclude: Any = None, pattern_exclude: Optional[str] = None ) -> None: """Selects columns based on regex pattern and dtype Args: columns_type (str): Type of columns: 1. ``'string'``: returns the name of the columns. Useful for :class:`pandas.DataFrame` 2. ``'numeric'``: returns the index of the columns. Useful for :class:`numpy.ndarray` dtype_include (type): Type of the columns to select. For more info see :meth:`pandas.DataFrame.select_dtypes`. pattern_include (str): Regex pattern to match columns to **include** dtype_exclude (type): Type of the columns to ignore. For more info see :meth:`pandas.DataFrame.select_dtypes`. Defaults to None. pattern_exclude (str): Regex pattern to match columns to **exclude** """ self.columns_type = columns_type self.pattern_include = pattern_include self.pattern_exclude = pattern_exclude self.dtype_include = dtype_include self.dtype_exclude = dtype_exclude
[docs] def __call__( self, df: pd.DataFrame, *args: Any, **kwds: Any ) -> Union[List[str], List[int]]: """ Args: df (:class:`pandas.DataFrame`): Dataframe to extract columns from Returns: Union[List[str], List[int]]: List of names or indices of filtered columns Raises: ValueError: If the ``df`` is not instance of :class:`pandas.DataFrame` """ if not isinstance(df, pd.DataFrame): raise TypeError(f'`df` must be a `DataFrame` not {type(df)}') # since `make_column_selector` will ignore pattern if None provide, # we need to set pattern (pattern_exclude) to sth pepega to ignore all columns pattern_exclude = '\~' if self.pattern_exclude is None else self.pattern_exclude # first select desired then select undesired columns_to_include = make_column_selector( dtype_include=self.dtype_include, dtype_exclude=self.dtype_exclude, pattern=self.pattern_include )(df) columns_to_exclude = make_column_selector( dtype_include=self.dtype_include, dtype_exclude=self.dtype_exclude, pattern=pattern_exclude )(df) # remove columns_to_exclude from columns_to_include columns = [ column for column in columns_to_include if column not in columns_to_exclude] # return columns based on columns_type (`columns` is already `string`) if self.columns_type == 'numeric': return [df.columns.get_loc(column) for column in columns] return columns
[docs] class ColumnTransformerConfig: """A helper class that parses configs for using the :class:`sklearn.compose.ColumnTransformer` The purpose of this class is to create the list of ``transformers`` to be used by the :class:`sklearn.compose.ColumnTransformer`. Hence, one needs to define the configs by using the :meth:`set_configs` method. Then use the :meth:`generate_pipeline` method to create the list of transformers. This class at the end, will return a list of tuples, where each tuple is a in the form of ``(name, transformer, columns)``. """
[docs] def __init__(self) -> None: self.logger = logging.getLogger(logger.name+self.__class__.__name__) self.CONF = self.set_configs()
[docs] def set_configs(self, path: Union[str, pathlib.Path] = None) -> dict: """Defines and sets the config to be parsed The keys of the configs are the names of the transformers. They must include the API name of one of the available transforms at the end: * sklearn transformers: Any class that could be used for transformation that is importable as ``sklearn.preprocessing.API_NAME`` * custom transformers: Any class that is not a ``sklearn`` transformer and is importable as ``niklib.models.preprocessors.API_NAME`` This naming convention is used to create proper transformers for each type of data. e.g in json format:: "age_StandardScaler": { "columns_type": "'numeric'", "dtype_include": "np.float32", "pattern_include": "'age'", "pattern_exclude": "None", "dtype_exclude": "None", "group": "False", "use_global": "False" } "sex_OneHotEncoder": { "columns_type": "'numeric'", "dtype_include": "'category'", "pattern_include": "'VisaResult'", "pattern_exclude": "None", "dtype_exclude": "None", "group": "True", "use_global": "True" } The values of the configs are the columns to be transformed. The columns can be obtained by using :class:`niklib.models.preprocessors.core.ColumnSelector` which requires user to pass certain parameters. This parameters can be set manually or extracted from JSON config files by providing the path to the JSON file. The ``group`` key is used to determine if the transformer should be applied considering a group of columns or not. If ``group`` is ``True``, then required values for transformation are obtained from all columns rather than handling each group separately. For instance, one can use ``OneHotEncoding`` on a set of columns where if ``group`` is ``True``, then all unique categories of all of those columns are extracted, then transformed. if ``group`` is ``False``, then each column will have be transformed based on their unique categories independently. (``group`` cannot be passed to :class:`ColumnSelector`) The ``use_global`` key is used to determine if the transformer should be applied considering the all data or train data (since fitting transformation for normalization need to be only done on *train* data). If ``use_global`` is ``True``, then the transformer will be applied on all data. This is particularly useful for one hot encoding categorical features where some categories might are rare and might only exist in test and eval data. Args: path: path to the JSON file containing the configs Returns: dict: A dictionary where keys are string names, values are tuple of :class:`niklib.models.preprocessors.core.ColumnSelector` instance and a boolean control variable which will be passed to :meth:`generate_pipeline`. """ # convert str path to Path if isinstance(path, str): path = pathlib.Path(path) # if no json is provided, use the default configs if path is None: path = EXAMPLE_COLUMN_TRANSFORMER_CONFIG_X self.conf_path = path # log the path used self.logger.warning(f'Config file "{self.conf_path}" is being used') # read the json file with open(path, 'r') as f: configs = json.load(f) # parse the configs (json files) parsed_configs = {} for key, value in configs.items(): parsed_values = {k: eval(v) for k, v in value.items()} # extract 'group'; if didn't exist in configs, then set it to False group = parsed_values.get('group', False) # remove 'group' from parsed_values after parsing it if 'group' in parsed_values: del parsed_values['group'] # extract 'use_global'; if didn't exist in configs, then set it to False use_global = parsed_values.get('use_global', False) # remove 'use_global' from parsed_values after parsing it if 'use_global' in parsed_values: del parsed_values['use_global'] parsed_configs[key] = ( ColumnSelector(**parsed_values), group, use_global ) # set the configs when explicit call to this method is made self.CONF = parsed_configs # return the parsed configs return parsed_configs
[docs] def as_mlflow_artifact(self, target_path: Union[str, pathlib.Path]) -> None: """Saves the configs to the MLFlow artifact directory Args: target_path: Path to the MLFlow artifact directory. The name of the file will be same as original config file, hence, only provide path to dir. """ # convert str path to Path if isinstance(target_path, str): target_path = pathlib.Path(target_path) if self.conf_path is None: raise ValueError( 'Configs have not been set yet. Use set_configs to set them.') # read the json file with open(self.conf_path, 'r') as f: configs = json.load(f) # save the configs to the artifact directory target_path = target_path / self.conf_path.name with open(target_path, 'w') as f: json.dump(configs, f) self.logger.info(f'"{target_path}" saved as a artifact.')
[docs] @staticmethod def extract_selected_columns( selector: ColumnSelector, df: pd.DataFrame ) -> Union[List[str], List[int]]: """Extracts the columns from the dataframe based on the selector Note: This method is simply a wrapper around :class:`niklib.models.preprocessors.core.ColumnSelector` that makes the call given a dataframe. I.e.:: # assuming same configs selector = preprocessors.ColumnSelector(...) A = ColumnTransformerConfig.extract_selected_columns(selector=selector, df=df) B = selector(df) A == B # True Also, this is a static method. Args: selector (:class:`niklib.models.preprocessors.core.ColumnSelector`): Initialized selector object df (:class:`pandas.DataFrame`): Dataframe to extract columns from Returns: Union[List[str], List[int]]: List of columns to be transformed """ return selector(df=df)
@staticmethod def __check_arg_exists(callable: Callable, arg: str) -> None: """Checks if the argument exists in the callable signature Args: callable (Callable): Callable to check the argument in arg (str): Argument to check if exists in the callable signature Raises: ValueError: If the argument does not exist in the callable signature """ # get the signature of the callable sig = inspect.signature(callable) # check if the argument exists in the signature if arg not in sig.parameters: raise ValueError( f'Argument "{arg}" does not exist in the "{callable}" signature') @staticmethod def __get_df_column_unique(df: pd.DataFrame, loc: Union[int, str]) -> list: """Gets uniques of a column in a dataframe Args: df (:class:`pandas.DataFrame`): Dataframe to get uniques from loc (Union[int, str]): Column to locate on the dataframe Returns: list: List of unique values in the column. Values of the returned list can be anything that is supported by :class:`pandas.DataFrame` """ # if loc is an int use iloc if isinstance(loc, int): return list(df.iloc[:, loc].unique()) # if loc is a str, use loc if isinstance(loc, str): return list(df.loc[:, loc].unique())
[docs] def calculate_params( self, df: pd.DataFrame, columns: List, group: bool, transformer_name: str ) -> dict: """Calculates the parameters for the group transformation w.r.t. the transformer name Args: df (:class:`pandas.DataFrame`): Dataframe to extract columns from columns (List): List of columns to be transformed group (bool): If True, then the columns will be grouped together and the parameters will be calculated over all columns passed in transformer_name (str): Name of the transformer. It is used to determine the type of params to be passed to the transformer. E.g. if ``transformer_name`` corresponds to ``OneHotEncoding``, then params would be unique categories. Raises: ValueError: If the transformer name is not implemented but supported Returns: dict: Parameters for the group transformation """ # the params to be returned params: dict = {} # if transformer is OneHotEncoder, extract the unique categories from all columns if transformer_name == 'OneHotEncoder': unique_values: list = [] # get all uniques from all columns if 'group' is True if group: for col in columns: unique_values.extend( self.__get_df_column_unique(df=df, loc=col) ) unique_values = list(set(unique_values)) else: # if columns is a list, then we assume that user wants default behavior # of OneHotEncoder and left finding categories to 3rd party library if len(columns) > 1: return {} # return empty params unique_values = self.__get_df_column_unique( df=df, loc=columns[0] ) # check correct arg to set given the transformer object signature transformer_arg = 'categories' self.__check_arg_exists( TRANSFORMS[transformer_name], transformer_arg) # set args appropriately # for OneHotEncoder, the arg is 'categories' params[transformer_arg] = [ unique_values for _ in range(len(columns)) ] else: if group: raise NotImplementedError( f'Group transformation param calculation' f'is not implemented for {transformer_name}') return params
[docs] def _check_overlap_in_transformation_columns( self, transformers: List[Tuple] ) -> None: """Checks if there are multiple transformers on the same columns and reports them Throw info if columns of different transformers overlap. I.e. at least another transform is happening on a column that is already has been transformed. Note: This is not a bug or misbehavior since we should be able to pipe multiple transformers sequentially on the same column (e.g. ``add`` -> ``divide``). The warning is thrown when user didn't meant to do so since the output might be acceptable but wrong values and there is no way to find out except manual inspection. Hence, this method will make the user aware that something might be wrong. Args: transformers (List[Tuple]): A list of tuples, where each tuple is a in the form of ``(name, transformer, columns)`` where ``name`` is the name of the transformer, ``transformer`` is the transformer object and ``columns`` is the list of columns names to be transformed. Todo: Should I also check if each list of column is a set? (no duplicate in same list) see https://stackoverflow.com/a/3697450/18971263 """ count = len(transformers) for i in range(count): for j in range(i+1, count): # (_ , _, columns) columns_a: List[Union[int, str]] = transformers[i][-1] # (_ , _, columns) columns_b: List[Union[int, str]] = transformers[j][-1] overlap: List[Union[int, str]] = list( set(columns_a).intersection(columns_b)) if len(overlap) > 0: # found overlap name_a: str = transformers[i][0] # (name, _, _) name_b: str = transformers[j][0] # (name, _, _) self.logger.info( f'transformer "{name_a}" is overlapping with\n' f' transformer "{name_b}" on columns {overlap}' )
[docs] def generate_pipeline( self, df: pd.DataFrame, df_all: Optional[pd.DataFrame] = None ) -> list: """Generates the list of transformers to be used by the :class:`sklearn.compose.ColumnTransformer` Note: For more info about how the transformers are created, see methods :meth:`set_configs`, :meth:`extract_selected_columns` and :meth:`calculate_params`. Args: df (:class:`pandas.DataFrame`): Dataframe to extract columns from if ``df_all`` is None, then this is interpreted as train data df_all (Optional[:class:`pandas.DataFrame`]): Dataframe to extract columns from if ``df_all`` is not None, then this is interpreted as entire data. For more info see :meth:`set_configs`. Raises: ValueError: If the naming convention used for the keys in the configs (see :meth:`set_configs`) is not followed. Returns: list: A list of tuples, where each tuple is a in the form of ``(name, transformer, columns)`` where ``name`` is the name of the transformer, ``transformer`` is the transformer object and ``columns`` is the list of columns names to be transformed. """ # just place holders for what we want name: str = '' # name of the transformer transformer: object = None # initialized sklearn transformer columns: List = [] # columns to transform # list of (name, transformer, columns) tuples to return transformers: List[Tuple] = [] # iterate through the configs to build transformer instances appropriately for key, value in self.CONF.items(): # value is tuple of (selector, group) selector: ColumnSelector = value[0] group: bool = value[1] use_global: bool = value[2] # extract transformer name transformer_name = key.split('_')[-1] if transformer_name in TRANSFORMS: name = key # extract list of columns names columns = self.extract_selected_columns( selector=selector, df=df ) # if group not false, extract group level transformation params group_params: dict = {} group_params = self.calculate_params( df=df_all if use_global else df, columns=columns, group=group, transformer_name=transformer_name ) # build transformer object transformer = TRANSFORMS[transformer_name](**group_params) else: raise ValueError(f'Unknown transformer {key} in config.') # add to the list of transformers transformers.append((name, transformer, columns)) self.logger.info( f'Transformer with name "{name}" has been constructed.' ) # throw logs if columns of different self.CONF overlap self._check_overlap_in_transformation_columns(transformers) return transformers
[docs] def get_transformed_feature_names( column_transformer: ColumnTransformer, original_columns_names: List[str], ) -> List[str]: """Gives feature names for transformed data via original feature names This is super useful as the default :meth:`sklearn.compose.ColumnTransformer.get_feature_names_out` uses meaningless names for features after transformation which makes tracking the transformed features almost impossible as it uses ``f0[_category], f1[_category], ... fn[_category]` as feature names. This method for example, extracts the name of original column ``A`` (with categories ``[a, b]``) before transformation and finds new columns after transforming that column and names them ``A_a`` and ``A_b`` meanwhile ``sklearn`` method gives ``x[num0]_a`` and ``x_[num0]_b``. Args: column_transformer (:class:`sklearn.compose.ColumnTransformer`): A **fitted** column transformer that has ``.transformers_`` where each is a tuple as ``(name, transformer, in_columns)``. ``in_columns`` used to detect the original index of transformed columns. original_columns_names (List[str]): List of original columns names before transformation Returns: List[str]: A list of transformed columns names prefixed with original columns names """ # build a dictionary of index:feature_name from untransformed dataset original_columns_dict: dict = {} # build index of transformed columns from transformers new_index: List[int] = [] for t in column_transformer.transformers_: new_index.extend(t[2]) # (name, transformer, **in_columns**) # build a mapping between original index of columns and their names for ni in new_index: original_columns_dict[ni] = original_columns_names[ni] # original_columns_dict = dict(sorted(original_columns_dict.items())) original_columns_dict = dict( sorted( original_columns_dict.items(), key=lambda x: x[0], reverse=True ) ) # replace idx with `original feature name` in `transformed feature names`` feature_names: List[str] = column_transformer.get_feature_names_out() new_feature_names: List[str] = [] for fn in feature_names: # reverse it so if we have '10', it does not get replaced with '1' and '0' first for k, v in original_columns_dict.items(): # if index of orig feature is in transformed name if str(k) in fn: # replace `x[num]` with original column names fn = fn.replace(f'x{k}', v) new_feature_names.append(fn) # we can have only one index, so go for next feature if u found one already break return new_feature_names
[docs] def move_dependent_variable_to_end( df: pd.DataFrame, target_column: str ) -> pd.DataFrame: """Move the dependent variable to the end of the dataframe This is useful for some frameworks that require the dependent variable to be the last or in general form, it is way easier to play with :class:`numpy.ndarray` s when the dependent variable is the last one. Note: This is particularly is useful for us since we have multiple columns of the same type in our dataframe, and when we want to apply same preprocessing to a all members of a group of features, we can directly use index of those features from our pandas dataframe in converted numpy array. E.g:: df = pd.DataFrame(...) x = df.to_numpy() index = df.columns.get_loc(a_group_of_columns_with_the_same_logic) x[:, index] = transform(x[:, index]) Args: df (:class:`pandas.DataFrame`): Dataframe to convert target_column (str): Name of the target column Returns: :class:`pandas.DataFrame`: Dataframe with the dependent variable at the end """ columns = df.columns.tolist() columns.pop(columns.index(target_column)) df = df[columns + [target_column]] return df