Source code for clarans.fast_clarans

"""FastCLARANS implementation.

Provides a faster variant of CLARANS by using FastPAM1 delta-cost updates.
This implementation computes distances on-the-fly as recommended in the
original paper, making it memory-efficient for large datasets while still
benefiting from the O(k) speedup per swap evaluation.
"""

from __future__ import annotations

import warnings
from typing import TYPE_CHECKING, Any, Sequence, Tuple

import numpy as np
from numpy.typing import ArrayLike
from sklearn.metrics import pairwise_distances, pairwise_distances_argmin_min
from sklearn.utils.validation import check_array, check_random_state

from clarans.clarans import CLARANS
from clarans.initialization import (
    initialize_build,
    initialize_heuristic,
    initialize_k_medoids_plus_plus,
)

if TYPE_CHECKING:
    from scipy.sparse import spmatrix


[docs] class FastCLARANS(CLARANS): """ FastCLARANS: Fast variant of the CLARANS clustering algorithm. Parameters ---------- n_clusters : int, default=8 The number of clusters to form (also the number of medoids). numlocal : int, default=2 The number of local searches to perform. More local searches increase the chance of finding a better minimum at the cost of additional runtime. maxneighbor : int or None, default=None The maximum number of non-medoid candidates to sample per local search. If ``None``, defaults to 2.5% of non-medoid points (i.e., ``0.025 * (n - k)``) as recommended in the paper. max_iter : int or None, default=300 Maximum number of successful swaps (improvements) allowed per local search. Use ``None`` to disable this safeguard. init : {'random', 'heuristic', 'k-medoids++', 'build', array-like}, default='random' Method for initialization. If an array-like is provided it should be of shape (n_clusters, n_features) and will be snapped to the nearest points in X. metric : str or callable, default='euclidean' The distance metric passed to scikit-learn pairwise utilities. random_state : int, RandomState instance or None, default=None Controls random number generation for reproducibility. Attributes ---------- medoid_indices_ : ndarray of shape (n_clusters,) Indices of the selected medoids in the training set. cluster_centers_ : ndarray of shape (n_clusters, n_features) Coordinates of the medoids (rows from the training data). labels_ : ndarray of shape (n_samples,) Labels of each point indicating the nearest medoid. Notes ----- This implementation follows the original FastCLARANS paper by computing distances on-the-fly rather than precomputing a full distance matrix. This keeps memory usage at O(n) instead of O(n^2), making it suitable for larger datasets. The key improvement from FastCLARANS is the sampling strategy: instead of sampling random (medoid, non-medoid) pairs like CLARANS, it samples only non-medoid candidates and evaluates swaps with all k medoids at once using FastPAM1 delta formulas. This explores k edges of the search graph in the time CLARANS explores one. References ---------- Schubert, E., & Rousseeuw, P. J. (2021). Fast and eager k-medoids clustering: O(k) runtime improvement of the PAM, CLARA, and CLARANS algorithms. Information Systems, 101, 101804. """
[docs] def fit(self, X: ArrayLike | "spmatrix", y: Any = None) -> "FastCLARANS": """ Fit the FastCLARANS model to X. Parameters ---------- X : array-like or sparse matrix of shape (n_samples, n_features) Training instances to cluster. Accepts CSR/CSC sparse matrices. y : Ignored Not used, present for API consistency. Returns ------- self : FastCLARANS The fitted estimator. Attributes set on the estimator include ``medoid_indices_``, ``cluster_centers_`` and ``labels_``. Raises ------ ValueError If ``n_clusters >= n_samples`` or if an explicit ``init`` array is provided with an incompatible shape or there are not enough unique points to form the requested number of medoids. Notes ----- Unlike implementations that precompute the full distance matrix, this version computes distances on-the-fly to save memory. This is efficient for low-dimensional data with cheap distance metrics (e.g., Euclidean distance). """ X = check_array(X, accept_sparse=["csr", "csc"]) self.n_features_in_ = X.shape[1] random_state = check_random_state(self.random_state) n_samples, _ = X.shape if self.n_clusters >= n_samples: raise ValueError("n_clusters must be less than n_samples") if self.maxneighbor is None: # FastCLARANS samples 2.5% of non-medoid points per local search # (Schubert & Rousseeuw, 2021) instead of 1.25% * k * (n-k) edges self.maxneighbor_ = max( 250, int(0.025 * (n_samples - self.n_clusters)) ) else: self.maxneighbor_ = self.maxneighbor best_cost = np.inf best_medoids = None self.n_iter_ = 0 for loc_idx in range(self.numlocal): if isinstance(self.init, str) and self.init == "random": current_medoids_indices = random_state.choice( n_samples, self.n_clusters, replace=False ) elif isinstance(self.init, str) and self.init == "k-medoids++": current_medoids_indices = initialize_k_medoids_plus_plus( X, self.n_clusters, random_state, self.metric ) elif isinstance(self.init, str) and self.init == "heuristic": current_medoids_indices = initialize_heuristic( X, self.n_clusters, self.metric ) elif isinstance(self.init, str) and self.init == "build": current_medoids_indices = initialize_build( X, self.n_clusters, self.metric ) elif hasattr(self.init, "__array__") or isinstance(self.init, list): init_centers = check_array(self.init) if init_centers.shape != (self.n_clusters, self.n_features_in_): raise ValueError( f"init array must be of shape ({self.n_clusters}, {self.n_features_in_})" ) current_medoids_indices = pairwise_distances_argmin_min( init_centers, X, metric=self.metric )[0] if len(set(current_medoids_indices)) < self.n_clusters: warnings.warn( "Provided init centers map to duplicate points in X. " "Filling duplicates with random points." ) current_medoids_indices = list(set(current_medoids_indices)) remaining = self.n_clusters - len(current_medoids_indices) available = list( set(range(n_samples)) - set(current_medoids_indices) ) if len(available) < remaining: raise ValueError( "Not enough unique points to fill up to n_clusters." ) current_medoids_indices.extend( random_state.choice(available, remaining, replace=False) ) current_medoids_indices = np.array(current_medoids_indices) else: raise ValueError(f"Unknown init method: {self.init}") current_medoids_indices.sort() # Compute nearest/second-nearest on-the-fly (no precomputed matrix) near_idx_map, near_dist, second_dist = self._update_cache_onthefly( X, current_medoids_indices ) current_cost = np.sum(near_dist) i = 0 iter_count = 0 while i < self.maxneighbor_: if self.max_iter is not None and iter_count >= self.max_iter: break # Choose a random non-medoid candidate while True: candidate_idx = random_state.randint(0, n_samples) if candidate_idx not in current_medoids_indices: break # Compute distances from candidate to all points on-the-fly d_xc = pairwise_distances( X[candidate_idx].reshape(1, -1), X, metric=self.metric ).ravel() removal_loss = np.zeros(self.n_clusters) diff = second_dist - near_dist removal_loss += np.bincount( near_idx_map, weights=diff, minlength=self.n_clusters ) mask_better_than_nearest = d_xc < near_dist delta_td_plus_xc = np.sum( d_xc[mask_better_than_nearest] - near_dist[mask_better_than_nearest] ) total_delta = removal_loss + delta_td_plus_xc mask_better_than_second = d_xc < second_dist term1 = ( near_dist[mask_better_than_nearest] - second_dist[mask_better_than_nearest] ) idx1 = near_idx_map[mask_better_than_nearest] total_delta += np.bincount( idx1, weights=term1, minlength=self.n_clusters ) mask_case2 = (~mask_better_than_nearest) & mask_better_than_second term2 = d_xc[mask_case2] - second_dist[mask_case2] idx2 = near_idx_map[mask_case2] total_delta += np.bincount( idx2, weights=term2, minlength=self.n_clusters ) min_delta_idx = np.argmin(total_delta) min_delta = total_delta[min_delta_idx] if min_delta < 0: current_medoids_indices[min_delta_idx] = candidate_idx current_medoids_indices.sort() current_cost += min_delta # Update nearest/second caches after an accepted swap near_idx_map, near_dist, second_dist = self._update_cache_onthefly( X, current_medoids_indices ) i = 0 iter_count += 1 else: i += 1 self.n_iter_ += max(1, iter_count) if current_cost < best_cost: best_cost = current_cost best_medoids = current_medoids_indices.copy() self.medoid_indices_ = best_medoids self.cluster_centers_ = X[self.medoid_indices_] self.labels_, _ = pairwise_distances_argmin_min( X, self.cluster_centers_, metric=self.metric ) return self
def _update_cache_onthefly( self, X: ArrayLike, medoids_indices: Sequence[int] ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Compute nearest and second-nearest medoid information on-the-fly. Parameters ---------- X : array-like of shape (n_samples, n_features) The data matrix. medoids_indices : array-like of shape (n_clusters,) Indices of the current medoids. Returns ------- near_idx_map : ndarray of shape (n_samples,) For each sample, the index (0..k-1) of the nearest medoid in ``medoids_indices``. near_dist : ndarray of shape (n_samples,) Distance from each sample to its nearest medoid. second_dist : ndarray of shape (n_samples,) Distance from each sample to its second nearest medoid. If ``n_clusters == 1`` this will be an array filled with ``np.inf``. """ n_samples = X.shape[0] medoids = X[medoids_indices] # Compute distances from all points to all medoids on-the-fly subD = pairwise_distances(X, medoids, metric=self.metric) if self.n_clusters >= 2: partitioned_idx = np.argpartition(subD, 1, axis=1) smallest_idx = partitioned_idx[:, 0] second_smallest_idx = partitioned_idx[:, 1] near_dist = subD[np.arange(n_samples), smallest_idx] second_dist = subD[np.arange(n_samples), second_smallest_idx] near_idx_map = smallest_idx else: near_dist = subD[:, 0] second_dist = np.full(n_samples, np.inf) near_idx_map = np.zeros(n_samples, dtype=int) return near_idx_map, near_dist, second_dist