from abc import ABC, abstractmethod
import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.utils import check_random_state
from sklearn.utils.multiclass import check_classification_targets
from sklearn.utils.validation import check_is_fitted, check_array
from .. import distances
from .. import solvers
from ..solvers import SolverBaseClass
from .._utils import init_class
from ..distances import DistanceBaseClass
from ..objectives import GeneralizedLearningObjective
from typing import Tuple, Union, List
ModelParamsType = np.ndarray
[docs]class LVQBaseClass(
ABC, BaseEstimator, ClassifierMixin
): # lgtm [py/conflicting-attributes]
"""Learning Vector Quantization base class
Abstract class for implementing LVQ models. It provides abstract methods with
expected call signatures.
Provides a common interface to the solver and other function that require access to the
models. Additionally, it implements a number of functions shared by the currently implemented
LVQ variations.
See also
--------
GLVQ, GMLVQ, LGMLVQ
"""
# Public attributes
prototypes_: np.ndarray
# "Private" attributes
_distance: Union[DistanceBaseClass, object]
_objective: GeneralizedLearningObjective
_solver: Union[SolverBaseClass, object]
# Related to model parameters
_prototypes_size: int
_prototypes_shape: Tuple
# _variables stores the model parameters (e.g., prototypes) in 1D format. The
# get_prototypes() and other model parameter functions should return a view into this 1D
# array. Likewise set_prototypes() overwrites the _variables array.
_variables: np.ndarray
[docs] def __init__(
self,
distance_type: Union[str, type] = "squared-euclidean",
distance_params: dict = None,
valid_distances: List[str] = None,
solver_type: Union[str, type] = "steepest-gradient-descent",
solver_params: dict = None,
valid_solvers: List[str] = None,
prototype_init: Union[str, np.ndarray] = "class-conditional-mean",
prototype_n_per_class: int = 1,
random_state: Union[int, np.random.RandomState] = None,
force_all_finite: Union[str, bool] = True,
):
self.distance_type = distance_type
self.distance_params = distance_params
self.valid_distances = valid_distances
self.solver_type = solver_type
self.solver_params = solver_params
self.valid_solvers = valid_solvers
self.prototype_init = prototype_init
self.prototype_n_per_class = prototype_n_per_class
self.random_state = random_state
self.force_all_finite = force_all_finite
###########################################################################################
# The "Getter" and "Setter" that are used by the solvers to set and get model params.
###########################################################################################
[docs] def get_variables(self) -> np.ndarray:
r"""
Returns the ``self._variables`` array that owns the memory allocated for the model
parameters.
Returns
-------
_variables : ndarray
returns the model's _variables array.
"""
return self._variables
[docs] def set_variables(self, new_variables: np.ndarray) -> None:
r"""
Modifies the ``self._variables`` by copying the values of ``new_variables`` into the
memory of ``self._variables``.
Parameters
----------
new_variables : ndarray
1d numpy array that contains all the model parameters in continuous memory
"""
np.copyto(self._variables, new_variables)
[docs] @abstractmethod
def get_model_params(self) -> Union[tuple, np.ndarray]:
r"""
Should return a view or tuple of views (in correct shape) of the model's parameters.
Implementation depends on specific model as model parameters may differ per model.
Returns
-------
ndarray or tuple
View or tuple of views of the model's parameters.
"""
raise NotImplementedError("You should implement this!")
[docs] @abstractmethod
def set_model_params(self, new_model_params: Union[tuple, np.ndarray]):
r"""
Should modify the ``self._variables`` array. Accepts the new_model_params in the shape of
the model's parameters, e.g., prototypes or (prototypes, relevance_matrix).
Always needs to be all the model parameters, can not be used for partial updates.
Parameters
----------
new_model_params : ndarray or tuple
Array or tuple of arrays of the new model's parameters.
"""
raise NotImplementedError("You should implement this!")
###########################################################################################
# Specific "getters" and "setters" for the prototypes shared by every LVQ model.
###########################################################################################
[docs] def get_prototypes(self) -> np.ndarray:
r"""
Return a view into ``self._variables`` of the the shape of the prototypes (n_prototypes,
n_features). At the moment only consistency function, does not actually create the shape
and only works after ``self.prototypes_`` has been set.
Returns
-------
prototypes : ndarray of shape (n_prototypes, n_features)
View into ``self._variables`` with shape specified above.
"""
return self.prototypes_
[docs] def set_prototypes(self, new_prototypes: np.ndarray) -> None:
r"""
Accepts a new_prototypes array with the same shape as ``self.prototypes_`` and overwrites
the ``self._variables`` array by copying the values of the new_prototypes.
Parameters
----------
new_prototypes : ndarray of shape (n_prototypes, n_features)
The new prototypes the model should store.
"""
np.copyto(self.prototypes_, new_prototypes)
###########################################################################################
# Functions to transform the 1D variables array to model parameters and back
###########################################################################################
[docs] @abstractmethod
def to_model_params_view(self, var_buffer: np.ndarray) -> Union[tuple, np.ndarray]:
r"""
Should return a single view into the var_buffer or a tuple of views. This depends on the
model and its parameters.
Parameters
----------
var_buffer : ndarray
Array with the same size as the model's variables array as returned
by ``get_variables()``.
Returns
-------
ndarray or tuple
Should return a view or tuple of views of the model parameters in appropriate shapes.
"""
raise NotImplementedError("You should implement this!")
[docs] @abstractmethod
def to_prototypes_view(self, var_buffer: np.ndarray) -> np.ndarray:
r"""
Should return the prototypes from the provided var_buffer. I.e., it selects/views the
appropriate part of memory and reshapes it.
Parameters
----------
var_buffer : ndarray
Array with the same size as the model's variables array as returned
by ``get_variables()``.
Returns
-------
ndarray of shape (n_prototypes, n_features)
View into the var_buffer.
"""
raise NotImplementedError("You should implement this!")
###########################################################################################
# Solver Normalization functions
###########################################################################################
[docs] @abstractmethod
def normalize_variables(self, var_buffer: np.ndarray) -> None:
r"""
Should modify the var_buffer as if it was the variables array provided
by ``get_variables()``.
Parameters
----------
var_buffer : ndarray
Array with the same size as the model's variables array as returned
by ``get_variables()``.
Returns
-------
ndarray or tuple
Same shape and size as input, but normalized. How to normalize depends on model
implementation.
"""
raise NotImplementedError("You should implement this!")
@staticmethod
def _normalize_prototypes(prototypes: np.ndarray) -> None:
r"""
Normalizes the provided prototypes array, i.e., it writes to the same memory. Performs
the following normalization step for each prototype :math:`\mathbf{w_i}`:
..math::
\mathbf{w}_i / || \mathbf{w}_i ||
Parameters
----------
prototypes : ndarray of shape (n_prototypes, n_features)
To be normalized prototypes.
Returns
-------
ndarray of same shape as input
Normalized prototypes.
"""
np.divide(
prototypes,
np.linalg.norm(prototypes, axis=1, keepdims=True),
out=prototypes,
)
###########################################################################################
# Solver helper functions
###########################################################################################
[docs] @abstractmethod
def add_partial_gradient(self, gradient, partial_gradient, i_prototype) -> None:
r"""
To increase performance, the distance gradient methods (should) return only the relevant values.
I.e., the gradient of the prototype i_prototype and potentially other parameters linked to
this prototype. This partial gradient needs to added (overwrite) to the correct parts of
the actual gradient and this is what this function should do.
Parameters
----------
gradient : ndarray
Same shape as the ``get_variables()`` would return.
partial_gradient : ndarray
1d array containing the partial gradient.
i_prototype : int
The index of the prototype to which the partial gradient was computed.
"""
raise NotImplementedError("You should implement this!")
[docs] @abstractmethod
def mul_step_size(
self, step_size: Union[float, np.ndarray], gradient: np.ndarray
) -> None:
r"""
Should multiply the provided gradient with the provided step size and overwrite the
values in ``gradient``. Depending on the ``step_size`` being a float or array different
step sizes are used for different model parameters (which also depends on the model if
there are more then only prototypes)
Parameters
----------
step_size : float or ndarray
The scalar or list of values containing the step sizes.
gradient : ndarray
Same shape as the ``get_variables()`` would return.
"""
raise NotImplementedError("You should implement this!")
###########################################################################################
# Initialization function
###########################################################################################
@abstractmethod
def _init_variables(self):
r"""
Should initialize the variables, 1d numpy array to hold all model parameters. Should
store these in self._variables.
"""
raise NotImplementedError("You should implement this!")
@abstractmethod
def _check_model_params(self):
r"""
Should check the model parameters. I.e., call check_prototype_params with parameters and
other model parameters that there might be.
"""
@abstractmethod
def _init_model_params(self, X, y):
r"""
Depending on the model, things such as self.prototypes_ should be initialized and set
using the set methods or one should make sure the parameters are views into the variables
array, such that variables array is changed as well.
Parameters
----------
X : ndarray, with shape (n_samples, n_features)
The X
y : ndarray, with shape (n_samples)
The labels
"""
raise NotImplementedError("You should implement this!")
def _check_prototype_params(self):
"""
Check prototype params, i.e., if the prototypes_per_class is set correctly.
Additionally, it sets the size and shape of the prototypes such that these can be used
for the creation of the ``self._variables`` and view ``self.prototypes_``.
"""
prototype_n_per_class = self.prototype_n_per_class
if isinstance(prototype_n_per_class, int):
self._prototypes_shape = (
prototype_n_per_class * self.classes_.size,
self.n_features_in_,
)
elif isinstance(prototype_n_per_class, np.ndarray):
if prototype_n_per_class.size != self.classes_.size:
raise ValueError(
"Expected the number protoypes_per_class (size = {}) to have a number of elements "
"equal to the number of classes (size = {}).",
prototype_n_per_class.size,
self.classes_.size,
)
if np.any(prototype_n_per_class <= 0.0):
raise ValueError(
"Prototypes_per_class ({}) cannot contain any values less than or equal to zero.",
prototype_n_per_class,
)
self._prototypes_shape = (
np.sum(prototype_n_per_class),
self.n_features_in_,
)
else:
raise ValueError(
"Expected prototypes_per_class to be either of type int or np.ndarray, but got type: {}",
type(prototype_n_per_class),
)
self._prototypes_size = np.prod(self._prototypes_shape)
def _init_prototypes(
self,
X: np.ndarray,
y: np.ndarray,
) -> None:
"""
Initialized the prototypes, with a small random offset, to the class conditional mean.
To be used in the _initialize_parameters function.
Parameters
----------
X : ndarray with shape (number of observations, number of dimensions)
y : ndarray with size equal to the number of observations
"""
self.prototypes_labels_ = np.repeat(
np.arange(0, self.classes_.size), self.prototype_n_per_class
)
# Sets initial value for prototypes....
self.prototypes_ = self.to_prototypes_view(self._variables)
if self.prototype_init == "class-conditional-mean":
conditional_mean = _conditional_mean(self.prototypes_labels_, X, y)
self.set_prototypes(
conditional_mean
+ (1e-4 * self.random_state_.uniform(-1, 1, conditional_mean.shape))
)
else:
raise ValueError(
"The provided value for the parameter 'prototype_init' is invalid."
)
@abstractmethod
def _init_objective(self) -> None:
"""
Should initialize the ``self._objective``. Depends on the algorithm.
"""
raise NotImplementedError("You should implement this!")
def _init_distance(self) -> None:
"""
Initializes the ``self.distance``.
"""
distance_class = init_class(
distances,
self.distance_type,
valid_class_types=self.valid_distances,
)
if self.distance_params is not None:
self._distance = distance_class(**self.distance_params)
else:
self._distance = distance_class()
def _init_solver(self) -> None:
"""
Should initialize the ``self._solver``. Depends on the algorithm.
"""
solver_class = init_class(solvers, self.solver_type, self.valid_solvers)
if self.solver_params is not None:
self._solver = solver_class(self._objective, **self.solver_params)
else:
self._solver = solver_class(self._objective)
###########################################################################################
# Data and label validation
###########################################################################################
def _check_data_and_labels(
self, X: np.ndarray, labels: np.ndarray
) -> Tuple[np.ndarray, np.ndarray]:
"""
Functions performs a series of check mostly by using sklearn util functions.
Additionally, it transform the labels into indexes of unique labels, which are stored in
self.classes_.
Checks X and labels for consistent length, enforces X to be 2D and labels
1D. By default, X is checked to be non-empty and containing only finite values. Standard
input checks are also applied to labels, such as checking that labels does not have
np.nan or np.inf targets.
- sklearn check_X_y()
Ensure that target labels are of a non-regression type.
Only the following target types (as defined in sklearn.utils.multiclass.type_of_target)
are allowed:
'binary', 'multiclass', 'multiclass-multioutput',
'multilabel-indicator', 'multilabel-sequences'.
- sklearn check_classification_targets()
Parameters
----------
X : ndarray of shape (number of observations, number of dimensions)
labels : ndarray of size (number of observations)
Returns
-------
X : ndarray with same shape (and values) as input
labels : ndarray of indexes to self.classes_
"""
# Check X
X, labels = self._validate_data(
X, labels, force_all_finite=self.force_all_finite
)
# Check classification targets
check_classification_targets(labels)
# Store unique provided labels in self.classes_ and transform the labels into a index
# that can be used to reconstruct the original labels.
self.classes_, labels = np.unique(labels, return_inverse=True)
# Raise an error when the targets only contain a single unique class.
if self.classes_.size <= 1:
raise ValueError("Classifier can't train when only one class is present.")
return X, labels
###########################################################################################
# Before and after fit/solve function (initialization of things)
###########################################################################################
def _before_fit(self, X: np.ndarray, y: np.ndarray):
"""
Should initialize:
1. self._variables and algorithm specific parameters which should be
views into self._variables (share memory).
2. The distance function in self.distance
3. The objective function in self._objective
4. The solver function in self._solver
Parameters
----------
X : ndarray, with shape (n_samples, n_features)
The X
y : ndarray, with shape (n_samples)
The labels
"""
self._check_model_params()
# Initializes the 1D block of continuous memory to hold the model params.
self._init_variables()
# Initialize algorithm specific parameters.
self._init_model_params(X, y)
self._init_distance()
self._init_objective()
self._init_solver()
def _after_fit(self, X: np.ndarray, y: np.ndarray):
"""
Method that by default does nothing but can be used by methods that need to compute
transformation matrices, such that this does not need to be checked or done everytime
transform() is called (e.g., for GMLVQ and LGMLVQ)
Parameters
----------
X : ndarray with shape (number of observations, number of dimensions)
y : ndarray with size equal to the number of observations
"""
###########################################################################################
# Public API functions
###########################################################################################
[docs] def fit(self, X: np.ndarray, y: np.ndarray):
"""Fit function that provides the general implementation of the LVQ algorithms. It checks the data, calls
before_fit method, calls the solve method of the solver, and the after_fit method.
Parameters
----------
X : ndarray of shape (number of observations, number of dimensions)
y : ndarray of size (number of observations)
Returns
-------
self
The trained model
"""
# Check X and check and transform labels.
X, y_index = self._check_data_and_labels(X, y)
# Initialize random_state_ that should be used to perform any rng.
self.random_state_ = check_random_state(self.random_state)
# Before solve (handles initialization of things)
self._before_fit(X, y_index)
self._solver.solve(X, y_index, self)
# After solve (handles initialization of things that can only be done after fit)
self._after_fit(X, y_index)
return self
def _multiclass_decision_function(self, X: np.ndarray):
"""
Computes the decision values and returns shape (n_observations, n_classes). The values
are constructed by computing: the distance between an observation and the prototype with
a different label minus the distance between that observation and the closest prototype
with the same label, where "the same" is defined to be the index of the column the value
is being computed for.
Parameters
----------
X : ndarray
The data.
Returns
-------
ndarray of shape (n_observations, n_classes)
"""
# Of shape n_observations , n_prototypes
distances = self._distance(X, self)
# Allocation n_observations, n_classes
decision_values = np.zeros((X.shape[0], self.classes_.size))
# return n_observations, n_classes
for i, _ in enumerate(self.classes_):
decision_values[:, i] = distances[:, self.prototypes_labels_ != i].min(
axis=1
) - distances[:, self.prototypes_labels_ == i].min(axis=1)
return decision_values
[docs] def decision_function(self, X: np.ndarray):
"""
Evaluates the decision function for the samples in X.
Shape for binary class is (n_observations,) with the decision values for the "greater"
class. In the multiclass case it returns decision values for each class and therefore has
the shape (n_observations, n_classes).
Parameters
----------
X : ndarray
The data.
Returns
-------
decision_values : ndarray
Binary case shape is (n_observations,) and the multiclass case (n_observations,
n_classes)
"""
# SciKit-learn list of checked params before predict
check_is_fitted(self)
# Input validation
X = check_array(X, force_all_finite=self.force_all_finite)
decision_values = self._multiclass_decision_function(X)
if self.classes_.size == 2:
# The decision function needs to return (n_samples,) for the "greater" class.
return decision_values[:, 1]
# else it should return (n_samples, n_classes)
return decision_values
[docs] def predict_proba(self, X: np.ndarray):
"""
Parameters
----------
X : ndarray
The data.
Returns
-------
confidence_scores : ndarray of shape (n_observations, n_classes)
"""
# SciKit-learn list of checked params before predict
check_is_fitted(self)
# Input validation
X = check_array(X, force_all_finite=self.force_all_finite)
# Between -1 and 1
decision_values = self._multiclass_decision_function(X)
# Softmax function (keeps the same scipy.stats.rankdata)
# Very arbitrary 0.01, which also might not always work?
exp_decision_values = np.exp(0.01 * decision_values)
return exp_decision_values / np.sum(exp_decision_values, axis=1)[:, np.newaxis]
[docs] def predict(self, X: np.ndarray):
"""Predict function
The decision is made for the label of the prototype with the minimum decision value,
as provided by the ``decision_function()``.
Parameters
----------
X : ndarray
The data.
Returns
-------
ndarray of shape (n_observations)
Returns the predicted labels.
"""
# SciKit-learn list of checked params before predict
check_is_fitted(self)
# Input validation
# X = self._validate_data(X, force_all_finite=self.force_all_finite)
decision_values = self.decision_function(X)
if self.classes_.size == 2:
return self.classes_[(decision_values > 0).astype(np.int)]
# Lower value is the closest prototype.
return self.classes_[decision_values.argmax(axis=1)]
def _conditional_mean(p_labels: np.ndarray, data: np.ndarray, d_labels: np.ndarray):
""" Implements the conditional mean (ignoring nan values), i.e., mean per class"""
return np.array(
[np.nanmean(data[p_label == d_labels, :], axis=0) for p_label in p_labels]
)