Source code for mesmerize.analysis.math.cross_correlation

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# @author: kushal
#
# Chatzigeorgiou Group
# Sars International Centre for Marine Molecular Biology
#
# GNU GENERAL PUBLIC LICENSE Version 3, 29 June 2007


import numpy as np
from tslearn.cycc import normalized_cc
from itertools import product
from ...common.utils import HdfTools
# from tslearn.preprocessing import TimeSeriesScalerMeanVariance, TimeSeriesScalerMinMax
# from sklearn.metrics import pairwise_distances


[docs]def ncc_c(x: np.ndarray, y: np.ndarray) -> np.ndarray: """ Must pass 1D array to both x and y :param x: Input array [x1, x2, x3, ... xn] :param y: Input array [y2, y2, x3, ... yn] :return: Returns the normalized cross correlation function (as an array) of the two input vector arguments "x" and "y" :rtype: np.ndarray """ cc = normalized_cc(x.reshape(-1, 1), y.reshape(-1, 1)) return cc
[docs]def get_omega(x: np.ndarray = None, y: np.ndarray = None, cc: np.ndarray = None) -> int: """ Must pass a 1D array to either both "x" and "y" or a cross-correlation function (as an array) to "cc" :param x: Input array [x1, x2, x3, ... xn] :param y: Input array [y2, y2, x3, ... yn] :param cc: cross-correlation function represented as an array [c1, c2, c3, ... cn] :return: index (x-axis position) of the global maxima of the cross-correlation function :rtype: np.ndarray """ if cc is None: cc = ncc_c(x, y) w = np.argmax(cc) return w
[docs]def get_lag(x: np.ndarray = None, y: np.ndarray = None, cc: np.ndarray = None) -> float: """ Must pass a 1D array to either both "x" and "y" or a cross-correlation function (as an array) to "cc" :param x: Input array [x1, x2, x3, ... xn] :param y: Input array [y2, y2, x3, ... yn] :param cc: cross-correlation function represented as a array [c1, c2, c3, ... cn] :return: Position of the maxima of the cross-correlation function with respect to middle point of the cross-correlation function :rtype: np.ndarray """ if cc is None: w = get_omega(x, y) s = w - x.shape[0] else: w = get_omega(cc=cc) s = w - int(cc.shape[0] / 2) return float(-s)
[docs]def get_epsilon(x: np.ndarray = None, y: np.ndarray = None, cc: np.ndarray = None) -> float: """ Must pass a 1D vector to either both "x" and "y" or a cross-correlation function to "cc" :param x: Input array [x1, x2, x3, ... xn] :param y: Input array [y2, y2, x3, ... yn] :param cc: cross-correlation function represented as an array [c1, c2, c3, ... cn] :return: Magnitude of the global maxima of the cross-correlationn function :rtype: np.ndarray """ if cc is None: cc = ncc_c(x, y) e = np.max(cc) return e
[docs]def get_lag_matrix(curves: np.ndarray = None, ccs: np.ndarray = None) -> np.ndarray: """ Get a 2D matrix of lags. Can pass either a 2D array of 1D curves or cross-correlations :param curves: 2D array of 1D curves :param ccs: 2D array of 1D cross-correlation functions represented by arrays :return: 2D matrix of lag values, shape is [n_curves, n_curves] :rtype: np.ndarray """ if ccs is None: m = curves.shape[0] a = np.zeros((m, m)) for i, j in product(*(range(m),) * 2): a[i, j] = get_lag(curves[i], curves[j]) return a return _compute_from_ccs(ccs, get_lag)
[docs]def get_epsilon_matrix(curves: np.ndarray = None, ccs: np.ndarray = None) -> np.ndarray: """ Get a 2D matrix of maximas. Can pass either a 2D array of 1D curves or cross-correlations :param curves: 2D array of 1D curves :param ccs: 2D array of 1D cross-correlation functions represented by arrays :return: 2D matrix of maxima values, shape is [n_curves, n_curves] :rtype: np.ndarray """ if ccs is None: m = curves.shape[0] a = np.zeros((m, m)) for i, j in product(*(range(m),) * 2): a[i, j] = get_epsilon(curves[i], curves[j]) return a return _compute_from_ccs(ccs, get_epsilon)
def _compute_from_ccs(ccs: np.ndarray, func: callable) -> np.ndarray: m = ccs.shape[0] a = np.zeros(shape=(m, m)) for i, j in product(*(range(m),) * 2): a[i, j] = func(cc=ccs[i, j, :]) return a
[docs]class CC_Data:
[docs] def __init__(self, ccs: np.ndarray = None, lag_matrix: np.ndarray = None, epsilon_matrix: np.ndarray = None, curve_uuids: np.ndarray = None, labels: np.ndarray = None): """ Object for organizing cross-correlation data types must be numpy.ndarray to be compatible with hdf5 :param ccs: array of cross-correlation functions, shape: [n_curves, n_curves, func_length] :type ccs: np.ndarray :param lag_matrix: the lag matrix, shape: [n_curves, n_curves] :type lag_matrix: np.ndarray :param epsilon_matrix: the maxima matrix, shape: [n_curves, n_curves] :type epsilon_matrix: np.ndarray :param curve_uuids: uuids (str representation) for each of the curves, length: n_curves :type curve_uuids: np.ndarray :param labels: labels for each curve, length: n_curves :type labels: np.ndarray """ self.ccs = ccs #: array of cross-correlation functions self.lag_matrix = lag_matrix #: lag matrix self.epsilon_matrix = epsilon_matrix #: maxima matrix self.curve_uuids = curve_uuids #: uuids for each curve self.labels = labels #: labels for each curve
[docs] def get_threshold_matrix(self, matrix_type: str, lag_thr: float, max_thr: float, lag_thr_abs: bool = True) -> np.ndarray: """ Get lag or maxima matrix with thresholds applied. Values outside the threshold are set to NaN :param matrix_type: one of 'lag' or 'maxima' :param lag_thr: lag threshold :param max_thr: maxima threshold :param lag_thr_abs: threshold with the absolute value of lag :return: the requested matrix with the thresholds applied to it. :rtype: np.ndarray """ # Get lag and maxima matrices l = self.lag_matrix m = self.epsilon_matrix if (l is None) or (m is None): raise ValueError('No lag or maxima data available') # The type of matrix to return if matrix_type == 'lag': a = np.copy(l) elif matrix_type == 'maxima': a = np.copy(m) else: raise ValueError('Argument "matrix_type" must be "lag" or "maxima"') # Set all values below both thresholds to nan if lag_thr_abs: a[np.abs(l) > lag_thr] = np.nan else: a[l > lag_thr] = np.nan a[m < max_thr] = np.nan return a
[docs] @classmethod def from_dict(cls, d: dict): """Load data from a dict""" return cls(**d)
[docs] def to_hdf5(self, path: str): """ Save as an HDF5 file :param path: path to save the hdf5 file to, file must not exist. """ HdfTools.save_dict(self.__dict__, path, 'cross_corr_data')
[docs] @classmethod def from_hdf5(cls, path: str): """ Load cross-correlation data from an hdf5 file :param path: path to the hdf5 file """ d = HdfTools.load_dict(path, 'cross_corr_data') return cls(**d)
[docs]def compute_cc_data(curves: np.ndarray) -> CC_Data: """ Compute cross-correlation data (cc functions, lag and maxima matrices) :param curves: input curves as a 2D array, shape is [n_samples, curve_size] :return: cross correlation data for the input curves as a CC_Data instance :rtype: CC_Data """ ccs = compute_ccs(curves) l = get_lag_matrix(ccs=ccs) e = get_epsilon_matrix(ccs=ccs) return CC_Data(ccs, l, e)
[docs]def compute_ccs(a: np.ndarray) -> np.ndarray: """ Compute cross-correlations between all 1D curves in a 2D input array :param a: 2D input array of 1D curves, shape is [n_samples, curve_size] :rtype: np.ndarray """ n = a.shape[0] m = a.shape[1] out = np.zeros(shape=(n, n, (2 * m) - 1)) for i, j in product(*(range(n),) * 2): out[i, j, :] = ncc_c(a[i], a[j]) return out