"""
This is the package dataset.
"""
import math
import warnings
from copy import copy
from math import log2
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import statsmodels.api as sm
from scipy.cluster import hierarchy
from scipy.special import boxcox1p
from scipy.stats import skew, boxcox_normmax
from sklearn.model_selection import train_test_split
from sklearn.neighbors import LocalOutlierFactor
# noinspection PyUnresolvedReferences
from sklearn.preprocessing import MinMaxScaler, StandardScaler, PowerTransformer
from sklearn.preprocessing import scale
from sklearn_pandas import DataFrameMapper
from skrebate import ReliefF
from dataset.correlations import cramers_v
from dataset.split import Split
warnings.simplefilter(action='ignore')
#
# Correlation ideas taken from:
# https://towardsdatascience.com/the-search-for-categorical-correlation-a1cf7f1888c9
#
[docs]class Dataset:
"""
This class allows a simpler representation of the dataset used
to build a model in class. It allows to load a remote CSV by
providing an URL to the initialization method of the object, and
work on the most common tasks related to data preparation and
feature engineering.::
my_data = Dataset(URL)
my_data = Dataset.from_dataframe(my_dataframe)
"""
all = None
meta = None
# data = None
target = None
features = None
numerical = None
categorical = None
meta_tags = ['all', 'numerical', 'categorical', 'complete',
'numerical_na', 'categorical_na', 'features', 'target']
categorical_dtypes = ['bool', 'object', 'string', 'category']
__num_plots_per_row = 4
def __init__(self, data_location=None, data_frame=None, *args, **kwargs):
"""
Wrapper over the method read_csv from pandas, so you can user variadic
arguments, as if you were using the actual read_csv
:param data_location: path or url to the file
:param data_frame: in case this method is called from the class method
this parameter is passing the actual dataframe to read data from
:param args: variadic unnamed arguments to pass to read_csv
:param kwargs: variadic named arguments to pass to read_csv
"""
if data_location is not None:
self.features = pd.read_csv(data_location, *args, **kwargs)
else:
if data_frame is not None:
self.features = copy(data_frame)
else:
raise RuntimeError(
"No data location, nor DataFrame passed to constructor")
# When data is read with no headers, column names can be 'int', so
# I need to convert them to strings.
if isinstance(list(self.features)[0], str) is False:
colnames = ['x{}'.format(col) for col in list(self.features)]
self.features.columns = colnames
self.to_float()
self.__update()
[docs] @classmethod
def from_dataframe(cls, df):
return cls(data_location=None, data_frame=df)
[docs] def set_target(self, target_name):
"""
Set the target variable for this dataset. This will create a new
property of the object called 'target' that will contain the
target column of the dataset, and that column will be removed
from the list of features.
:param target_name: The name of the column we want to be set as the
target variable for this dataset.
Example::
my_data.set_target('SalePrice')
"""
assert target_name in list(self.features), "Target name NOT recognized"
self.target = self.features.loc[:, target_name].copy()
self.features.drop(target_name, axis=1, inplace=True)
self.__update()
return self
[docs] def unset_target(self):
"""
Undo the `set_target()` operation. The feature `target_name` returns
to the DataFrame with the rest of the features.
Example::
my_data.unset_target()
"""
assert self.target is not None, "Target feature NOT set, yet..."
self.features[self.target.name] = self.target.values
self.target = None
self.__update()
return self
def __update(self):
"""
Builds meta-information about the dataset, considering the
features that are categorical, numerical or does/doesn't contain NA's.
"""
meta = dict()
# Update META-information
if self.target is not None:
meta['all'] = list(self.features) + [self.target.name]
self.all = pd.concat([self.features, self.target], axis=1)
else:
meta['all'] = list(self.features)
self.all = self.features
# Build the subsets per data ype (list of names)
descr = pd.DataFrame({'dtype': self.features.dtypes,
'NAs': self.features.isna().sum()})
meta['description'] = descr
numerical = self.features.select_dtypes(include=['number'])
numerical_features = list(numerical)
categorical = self.features.select_dtypes(exclude=['number'])
categorical_features = list(categorical)
numerical_features_na = numerical.columns[
numerical.isna().any()].tolist()
categorical_features_na = categorical.columns[
categorical.isna().any()].tolist()
complete_features = self.all.columns[
~self.all.isna().any()].tolist()
meta['features'] = list(self.features)
meta['target'] = self.target.name if self.target is not None else None
meta['categorical'] = categorical_features
meta['categorical_na'] = categorical_features_na
meta['numerical'] = numerical_features
meta['numerical_na'] = numerical_features_na
meta['complete'] = complete_features
self.meta = meta
# Update macro access properties
self.numerical = self.select('numerical')
self.categorical = self.select('categorical')
# self.data = self.features
return self
[docs] def outliers(self, n_neighbors=20):
"""
Find outliers, using LOF criteria, from the numerical features.
Returns a list of indices where outliers are present
:param n_neighbors: Number of neighbors to use by default for
kneighbors queries. If n_neighbors is larger than the number
of samples provided, all samples will be used.
# TODO Implement a simple set of methods to select from in order to
detect outliers.
"""
X = self.select('numerical')
lof = LocalOutlierFactor(n_neighbors=n_neighbors, contamination='auto')
y_pred = lof.fit_predict(X)
outliers = np.where(y_pred == -1)
return outliers[0]
[docs] def scale(self,
features_of_type='numerical',
method='StandardScaler',
return_series=False):
"""
Scales numerical features in the dataset, unless the parameter 'what'
specifies any other subset selection primitive. The method to be used
is the sckikit learn StandardScaler.
Examples::
# scale all my numerical features
my_data.scale()
:param features_of_type: Subset selection primitive
:param method: 'StandardScaler', 'MinMaxScaler'
:return: the subset scaled.
"""
assert features_of_type in self.meta_tags, \
"No features of the type specified"
assert method == 'StandardScaler' or method == 'MinMaxScaler', \
"Method can only be \'standard\' or \'minmax\'"
subset = self.select(features_of_type)
scaler = globals()[method]
mapper = DataFrameMapper([(subset.columns, scaler())])
scaled_features = mapper.fit_transform(subset.copy())
self.features[self.names(features_of_type)] = pd.DataFrame(
scaled_features,
index=subset.index,
columns=subset.columns)
self.__update()
if return_series is True:
return self.features[self.names(features_of_type)]
else:
return self
[docs] def fix_skewness(self, feature_names=None, return_series=False):
"""
Ensures that the numerical features in the dataset,
fit into a normal distribution by applying the Yeo-Johnson transform.
If not already scaled, they're scaled as part of the process.
:param feature_names: Features to be fixed. If not specified, all
numerical features are examined.
:param return_series: Return the normalized series
:return: The subset fitted to normal distribution,
or None
"""
if feature_names is None:
if len(self.numerical_features) == 0:
raise ValueError('No numerical features to fix.')
feature_names = self.numerical_features
elif not isinstance(feature_names, list):
feature_names = [feature_names]
yj = PowerTransformer(method='yeo-johnson')
normed_features = yj.fit_transform(self.features[feature_names])
self.features[feature_names] = normed_features
self.__update()
if return_series is True:
return normed_features
[docs] def skewed_features(self, threshold=0.75, fix=False, return_series=True):
"""
Returns the list of numerical features that present skewness. This
method optionally can fix detected skewness whose ABS is greater
than the threshold passed, using BoxCox method.
:param threshold: The limit over which considering that the
``skew()`` return value is considered a skewed feature.
:param fix: (Default: False) Boolean indicating whether or not
fixing the skewed features. If True, those with values above the
threshold will be fixed using BoxCox.
:param return_series: (Default: True) Boolean indicating whether
returning the features (pandas DataFrame) that present skewness.
:return: A pandas Series with the features and their skewness
"""
df = self.numerical
feature_skew = df.apply(
lambda x: skew(x)).sort_values(ascending=False)
if fix is True:
high_skew = feature_skew[np.abs(feature_skew) > threshold]
skew_index = high_skew.index
for feature in skew_index:
self.features[feature] = boxcox1p(
df[feature], boxcox_normmax(df[feature] + 1))
if return_series is True:
return feature_skew
@staticmethod
def __top_correlations(df, correlations, threshold):
def redundant_pairs(dataf):
"""
Get diagonal and lower triangular pairs of correlation matrix
"""
pairs_to_drop = set()
cols = dataf.columns
for i in range(0, dataf.shape[1]):
for j in range(0, i + 1):
pairs_to_drop.add((cols[i], cols[j]))
return pairs_to_drop
labels_to_drop = redundant_pairs(df)
correlations = correlations.drop(labels=labels_to_drop).sort_values(
ascending=False)
tuples = [(correlations.index[i][0], correlations.index[i][1],
correlations[i]) for i in range(correlations.shape[0]) \
if correlations[i] > threshold]
return tuples
[docs] def under_represented_features(self, threshold=0.98):
"""
Returns the list of categorical features with unrepresented categories
or a clear unbalance between the values that can take.
:param threshold: The upper limit of the most represented category
of the feature.
:return: the list of features that with unrepresented categories.
"""
under_rep = []
for column in self.meta['categorical']:
counts = self.features[column].value_counts()
majority_freq = counts.iloc[0]
if (majority_freq / len(self.features)) > threshold:
under_rep.append(column)
return under_rep
def _IG(self, vble_name):
"""
Computes the Information Gain between a variable –whose name is passed,
and the target variable.
Args:
vble_name: A string with the name of the categorical vble.
Returns:
The IG
"""
assert self.target is not None, "Target must be set before calling IG"
assert vble_name in self.categorical_features, \
"Variable must be categorical to compute IG"
def entropy(distribution):
def H(p):
return 0. if p == 0. else p * log2(p)
return -sum([H(distribution[i]) for i in range(len(distribution))])
target_num_unique_values = self.target.nunique()
target_unique_values = self.target.unique()
target_value_counts = self.target.value_counts()
target_num_samples = sum([target_value_counts[target_unique_values[i]]
for i in range(target_num_unique_values)])
target_distribution = [
target_value_counts[target_unique_values[i]] / target_num_samples \
for i in range(target_num_unique_values)]
target_entropy = entropy(target_distribution)
vble = self.features[vble_name]
vble_num_unique_values = vble.nunique()
vble_unique_values = vble.unique()
vble_value_counts = vble.value_counts()
vble_num_samples = sum(
[vble_value_counts[vble_unique_values[i]] for i in
range(vble_num_unique_values)])
vble_distribution = [
vble_value_counts[vble_unique_values[i]] / vble_num_samples \
for i in range(vble_num_unique_values)]
vble_distribution = dict(zip(vble_unique_values, vble_distribution))
xt = pd.crosstab(vble, self.target)
cond_entropy = []
for i in range(xt.shape[0]):
row_distribution = [xt.iloc[i, j] / xt.iloc[i].sum() for j in
range(xt.shape[1])]
cond_entropy.append(entropy(row_distribution))
vble_entropy = dict(zip(xt.index, cond_entropy))
vble_cond_entropy = sum([vble_distribution[v] * vble_entropy[v] for v in
vble_unique_values])
IG = target_entropy - vble_cond_entropy
return IG
[docs] def stepwise_selection(self,
initial_list=None,
threshold_in=0.01,
threshold_out=0.05,
verbose=False):
"""
Perform a forward/backward feature selection based on p-value from
statsmodels.api.OLS
Your features must be all numerical, so be sure to onehot_encode them
before calling this method.
Always set threshold_in < threshold_out to avoid infinite looping.
All features involved must be numerical and types must be float.
Target variable must also be float. You can convert it back to a
categorical type after calling this method.
:parameter initial_list: list of features to start with (column names
of X)
:parameter threshold_in: include a feature if its
p-value < threshold_in
:parameter threshold_out: exclude a feature if its
p-value > threshold_out
:parameter verbose: whether to print the sequence of inclusions and
exclusions
:return: List of selected features
Example::
my_data.stepwise_selection()
See <https://en.wikipedia.org/wiki/Stepwise_regression>
for the details
Taken from: <https://datascience.stackexchange.com/a/24823>
"""
if initial_list is None:
initial_list = []
if len(self.names('categorical')) != 0:
print('Considering only numerical features')
# assert self.target.dtype.name == 'float64'
included = list(initial_list)
while True:
changed = False
# forward step
excluded = list(set(self.numerical.columns) - set(included))
new_pval = pd.Series(index=excluded)
for new_column in excluded:
model = sm.OLS(self.target, sm.add_constant(
pd.DataFrame(
self.numerical[included + [new_column]]))).fit()
new_pval[new_column] = model.pvalues[new_column]
best_pval = new_pval.min()
if best_pval < threshold_in:
best_feature = new_pval.idxmin()
included.append(best_feature)
changed = True
if verbose:
print('Add {:30} with p-value {:.6}'.format(best_feature,
best_pval))
# backward step
model = sm.OLS(self.target, sm.add_constant(
pd.DataFrame(self.numerical[included]))).fit()
# use all coefs except intercept
pvalues = model.pvalues.iloc[1:]
worst_pval = pvalues.max() # null if p-values is empty
if worst_pval > threshold_out:
changed = True
worst_feature = pvalues.argmax()
included.remove(worst_feature)
if verbose:
print('Drop {:30} with p-value {:.6}'.format(worst_feature,
worst_pval))
if not changed:
break
return included
[docs] def features_importance(self,
num_features=None,
num_neighbors=None,
abs_imp=False):
"""
Computes NUMERICAL features importance, using the ReliefF algorithm as
implemented in the `rebate` library.
Args:
num_features: The nr of features we want to display
num_neighbors: The nr of neighbors to consider when computing the
features importance
abs_imp: if True, importance is displayed taking the ABS()
Returns:
A sorted dictionary with the feature names and their importance.
"""
if num_features is None:
num_features = len(self.numerical_features)
if num_neighbors is None:
num_neighbors = 20
assert num_features <= len(self.numerical_features), \
"Larger nr of features ({}) than available ({})".format(
num_features, len(self.numerical_features))
assert self.target is not None, \
"Target feature must be specified before computing importance"
assert num_neighbors <= self.features.shape[0], \
"Larger nr of neighbours than samples ({})".format(
self.features.shape[0])
my_features = self.numerical.values # the array inside the dataframe
my_labels = self.target.values.ravel() # the target as a 1D array.
fs = ReliefF(n_features_to_select=num_features,
n_neighbors=num_neighbors)
fs.fit_transform(my_features, my_labels)
if abs_imp is True:
importances = abs(fs.feature_importances_[:num_features])
else:
importances = fs.feature_importances_[:num_features]
indices = np.argsort(importances)[:num_features]
return dict([(self.numerical_features[i], importances[i]) for
i in indices])
#
# Methods are related to data manipulation of the pandas dataframe.
#
[docs] def select(self, what):
"""
Returns a subset of the columns of the dataset.
`what` specifies what subset of features to return
If it is a list, it returns those feature names in the list,
And if it is a keywork from: 'all', 'categorical', 'categorical_na',
'numerical', 'numerical_na', 'complete', 'features', 'target',
then the list of features is extracted from the metainformation
of the dataset.
:param what: Possible values are
* all: (Default) Include very feature, including the target
* numerical: Only numerical features
* categorical: Only categorical features
* complete: Only features without NA
* numerical_na: Numerical features with NA
* categorical_na: Categorical features with NA
* features: Only features, NOT the target variable.
* target: Only the target variable.
:return: Reference to the columns specified.
"""
if isinstance(what, list):
# return self.features.loc[:, what]
return self.features[what]
else:
assert what in self.meta_tags
if what == 'all':
return self.all[self.meta[what]]
else:
return self.features[self.meta[what]]
[docs] def samples_matching(self, value=None, feature=None):
"""
Return the a list with the indexes of those samples matching a given
criteria. The match can be set on target variable, or any other
column name.
Args:
value:
feature:
Returns:
A list with the index values of those samples matching.
Examples::
my_data.samples_matching('red')
returns the indices of those samples whose `target` matches the
value `red`.
my_data.samples_matching(75, 'column_3')
returns the indices of those samples whose feature `column_3`
values 75.
"""
if feature is None:
sample_indices = self.all.index[
self.all[self.target.name] == value].to_list()
else:
assert feature in self.names(), \
"Feature ({}) is not present in dataset".format(feature)
assert feature is not None and value is not None, \
"A feature name and a value must be provided"
sample_indices = self.all.index[
self.all[feature] == value].to_list()
return sample_indices
[docs] def names(self, what='all'):
"""
Returns a the names of the columns of the dataset for which the arg
`what` is specified.
If it is a list, it returns those feature names in the list,
And if it is a keywork from: 'all', 'categorical', 'categorical_na',
'numerical', 'numerical_na', 'complete', then the list of
features is extracted from the metainformation of the dataset.
:param what: Possible values are
* all: (Default) Include very feature, including the target
* numerical: Only numerical features
* categorical: Only categorical features
* complete: Only features without NA
* numerical_na: Numerical features with NA
* categorical_na: Categorical features with NA
* features: Only features, NOT the target variable.
* target: Only the target variable.
"""
assert what in self.meta_tags
return self.meta[what]
[docs] def discretize(self, column, bins, category_names=None):
"""
Makes a feature, which is normally numerical, categorical by binning its
contents into the specified buckets.
Args:
column: The name of the feature to be binned
bins: the list of bins as an array of values of the form
[(15, 20), (20, 25), (25, 30), (30, 35), (35, 40)]
category_names: An array with names or values we want for our new
categories. If None a simple array with ordinal
number of the category is used. In the
example above, it should be an array from 1 .. 5.
Returns: The dataset modified
Example::
# Variable "x3" contains the number of sons of a person as an
# integer ranging between values 0 and 10. We want to convert
# that numerical value into a categorical one with a list
# of (say) 4 possible values, for the number of sons within
# given ranges:
my_data.discretize('x3',
[(0, 2), (2, 4), (4, 6), (6, 8)], [1, 2, 3, 4])
"""
assert column in self.numerical_features, \
'Feature {} is not numerical, in order to be discretized'.format(
column)
bins_tuples = pd.IntervalIndex.from_tuples(bins)
x = pd.cut(self.features[column].to_list(), bins_tuples)
if category_names is None:
x.categories = [i + 1 for i in range(len(bins))]
else:
assert len(category_names) == len(bins), \
"Num of categories passed does not matched number of bins."
x.categories = category_names
self.features[column] = x
self.to_categorical(column)
self.__update()
return self
[docs] def onehot_encode(self, feature_names=None):
"""
Encodes the categorical features in the dataset, with OneHotEncode
:parameter feature_names: column or list of columns to be one-hot
encoded.
The only restriction is that the target variable cannot be
specifiedin the list of columns and therefore, cannot be
onehot encoded.
Default = all categorical features in dataset.
:return: self
Example::
# Encodes a single column named 'my_column_name'
my_data.onehot_encode('my_column_name')
# Encodes 'col1' and 'col2'
my_data.onehot_encode(['col1', 'col2'])
# Encodes all categorical features in the dataset
my_data.onehot_encode(my_data.names('categorical'))
or::
my_data.onehot_encode()
"""
if feature_names is None:
to_encode = list(self.categorical)
else:
if isinstance(feature_names, list) is not True:
to_encode = [feature_names]
else:
to_encode = feature_names
new_df = self.features[
self.features.columns.difference(to_encode)].copy()
for column_to_convert in to_encode:
new_df = pd.concat(
[new_df,
pd.get_dummies(
self.features[column_to_convert],
prefix=column_to_convert,
dtype=float)
],
axis=1)
self.features = new_df.copy()
self.__update()
return self
[docs] def add_columns(self, new_features):
"""
Add a Series as a new column to the dataset.
:param new_features: A pandas Series object or a DataFrame with the
data to be added to the Dataset. It must contain
a valid name not present in the Dataset already.
Examples::
my_data.add_column(my_series)
my_data.add_column(pandas.Series().values)
my_data.add_column(my_dataframe)
"""
if isinstance(new_features, pd.Series):
if new_features.name is not None:
if new_features.name in self.names('features'):
raise ValueError(
'There is already a feature called {}'.format(
new_features.name))
self.features[new_features.name] = new_features.values
else:
self.features[
'xf{}'.format(self.num_features + 1)] = new_features.values
elif isinstance(new_features, pd.DataFrame):
self.features = pd.concat([self.features, new_features], axis=1)
else:
raise ValueError(
'Only pandas Series or DataFrames can be passed to this method')
self.__update()
return self
[docs] def drop_columns(self, columns_list):
"""
Drop one or a list of columns from the dataset.
:param columns_list: An array-type expression with the names of the
columns to be removed from the Dataset. In case a single string
is passed, it will be considered the name of a sinle columns to
be dropped.
Examples::
my_data.drop_columns('column_name')
my_data.drop_columns(['column1', 'column2', 'column3'])
"""
if isinstance(columns_list, list) is not True:
columns_list = [columns_list]
for column in columns_list:
if column in self.names('features'):
self.features.drop(column, axis=1, inplace=True)
self.__update()
return self
[docs] def keep_columns(self, to_keep):
"""
Keep only one or a list of columns from the dataset.
:param to_keep: A string or array-like expression indicating the
columns to be kept in the Dataset. The columns not in the list
of names passed are dropped.
Example::
my_data.keep_columns('column_name')
my_data.keep_columns(['column1', 'column2', 'column3'])
"""
if isinstance(to_keep, list) is not True:
to_keep = [to_keep]
to_drop = list(set(list(self.features)) - set(to_keep))
self.drop_columns(to_drop)
return self
[docs] def aggregate(self,
col_list,
new_column,
operation='sum',
drop_columns=True):
"""
Perform an arithmetic operation on the given columns, and places the
result on a new column, removing the original ones.
:param col_list: the list of columns over which the operation is done
:param new_column: the name of the new column to be generated from the
operation
:param drop_columns: whether remove the columns used to perfrom the
aggregation
:param operation: the operation to be done over the column values for
each row. Examples: 'sum', 'diff', 'max', etc. By default, the
operation is the sum of the values.
:return: The Dataset object
Example:
If we want to sum the values of column1 and column2 into a
new column called 'column3', we use::
my_data.aggregate(['column1', 'column2'], 'column3', 'sum')
As a result, ``my_data`` will remove ``column1`` and ``column2``,
and the operation will be the sum of the values, as it is the default
operation.
"""
assert operation in dir(type(self.features))
for col_name in col_list:
assert col_name in list(self.features)
self.features[new_column] = getattr(
self.features[col_list],
operation)(axis=1)
if drop_columns is True:
self.drop_columns(col_list)
else:
self.__update()
return self
[docs] def drop_samples(self, index_list):
"""
Remove the list of samples from the dataset.
:param index_list: The list of indices in the DataFrame to be removed
from the features and the target DataFrames.
:return: self
"""
self.features = self.features.drop(self.features.index[index_list])
if self.target is not None:
self.target = self.target.drop(self.target.index[index_list])
self.features.reset_index(inplace=True, drop=True)
self.target.reset_index(inplace=True, drop=True)
self.__update()
return self
[docs] def nas(self):
"""
Returns the list of features that present NA entries
:return: the list of feature names presenting NA
"""
return self.names('numerical_na') + self.names('categorical_na')
[docs] def replace_na(self, column, value):
"""
Replace any NA occurrence from the column or list of columns passed
by the value passed as second argument.
:param column: Column name or list of column names from which to
replace NAs with the value passes in the second argument
:param value: value to be used as replacement
:return: the object.
"""
if isinstance(column, list) is True:
for col in column:
self.features[col].fillna(value, inplace=True)
else:
self.features[column].fillna(value, inplace=True)
self.__update()
return self
[docs] def drop_na(self):
"""
Drop samples with NAs from the features. If any value is infinite
or -infinite, it is converted to NA, and removed also.
Examples::
my_data.drop_na()
:return: object
"""
self.features.dropna(inplace=True)
if self.target is not None:
self.target = self.target[
self.target.index.isin(self.features.index)]
self.target = self.target.reset_index(drop=True)
self.features = self.features.reset_index(drop=True)
self.__update()
return self
[docs] def split(self,
seed=1024,
test_size=0.2,
validation_split=False):
"""
From an Dataset, produce splits (with or without validation) for
training and test. The objects of type ``Split`` will only contain
properties with the names ``train`` or ``test`` to reference the
different splits.
:param seed: The seed to be used to generate the random split.
:param test_size: The test size as a percentage of the base dataset.
:param validation_split: Boolean indicating whether it is also needed
to generate a third split for validation purposes, same size
as the test_size.
:return: The X and y objects that contain the splits.
Example::
# Generate the splits (80-20)
X, y = my_data.split()
# Create an instance of the model, and use the training set to
# fit it, and the test set to score it.
model = LinearRegression()
model.fit(X.train, y.train)
model.score(X.test, y.test)
"""
assert self.target is not None, \
"The target variable must be specified before calling this method"
x = pd.DataFrame(self.features, columns=self.names('features'))
y = pd.DataFrame(self.target)
x_train, x_test, y_train, y_test = train_test_split(
x, y,
test_size=test_size, random_state=seed)
if validation_split is True:
x_train, x_val, y_train, y_val = train_test_split(
x_train, y_train,
test_size=test_size, random_state=seed)
x_splits = [x_train, x_test, x_val]
y_splits = [y_train, y_test, y_val]
else:
x_splits = [x_train, x_test]
y_splits = [y_train, y_test]
return Split(x_splits), Split(y_splits)
[docs] def to_numerical(self, to_convert):
"""
Convert the specified column or columns to numbers
:param to_convert: column name or list of column names to be converted
:return: object
TODO: It must be possible to perform label encoding if specified.
For example, I might want to convert a target variable with
strings valued "Yes" and "No" to type "category" or to type
"int" with values 1 and 0.
"""
if isinstance(to_convert, list) is not True:
to_convert = [to_convert]
for column_name in to_convert:
if column_name in list(self.features.columns):
self.features[column_name] = pd.to_numeric(
self.features[column_name])
else:
self.target = pd.to_numeric(self.target)
self.__update()
return self
[docs] def to_float(self, to_convert=None):
"""
Convert a column or list of columns to float values. The columns must
be numerical.
Args:
to_convert: the column name or list of column names that we want
to convert. If this argument is empty, then every
numerical feature in the dataset is converted.
Returns: The dataset
Example::
my_data.to_float(my_data.numerical_features)
# which is equivalent to::
my_data.to_float()
# We can also specify a single or multiple features::
my_data.to_float('feature_15')
my_data.to_float(['feature_15', 'feature_21'])
"""
to_convert = self.__assert_list_of_numericals(to_convert)
for column_name in to_convert:
self.features[column_name] = pd.to_numeric(
self.features[column_name]).astype(float)
return self.__update()
[docs] def to_int(self, to_convert=None):
"""
Convert a column or list of columns to integer values.
The columns must be numerical
Args:
to_convert: the column name or list of column names that we want
to convert. If none specified, all numerical columns
are converted to int type.
Returns: The dataset
Example::
my_data.to_int(my_data.numerical_features)
# which is equivalent to::
my_data.to_int()
# We can also specify a single or multiple features::
my_data.to_int('feature_15')
my_data.to_int(['feature_15', 'feature_21'])
"""
to_convert = self.__assert_list_of_numericals(to_convert)
# Bulk conversion..
self.features[to_convert] = self.features[to_convert].astype(int)
return self.__update()
[docs] def to_categorical(self, to_convert):
"""
Convert the specified column or columns to categories
:param to_convert: column or column list to be converted
:return: object
"""
if isinstance(to_convert, list) is not True:
to_convert = [to_convert]
for column_name in to_convert:
if column_name in list(self.features):
self.features[column_name] = self.features[column_name].apply(
str)
else:
self.target = self.target.apply(str)
self.__update()
return self
[docs] def merge_categories(self, column, old_values, new_value):
"""
Merge a subset of categories present in one of the columns into a
new single category. This is normally done when this list of categs
is not enough representative.
:param column: The column with the categories to be merged
:param old_values: The list of categories to be merged
:param new_value: The resulting new category after the merge.
:return: self.
Example::
my_data.merge_categories(column='color',
old_values=['grey', 'black'],
new_value='dark')
"""
assert column in self.categorical, "Column must be categorical"
assert isinstance(old_values, list), \
"Old values must be a list of values to be merged"
assert len(old_values) > 1, \
"List of values must contains more than 1 value"
assert new_value is not None, "New value cannot be None"
self.features[column] = self.features[column].apply(
lambda x: new_value if x in old_values else x).astype('object')
self.__update()
return self
[docs] def merge_values(self, column, old_values, new_value):
"""
Same method as 'merge_categories' but for numerical values.
Merge a subset of values present in one of the columns into a
new single category. This is normally done when this list of values
is not enough representative.
:param column: The column with the values to be merged
:param old_values: The list of values to be merged
:param new_value: The resulting new value after the merge.
:return: self.
Example::
my_data.merge_values(column='years',
old_values=['2001', '2002'],
new_value='2000')
"""
assert column in self.numerical, "Column must be numerical"
assert isinstance(old_values, list), \
"Old values must be a list of values to be merged"
assert len(old_values) > 1, \
"List of values must contains more than 1 value"
assert new_value is not None, "New value cannot be None"
self.features[column] = self.features[column].apply(
lambda x: new_value if x in old_values else x).astype('float64')
self.__update()
return self
#
# Description methods, printing out summaries for dataset or features.
#
[docs] def describe_dataset(self):
"""
Printout the metadata information collected when calling the
metainfo() method.
:return: nothing
"""
if self.meta is None:
self.__update()
print('{} Features. {} Samples'.format(
len(self.meta['features']), self.features.shape[0]))
print('Available types:', self.meta['description']['dtype'].unique())
print(' · {} categorical features'.format(
len(self.meta['categorical'])))
print(' · {} numerical features'.format(
len(self.meta['numerical'])))
print(' · {} categorical features with NAs'.format(
len(self.meta['categorical_na'])))
print(' · {} numerical features with NAs'.format(
len(self.meta['numerical_na'])))
print(' · {} Complete features'.format(
len(self.meta['complete'])))
print('--')
if self.target is not None:
print('Target: {} ({})'.format(
self.meta['target'], self.target.dtype.name))
if self.target.dtype.name == 'object':
self.__describe_categorical(self.target)
else:
self.__describe_numerical(self.target)
else:
print('Target: Not set')
return
[docs] def describe(self, feature_name=None, inline=False):
"""
Wrapper.
Calls the proper feature description method, depending on whether the
feature is numerical or categorical. If no arguments are passed, the
description of the entire dataset is provided.
:param feature_name: The feature to be described. Default value is
None, which implies that **all** features are described.
:param inline: whether the output is multiple lines or inline. This
is used when describing from ``summary()`` function or from
a console or cell.
:return: The string, only when inline=True, that contains the
description.
TODO: Implement a limit of characters for each line that is printed
out in the screen, so that when reaching that limit '...' is
printed.
"""
if feature_name is None:
return self.describe_dataset()
# It could happen that target has not yet been defined.
target_name = None if self.target is None else self.target.name
# If feature specified, ensure that it is contained somewhere
assert feature_name in (list(self.features) + [target_name])
if feature_name == target_name:
feature = self.target
else:
feature = self.features[feature_name]
if feature.dtype.name in self.categorical_dtypes:
return self.__describe_categorical(feature, inline)
else:
return self.__describe_numerical(feature, inline)
[docs] def summary(self, what='all'):
"""
Printout a summary of each feature.
:param what: Possible values are
* all: (Default) Include very feature, including the target
* numerical: Only numerical features
* categorical: Only categorical features
* complete: Only features without NA
* numerical_na: Numerical features with NA
* categorical_na: Categorical features with NA
* features: Only features, NOT the target variable.
* target: Only the target variable.
:return: N/A
"""
assert what in self.meta_tags
max_width = 25
max_len_in_list = np.max([len(s) for s in list(self.select(what))]) + 2
if max_len_in_list > max_width:
max_width = max_len_in_list
else:
max_width = max_len_in_list
formatting = '{{:<{}s}}: {{:<10s}} {{}}'.format(max_width)
print('Features Summary ({}):'.format(what))
for feature_name in list(self.select(what)):
feature_formatted = '\'' + feature_name + '\''
print(formatting.format(
feature_formatted, self.select(what)[feature_name].dtype.name,
self.describe(feature_name, inline=True)))
return
[docs] def table(self, what='all', max_width=80):
"""
Print a tabulated version of the list of elements in a list, using
a max_width display (default 80).
:param what: Possible values are
* all: (Default) Include very feature, including the target
* numerical: Only numerical features
* categorical: Only categorical features
* complete: Only features without NA
* numerical_na: Numerical features with NA
* categorical_na: Categorical features with NA
* features: Only features, NOT the target variable.
* target: Only the target variable.
:param max_width: The max_width used in the display.
:return: None
"""
assert what in self.meta_tags
f_list = self.names(what)
if len(f_list) == 0:
return
num_features = len(f_list)
max_length = max([len(feature) for feature in f_list])
max_fields = int(np.floor(max_width / (max_length + 1)))
col_width = max_length + 1
print('-' * ((max_fields * max_length) + (max_fields - 1)))
for field_idx in range(int(np.ceil(num_features / max_fields))):
from_idx = field_idx * max_fields
to_idx = (field_idx * max_fields) + max_fields
if to_idx > num_features:
to_idx = num_features
format_str = ''
for i in range(to_idx - from_idx):
format_str += '{{:<{:d}}}'.format(col_width)
print(format_str.format(*f_list[from_idx:to_idx]))
print('-' * ((max_fields * max_length) + (max_fields - 1)))
return
#
# Properties
#
@property
def feature_names(self):
return list(self.features.columns)
@property
def numerical_features(self):
return self.names('numerical')
@property
def numerical_features_na(self):
return self.names('numerical_na')
@property
def categorical_features(self):
return self.names('categorical')
@property
def categorical_features_na(self):
return self.names('categorical_na')
@property
def incomplete_features(self):
return self.categorical_features_na + self.numerical_features_na
@property
def num_features(self):
return self.features.shape[1]
@property
def num_samples(self):
return self.features.shape[0]
#
# Plot functions
#
[docs] @staticmethod
def plot_correlation_matrix(corr_matrix):
plt.subplots(figsize=(11, 9))
# Generate a mask for the upper triangle
mask = np.zeros_like(corr_matrix, dtype=np.bool)
mask[np.triu_indices_from(mask)] = True
cmap = sns.diverging_palette(220, 10, as_cmap=True)
sns.heatmap(corr_matrix, mask=mask, cmap=cmap, vmax=0.75, center=0,
square=True, linewidths=.5, cbar_kws={"shrink": .5})
plt.show()
return
[docs] def plot_density(self, feature_names=None, category=None):
"""
Double density plot(s) between feature(s) and a reference category.
:param feature_names: The name of a feature(s) in the dataset.
:param category: The name of the reference category we want to
represent the double density plot against. If None, then the
target variable is used.
:return: None
Example::
# represent multiple density plots, one per unique value of the
# target
my_data.plot_density(my_feature)
# represent double density plots, one per unique value of the
# categorical feature 'my_feature2'
my_data.plot_density(my_feature1, my_feature2)
# Plot double density plots for all numerical features.
my_data.plot_density(my_data.numerical_features)
# or
my_data.plot_density()
"""
if feature_names is None:
feature_names = self.numerical_features
if isinstance(feature_names, list):
num_plots = int(len(feature_names))
rows = int(num_plots / self.__num_plots_per_row)
if num_plots % self.__num_plots_per_row != 0:
rows += 1
if num_plots >= self.__num_plots_per_row:
cols = self.__num_plots_per_row
else:
cols = num_plots
plots_left = num_plots
for j in range(rows):
plt.figure(figsize=(14, 3))
for i in range(min(self.__num_plots_per_row, plots_left)):
plt.subplot(1, cols, i + 1)
self.__plot_double_density(
feature_names[i + (j * self.__num_plots_per_row)])
plots_left -= 1
plt.show()
else:
self.__plot_double_density(feature_names, category)
[docs] def plot_histogram(self, feature_names=None, category=None):
"""
Double histogram plot between a feature and a reference category.
:param feature_names: The name(s) of the feature(s) in the dataset.
:param category: The name of the reference category we want to
represent the double density plot against. If None, then the
target variable is used.
:return: None
Example::
# represent multiple density plots, one per unique value of the
# target
my_data.plot_double_hist(my_feature)
# represent double density plots, one per unique value of the
# categorical feature 'my_feature2'
my_data.double_hist(my_feature1, my_feature2)
# or
my_data.plot_density()
"""
if feature_names is None:
feature_names = self.numerical_features
if isinstance(feature_names, list):
num_plots = int(len(feature_names))
rows = int(num_plots / self.__num_plots_per_row)
if num_plots % self.__num_plots_per_row != 0:
rows += 1
if num_plots >= self.__num_plots_per_row:
cols = self.__num_plots_per_row
else:
cols = num_plots
plots_left = num_plots
for j in range(rows):
plt.figure(figsize=(14, 3))
for i in range(min(self.__num_plots_per_row, plots_left)):
plt.subplot(1, cols, i + 1)
self.__plot_double_hist(
feature_names[i + (j * self.__num_plots_per_row)])
plots_left -= 1
plt.show()
else:
self.__plot_double_hist(feature_names, category)
[docs] def plot_importance(self,
num_features=None,
num_neighbors=None,
abs_imp=False):
"""
Plots the NUMERICAL features importance, using the ReliefF algorithm as
implemented in the `rebate` library.
Args:
num_features: The nr of features we want to display. Default is
all features.
num_neighbors: The nr of neighbors to consider when computing the
features importance. Default is 20.
abs_imp: if True, importance is displayed taking the ABS()
Default value is False.
Returns:
None
"""
if num_features is None:
num_features = len(self.numerical_features)
if num_neighbors is None:
num_neighbors = 20
vbles_importance = self.features_importance(num_features,
num_neighbors,
abs_imp)
top_features = list(vbles_importance.keys())
importances = list(vbles_importance.values())
plt.figure(figsize=(8, 8))
plt.title("Numerical Features importance (ReliefF)")
plt.barh(range(num_features), importances,
color="#c1d9eb",
xerr=np.std(importances),
align="center")
plt.yticks(range(num_features), top_features)
plt.ylim([-1, num_features])
plt.show()
[docs] def plot_covariance(self):
"""
Plots the covariance matrix as explained by scikit contributor
Andreas Mueller in Columbia lectures, ordering and grouping
(numerical) features with higher correlation.
Returns:
None
"""
if len(self.numerical_features) == 0:
raise ValueError('No numerical features to plot.')
X = scale(self.select('numerical'))
cov = np.cov(X, rowvar=False)
order = np.array(
hierarchy.dendrogram(hierarchy.ward(cov), no_plot=True)['ivl'])
order = order.astype(np.int)
ordered_features = [self.numerical_features[i] for i in order]
plt.figure(figsize=(8, 8), dpi=100)
plt.title('Covariance Matrix for numerical features')
plt.imshow(cov[order, :][:, order])
plt.colorbar(shrink=0.8)
plt.xticks(range(X.shape[1]), ordered_features)
plt.yticks(range(X.shape[1]), ordered_features)
plt.show()
#
# Private Methods
#
def __assert_list_of_numericals(self, to_convert):
if to_convert is not None:
# The list of columns is always a list, although a single
# argument is passed.
if isinstance(to_convert, list) is not True:
to_convert = [to_convert]
# Safety check
for feature in to_convert:
assert feature in self.numerical_features, \
'Feature {} is not numerical.'.format(feature)
else:
to_convert = self.features.select_dtypes(
include=[np.number]).columns.tolist()
return to_convert
def __assert_category_values(self, category):
assert category is not None or self.target is not None, \
'Category cannot be None. Set target or categorical variable first'
if self.target is not None:
if category is None or self.target.name == category:
categories = self.target.unique()
category_series = self.target
else:
raise ValueError('Target variable not set.')
else:
assert category in list(self.categorical), \
'"{}" must be a categorical feature'.format(category)
categories = self.features[category].unique()
category_series = self.features[category]
return categories, category_series
@staticmethod
def __numerical_description(feature):
"""
Build a dictionary with the main numerical descriptors for a feature.
:param feature: The feature (column) to be analyzed
:return: A dictionary with the indicators and its values.
"""
description = dict()
description['Min.'] = np.min(feature)
description['1stQ'] = np.percentile(feature, 25)
description['Med.'] = np.median(feature)
description['Mean'] = np.mean(feature)
description['3rdQ'] = np.percentile(feature, 75)
description['Max.'] = np.max(feature)
return description
@staticmethod
def __describe_categorical(feature, inline=False):
"""
Describe a categorical column by printing num classes and proportion
metrics.
Args:
feature: The categorical feature to be described.
inline: Print out without newlines.
"""
num_categories = feature.nunique()
cat_names = feature.unique()
cat_counts = feature.value_counts().values
cat_proportion = [count / cat_counts.sum()
for count in cat_counts]
if inline is False:
print('\'', feature.name, '\' (', feature.dtype.name, ')', sep='')
print(' {} categories'.format(num_categories))
for cat in range(len(cat_proportion)):
print(' · \'{}\': {} ({:.04})'.format(
cat_names[cat], cat_counts[cat], cat_proportion[cat]))
else:
if num_categories <= 4:
max_categories = num_categories
trail = ''
else:
max_categories = 4
trail = '...'
header = '{:d} categs. '.format(num_categories)
body = '\'{}\'({:d}, {:.4f}) ' * max_categories
values = [(cat_names[cat], cat_counts[cat], cat_proportion[cat])
for cat in range(max_categories)]
values_flattened = list(sum(values, ()))
body_formatted = body.format(*values_flattened)
return header + body_formatted + trail
@staticmethod
def __describe_numerical(feature, inline=False):
"""
Describe a numerical column by printing min, max, med, mean, 1Q, 3Q
:param feature: The numerical feature to be described.
:param inline: Default False. Controls whether the description is
generated in a single line (compact) or paragraph mode.
:return: nothing
"""
description = Dataset.__numerical_description(feature)
if inline is False:
print('\'', feature.name, '\'', sep='')
for k, v in description.items():
print(' · {:<4s}: {:.04f}'.format(k, v))
return
else:
body = ('{}({:<.4}) ' * len(description))[:-1]
values = [(k, str(description[k])) for k in description]
values_flattened = list(sum(values, ()))
body_formatted = body.format(*values_flattened)
return body_formatted
def __plot_double_density(self, feature, category=None):
"""
Plots a double density plot with the feature specified
"""
# Get the list of categories
categories, category_series = self.__assert_category_values(category)
assert feature in self.numerical, '"Feature" must be numerical.'
# plot a density for each value of the category
for value in categories:
sns.distplot(self.features[feature][category_series == value],
hist=False, kde=True,
kde_kws={'shade': True},
label=str(value))
def __plot_double_hist(self, feature, category=None):
"""
Plots a double histogram for feature name passed.
"""
# Get the list of categories
categories, category_series = self.__assert_category_values(category)
assert feature in self.numerical, '"Feature" must be numerical.'
# plot a density for each value of the category
for value in categories:
sns.distplot(self.features[feature][category_series == value],
hist=True, kde=False,
kde_kws={'shade': True},
label=str(value))
plt.legend(loc='best')