Source code for CompSensePack.dictionaries.KSVD

"""
KSVD.py - K-SVD algorithm for dictionary learning and sparse coding using Orthogonal Matching Pursuit (OMP).

This module implements the K-SVD algorithm, which is widely used for dictionary learning. The algorithm
iteratively updates dictionary elements to best represent a set of input signals. This module also includes
utility functions to update dictionary elements using Singular Value Decomposition (SVD) and handle redundant
dictionary elements.

Example usage:
--------------
>>> from KSVD import KSVD
>>> data = np.random.randn(100, 200)  # Random signals
>>> params = {'K': 50, 'num_iterations': 10, 'initialization_method': 'DataElements', 'L': 5, 'preserve_dc_atom': 0}
>>> dictionary, coefficients = KSVD(data, params)
"""

import numpy as np
from scipy.sparse.linalg import svds
from .OMP import OMP  # Import OMP algorithm from the OMP module

[docs] def svds_vector(v): """ Handle SVD for a vector or a 2D matrix with one dimension equal to 1. Parameters ---------- v : numpy.ndarray Input vector or 2D matrix with one dimension equal to 1. Returns ------- u : numpy.ndarray The left singular vector (normalized). s : float The singular value (the norm of the vector). vt : numpy.ndarray The right singular vector, which is always [[1]] for vectors. Raises ------ ValueError If the input is not a vector or a 2D array with one dimension equal to 1. """ v = np.asarray(v) if v.ndim == 1: v = v.reshape(-1, 1) elif v.ndim == 2 and (v.shape[0] == 1 or v.shape[1] == 1): pass else: raise ValueError("Input must be a vector or a 2D array with one dimension equal to 1.") s = np.linalg.norm(v) if s > 0: u = v / s else: u = np.zeros_like(v) vt = np.array([[1]]) return u, s, vt
[docs] def I_findBetterDictionaryElement(data, dictionary, j, coeff_matrix, numCoefUsed=1): """ Update the j-th dictionary element using the current sparse representation. Parameters ---------- data : numpy.ndarray The data matrix (n x N), where n is the signal dimension and N is the number of signals. dictionary : numpy.ndarray The current dictionary matrix (n x K), where K is the number of dictionary atoms. j : int The index of the dictionary element to be updated. coeff_matrix : numpy.ndarray The sparse coefficient matrix (K x N), representing the sparse codes for the signals. numCoefUsed : int, optional (default=1) The number of coefficients used in the sparse representation. Returns ------- betterDictionaryElement : numpy.ndarray The updated dictionary element (vector). coeff_matrix : numpy.ndarray The updated coefficient matrix with the new coefficients for the updated dictionary element. newVectAdded : int A flag indicating if a new vector was added to the dictionary. """ relevantDataIndices = np.nonzero(coeff_matrix[j, :])[0] if relevantDataIndices.size == 0: # Find the signal with the largest residual errorMat = data - dictionary @ coeff_matrix errorNormVec = np.sum(errorMat ** 2, axis=0) i = np.argmax(errorNormVec) betterDictionaryElement = data[:, i] / np.linalg.norm(data[:, i]) betterDictionaryElement *= np.sign(betterDictionaryElement[0]) coeff_matrix[j, :] = 0 newVectAdded = 1 return betterDictionaryElement, coeff_matrix, newVectAdded newVectAdded = 0 tmpCoefMatrix = coeff_matrix[:, relevantDataIndices] tmpCoefMatrix[j, :] = 0 errors = data[:, relevantDataIndices] - dictionary @ tmpCoefMatrix if np.min(errors.shape) <= 1: u, s, vt = svds_vector(errors) betterDictionaryElement = u singularValue = s betaVector = vt else: u, s, vt = svds(errors, k=1) betterDictionaryElement = u[:, 0] singularValue = s[0] betaVector = vt[0, :] coeff_matrix[j, relevantDataIndices] = singularValue * betaVector.T return betterDictionaryElement, coeff_matrix, newVectAdded
[docs] def I_clearDictionary(dictionary, coeff_matrix, data): """ Clear or replace redundant dictionary elements. Parameters ---------- dictionary : numpy.ndarray The dictionary matrix to be updated. coeff_matrix : numpy.ndarray The coefficient matrix representing the sparse codes for the data. data : numpy.ndarray The original data matrix. Returns ------- dictionary : numpy.ndarray The updated dictionary with redundant elements replaced. """ T2 = 0.99 # Coherence threshold T1 = 3 # Minimum number of non-zero coefficients K = dictionary.shape[1] Er = np.sum((data - dictionary @ coeff_matrix) ** 2, axis=0) G = dictionary.T @ dictionary G -= np.diag(np.diag(G)) for jj in range(K): if np.max(G[jj, :]) > T2 or np.count_nonzero(np.abs(coeff_matrix[jj, :]) > 1e-7) <= T1: pos = np.argmax(Er) Er[pos] = 0 dictionary[:, jj] = data[:, pos] / np.linalg.norm(data[:, pos]) G = dictionary.T @ dictionary G -= np.diag(np.diag(G)) return dictionary
[docs] def KSVD(data, param): """ K-SVD algorithm for dictionary learning. The K-SVD algorithm is an iterative method for updating the dictionary and sparse coefficients to best represent the input signals. The dictionary is updated using the singular value decomposition (SVD) of the data approximation error. Parameters ---------- data : numpy.ndarray The data matrix (n x N) containing N signals of dimension n. param : dict A dictionary containing parameters for the K-SVD algorithm: - 'K': int, the number of dictionary atoms. - 'num_iterations': int, the number of iterations to run the K-SVD algorithm. - 'initialization_method': str, how to initialize the dictionary ('DataElements' or 'GivenMatrix'). - 'initial_dictionary': numpy.ndarray, the initial dictionary (if 'GivenMatrix' is used). - 'L': int, the number of non-zero coefficients for sparse coding (used in OMP). - 'preserve_dc_atom': int, flag to preserve a DC atom (default is 0). Returns ------- dictionary : numpy.ndarray The learned dictionary of size (n x K). coef_matrix : numpy.ndarray The sparse coefficient matrix (K x N), representing the sparse representation of the data. """ # Check if the number of data samples is smaller than the dictionary size if data.shape[1] < param['K']: print('KSVD: number of training data is smaller than the dictionary size. Trivial solution...') dictionary = data[:, :data.shape[1]] coef_matrix = np.eye(data.shape[1]) return dictionary, coef_matrix # Initialize the dictionary dictionary = np.zeros((data.shape[0], param['K']), dtype=np.float64) if param['initialization_method'] == 'DataElements': dictionary[:, :param['K'] - param['preserve_dc_atom']] = \ data[:, :param['K'] - param['preserve_dc_atom']] elif param['initialization_method'] == 'GivenMatrix': dictionary[:, :param['K'] - param['preserve_dc_atom']] = \ param['initial_dictionary'][:, :param['K'] - param['preserve_dc_atom']] # Iterate to update the dictionary and coefficients for iterNum in range(param['num_iterations']): coef_matrix = OMP(dictionary, data, param['L']) rand_perm = np.random.permutation(dictionary.shape[1]) for j in rand_perm: betterDictElem, coef_matrix, newVectAdded = I_findBetterDictionaryElement( data, dictionary, j, coef_matrix, param['L'] ) dictionary[:, j] = betterDictElem.ravel() dictionary = I_clearDictionary(dictionary, coef_matrix, data) return dictionary, coef_matrix