Module pairwiseprediction.classifier
Expand source code
import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import StratifiedKFold, cross_val_score
from sklearn.utils.validation import check_is_fitted
from pairwiseprediction.combination import pairwise_diff, pairwise_hstack
from pairwiseprediction.interpolation import interpolate_for_classification
class PairwiseClassifier(BaseEstimator, ClassifierMixin):
r"""
:param algorithm: Class of algorithm to predict pairs internally.
:param pairwise: Type of combination: "difference" or "concatenation".
:param threshold: How much difference between target values should be considered as relevant within a pair?
:param proportion: Is the threshold an absolute value (difference) or relative value (proportion)?
:param center: Default value is the mean of the training sample.
:param only_relevant_pairs_on_prediction: Whether to keep only relevant differences during interpolation.
:param n_jobs: Number of processess in parallel
:param kwargs: Arguments for user-provided `algorithm`.
>>> import numpy as np
>>> from sklearn.datasets import load_diabetes
>>> from sklearn.ensemble import RandomForestClassifier
>>> a, b = load_diabetes(return_X_y=True)
>>> me = np.mean(b)
>>> # noinspection PyUnresolvedReferences
>>> y = (b > me).astype(int)
>>> alg = RandomForestClassifier(n_estimators=3, random_state=0, n_jobs=-1)
>>> np.mean(cross_val_score(alg, a, y, cv=StratifiedKFold(n_splits=2))) # doctest:+ELLIPSIS +NORMALIZE_WHITESPACE
0.6...
>>> c = b.reshape(len(b), 1)
>>> X = np.hstack([a, c])
>>> alg = PairwiseClassifier(n_estimators=3, threshold=20, only_relevant_pairs_on_prediction=False, random_state=0, n_jobs=-1)
>>> np.mean(cross_val_score(alg, X, y, cv=StratifiedKFold(n_splits=2))) # doctest:+ELLIPSIS +NORMALIZE_WHITESPACE
0.7...
>>> alg = alg.fit(X[:80])
>>> alg.predict(X[:2])
array([1, 0])
>>> alg.predict(X[:6], paired_rows=True)
array([1, 0, 0, 1, 1, 0])
>>> p1 = alg.predict(X[80::2])
>>> p2 = alg.predict(X[80:], paired_rows=True)[::2]
>>> a = X[80::2,-1]
>>> b = X[81::2,-1]
>>> np.sum(p1 == y[80::2]) / len(p1) # doctest:+ELLIPSIS +NORMALIZE_WHITESPACE
0.7...
>>> np.sum(p2 == (a>b)) / len(p2) # doctest:+ELLIPSIS +NORMALIZE_WHITESPACE
0.6...
"""
def __init__(self, algorithm=RandomForestClassifier, pairwise="concatenation", threshold=0, proportion=False, center=None, only_relevant_pairs_on_prediction=False, **kwargs):
self.algorithm = algorithm
self.kwargs = kwargs
self.pairwise = pairwise
self.threshold = threshold
self.proportion = proportion
self.center = center
self.only_relevant_pairs_on_prediction = only_relevant_pairs_on_prediction
self._estimator = None
# def get_params(self, deep=False):
# return {} # "n_estimators": self.n_estimators, "n_jobs": self.n_jobs, "random_state": self.random_state, "diff": self.diff}
def fit(self, X, y=None):
"""
WARNING: y is ignored; permutation test wont work
TODO: see if permutation test accept pandas; use index to fix warning above
:param X: Last column is the continuous target.
:param y: Ignored.
:return:
"""
return self.fit_(X)
def fit_(self, X, y=None, extra_kwargs=None):
if extra_kwargs is None:
extra_kwargs = {}
self.Xw = X if isinstance(X, np.ndarray) else np.array(X)
X = y = None
if self.center is None:
self.center = np.mean(self.Xw[:, -1])
handle_last_as_y = "%" if self.proportion else True
filter = lambda tmp: (tmp[:, -1] < -self.threshold) | (tmp[:, -1] >= self.threshold)
pairwise = True
if self.pairwise == "difference":
pairs = lambda a, b: pairwise_diff(a, b, pct=handle_last_as_y == "%")
elif self.pairwise == "concatenation":
pairs = lambda a, b: pairwise_hstack(a, b, handle_last_as_y=handle_last_as_y)
elif self.pairwise == "none":
if self.proportion:
raise Exception(f"Just use delta=9 instead of pct,delta=0.1 (assuming you are looking for 20% increase")
boo = self.Xw[(self.Xw[:, -1] < self.center - self.threshold) | (self.Xw[:, -1] >= self.center + self.threshold)]
self.Xw = self.Xw[boo]
pairwise = False
pairs = None
else:
raise Exception(f"Not implemented for {self.pairwise=}")
# self.classes_ = unique_labels(y)
# sort
self.idxs = np.argsort(self.Xw[:, -1].flatten(), kind="stable").flatten()
self.Xw = self.Xw[self.idxs]
if pairwise: # pairwise transformation
tmp = pairs(self.Xw, self.Xw)
pairs_Xy_tr = tmp[filter(tmp)]
Xtr = pairs_Xy_tr[:, :-1]
ytr = (pairs_Xy_tr[:, -1] >= 0).astype(int)
else:
Xtr = self.Xw[:, :-1]
w = self.Xw[:, -1]
# noinspection PyUnresolvedReferences
ytr = (w >= self.center).astype(int)
self._estimator = self.algorithm(**self.kwargs, **extra_kwargs).fit(Xtr, ytr)
self.Xtr, self.ytr = Xtr, ytr
self.Xw_tr = self.Xw[abs(self.Xw[:, -1] - self.center) >= self.threshold] if self.only_relevant_pairs_on_prediction else self.Xw
return self
def predict_proba(self, X, paired_rows=False):
"""
:param X: Last column is discarded.
:param paired_rows:
:return:
>>> import numpy as np
>>> from sklearn.datasets import load_diabetes
>>> from sklearn.ensemble import RandomForestClassifier
>>> a, b = load_diabetes(return_X_y=True)
>>> me = np.mean(b)
>>> # noinspection PyUnresolvedReferences
>>> y = (b > me).astype(int)
>>> alg = RandomForestClassifier(n_estimators=10, random_state=0, n_jobs=-1)
>>> alg = alg.fit(a, y)
>>> y[:5], b[:5], me # doctest:+ELLIPSIS +NORMALIZE_WHITESPACE
(array([0, 0, 0, 1, 0]), array([151., 75., 141., 206., 135.]), 152...)
>>> np.round(alg.predict_proba(a[:5]), 2) # doctest:+ELLIPSIS +NORMALIZE_WHITESPACE
array([[0.7, 0.3],
[1. , 0. ],
[0.9, 0.1],
[0. , 1. ],
[1. , 0. ]])
>>> c = b.reshape(len(b), 1)
>>> X = np.hstack([a, c])
>>> alg = PairwiseClassifier(n_estimators=30, threshold=20, only_relevant_pairs_on_prediction=False, random_state=0, n_jobs=-1)
>>> alg = alg.fit(X)
>>> np.round(alg.predict_proba(X[:5]), 2) # doctest:+ELLIPSIS +NORMALIZE_WHITESPACE
array([[0.9 , 0.1 ],
[0.41, 0.59],
[0.96, 0.04],
[0.76, 0.24],
[0.74, 0.26]])
>>> b[:6]
array([151., 75., 141., 206., 135., 97.])
>>> # 151 > 75 → label=1
>>> alg.predict(X[:6], paired_rows=True)[::2]
array([1, 0, 1])
>>> alg.predict_proba(X[:6], paired_rows=True)[::2]
array([[0. , 1. ],
[1. , 0. ],
[0.1, 0.9]])
"""
return self.predict(X, paired_rows, predict_proba=True)
def predict(self, X, paired_rows=False, predict_proba=False):
"""
:param X: Last column is discarded.
:param paired_rows:
:param predict_proba:
:return:
"""
check_is_fitted(self._estimator)
Xw_ts = X if isinstance(X, np.ndarray) else np.array(X)
X = Xw_ts[:, :-1] # discard label to avoid data leakage
pairwise = True
if self.pairwise == "difference":
pairs = lambda a, b: pairwise_diff(a, b)
elif self.pairwise == "concatenation":
pairs = lambda a, b: pairwise_hstack(a, b)
elif self.pairwise == "none":
if self.proportion:
raise Exception(f"For no pairwise, just use delta=9 instead of pct,delta=0.1 (assuming you are looking for 20% increase")
pairwise = False
pairs = None
else:
raise Exception(f"Not implemented for {self.pairwise=}")
if pairwise:
targets = self.Xw_tr[:, -1]
pos_ampl, neg_ampl = (np.max(targets) - self.center, self.center - np.min(targets)) if predict_proba else (None, None)
l = []
loop = range(0, X.shape[0], 2) if paired_rows else range(X.shape[0])
for i in loop:
x = X[i : i + 1, :]
if paired_rows:
Xts = pairs(x, X[i + 1 : i + 2, :])
if predict_proba:
predicted = self._estimator.predict_proba(Xts)[0]
else:
predicted = int(self._estimator.predict(Xts)[0])
l.append(predicted)
l.append(1 - predicted)
else:
Xts = pairs(x, self.Xw_tr[:, :-1])
zts = self._estimator.predict(Xts)
# interpolation
conditions = 2 * zts - 1
z = interpolate_for_classification(targets, conditions)
if predict_proba:
d = float(z - self.center)
p = (d / pos_ampl) if d > 0 else (-d / neg_ampl)
predicted = [1 - p, p]
else:
predicted = int(z >= self.center)
l.append(predicted)
return np.array(l)
return self._estimator.predict(X)
def shap(self, xa, xb, columns, seed=0, **kwargs):
"""
Return an indexed dict {variable: (value, SHAP)} for the given pair of instances
A, B: last column is discarded.
:param xa: Test instance A.
:param xb: Test instance B.
:param columns: Column names for a single instance.
:param seed:
:return:
>>> import numpy as np
>>> from pandas import DataFrame
>>> from sklearn.datasets import load_diabetes
>>> from sklearn.ensemble import RandomForestClassifier
>>> a, b = load_diabetes(return_X_y=True)
>>> a = a[:, :3]
>>> c = b.reshape(len(b), 1)
>>> X = np.hstack([a, c])
>>> alg = PairwiseClassifier(n_estimators=3, threshold=20, only_relevant_pairs_on_prediction=False, random_state=0, n_jobs=-1)
>>> alg = alg.fit(X[:80])
>>> d = alg.shap(X[0], X[1], columns=list("abc"))
>>> d.sort()
>>> d # doctest:+ELLIPSIS +NORMALIZE_WHITESPACE
IndexedOrderedDict([('a_a', (0.038..., 0.078...)), ('a_b', (0.050..., -0.007...)), ('a_c', (0.061..., 0.079...)), ('b_a', (-0.001..., 0.085...)), ('b_b', (-0.044..., 0.025...)), ('b_c', (-0.051..., 0.237...))])
"""
check_is_fitted(self._estimator)
import dalex as dx
from indexed import Dict
from pandas import DataFrame
# TODO: difference is working ok with this concatenation of column names?
f = lambda i: [f"{i}_{col}" for col in columns]
columns = f("a") + f("b")
self._estimator.feature_names_in_ = columns
if self.pairwise == "difference":
x = pairwise_diff(xa[:-1].reshape(1, -1), xb[:-1].reshape(1, -1))
elif self.pairwise == "concatenation":
x = pairwise_hstack(xa[:-1].reshape(1, -1), xb[:-1].reshape(1, -1))
else:
raise Exception(f"Not implemented for {self.pairwise=}")
x = DataFrame(x, columns=columns)
Xtr = DataFrame(self.Xtr, columns=columns)
explainer = dx.Explainer(model=self._estimator, data=Xtr, y=self.ytr, verbose=False)
predictparts = dx.Explainer.predict_parts(explainer, new_observation=x, type="shap", random_state=seed, **kwargs)
zz = zip(predictparts.result["variable"], predictparts.result["contribution"])
var__val_shap = Dict((name_val.split(" = ")[0], (float(name_val.split(" = ")[1:][0]), co)) for name_val, co in zz)
return var__val_shap
def __sklearn_is_fitted__(self):
return check_is_fitted(self._estimator)
def __repr__(self, **kwargs):
return "PW" + repr(self.algorithm)
def __sklearn_clone__(self):
return PairwiseClassifier(self.algorithm, self.pairwise, self.threshold, self.proportion, self.center, self.only_relevant_pairs_on_prediction, **self.kwargs)
Classes
class PairwiseClassifier (algorithm=sklearn.ensemble._forest.RandomForestClassifier, pairwise='concatenation', threshold=0, proportion=False, center=None, only_relevant_pairs_on_prediction=False, **kwargs)
-
:param algorithm: Class of algorithm to predict pairs internally. :param pairwise: Type of combination: "difference" or "concatenation". :param threshold: How much difference between target values should be considered as relevant within a pair? :param proportion: Is the threshold an absolute value (difference) or relative value (proportion)? :param center: Default value is the mean of the training sample. :param only_relevant_pairs_on_prediction: Whether to keep only relevant differences during interpolation. :param n_jobs: Number of processess in parallel :param kwargs: Arguments for user-provided
algorithm
.>>> import numpy as np >>> from sklearn.datasets import load_diabetes >>> from sklearn.ensemble import RandomForestClassifier >>> a, b = load_diabetes(return_X_y=True) >>> me = np.mean(b) >>> # noinspection PyUnresolvedReferences >>> y = (b > me).astype(int) >>> alg = RandomForestClassifier(n_estimators=3, random_state=0, n_jobs=-1) >>> np.mean(cross_val_score(alg, a, y, cv=StratifiedKFold(n_splits=2))) # doctest:+ELLIPSIS +NORMALIZE_WHITESPACE 0.6... >>> c = b.reshape(len(b), 1) >>> X = np.hstack([a, c]) >>> alg = PairwiseClassifier(n_estimators=3, threshold=20, only_relevant_pairs_on_prediction=False, random_state=0, n_jobs=-1) >>> np.mean(cross_val_score(alg, X, y, cv=StratifiedKFold(n_splits=2))) # doctest:+ELLIPSIS +NORMALIZE_WHITESPACE 0.7... >>> alg = alg.fit(X[:80]) >>> alg.predict(X[:2]) array([1, 0]) >>> alg.predict(X[:6], paired_rows=True) array([1, 0, 0, 1, 1, 0]) >>> p1 = alg.predict(X[80::2]) >>> p2 = alg.predict(X[80:], paired_rows=True)[::2] >>> a = X[80::2,-1] >>> b = X[81::2,-1] >>> np.sum(p1 == y[80::2]) / len(p1) # doctest:+ELLIPSIS +NORMALIZE_WHITESPACE 0.7... >>> np.sum(p2 == (a>b)) / len(p2) # doctest:+ELLIPSIS +NORMALIZE_WHITESPACE 0.6...
Expand source code
class PairwiseClassifier(BaseEstimator, ClassifierMixin): r""" :param algorithm: Class of algorithm to predict pairs internally. :param pairwise: Type of combination: "difference" or "concatenation". :param threshold: How much difference between target values should be considered as relevant within a pair? :param proportion: Is the threshold an absolute value (difference) or relative value (proportion)? :param center: Default value is the mean of the training sample. :param only_relevant_pairs_on_prediction: Whether to keep only relevant differences during interpolation. :param n_jobs: Number of processess in parallel :param kwargs: Arguments for user-provided `algorithm`. >>> import numpy as np >>> from sklearn.datasets import load_diabetes >>> from sklearn.ensemble import RandomForestClassifier >>> a, b = load_diabetes(return_X_y=True) >>> me = np.mean(b) >>> # noinspection PyUnresolvedReferences >>> y = (b > me).astype(int) >>> alg = RandomForestClassifier(n_estimators=3, random_state=0, n_jobs=-1) >>> np.mean(cross_val_score(alg, a, y, cv=StratifiedKFold(n_splits=2))) # doctest:+ELLIPSIS +NORMALIZE_WHITESPACE 0.6... >>> c = b.reshape(len(b), 1) >>> X = np.hstack([a, c]) >>> alg = PairwiseClassifier(n_estimators=3, threshold=20, only_relevant_pairs_on_prediction=False, random_state=0, n_jobs=-1) >>> np.mean(cross_val_score(alg, X, y, cv=StratifiedKFold(n_splits=2))) # doctest:+ELLIPSIS +NORMALIZE_WHITESPACE 0.7... >>> alg = alg.fit(X[:80]) >>> alg.predict(X[:2]) array([1, 0]) >>> alg.predict(X[:6], paired_rows=True) array([1, 0, 0, 1, 1, 0]) >>> p1 = alg.predict(X[80::2]) >>> p2 = alg.predict(X[80:], paired_rows=True)[::2] >>> a = X[80::2,-1] >>> b = X[81::2,-1] >>> np.sum(p1 == y[80::2]) / len(p1) # doctest:+ELLIPSIS +NORMALIZE_WHITESPACE 0.7... >>> np.sum(p2 == (a>b)) / len(p2) # doctest:+ELLIPSIS +NORMALIZE_WHITESPACE 0.6... """ def __init__(self, algorithm=RandomForestClassifier, pairwise="concatenation", threshold=0, proportion=False, center=None, only_relevant_pairs_on_prediction=False, **kwargs): self.algorithm = algorithm self.kwargs = kwargs self.pairwise = pairwise self.threshold = threshold self.proportion = proportion self.center = center self.only_relevant_pairs_on_prediction = only_relevant_pairs_on_prediction self._estimator = None # def get_params(self, deep=False): # return {} # "n_estimators": self.n_estimators, "n_jobs": self.n_jobs, "random_state": self.random_state, "diff": self.diff} def fit(self, X, y=None): """ WARNING: y is ignored; permutation test wont work TODO: see if permutation test accept pandas; use index to fix warning above :param X: Last column is the continuous target. :param y: Ignored. :return: """ return self.fit_(X) def fit_(self, X, y=None, extra_kwargs=None): if extra_kwargs is None: extra_kwargs = {} self.Xw = X if isinstance(X, np.ndarray) else np.array(X) X = y = None if self.center is None: self.center = np.mean(self.Xw[:, -1]) handle_last_as_y = "%" if self.proportion else True filter = lambda tmp: (tmp[:, -1] < -self.threshold) | (tmp[:, -1] >= self.threshold) pairwise = True if self.pairwise == "difference": pairs = lambda a, b: pairwise_diff(a, b, pct=handle_last_as_y == "%") elif self.pairwise == "concatenation": pairs = lambda a, b: pairwise_hstack(a, b, handle_last_as_y=handle_last_as_y) elif self.pairwise == "none": if self.proportion: raise Exception(f"Just use delta=9 instead of pct,delta=0.1 (assuming you are looking for 20% increase") boo = self.Xw[(self.Xw[:, -1] < self.center - self.threshold) | (self.Xw[:, -1] >= self.center + self.threshold)] self.Xw = self.Xw[boo] pairwise = False pairs = None else: raise Exception(f"Not implemented for {self.pairwise=}") # self.classes_ = unique_labels(y) # sort self.idxs = np.argsort(self.Xw[:, -1].flatten(), kind="stable").flatten() self.Xw = self.Xw[self.idxs] if pairwise: # pairwise transformation tmp = pairs(self.Xw, self.Xw) pairs_Xy_tr = tmp[filter(tmp)] Xtr = pairs_Xy_tr[:, :-1] ytr = (pairs_Xy_tr[:, -1] >= 0).astype(int) else: Xtr = self.Xw[:, :-1] w = self.Xw[:, -1] # noinspection PyUnresolvedReferences ytr = (w >= self.center).astype(int) self._estimator = self.algorithm(**self.kwargs, **extra_kwargs).fit(Xtr, ytr) self.Xtr, self.ytr = Xtr, ytr self.Xw_tr = self.Xw[abs(self.Xw[:, -1] - self.center) >= self.threshold] if self.only_relevant_pairs_on_prediction else self.Xw return self def predict_proba(self, X, paired_rows=False): """ :param X: Last column is discarded. :param paired_rows: :return: >>> import numpy as np >>> from sklearn.datasets import load_diabetes >>> from sklearn.ensemble import RandomForestClassifier >>> a, b = load_diabetes(return_X_y=True) >>> me = np.mean(b) >>> # noinspection PyUnresolvedReferences >>> y = (b > me).astype(int) >>> alg = RandomForestClassifier(n_estimators=10, random_state=0, n_jobs=-1) >>> alg = alg.fit(a, y) >>> y[:5], b[:5], me # doctest:+ELLIPSIS +NORMALIZE_WHITESPACE (array([0, 0, 0, 1, 0]), array([151., 75., 141., 206., 135.]), 152...) >>> np.round(alg.predict_proba(a[:5]), 2) # doctest:+ELLIPSIS +NORMALIZE_WHITESPACE array([[0.7, 0.3], [1. , 0. ], [0.9, 0.1], [0. , 1. ], [1. , 0. ]]) >>> c = b.reshape(len(b), 1) >>> X = np.hstack([a, c]) >>> alg = PairwiseClassifier(n_estimators=30, threshold=20, only_relevant_pairs_on_prediction=False, random_state=0, n_jobs=-1) >>> alg = alg.fit(X) >>> np.round(alg.predict_proba(X[:5]), 2) # doctest:+ELLIPSIS +NORMALIZE_WHITESPACE array([[0.9 , 0.1 ], [0.41, 0.59], [0.96, 0.04], [0.76, 0.24], [0.74, 0.26]]) >>> b[:6] array([151., 75., 141., 206., 135., 97.]) >>> # 151 > 75 → label=1 >>> alg.predict(X[:6], paired_rows=True)[::2] array([1, 0, 1]) >>> alg.predict_proba(X[:6], paired_rows=True)[::2] array([[0. , 1. ], [1. , 0. ], [0.1, 0.9]]) """ return self.predict(X, paired_rows, predict_proba=True) def predict(self, X, paired_rows=False, predict_proba=False): """ :param X: Last column is discarded. :param paired_rows: :param predict_proba: :return: """ check_is_fitted(self._estimator) Xw_ts = X if isinstance(X, np.ndarray) else np.array(X) X = Xw_ts[:, :-1] # discard label to avoid data leakage pairwise = True if self.pairwise == "difference": pairs = lambda a, b: pairwise_diff(a, b) elif self.pairwise == "concatenation": pairs = lambda a, b: pairwise_hstack(a, b) elif self.pairwise == "none": if self.proportion: raise Exception(f"For no pairwise, just use delta=9 instead of pct,delta=0.1 (assuming you are looking for 20% increase") pairwise = False pairs = None else: raise Exception(f"Not implemented for {self.pairwise=}") if pairwise: targets = self.Xw_tr[:, -1] pos_ampl, neg_ampl = (np.max(targets) - self.center, self.center - np.min(targets)) if predict_proba else (None, None) l = [] loop = range(0, X.shape[0], 2) if paired_rows else range(X.shape[0]) for i in loop: x = X[i : i + 1, :] if paired_rows: Xts = pairs(x, X[i + 1 : i + 2, :]) if predict_proba: predicted = self._estimator.predict_proba(Xts)[0] else: predicted = int(self._estimator.predict(Xts)[0]) l.append(predicted) l.append(1 - predicted) else: Xts = pairs(x, self.Xw_tr[:, :-1]) zts = self._estimator.predict(Xts) # interpolation conditions = 2 * zts - 1 z = interpolate_for_classification(targets, conditions) if predict_proba: d = float(z - self.center) p = (d / pos_ampl) if d > 0 else (-d / neg_ampl) predicted = [1 - p, p] else: predicted = int(z >= self.center) l.append(predicted) return np.array(l) return self._estimator.predict(X) def shap(self, xa, xb, columns, seed=0, **kwargs): """ Return an indexed dict {variable: (value, SHAP)} for the given pair of instances A, B: last column is discarded. :param xa: Test instance A. :param xb: Test instance B. :param columns: Column names for a single instance. :param seed: :return: >>> import numpy as np >>> from pandas import DataFrame >>> from sklearn.datasets import load_diabetes >>> from sklearn.ensemble import RandomForestClassifier >>> a, b = load_diabetes(return_X_y=True) >>> a = a[:, :3] >>> c = b.reshape(len(b), 1) >>> X = np.hstack([a, c]) >>> alg = PairwiseClassifier(n_estimators=3, threshold=20, only_relevant_pairs_on_prediction=False, random_state=0, n_jobs=-1) >>> alg = alg.fit(X[:80]) >>> d = alg.shap(X[0], X[1], columns=list("abc")) >>> d.sort() >>> d # doctest:+ELLIPSIS +NORMALIZE_WHITESPACE IndexedOrderedDict([('a_a', (0.038..., 0.078...)), ('a_b', (0.050..., -0.007...)), ('a_c', (0.061..., 0.079...)), ('b_a', (-0.001..., 0.085...)), ('b_b', (-0.044..., 0.025...)), ('b_c', (-0.051..., 0.237...))]) """ check_is_fitted(self._estimator) import dalex as dx from indexed import Dict from pandas import DataFrame # TODO: difference is working ok with this concatenation of column names? f = lambda i: [f"{i}_{col}" for col in columns] columns = f("a") + f("b") self._estimator.feature_names_in_ = columns if self.pairwise == "difference": x = pairwise_diff(xa[:-1].reshape(1, -1), xb[:-1].reshape(1, -1)) elif self.pairwise == "concatenation": x = pairwise_hstack(xa[:-1].reshape(1, -1), xb[:-1].reshape(1, -1)) else: raise Exception(f"Not implemented for {self.pairwise=}") x = DataFrame(x, columns=columns) Xtr = DataFrame(self.Xtr, columns=columns) explainer = dx.Explainer(model=self._estimator, data=Xtr, y=self.ytr, verbose=False) predictparts = dx.Explainer.predict_parts(explainer, new_observation=x, type="shap", random_state=seed, **kwargs) zz = zip(predictparts.result["variable"], predictparts.result["contribution"]) var__val_shap = Dict((name_val.split(" = ")[0], (float(name_val.split(" = ")[1:][0]), co)) for name_val, co in zz) return var__val_shap def __sklearn_is_fitted__(self): return check_is_fitted(self._estimator) def __repr__(self, **kwargs): return "PW" + repr(self.algorithm) def __sklearn_clone__(self): return PairwiseClassifier(self.algorithm, self.pairwise, self.threshold, self.proportion, self.center, self.only_relevant_pairs_on_prediction, **self.kwargs)
Ancestors
- sklearn.base.BaseEstimator
- sklearn.utils._estimator_html_repr._HTMLDocumentationLinkMixin
- sklearn.utils._metadata_requests._MetadataRequester
- sklearn.base.ClassifierMixin
Subclasses
Methods
def fit(self, X, y=None)
-
WARNING: y is ignored; permutation test wont work TODO: see if permutation test accept pandas; use index to fix warning above
:param X: Last column is the continuous target. :param y: Ignored.
:return:
Expand source code
def fit(self, X, y=None): """ WARNING: y is ignored; permutation test wont work TODO: see if permutation test accept pandas; use index to fix warning above :param X: Last column is the continuous target. :param y: Ignored. :return: """ return self.fit_(X)
def fit_(self, X, y=None, extra_kwargs=None)
-
Expand source code
def fit_(self, X, y=None, extra_kwargs=None): if extra_kwargs is None: extra_kwargs = {} self.Xw = X if isinstance(X, np.ndarray) else np.array(X) X = y = None if self.center is None: self.center = np.mean(self.Xw[:, -1]) handle_last_as_y = "%" if self.proportion else True filter = lambda tmp: (tmp[:, -1] < -self.threshold) | (tmp[:, -1] >= self.threshold) pairwise = True if self.pairwise == "difference": pairs = lambda a, b: pairwise_diff(a, b, pct=handle_last_as_y == "%") elif self.pairwise == "concatenation": pairs = lambda a, b: pairwise_hstack(a, b, handle_last_as_y=handle_last_as_y) elif self.pairwise == "none": if self.proportion: raise Exception(f"Just use delta=9 instead of pct,delta=0.1 (assuming you are looking for 20% increase") boo = self.Xw[(self.Xw[:, -1] < self.center - self.threshold) | (self.Xw[:, -1] >= self.center + self.threshold)] self.Xw = self.Xw[boo] pairwise = False pairs = None else: raise Exception(f"Not implemented for {self.pairwise=}") # self.classes_ = unique_labels(y) # sort self.idxs = np.argsort(self.Xw[:, -1].flatten(), kind="stable").flatten() self.Xw = self.Xw[self.idxs] if pairwise: # pairwise transformation tmp = pairs(self.Xw, self.Xw) pairs_Xy_tr = tmp[filter(tmp)] Xtr = pairs_Xy_tr[:, :-1] ytr = (pairs_Xy_tr[:, -1] >= 0).astype(int) else: Xtr = self.Xw[:, :-1] w = self.Xw[:, -1] # noinspection PyUnresolvedReferences ytr = (w >= self.center).astype(int) self._estimator = self.algorithm(**self.kwargs, **extra_kwargs).fit(Xtr, ytr) self.Xtr, self.ytr = Xtr, ytr self.Xw_tr = self.Xw[abs(self.Xw[:, -1] - self.center) >= self.threshold] if self.only_relevant_pairs_on_prediction else self.Xw return self
def predict(self, X, paired_rows=False, predict_proba=False)
-
:param X: Last column is discarded. :param paired_rows: :param predict_proba: :return:
Expand source code
def predict(self, X, paired_rows=False, predict_proba=False): """ :param X: Last column is discarded. :param paired_rows: :param predict_proba: :return: """ check_is_fitted(self._estimator) Xw_ts = X if isinstance(X, np.ndarray) else np.array(X) X = Xw_ts[:, :-1] # discard label to avoid data leakage pairwise = True if self.pairwise == "difference": pairs = lambda a, b: pairwise_diff(a, b) elif self.pairwise == "concatenation": pairs = lambda a, b: pairwise_hstack(a, b) elif self.pairwise == "none": if self.proportion: raise Exception(f"For no pairwise, just use delta=9 instead of pct,delta=0.1 (assuming you are looking for 20% increase") pairwise = False pairs = None else: raise Exception(f"Not implemented for {self.pairwise=}") if pairwise: targets = self.Xw_tr[:, -1] pos_ampl, neg_ampl = (np.max(targets) - self.center, self.center - np.min(targets)) if predict_proba else (None, None) l = [] loop = range(0, X.shape[0], 2) if paired_rows else range(X.shape[0]) for i in loop: x = X[i : i + 1, :] if paired_rows: Xts = pairs(x, X[i + 1 : i + 2, :]) if predict_proba: predicted = self._estimator.predict_proba(Xts)[0] else: predicted = int(self._estimator.predict(Xts)[0]) l.append(predicted) l.append(1 - predicted) else: Xts = pairs(x, self.Xw_tr[:, :-1]) zts = self._estimator.predict(Xts) # interpolation conditions = 2 * zts - 1 z = interpolate_for_classification(targets, conditions) if predict_proba: d = float(z - self.center) p = (d / pos_ampl) if d > 0 else (-d / neg_ampl) predicted = [1 - p, p] else: predicted = int(z >= self.center) l.append(predicted) return np.array(l) return self._estimator.predict(X)
def predict_proba(self, X, paired_rows=False)
-
:param X: Last column is discarded. :param paired_rows: :return:
>>> import numpy as np >>> from sklearn.datasets import load_diabetes >>> from sklearn.ensemble import RandomForestClassifier >>> a, b = load_diabetes(return_X_y=True) >>> me = np.mean(b) >>> # noinspection PyUnresolvedReferences >>> y = (b > me).astype(int) >>> alg = RandomForestClassifier(n_estimators=10, random_state=0, n_jobs=-1) >>> alg = alg.fit(a, y) >>> y[:5], b[:5], me # doctest:+ELLIPSIS +NORMALIZE_WHITESPACE (array([0, 0, 0, 1, 0]), array([151., 75., 141., 206., 135.]), 152...) >>> np.round(alg.predict_proba(a[:5]), 2) # doctest:+ELLIPSIS +NORMALIZE_WHITESPACE array([[0.7, 0.3], [1. , 0. ], [0.9, 0.1], [0. , 1. ], [1. , 0. ]]) >>> c = b.reshape(len(b), 1) >>> X = np.hstack([a, c]) >>> alg = PairwiseClassifier(n_estimators=30, threshold=20, only_relevant_pairs_on_prediction=False, random_state=0, n_jobs=-1) >>> alg = alg.fit(X) >>> np.round(alg.predict_proba(X[:5]), 2) # doctest:+ELLIPSIS +NORMALIZE_WHITESPACE array([[0.9 , 0.1 ], [0.41, 0.59], [0.96, 0.04], [0.76, 0.24], [0.74, 0.26]]) >>> b[:6] array([151., 75., 141., 206., 135., 97.]) >>> # 151 > 75 → label=1 >>> alg.predict(X[:6], paired_rows=True)[::2] array([1, 0, 1]) >>> alg.predict_proba(X[:6], paired_rows=True)[::2] array([[0. , 1. ], [1. , 0. ], [0.1, 0.9]])
Expand source code
def predict_proba(self, X, paired_rows=False): """ :param X: Last column is discarded. :param paired_rows: :return: >>> import numpy as np >>> from sklearn.datasets import load_diabetes >>> from sklearn.ensemble import RandomForestClassifier >>> a, b = load_diabetes(return_X_y=True) >>> me = np.mean(b) >>> # noinspection PyUnresolvedReferences >>> y = (b > me).astype(int) >>> alg = RandomForestClassifier(n_estimators=10, random_state=0, n_jobs=-1) >>> alg = alg.fit(a, y) >>> y[:5], b[:5], me # doctest:+ELLIPSIS +NORMALIZE_WHITESPACE (array([0, 0, 0, 1, 0]), array([151., 75., 141., 206., 135.]), 152...) >>> np.round(alg.predict_proba(a[:5]), 2) # doctest:+ELLIPSIS +NORMALIZE_WHITESPACE array([[0.7, 0.3], [1. , 0. ], [0.9, 0.1], [0. , 1. ], [1. , 0. ]]) >>> c = b.reshape(len(b), 1) >>> X = np.hstack([a, c]) >>> alg = PairwiseClassifier(n_estimators=30, threshold=20, only_relevant_pairs_on_prediction=False, random_state=0, n_jobs=-1) >>> alg = alg.fit(X) >>> np.round(alg.predict_proba(X[:5]), 2) # doctest:+ELLIPSIS +NORMALIZE_WHITESPACE array([[0.9 , 0.1 ], [0.41, 0.59], [0.96, 0.04], [0.76, 0.24], [0.74, 0.26]]) >>> b[:6] array([151., 75., 141., 206., 135., 97.]) >>> # 151 > 75 → label=1 >>> alg.predict(X[:6], paired_rows=True)[::2] array([1, 0, 1]) >>> alg.predict_proba(X[:6], paired_rows=True)[::2] array([[0. , 1. ], [1. , 0. ], [0.1, 0.9]]) """ return self.predict(X, paired_rows, predict_proba=True)
def set_predict_proba_request(self: PairwiseClassifier, *, paired_rows: Union[bool, ForwardRef(None), str] = '$UNCHANGED$') ‑> PairwiseClassifier
-
Request metadata passed to the
predict_proba
method.Note that this method is only relevant if
enable_metadata_routing=True
(see :func:sklearn.set_config
). Please see :ref:User Guide <metadata_routing>
on how the routing mechanism works.The options for each parameter are:
-
True
: metadata is requested, and passed topredict_proba
if provided. The request is ignored if metadata is not provided. -
False
: metadata is not requested and the meta-estimator will not pass it topredict_proba
. -
None
: metadata is not requested, and the meta-estimator will raise an error if the user provides it. -
str
: metadata should be passed to the meta-estimator with this given alias instead of the original name.
The default (
sklearn.utils.metadata_routing.UNCHANGED
) retains the existing request. This allows you to change the request for some parameters and not others.Added in version: 1.3
Note
This method is only relevant if this estimator is used as a sub-estimator of a meta-estimator, e.g. used inside a :class:
~sklearn.pipeline.Pipeline
. Otherwise it has no effect.Parameters
paired_rows
:str, True, False,
orNone
, default=sklearn.utils.metadata_routing.UNCHANGED
- Metadata routing for
paired_rows
parameter inpredict_proba
.
Returns
self
:object
- The updated object.
Expand source code
def func(**kw): """Updates the request for provided parameters This docstring is overwritten below. See REQUESTER_DOC for expected functionality """ if not _routing_enabled(): raise RuntimeError( "This method is only available when metadata routing is enabled." " You can enable it using" " sklearn.set_config(enable_metadata_routing=True)." ) if self.validate_keys and (set(kw) - set(self.keys)): raise TypeError( f"Unexpected args: {set(kw) - set(self.keys)}. Accepted arguments" f" are: {set(self.keys)}" ) requests = instance._get_metadata_request() method_metadata_request = getattr(requests, self.name) for prop, alias in kw.items(): if alias is not UNCHANGED: method_metadata_request.add_request(param=prop, alias=alias) instance._metadata_request = requests return instance
-
def set_predict_request(self: PairwiseClassifier, *, paired_rows: Union[bool, ForwardRef(None), str] = '$UNCHANGED$', predict_proba: Union[bool, ForwardRef(None), str] = '$UNCHANGED$') ‑> PairwiseClassifier
-
Request metadata passed to the
predict
method.Note that this method is only relevant if
enable_metadata_routing=True
(see :func:sklearn.set_config
). Please see :ref:User Guide <metadata_routing>
on how the routing mechanism works.The options for each parameter are:
-
True
: metadata is requested, and passed topredict
if provided. The request is ignored if metadata is not provided. -
False
: metadata is not requested and the meta-estimator will not pass it topredict
. -
None
: metadata is not requested, and the meta-estimator will raise an error if the user provides it. -
str
: metadata should be passed to the meta-estimator with this given alias instead of the original name.
The default (
sklearn.utils.metadata_routing.UNCHANGED
) retains the existing request. This allows you to change the request for some parameters and not others.Added in version: 1.3
Note
This method is only relevant if this estimator is used as a sub-estimator of a meta-estimator, e.g. used inside a :class:
~sklearn.pipeline.Pipeline
. Otherwise it has no effect.Parameters
paired_rows
:str, True, False,
orNone
, default=sklearn.utils.metadata_routing.UNCHANGED
- Metadata routing for
paired_rows
parameter inpredict
. predict_proba
:str, True, False,
orNone
, default=sklearn.utils.metadata_routing.UNCHANGED
- Metadata routing for
predict_proba
parameter inpredict
.
Returns
self
:object
- The updated object.
Expand source code
def func(**kw): """Updates the request for provided parameters This docstring is overwritten below. See REQUESTER_DOC for expected functionality """ if not _routing_enabled(): raise RuntimeError( "This method is only available when metadata routing is enabled." " You can enable it using" " sklearn.set_config(enable_metadata_routing=True)." ) if self.validate_keys and (set(kw) - set(self.keys)): raise TypeError( f"Unexpected args: {set(kw) - set(self.keys)}. Accepted arguments" f" are: {set(self.keys)}" ) requests = instance._get_metadata_request() method_metadata_request = getattr(requests, self.name) for prop, alias in kw.items(): if alias is not UNCHANGED: method_metadata_request.add_request(param=prop, alias=alias) instance._metadata_request = requests return instance
-
def set_score_request(self: PairwiseClassifier, *, sample_weight: Union[bool, ForwardRef(None), str] = '$UNCHANGED$') ‑> PairwiseClassifier
-
Request metadata passed to the
score
method.Note that this method is only relevant if
enable_metadata_routing=True
(see :func:sklearn.set_config
). Please see :ref:User Guide <metadata_routing>
on how the routing mechanism works.The options for each parameter are:
-
True
: metadata is requested, and passed toscore
if provided. The request is ignored if metadata is not provided. -
False
: metadata is not requested and the meta-estimator will not pass it toscore
. -
None
: metadata is not requested, and the meta-estimator will raise an error if the user provides it. -
str
: metadata should be passed to the meta-estimator with this given alias instead of the original name.
The default (
sklearn.utils.metadata_routing.UNCHANGED
) retains the existing request. This allows you to change the request for some parameters and not others.Added in version: 1.3
Note
This method is only relevant if this estimator is used as a sub-estimator of a meta-estimator, e.g. used inside a :class:
~sklearn.pipeline.Pipeline
. Otherwise it has no effect.Parameters
sample_weight
:str, True, False,
orNone
, default=sklearn.utils.metadata_routing.UNCHANGED
- Metadata routing for
sample_weight
parameter inscore
.
Returns
self
:object
- The updated object.
Expand source code
def func(**kw): """Updates the request for provided parameters This docstring is overwritten below. See REQUESTER_DOC for expected functionality """ if not _routing_enabled(): raise RuntimeError( "This method is only available when metadata routing is enabled." " You can enable it using" " sklearn.set_config(enable_metadata_routing=True)." ) if self.validate_keys and (set(kw) - set(self.keys)): raise TypeError( f"Unexpected args: {set(kw) - set(self.keys)}. Accepted arguments" f" are: {set(self.keys)}" ) requests = instance._get_metadata_request() method_metadata_request = getattr(requests, self.name) for prop, alias in kw.items(): if alias is not UNCHANGED: method_metadata_request.add_request(param=prop, alias=alias) instance._metadata_request = requests return instance
-
def shap(self, xa, xb, columns, seed=0, **kwargs)
-
Return an indexed dict {variable: (value, SHAP)} for the given pair of instances
A, B: last column is discarded.
:param xa: Test instance A. :param xb: Test instance B. :param columns: Column names for a single instance. :param seed: :return:
>>> import numpy as np >>> from pandas import DataFrame >>> from sklearn.datasets import load_diabetes >>> from sklearn.ensemble import RandomForestClassifier >>> a, b = load_diabetes(return_X_y=True) >>> a = a[:, :3] >>> c = b.reshape(len(b), 1) >>> X = np.hstack([a, c]) >>> alg = PairwiseClassifier(n_estimators=3, threshold=20, only_relevant_pairs_on_prediction=False, random_state=0, n_jobs=-1) >>> alg = alg.fit(X[:80]) >>> d = alg.shap(X[0], X[1], columns=list("abc")) >>> d.sort() >>> d # doctest:+ELLIPSIS +NORMALIZE_WHITESPACE IndexedOrderedDict([('a_a', (0.038..., 0.078...)), ('a_b', (0.050..., -0.007...)), ('a_c', (0.061..., 0.079...)), ('b_a', (-0.001..., 0.085...)), ('b_b', (-0.044..., 0.025...)), ('b_c', (-0.051..., 0.237...))])
Expand source code
def shap(self, xa, xb, columns, seed=0, **kwargs): """ Return an indexed dict {variable: (value, SHAP)} for the given pair of instances A, B: last column is discarded. :param xa: Test instance A. :param xb: Test instance B. :param columns: Column names for a single instance. :param seed: :return: >>> import numpy as np >>> from pandas import DataFrame >>> from sklearn.datasets import load_diabetes >>> from sklearn.ensemble import RandomForestClassifier >>> a, b = load_diabetes(return_X_y=True) >>> a = a[:, :3] >>> c = b.reshape(len(b), 1) >>> X = np.hstack([a, c]) >>> alg = PairwiseClassifier(n_estimators=3, threshold=20, only_relevant_pairs_on_prediction=False, random_state=0, n_jobs=-1) >>> alg = alg.fit(X[:80]) >>> d = alg.shap(X[0], X[1], columns=list("abc")) >>> d.sort() >>> d # doctest:+ELLIPSIS +NORMALIZE_WHITESPACE IndexedOrderedDict([('a_a', (0.038..., 0.078...)), ('a_b', (0.050..., -0.007...)), ('a_c', (0.061..., 0.079...)), ('b_a', (-0.001..., 0.085...)), ('b_b', (-0.044..., 0.025...)), ('b_c', (-0.051..., 0.237...))]) """ check_is_fitted(self._estimator) import dalex as dx from indexed import Dict from pandas import DataFrame # TODO: difference is working ok with this concatenation of column names? f = lambda i: [f"{i}_{col}" for col in columns] columns = f("a") + f("b") self._estimator.feature_names_in_ = columns if self.pairwise == "difference": x = pairwise_diff(xa[:-1].reshape(1, -1), xb[:-1].reshape(1, -1)) elif self.pairwise == "concatenation": x = pairwise_hstack(xa[:-1].reshape(1, -1), xb[:-1].reshape(1, -1)) else: raise Exception(f"Not implemented for {self.pairwise=}") x = DataFrame(x, columns=columns) Xtr = DataFrame(self.Xtr, columns=columns) explainer = dx.Explainer(model=self._estimator, data=Xtr, y=self.ytr, verbose=False) predictparts = dx.Explainer.predict_parts(explainer, new_observation=x, type="shap", random_state=seed, **kwargs) zz = zip(predictparts.result["variable"], predictparts.result["contribution"]) var__val_shap = Dict((name_val.split(" = ")[0], (float(name_val.split(" = ")[1:][0]), co)) for name_val, co in zz) return var__val_shap