# -*- coding: utf-8 -*-
"""
A dataprocessing package for data preprocess and feature engineering.
This library contains preprocessing methods for data processing
and feature engineering used during data analysis and machine learning
process.
"""
import itertools
import os
import warnings
import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from sklearn.ensemble import IsolationForest, RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import cross_val_score, StratifiedKFold
from sklearn.pipeline import make_pipeline
from sklearn.utils.validation import check_is_fitted
__all__ = [
'DropColumns',
'DropNoVariance',
'DropHighCardinality',
'DropLowAUC',
'DropHighCorrelation',
'ImputeNaN',
'OneHotEncoding',
'BinarizeNaN',
'CountRowNaN',
'ClipData',
'GroupRareCategory',
'TargetMeanEncoding',
'StandardScaling',
'MinMaxScaling',
'CountEncoding',
'RankedCountEncoding',
'FrequencyEncoding',
'RankedTargetMeanEncoding',
'AppendAnomalyScore',
'AppendCluster',
'AppendClusterDistance',
'AppendPrincipalComponent',
'AppendArithmeticFeatures', # from 1.2
'RankedEvaluationMetricEncoding', # from 1.2
'AppendClassificationModel', # from 1.2
'AppendEncoder', # from 1.2
'AppendClusterTargetMean', # from 1.2
'PermutationImportanceTest', # from 1.2
'UnionAppend', # from 1.3
'load_titanic', # from 1.3
]
def _check_X(X):
if isinstance(X, pd.DataFrame):
pass
else:
raise TypeError("Input X is not a pandas DataFrame.")
def _check_y(y):
if isinstance(y, pd.Series):
pass
else:
raise TypeError("Input y is not a pandas Series.")
def _check_X_y(X, y):
_check_X(X)
_check_y(y)
if X.shape[0] != y.shape[0]:
raise ValueError("Number of rows are different between X and y.")
def _check_duplicate(list_):
return len(list_) != len(set(list_))
def _check_fit(list1, list2):
if set(list1) == set(list2):
pass
elif _check_duplicate(list1) or _check_duplicate(list2):
raise ValueError("There are features with duplicate name.")
else:
raise ValueError("Columns are different from when fitted. For\
preprocess with model transforming such as Isolation\
Forest or KMeans, it require the columns to be same.")
def _check_binary(y):
if len(y.unique()) == 2:
pass
else:
raise Exception("This class can only be used for Binary Classification")
def _check_method_implemented(model, method_str):
if method_str in dir(model):
pass
else:
raise Exception(method_str + ' is not implemented in the specified model')
[docs]def load_titanic():
"""
Load train and test data for titanic datasets.
:return: train_features, train_target, test_features
:rtype: pandas.DataFrame, pandas.Series, pandas.DataFrame
"""
path = os.path.dirname(__file__)
df = pd.read_csv(path + '/datasets/titanic_train.csv')
X_test = pd.read_csv(path + '/datasets/titanic_test.csv')
X = df.drop('Survived', axis=1)
y = df['Survived']
return X, X_test, y
[docs]class DropColumns(BaseEstimator, TransformerMixin):
"""
Simply delete columns specified from input dataframe.
:param list drop_columns: List of feature names which will be droped \
from input dataframe. For single columns, string can also be used.\
(default=None)
"""
def __init__(self, drop_columns=None):
self.drop_columns = drop_columns
[docs] def fit(self, X, y=None):
"""
Fit transformer by checking X is a pandas DataFrame.
:param pandas.DataFrame X: Input dataframe
:param pandas.Series y: Ignored. (default=None)
:return: fitted object (self)
:rtype: object
"""
_check_X(X)
if isinstance(self.drop_columns, list):
for col in self.drop_columns:
if col in X.columns:
pass
else:
raise Exception("Specified columns are not in the input.")
else:
if self.drop_columns in X.columns:
pass
else:
raise Exception("Specified column is not in the input.")
self.is_fitted_ = X.columns.shape
return self
[docs]class DropNoVariance(BaseEstimator, TransformerMixin):
"""
Delete columns which only have single unique value.
"""
def __init__(self):
pass
[docs] def fit(self, X, y=None):
"""
Fit transformer by deleting column with single unique value.
:param pandas.DataFrame X: Input dataframe
:param pandas.Series y: Ignored. (default=None)
:return: fitted object (self)
:rtype: object
"""
_check_X(X)
self.drop_columns_ = None
for feature in X.columns:
if X[feature].unique().shape[0] == 1:
if self.drop_columns_ is None:
self.drop_columns_ = [feature]
else:
self.drop_columns_ = np.append(self.drop_columns_, feature)
return self
[docs]class DropHighCardinality(BaseEstimator, TransformerMixin):
"""
Delete columns with high cardinality.
Basically means dropping column with too many categories.
:param int max_categories: Maximum number of categories to be permitted\
in a column. If number of categories in a certain column exceeds this\
value, that column will be deleted. (default=50)
"""
def __init__(self, max_categories=50):
self.max_categories = max_categories
[docs] def fit(self, X, y=None):
"""
Fit transformer by deleting column with high cardinality.
:param pandas.DataFrame X: Input dataframe
:param pandas.Series y: Ignored. (default=None)
:return: fitted object (self)
:rtype: object
"""
_check_X(X)
self.drop_columns_ = None
cat_columns = X.select_dtypes(exclude='number').columns
for feature in cat_columns:
if X[feature].unique().shape[0] >= self.max_categories:
if self.drop_columns_ is None:
self.drop_columns_ = [feature]
else:
self.drop_columns_ = np.append(self.drop_columns_, feature)
return self
[docs]class DropLowAUC(BaseEstimator, TransformerMixin):
"""
Delete columns that have low information to predict target variable.\
This class calculate roc_auc by fitting all features in the input\
array one by one against target feature using Logistic Regression\
and drop features with roc_auc below threshold specified.\
Missing values will be replaced by mean and categorical feature \
will be converted to dummy variables by one hot encoding with missing\
values filled with mode.
:param float threshold: Threshold value for roc_auc. Feature with roc_auc \
below this value will be deleted. (default=0.51)
"""
def __init__(self, threshold=0.51):
self.threshold = threshold
[docs] def fit(self, X, y=None):
"""
Fit transformer by fitting each feature with Logistic \
Regression and storing features with roc_auc less than threshold
:param pandas.DataFrame X: Input dataframe
:param pandas.Series y: Input Series for target variable
:return: fitted object (self)
:rtype: object
"""
_check_X_y(X, y)
_check_binary(y)
self.drop_columns_ = None
cv = StratifiedKFold(n_splits=5)
lr = LogisticRegression(penalty='l2', solver='lbfgs')
for feature in X.columns:
X_lr = X[[feature]]
if X_lr.dtypes[0] == 'object':
mode = X_lr.mode()
X_lr = X_lr.fillna(mode)
X_lr = pd.get_dummies(X_lr)
else:
mean = X_lr.mean()
X_lr = X_lr.fillna(mean)
roc_auc = cross_val_score(lr, X_lr, y, cv=cv).mean()
if roc_auc < self.threshold:
if self.drop_columns_ is None:
self.drop_columns_ = [feature]
else:
self.drop_columns_ = np.append(self.drop_columns_, feature)
return self
[docs]class DropHighCorrelation(BaseEstimator, TransformerMixin):
"""
Delete features that are highly correlated to each other.\
Best correlated feature against target variable will be\
selected from the highly correlated pairs within X.
:param float threshold: Threshold value for Pearson's correlation \
coefficient. (default=0.95)
"""
def __init__(self, threshold=0.95):
self.threshold = threshold
[docs] def fit(self, X, y=None):
"""
Fit transformer by identifying highly correlated variable pairs\
and dropping one that is less correlated to the target variable.\
Missing values will be imputed by mean.
:param pandas.DataFrame X: Input dataframe
:param pandas.Series y: Ignored. (default=None)
:return: fitted object (self)
:rtype: object
"""
_check_X_y(X, y)
self.drop_columns_ = None
Xm = X.corr()
pairs = []
for feature in Xm.columns:
pair = Xm[feature][abs(Xm[feature]) >= self.threshold].index.tolist()
if len(pair) > 1:
pairs.append(pair)
unique_pairs = pd.DataFrame(pairs).drop_duplicates().to_numpy()
for pair in unique_pairs:
pearsons = []
for col in pair:
if col is not None:
pearson = np.corrcoef(X[col].fillna(X[col].mean()), y)[0][1]
pearsons.append(abs(pearson))
best_col = pair[pearsons.index(max(pearsons))]
for col in pair:
if col != best_col and col is not None:
if self.drop_columns_ is None:
self.drop_columns_ = col
else:
self.drop_columns_ = np.append(self.drop_columns_, col)
self.drop_columns_ = np.unique(self.drop_columns_)
return self
[docs]class ImputeNaN(BaseEstimator, TransformerMixin):
"""
Look for NaN values in the dataframe and impute by\
strategy such as mean, median and mode.
:param string cat_strategy: Strategy for imputing NaN exist in categorical\
columns. If any other string apart from mode is specified, the\
NaN will be imputed by fixed string name ImputedNaN. (default='mode')
:param string num_strategy: Strategy for imputing NaN exist in numerical\
columns. Either mean, median or mode can be specified and if any\
other string is specified, mean imputation will be employed. (default='mean')
"""
def __init__(self, cat_strategy='mode', num_strategy='mean'):
self.cat_strategy = cat_strategy
self.num_strategy = num_strategy
[docs] def fit(self, X, y=None):
"""
Fit transformer by identifying numerical and categorical\
columns. Then, based on the strategy fit will store the\
values used for NaN existing in each columns.
:param pandas.DataFrame X: Input dataframe
:param pandas.Series y: Ignored. (default=None)
:return: fitted object (self)
:rtype: object
"""
_check_X(X)
self.num_columns_ = X.select_dtypes('number').columns
self.cat_columns_ = X.select_dtypes(exclude='number').columns
self.num_imputes_ = {}
self.cat_imputes_ = {}
for col in self.num_columns_:
if self.num_strategy == 'mean':
self.num_imputes_[col] = X[col].mean()
elif self.num_strategy == 'median':
self.num_imputes_[col] = X[col].median()
elif self.num_strategy == 'mode':
self.num_imputes_[col] = X[col].mode()[0]
else:
self.num_imputes_[col] = X[col].mean()
for col in self.cat_columns_:
if self.cat_strategy == 'mode':
self.cat_imputes_[col] = X[col].mode()[0]
else:
self.cat_imputes_[col] = 'ImputedNaN'
return self
[docs]class OneHotEncoding(BaseEstimator, TransformerMixin):
"""
One Hot Encoding of categorical variables.
:param bool drop_first: Whether to drop first column after one \
hot encoding in order to avoid multi-collinearity. (default=True)
"""
def __init__(self, drop_first=True):
self.drop_first = drop_first
[docs] def fit(self, X, y=None):
"""
Fit transformer by getting column names after\
one hot encoding.
:param pandas.DataFrame X: Input dataframe
:param pandas.Series y: Ignored. (default=None)
:return: fitted object (self)
:rtype: object
"""
_check_X(X)
self.dummy_cols_ = pd.get_dummies(X, drop_first=self.drop_first).columns
self.cat_columns_ = X.select_dtypes(exclude='number').columns
return self
[docs]class BinarizeNaN(BaseEstimator, TransformerMixin):
"""
Find a column with missing values, and create a new\
column indicating whether a value was missing (0) or\
not (1).
"""
def __init__(self):
pass
[docs] def fit(self, X, y=None):
"""
Fit transformer by getting column names that\
contains NaN
:param pandas.DataFrame X: Input dataframe
:param pandas.Series y: Ignored. (default=None)
:return: fitted object (self)
:rtype: object
"""
_check_X(X)
nan_info = X.isna().sum()
self.nan_columns_ = nan_info[nan_info != 0].index
return self
[docs]class CountRowNaN(BaseEstimator, TransformerMixin):
"""
Calculates total number of NaN in a row and create
a new column to store the total.
"""
def __init__(self):
pass
[docs] def fit(self, X, y=None):
"""
Fit transformer by getting column names during fit.
:param pandas.DataFrame X: Input dataframe
:param pandas.Series y: Ignored. (default=None)
:return: fitted object (self)
:rtype: object
"""
_check_X(X)
self.cols_ = X.columns
return self
[docs]class ClipData(BaseEstimator, TransformerMixin):
"""
Clip datasets by replacing values larger than\
the upper bound with upper bound and lower than \
the lower bound by lower bound. Missing values will\
be ignored.
:param float threshold: Threshold value for to define upper and \
lower bound. For example, 0.99 will imply upper bound at 99% percentile\
annd lower bound at 1% percentile. (default=0.99)
"""
def __init__(self, threshold=0.99):
self.threshold = threshold
[docs] def fit(self, X, y=None):
"""
Fit transformer to get upper bound and lower bound for\
numerical columns.
:param pandas.DataFrame X: Input dataframe
:param pandas.Series y: Ignored. (default=None)
:return: fitted object (self)
:rtype: object
"""
_check_X(X)
self.num_columns_ = X.select_dtypes('number').columns
self.upperbounds_ = {}
self.lowerbounds_ = {}
for col in self.num_columns_:
self.upperbounds_[col], self.lowerbounds_[col] = np.percentile(
X[col].dropna(), [100-self.threshold*100, self.threshold*100])
return self
[docs]class GroupRareCategory(BaseEstimator, TransformerMixin):
"""
Replace rare categories that appear in categorical columns\
with dummy string.
:param float threshold: Threshold value for defining "rare"\
category. For example, 0.01 will imply 1% of the total number\
of data as "rare". (default=0.01)
"""
def __init__(self, threshold=0.01):
self.threshold = threshold
[docs] def fit(self, X, y=None):
"""
Fit transformer to define and store rare categories\
to be replaced.
:param pandas.DataFrame X: Input dataframe
:param pandas.Series y: Ignored. (default=None)
:return: fitted object (self)
:rtype: object
"""
_check_X(X)
self.cat_columns_ = X.select_dtypes(exclude='number').columns
self.rare_categories_ = {}
for col in self.cat_columns_:
catcounts = X[col].value_counts(ascending=False)
rare_categories = catcounts[catcounts <=
catcounts.sum() * self.threshold].index.tolist()
self.rare_categories_[col] = rare_categories
return self
[docs]class TargetMeanEncoding(BaseEstimator, TransformerMixin):
"""
Target Mean Encoding of categorical variables. Missing\
values will be treated as one of the categories.
:param float k: hyperparameter for sigmoid function (default=0.0)
:param float f: hyperparameter for sigmoid function (default=1.0)
:param float smoothing: Whether to smooth target mean with global mean using\
sigmoid function. Do not recommend smoothing=False. (default=0.01)
"""
def __init__(self, k=0, f=1, smoothing=True):
self.k = k
self.f = f
self.smoothing = smoothing
def _sigmoid(self, count, k, f):
return 1 / (1 + np.exp(- (count - k) / f))
[docs] def fit(self, X, y=None):
"""
Fit transformer to define and store target mean \
smoothed target mean for categorical variables.
:param pandas.DataFrame X: Input dataframe
:param pandas.Series y: Input Series for target variable
:return: fitted object (self)
:rtype: object
"""
_check_X_y(X, y)
target = y.name
global_mean = y.mean()
self.global_mean_ = global_mean
sigmoid = np.vectorize(self._sigmoid)
self.cat_columns_ = X.select_dtypes(exclude='number').columns
self.dic_target_mean_ = {}
for col in self.cat_columns_:
df = pd.concat([X[col], y], axis=1).fillna('_Missing'
).groupby(col, as_index=False)
local_means = df.mean().rename(columns={target:'target_mean'})
counts = df.count().rename(columns={target:'count'})
df_summary = pd.merge(counts, local_means, on=col)
lambda_ = sigmoid(df_summary['count'], self.k, self.f)
df_summary['smoothed_target_mean'] = lambda_ * df_summary[
'target_mean'] + (1 - lambda_) * global_mean
df_summary.loc[df_summary['count'] == 1,
'smoothed_target_mean'] = global_mean
self.dic_target_mean_[col] = df_summary
return self
[docs]class StandardScaling(BaseEstimator, TransformerMixin):
"""
Standardize datasets to have mean = 0 and std = 1.\
Note this will only standardize numerical data\
and ignore missing values during computation.\
"""
def __init__(self):
pass
[docs] def fit(self, X, y=None):
"""
Fit transformer to get mean and std for each\
numerical features.
:param pandas.DataFrame X: Input dataframe
:param pandas.Series y: Ignored. (default=None)
:return: fitted object (self)
:rtype: object
"""
_check_X(X)
self.num_columns_ = X.select_dtypes('number').columns
self.dic_mean_ = {}
self.dic_std_ = {}
for col in self.num_columns_:
self.dic_mean_[col] = X[col].mean()
self.dic_std_[col] = X[col].std()
return self
[docs]class MinMaxScaling(BaseEstimator, TransformerMixin):
"""
Rescale the fit data into range between 0 and 1.\
Note this will only standardize numerical data\
and ignore missing values during computation.\
If there are values larger/smaller than fit data in the\
transform data, the value will be larger than 1\
or less than 0.
"""
def __init__(self):
pass
[docs] def fit(self, X, y=None):
"""
Fit transformer to get min and max values for each\
numerical features.
:param pandas.DataFrame X: Input dataframe
:param pandas.Series y: Ignored. (default=None)
:return: fitted object (self)
:rtype: object
"""
_check_X(X)
self.num_columns_ = X.select_dtypes('number').columns
self.dic_min_ = {}
self.dic_max_ = {}
for col in self.num_columns_:
self.dic_min_[col] = X[col].min()
self.dic_max_[col] = X[col].max()
return self
[docs]class CountEncoding(BaseEstimator, TransformerMixin):
"""
Encode categorical variables by the count of category\
within the categorical column.
"""
def __init__(self):
pass
[docs] def fit(self, X, y=None):
"""
Fit transformer to define categorical variables and\
obtain occurrence of each categories.
:param pandas.DataFrame X: Input dataframe
:param pandas.Series y: Ignored. (default=None)
:return: fitted object (self)
:rtype: object
"""
_check_X(X)
self.cat_columns_ = X.select_dtypes(exclude='number').columns
self.dic_counts_ = {}
for col in self.cat_columns_:
df = pd.concat([X[col], pd.DataFrame(np.zeros(X.shape[0]))],
axis=1).fillna('_Missing').groupby(col, as_index=False)
counts = df.count().rename(columns={0:'count'})
self.dic_counts_[col] = counts
return self
[docs]class RankedCountEncoding(BaseEstimator, TransformerMixin):
"""
Firstly encode categorical variables by the count of category\
within the categorical column. Then, counts are ranked in\
descending order and the ranks are used to encode category\
columns. Even in case there are categories with same counts,\
ranking will be based on the index and therefore the\
categories will be distinguished. RankedFrequencyEncoding\
is not provided as the result will be identical to this class.
"""
def __init__(self):
pass
[docs] def fit(self, X, y=None):
"""
Fit transformer to define categorical variables and\
obtain ranking of category counts.
:param pandas.DataFrame X: Input dataframe
:param pandas.Series y: Ignored. (default=None)
:return: fitted object (self)
:rtype: object
"""
_check_X(X)
self.cat_columns_ = X.select_dtypes(exclude='number').columns
self.dic_ranks_ = {}
for col in self.cat_columns_:
df_rank = pd.DataFrame(X[col].fillna('_Missing'
).value_counts(ascending=False)).reset_index().reset_index()
df_rank.columns = ['Rank', col, 'Counts']
df_rank['Rank'] += 1
df_rank = df_rank.set_index(col)
self.dic_ranks_[col] = df_rank
return self
[docs]class FrequencyEncoding(BaseEstimator, TransformerMixin):
"""
Encode categorical variables by the frequency of category\
within the categorical column.
"""
def __init__(self):
pass
[docs] def fit(self, X, y=None):
"""
Fit transformer to define categorical variables and\
obtain frequency of each categories.
:param pandas.DataFrame X: Input dataframe
:param pandas.Series y: Ignored. (default=None)
:return: fitted object (self)
:rtype: object
"""
_check_X(X)
self.cat_columns_ = X.select_dtypes(exclude='number').columns
self.dic_freq_ = {}
for col in self.cat_columns_:
df = pd.concat([X[col], pd.DataFrame(np.zeros(X.shape[0]))],
axis=1).fillna('_Missing').groupby(col, as_index=False)
df_count = df.count()
df_count.columns = [col, 'Frequency']
df_count['Frequency'] = df_count[['Frequency']].apply(
lambda x: x / x.sum())
self.dic_freq_[col] = df_count
return self
[docs]class RankedTargetMeanEncoding(BaseEstimator, TransformerMixin):
"""
Ranking with Target Mean Encoding of categorical variables. Missing\
values will be treated as one of the categories. This will treat\
Categories with same target mean separately as the rank is obtained\
from index once sorted by target mean.
:param float k: hyperparameter for sigmoid function (default=0.0)
:param float f: hyperparameter for sigmoid function (default=1.0)
:param float smoothing: Whether to smooth target mean with global mean using\
sigmoid function. Do not recommend smoothing=False. (default=0.01)
"""
def __init__(self, k=0, f=1, smoothing=True):
self.k = k
self.f = f
self.smoothing = smoothing
def _sigmoid(self, count, k, f):
return 1 / (1 + np.exp(- (count - k) / f))
[docs] def fit(self, X, y=None):
"""
Fit transformer to define and store target mean \
smoothed target mean for categorical variables.\
Then, ranking is created based on target mean.
:param pandas.DataFrame X: Input dataframe
:param pandas.Series y: Input Series for target variable
:return: fitted object (self)
:rtype: object
"""
_check_X_y(X, y)
target = y.name
global_mean = y.mean()
self.global_mean_ = global_mean
sigmoid = np.vectorize(self._sigmoid)
self.cat_columns_ = X.select_dtypes(exclude='number').columns
self.dic_target_mean_ = {}
for col in self.cat_columns_:
df = pd.concat([X[col], y], axis=1).fillna('_Missing'
).groupby(col, as_index=False)
local_means = df.mean().rename(columns={target:'target_mean'})
counts = df.count().rename(columns={target:'count'})
df_summary = pd.merge(counts, local_means, on=col)
lambda_ = sigmoid(df_summary['count'], self.k, self.f)
df_summary['smoothed_target_mean'] = lambda_ * df_summary[
'target_mean'] + (1 - lambda_) * global_mean
df_summary.loc[df_summary['count'] == 1,
'smoothed_target_mean'] = global_mean
df_summary = df_summary.sort_values('smoothed_target_mean'
, ascending=False).reset_index(drop=True).reset_index()
df_summary = df_summary.rename(columns={'index':'Rank'})
df_summary['Rank'] += 1
self.dic_target_mean_[col] = df_summary
return self
[docs]class AppendAnomalyScore(BaseEstimator, TransformerMixin):
"""
Append anomaly score calculated from isolation forest.\
Since IsolationForest needs to be fitted, category columns must\
first be encoded to numerical values.
:param int n_estimators: Number of base estimators in the \
Isolation Forest ensemble. (default=100)
:param int random_state: random_state for Isolation Forest \
(default=1234)
"""
def __init__(self, n_estimators=100, random_state=1234):
self.n_estimators = n_estimators
self.random_state = random_state
[docs] def fit(self, X, y=None):
"""
Fit Isolation Forest
:param pandas.DataFrame X: Input dataframe
:param pandas.Series y: Ignored. (default=None)
:return: fitted object (self)
:rtype: object
"""
_check_X(X)
self.model_ = IsolationForest(n_estimators=self.n_estimators,
random_state=self.random_state)
self.model_.fit(X)
self.fit_columns_ = X.columns
return self
[docs]class AppendCluster(BaseEstimator, TransformerMixin):
"""
Append cluster number obtained from kmeans++ clustering.\
For clustering categorical variables need to be converted\
to numerical data.
:param int n_clusters: Number of clusters (default=8)
:param int random_state: random_state for KMeans \
(default=1234)
"""
def __init__(self, n_clusters=8, random_state=1234):
self.n_clusters = n_clusters
self.random_state = random_state
[docs] def fit(self, X, y=None):
"""
Fit KMeans Clustering
:param pandas.DataFrame X: Input dataframe
:param pandas.Series y: Ignored. (default=None)
:return: fitted object (self)
:rtype: object
"""
_check_X(X)
self.model_ = KMeans(n_clusters=self.n_clusters,
random_state=self.random_state)
self.model_.fit(X)
self.fit_columns_ = X.columns
return self
[docs]class AppendClusterDistance(BaseEstimator, TransformerMixin):
"""
Append cluster distance obtained from kmeans++ clustering.\
For clustering categorical variables need to be converted\
to numerical data.
:param int n_clusters: Number of clusters (default=8)
:param int random_state: random_state for KMeans \
(default=1234)
"""
def __init__(self, n_clusters=8, random_state=1234):
self.n_clusters = n_clusters
self.random_state = random_state
[docs] def fit(self, X, y=None):
"""
Fit KMeans Clustering
:param pandas.DataFrame X: Input dataframe
:param pandas.Series y: Ignored. (default=None)
:return: fitted object (self)
:rtype: object
"""
_check_X(X)
self.model_ = KMeans(n_clusters=self.n_clusters,
random_state=self.random_state)
self.model_.fit(X)
self.fit_columns_ = X.columns
return self
[docs]class AppendPrincipalComponent(BaseEstimator, TransformerMixin):
"""
Append principal components obtained from PCA.\
For pca categorical variables need to be converted\
to numerical data. Also, data should be standardized beforehand.
:param int n_components: Number of principal components (default=5)
:param int random_state: random_state for PCA \
(default=1234)
"""
def __init__(self, n_components=5, random_state=1234):
self.n_components = n_components
self.random_state = random_state
[docs] def fit(self, X, y=None):
"""
Fit PCA
:param pandas.DataFrame X: Input dataframe
:param pandas.Series y: Ignored. (default=None)
:return: fitted object (self)
:rtype: object
"""
_check_X(X)
self.model_ = PCA(n_components=self.n_components,
random_state=self.random_state)
self.model_.fit(X)
self.fit_columns_ = X.columns
return self
[docs]class AppendArithmeticFeatures(BaseEstimator, TransformerMixin):
"""
A transformer which recognizes all numerical features and create\
new features by arithmetic operation. Newly created features\
are evaluated individually by fitting Logistic Regression against\
the target variable and only new features with higher eval metric than feature\
pairs will be newly added to the data. Missing values need to be\
imputed beforehand.
:param int max_features: Number of numerical features to test\
combinations. If number of numerical features in the data exceeds\
this value, transformer will raise an exception. (default=50)
:param string metric: Metrics to evaluate feature. Sklearn default\
metrics can be used. (default='roc_auc')
:param string operation: Type of arithmetic operations. 'add', \
'subtract', 'multiply', 'divide' can be used. (default='multiply')
:param float replace_zero: Value to replace 0 when operation='divide'\
. Do not use 0 as it may cause ZeroDivisionError.(default=0.001)
"""
def __init__(self,
max_features=50,
metric='roc_auc',
operation='multiply',
replace_zero=0.001):
self.max_features = max_features
self.metric = metric
self.operation = operation
self.replace_zero = replace_zero
def _arithmetic_operation(self, series1, series2, operation):
if operation == 'add':
return pd.DataFrame(series1 + series2)
elif operation == 'subtract':
return pd.DataFrame(series1 - series2)
elif operation == 'multiply':
return pd.DataFrame(series1 * series2)
elif operation == 'divide':
return pd.DataFrame(np.divide(series1, series2.replace(0, self.replace_zero)))
else:
raise Exception('Unknown arithmetic operation was specified : ' + operation)
def _check_missing(self, df):
if df.isna().sum().sum() != 0:
raise Exception('Please impute missing values before using this transformer.')
[docs] def fit(self, X, y=None):
"""
Fit transformer by fitting each feature with Logistic\
Regression and storing features with eval metrics higher\
than the max of existing features.
:param pandas.DataFrame X: Input dataframe
:param pandas.Series y: Input Series for target variable
:return: fitted object (self)
:rtype: object
"""
_check_X_y(X, y)
_check_binary(y)
self.x_features_ = X.select_dtypes('number').columns
self._check_missing(X[self.x_features_])
if len(self.x_features_) > self.max_features:
raise Exception('Number of numerical features is larger than max_features.')
cv = StratifiedKFold(n_splits=5)
lr = LogisticRegression(penalty='l2', solver='lbfgs')
# Firstly find maximum evaluation metric in the existing feature
roc_auc_existing = {}
for feature in self.x_features_:
X_lr = X[[feature]].fillna(X[[feature]].mean())
roc_auc = cross_val_score(lr, X_lr, y, cv=cv, scoring=self.metric).mean()
roc_auc_existing[feature] = roc_auc
# Create feature by multiplication and employ if evaluation metric
# is larger than the maximum of existing features
combinations = list(itertools.combinations(self.x_features_, 2))
self.new_pair_ = []
for pair in combinations:
X_lr = self._arithmetic_operation(X[pair[0]], X[pair[1]], self.operation)
roc_auc = cross_val_score(lr, X_lr, y, cv=cv, scoring=self.metric).mean()
max_auc_pair = max(roc_auc_existing[pair[0]],
roc_auc_existing[pair[1]])
if roc_auc > max_auc_pair:
self.new_pair_.append(pair)
return self
[docs]class RankedEvaluationMetricEncoding(BaseEstimator, TransformerMixin):
"""
Encode categorical columns by firstly creating dummy variable, then\
LogisticRegression against target variable is fitted\
for each of the dummy variables. Evaluation metric such as accuracy\
or roc_auc is calculated and ranked. Finally, categories are encoded\
with its rank. It is strongly recommended to conduct DropHighCardinality\
or GroupRareCategory before using this encoding as this encoder will\
fit Logistic Regression for ALL categories with 5-fold.
:param string metric: Metrics to evaluate feature. Sklearn default\
metrics can be used. (default='roc_auc')
"""
def __init__(self, metric='roc_auc'):
self.metric = metric
[docs] def fit(self, X, y=None):
"""
Fit transformer by creating dummy variable and fitting \
LogisticRegression.
:param pandas.DataFrame X: Input dataframe
:param pandas.Series y: Input Series for target variable
:return: fitted object (self)
:rtype: object
"""
_check_X_y(X, y)
_check_binary(y)
self.cat_columns_ = X.select_dtypes(exclude='number').columns
cv = StratifiedKFold(n_splits=5)
lr = LogisticRegression(penalty='l2', solver='lbfgs')
self.dic_corr_ = {}
for feature in self.cat_columns_:
X_lr = pd.get_dummies(X[[feature]].fillna('_Missing'))
df_map = pd.DataFrame([])
for col in X_lr.columns:
eval_metric = cross_val_score(lr, X_lr[[col]], y, cv=cv, scoring=self.metric).mean()
df_map = pd.concat([df_map, pd.DataFrame([col.replace(feature + '_', ''),
eval_metric]).T], axis=0)
df_map.columns = ['Category', 'Evaluation_Metric']
df_map = df_map.sort_values('Evaluation_Metric'
, ascending=False).reset_index(drop=True).reset_index()
df_map = df_map.rename(columns={'index':'Rank'})
df_map['Rank'] += 1
df_map = df_map.set_index('Category')
self.dic_corr_[feature] = df_map
return self
[docs]class AppendClassificationModel(BaseEstimator, TransformerMixin):
"""
Append prediction from model as a new feature. Model must have\
fit and predict methods and it should only predict a single\
label. In case the model has a predict_proba method, option\
probability can be used to append class probability instead\
of class labels. predict_proba method must return class\
probability for 0 as first column and 1 as second column.
:param object model: Any model that is in line with sklearn \
classification model, meaning it implements fit and predict.\
(default=None)
:param bool probability: Whether to class probability instead \
of class labels. If True, model must have predict_proba method\
implemented.(default=False)
"""
def __init__(self, model=None, probability=False):
self.model = model
self.probability = probability
[docs] def fit(self, X, y=None):
"""
Fit transformer by fitting model specified.
:param pandas.DataFrame X: Input dataframe
:param pandas.Series y: Input Series for target variable
:return: fitted object (self)
:rtype: object
"""
_check_X_y(X, y)
_check_binary(y)
self.model_ = self.model
_check_method_implemented(self.model_, 'fit')
_check_method_implemented(self.model_, 'predict')
if self.probability:
_check_method_implemented(self.model_, 'predict_proba')
self.model_.fit(X, y)
self.fit_columns_ = X.columns
return self
[docs]class AppendEncoder(BaseEstimator, TransformerMixin):
"""
Append encoders in the DataLiner module. Encoders in DataLiner\
will automatically replace categorical values, but by wrapping\
DataLiner Encoders with this class, encoded results will be\
appended as a new feature and original categorical columns\
will remain. Regardless of whether the Encoder will require\
target column or not, this class will require target column.
:param object encoder: DataLiner Encoders.(default=None)
"""
def __init__(self, encoder=None):
self.encoder = encoder
[docs] def fit(self, X, y=None):
"""
Fit transformer by fitting encoder specified
:param pandas.DataFrame X: Input dataframe
:param pandas.Series y: Input Series for target variable
:return: fitted object (self)
:rtype: object
"""
_check_X(X)
if y is not None:
_check_X_y(X, y)
self.encoder_ = self.encoder
self.encoder_.fit(X, y)
self.fit_columns_ = X.columns
return self
[docs]class AppendClusterTargetMean(BaseEstimator, TransformerMixin):
"""
Append cluster number obtained from kmeans++ clustering.\
Then each cluster number is replaced with target mean.\
For clustering categorical variables need to be converted\
to numerical data.
:param int n_clusters: Number of clusters (default=8)
:param int random_state: random_state for KMeans \
(default=1234)
"""
def __init__(self, n_clusters=8, random_state=1234):
self.n_clusters = n_clusters
self.random_state = random_state
def _sigmoid(self, count, k=0, f=1):
return 1 / (1 + np.exp(- (count - k) / f))
[docs] def fit(self, X, y=None):
"""
Fit KMeans Clustering and obtain target mean
:param pandas.DataFrame X: Input dataframe
:param pandas.Series y: Ignored. (default=None)
:return: fitted object (self)
:rtype: object
"""
_check_X(X)
_check_X_y(X, y)
global_mean = y.mean()
self.global_mean_ = global_mean
self.model_ = KMeans(n_clusters=self.n_clusters,
random_state=self.random_state)
self.model_.fit(X)
self.fit_columns_ = X.columns
cluster = self.model_.predict(X)
df_cluster = pd.concat([pd.DataFrame(cluster),
pd.DataFrame(y)], axis=1)
df_cluster.columns = ['Cluster_Number', 'Target']
mean = df_cluster.groupby('Cluster_Number').mean().rename(
columns={'Target':'target_mean'})
count = df_cluster.groupby('Cluster_Number').count().rename(
columns={'Target':'count'})
df_map = pd.concat([mean, count], axis=1)
lambda_ = self._sigmoid(df_map['count'])
df_map['smoothed_target_mean'] = lambda_ * df_map[
'target_mean'] + (1 - lambda_) * global_mean
df_map.loc[df_map['count'] == 1,
'smoothed_target_mean'] = global_mean
self.cluster_target_mean_ = df_map
return self
[docs]class PermutationImportanceTest(BaseEstimator, TransformerMixin):
"""
Conduct permutation importance tests on features and drop features\
that are not effective. Basically it will firstly fit entire data,\
then randomly shuffle each feature's data and evaluate the metrics\
for both cases. If shuffled case has no difference in the evaluation\
then that means the feature is not effective in prediction.
:param float threshold: Average difference in roc_auc between original\
and shuffled dataset. Higher the value, more features will be dropped.\
(default=0.0001)
"""
def __init__(self, threshold=0.0001):
self.threshold = threshold
[docs] def fit(self, X, y=None):
"""
Conduct permutation importance test and store drop features.
:param pandas.DataFrame X: Input dataframe
:param pandas.Series y: Ignored. (default=None)
:return: fitted object (self)
:rtype: object
"""
_check_X_y(X, y)
_check_binary(y)
process = make_pipeline(ImputeNaN(), TargetMeanEncoding())
cv = StratifiedKFold(n_splits=5)
clf = RandomForestClassifier(n_estimators=300, max_depth=5, random_state=1234)
Xt = process.fit_transform(X, y)
metrics = np.array([])
feature_metrics_dic = {}
for feature in X.columns:
feature_metrics_dic[feature] = np.array([])
for train_idx, valid_idx in cv.split(Xt, y):
X_train, X_valid, y_train, y_valid = \
Xt.iloc[train_idx], Xt.iloc[valid_idx], y.iloc[train_idx], y.iloc[valid_idx]
clf.fit(X_train, y_train)
y_pred = clf.predict_proba(X_valid)[:, 1]
metrics = np.append(metrics, roc_auc_score(y_valid, y_pred))
for feature in X.columns:
X_valid2 = X_valid.copy()
X_shuffled = X_valid2[feature].sample(frac=1, random_state=1234)
X_shuffled.index = X_valid2.index
X_valid2[feature] = X_shuffled.copy()
y_pred_shuffled = clf.predict_proba(X_valid2)[:, 1]
feature_metrics_dic[feature] = np.append(feature_metrics_dic[feature],
roc_auc_score(y_valid, y_pred_shuffled))
base_metric = metrics.mean()
for feature in X.columns:
feature_metrics_dic[feature] = base_metric - feature_metrics_dic[feature].mean()
self.drop_columns_ = []
for key, value in feature_metrics_dic.items():
if value <= self.threshold:
self.drop_columns_.append(key)
return self
[docs]class UnionAppend(BaseEstimator, TransformerMixin):
"""
Concatenates features extracted from original input data by AppendXXX\
in the DataLiner package. Normally by applying AppendXXX in pipeline\
input data will be treated in series and therefore appended feature\
will be used in the next AppendXXX. By wrapping list of AppendXXX with\
this class, append features will be processed in parallel and therefore\
each AppendXXX class will only use the original input features.
:param list append_list: List of AppendXXX in DataLiner package. \
(default=None)
"""
def __init__(self, append_list=None):
self.append_list = append_list
[docs] def fit(self, X, y=None):
"""
Fit transformer by verifying AppendXXX specified.
:param pandas.DataFrame X: Input dataframe
:param pandas.Series y: Ignored. (default=None)
:return: fitted object (self)
:rtype: object
"""
_check_X(X)
self.append_list_ = self.append_list
if not self.append_list_:
raise Exception("Please specify list of AppendXXX from DataLiner class.")
for trans in self.append_list_:
trans.fit(X, y)
return self