Source code for mesmerize.pyqtgraphCore.flowchart.library.Signal

# -*- coding: utf-8 -*-
import traceback
from PyQt5 import QtWidgets
from ....plotting.widgets.peak_editor import peak_editor
from .common import *
from ....analysis import Transmission
from scipy import signal
from scipy import fftpack
import pandas as pd
from tslearn.preprocessing import TimeSeriesScalerMeanVariance
from ....analysis.compute_peak_features import ComputePeakFeatures


[docs]class ButterWorth(CtrlNode): """Butterworth Filter""" nodeName = 'ButterWorth' uiTemplate = [('data_column', 'combo', {}), ('order', 'intSpin', {'min': 1, 'max': 100, 'step': 1, 'value': 2}), ('freq_divisor', 'doubleSpin', {'min': 0.01, 'max': 100.00, 'step': 0.05, 'value': 2.00}), ('Apply', 'check', {'checked': False, 'applyBox': True}) ] output_column = '_BUTTERWORTH' def _func(self, x: np.ndarray, meta: dict): N = self.ctrls['order'].value() freq = 1 / meta['fps'] self.Wn = freq/self.freq_divisor b, a = signal.butter(N, self.Wn) sig = signal.filtfilt(b, a, x) return pd.Series({self.output_column: sig}) def processData(self, transmission: Transmission): self.t = transmission self.set_data_column_combo_box() if self.ctrls['Apply'] is False: return self.t = transmission.copy() self.order = self.ctrls['order'].value() self.freq_divisor = self.ctrls['freq_divisor'].value() # try: tqdm.pandas() self.t.df[self.output_column] = self.t.df.progress_apply(lambda x: self._func(x[self.data_column], x['meta']), axis=1) # except KeyError as e: # raise KeyError(str(e)) params = {'data_column': self.data_column, 'order': self.order, 'Wn': self.Wn, 'freq_divider': self.freq_divisor, 'units': self.t.last_unit } self.t.history_trace.add_operation(data_block_id='all', operation='butterworth', parameters=params) self.t.last_output = self.output_column return self.t
[docs]class SavitzkyGolay(CtrlNode): # Savitzky-Golay filter for example """Savitzky-Golay filter.""" nodeName = 'Savitzky_Golay' uiTemplate = [('data_column', 'combo', {}), ('window_length', 'intSpin', {'min': 3, 'max': 999, 'value': 3, 'step': 2}), ('polyorder', 'intSpin', {'min': 1, 'max': 998, 'value': 1, 'step': 1}), ('Apply', 'check', {'checked': False, 'applyBox': True}) ] def processData(self, transmission: Transmission): self.t = transmission self.set_data_column_combo_box() if self.ctrls['Apply'].isChecked() is False: return self.t = transmission.copy() w = self.ctrls['window_length'].value() p = self.ctrls['polyorder'].value() if p > w: raise ValueError('Invalid value! polyorder MUST be less than window_length') if w % 2 == 0: raise ValueError('Invalid value! window_length MUST be an odd number!') output_column = '_SAVITZKY_GOLAY' tqdm.pandas() self.t.df[output_column] = self.t.df[self.data_column].progress_apply(signal.savgol_filter, window_length=w, polyorder=p) params = {'data_column': self.data_column, 'window_length': w, 'polyorder': p, 'units': self.t.last_unit } self.t.history_trace.add_operation(data_block_id='all', operation='savitzky_golay', parameters=params) self.t.last_output = output_column return self.t
[docs]class PowerSpectralDensity(CtrlNode): """Return the Power Spectral Density of a curve.""" nodeName = 'PowSpecDens' uiTemplate = [('data_column', 'combo', {}), ('Apply', 'check', {'checked': False, 'applyBox': True}) ] def processData(self, transmission: Transmission): self.t = transmission self.set_data_column_combo_box() if self.ctrls['Apply'].isChecked() is False: return self.t = transmission.copy() output_column = '_POWER_SPECTRAL_DENSITY' tqdm.pandas() self.t.df[output_column] = self.t.df[self.data_column].progress_apply(self._func) params = {'data_column': self.data_column} self.t.history_trace.add_operation(data_block_id='all', operation='power_spectral_density', parameters=params) self.t.last_output = output_column return self.t def _func(self, curve): f, p = signal.periodogram(curve) return p
[docs]class Resample(CtrlNode): """Resample 1D data, uses scipy.signal.resample. "Rs": New sampling rate in "Tu" units of time. If "Tu" = 1, then Rs is the new sampling rate in Hertz\nOutput Column -> Input Column""" nodeName = 'Resample' uiTemplate = [('data_column', 'combo', {}), ('Rs', 'intSpin', {'min': 1, 'max': 9999, 'value': 10, 'step': 5}), ('Tu', 'intSpin', {'min': 1, 'max': 9999, 'value': 1, 'step': 1}), ('Apply', 'check', {'checked': False, 'applyBox': True}) ] def processData(self, transmission: Transmission): self.t = transmission self.set_data_column_combo_box() if self.ctrls['Apply'].isChecked() is False: return self.t = transmission.copy() self.Rs = self.ctrls['Rs'].value() self.Tu = self.ctrls['Tu'].value() self.new_rate = self.Rs / self.Tu output_column = '_RESAMPLE' tqdm.pandas() self.t.df[output_column] = self.t.df.progress_apply(self._func, axis=1) params = {'data_column': self.data_column, 'output_rate': self.new_rate, 'units': self.t.last_unit } self.t.history_trace.add_operation(data_block_id='all', operation='resample', parameters=params) self.t.last_output = output_column return self.t def _func(self, row: pd.Series): Nf = row['meta']['fps'] if Nf == 0: raise ValueError('Framerate not set for SampleID: ' + row['SampleID'] + '. You must set a framerate for this SampleID to continue') Ns = row[self.data_column].shape[0] Rn = int((Ns / Nf) * (self.new_rate)) return signal.resample(row[self.data_column], Rn)
[docs]class ScalerMeanVariance(CtrlNode): """Scaler for time series. Scales time series so that their mean (resp. standard deviation) in each dimension is mu (resp. std).\n See https://tslearn.readthedocs.io/en/latest/gen_modules/preprocessing/tslearn.preprocessing.TimeSeriesScalerMeanVariance.html#tslearn.preprocessing.TimeSeriesScalerMeanVariance""" nodeName = 'ScalerMeanVar' uiTemplate = [('data_column', 'combo', {}), ('mu', 'doubleSpin', {'value': 0.0, 'step': 0.1, 'toolTip': 'Mean of the output time series'}), ('std', 'doubleSpin', {'value': 1.0, 'step': 1.0, 'toolTip': 'Standard deviation of the output time series'}), ('Apply', 'check', {'checked': False, 'applyBox': True})] def processData(self, transmission: Transmission): self.t = transmission self.set_data_column_combo_box() if not self.ctrls['Apply'].isChecked(): return self.t = transmission.copy() mu = self.ctrls['mu'].value() std = self.ctrls['std'].value() params = {'data_column': self.data_column, 'mu': mu, 'std': std, 'units': self.t.last_unit } output_column = '_SCALER_MEAN_VARIANCE' self.t.df[output_column] = self.t.df[self.data_column].apply(lambda a: TimeSeriesScalerMeanVariance(mu=mu, std=std).fit_transform(a)[:, :, 0]) self.t.history_trace.add_operation(data_block_id='all', operation='scaler_mean_variance', parameters=params) self.t.last_output = output_column return self.t
[docs]class Normalize(CtrlNode): """Normalize a column containing 1-D arrays such that values in each array are normalized between 0 and 1\n Output Column -> Input Column""" nodeName = 'Normalize' uiTemplate = [('data_column', 'combo', {}), ('Apply', 'check', {'checked': False, 'applyBox': True})] def processData(self, transmission: Transmission): self.t = transmission self.set_data_column_combo_box() if self.ctrls['Apply'].isChecked() is False: return self.t = transmission.copy() output_column = '_NORMALIZE' self.t.df[output_column] = self.t.df[self.data_column].apply(lambda a: ((a - np.min(a)) / (np.max(a - np.min(a))))) params = {'data_column': self.data_column, 'units': self.t.last_unit } self.t.history_trace.add_operation(data_block_id='all', operation='normalize', parameters=params) self.t.last_output = output_column return self.t
[docs]class RFFT(CtrlNode): """ Uses fftpack.rfft, 'Discrete Fourier transform of a real sequence.\n https://docs.scipy.org/doc/scipy/reference/generated/scipy.fftpack.rfft.html#scipy.fftpack.rfft """ nodeName = 'RFFT' uiTemplate = [('data_column', 'combo', {}), ('Apply', 'check', {'checked': False, 'applyBox': True}) ] def processData(self, transmission: Transmission): self.t = transmission self.set_data_column_combo_box() if self.ctrls['Apply'].isChecked() is False: return # TODO: Replace this code block with the DataTypes.get_framerate() method ############################################################################################################## sampling_rates = [] for db in transmission.history_trace.data_blocks: if transmission.history_trace.check_operation_exists(db, 'resample'): sampling_rates.append(transmission.history_trace.get_operation_params(db, 'resample')['output_rate']) else: r = pd.DataFrame(transmission.get_data_block_dataframe(db).meta.to_list())['fps'].unique() # if rates.size > 1: # raise ValueError("Sampling rates for the data do not match") # else: sampling_rates.append(r) rates = np.hstack([sampling_rates]) if np.ptp(rates) > 0.1: raise ValueError("Sampling rates of the data differ by greater than the tolerance of 0.1 Hz") framerate = int(np.mean(sampling_rates)) ############################################################################################################## array_size = transmission.df[self.data_column].apply(lambda a: a.size).unique() if array_size.size > 1: raise ValueError("Size of all arrays in data column must match exactly.") array_size = array_size[0] freqs = fftpack.rfftfreq(array_size) * framerate self.t = transmission.copy() output_column = '_RFFT' self.t.df[output_column] = self.t.df[self.data_column].apply(fftpack.rfft) self.t.last_unit = 'frequency' params = {'data_column': self.data_column, 'frequencies': freqs.tolist(), 'sampling_rate': framerate, 'nyquist_frequency': float(freqs.max()), 'units': 'frequency' } self.t.history_trace.add_operation(data_block_id='all', operation='rfft', parameters=params) self.t.last_output = output_column return self.t
[docs]class iRFFT(CtrlNode): """ Uses fftpack.irfft, 'Return inverse discrete Fourier transform of real sequence.'\n https://docs.scipy.org/doc/scipy/reference/generated/scipy.fftpack.irfft.html#scipy.fftpack.irfft\n Input must have an _RFFT column from the RFFT node.\n """ nodeName = 'iRFFT' uiTemplate = [('Apply', 'check', {'checked': False, 'applyBox': True})] def processData(self, transmission: Transmission): self.t = transmission self.set_data_column_combo_box() if self.ctrls['Apply'].isChecked() is False: return self.t = transmission.copy() output_column = '_IRFFT' self.t.df[output_column] = self.t.df['_RFFT'].apply(fftpack.irfft) self.t.last_unit = 'time' params = {'data_column': '_RFFT', 'units': 'time' } self.t.history_trace.add_operation(data_block_id='all', operation='irfft', parameters=params) self.t.last_output = output_column return self.t
[docs]class PeakDetect(CtrlNode): """Detect peaks & bases by finding local maxima & minima. Use this after the Derivative Filter""" nodeName = 'PeakDetect' uiTemplate = [('data_column', 'combo', {}), ('Fictional_Bases', 'check', {'checked': True}), ('Edit', 'button', {'text': 'Open GUI'}), ('SlopeThr', 'doubleSpin', {'min': -100.00, 'max': 1.0, 'step': 0.010}), ('AmplThrAbs', 'doubleSpin', {'min': 0.00, 'max': 1.00, 'step': 0.05}), ('AmplThrRel', 'doubleSpin', {'min': 0.00, 'max': 1.00, 'step': 0.05}), ('Apply', 'check', {'checked': False, 'applyBox': True}) ] def __init__(self, name, **kwargs): CtrlNode.__init__(self, name, terminals={'Derivative': {'io': 'in'}, 'Normalized': {'io': 'in'}, 'Curve': {'io': 'in'}, 'PB_Input': {'io': 'in'}, 'Out': {'io': 'out', 'bypass': 'Curve'}}, **kwargs) self.data_modified = False self.editor_output = False self.pbw = None self.ctrls['Edit'].clicked.connect(self._peak_editor) self.t = None self.params = {} def _get_zero_crossings(self, d1: np.ndarray, sig: np.ndarray, norm_sig: np.ndarray, fictional_bases: bool = False) -> pd.DataFrame: """ Find the peaks and bases of the signal by finding zero crossing in the first derivative of the filtered signal :param dsig: The first derivative of the signal :return: DataFrame, all zero crossing events in one column, another column denotes it as a peak or base. """ # Get array of all sign switches sc = np.diff(np.sign(d1)) peaks_raw = np.where(sc < 0)[0] bases = np.where(sc > 0)[0] # Remove all peaks where amplitude is below the specified threshold peak_yvals = np.take(norm_sig, peaks_raw) # print('peak_yvals: ' + str(peak_yvals)) ix_below_ampl_thr = np.where(peak_yvals < self.ctrls['AmplThrAbs'].value()) # print('ix_below_ampl_thr: ' + str(ix_below_ampl_thr)) peaks_ampl_thr = np.delete(peaks_raw, ix_below_ampl_thr) s2 = np.gradient(d1) # Remove all peaks where the 2nd derivative is below a certain threshold peak_d2 = np.take(s2, peaks_ampl_thr) ix_below_slope_thr = np.where(peak_d2 > self.ctrls['SlopeThr'].value()) # print('peak_d2: ' + str(peak_d2)) # print('ix_below_slope_thr: ' + str(ix_below_slope_thr)) peaks = np.delete(peaks_ampl_thr, ix_below_slope_thr) ## TODO; DEBATE ABOUT HOW TO PROPERLY DEAL WITH TRACES THAT HAVE NO PEAKS if peaks.size == 0: # abort = True # peaks = np.array([1]) # bases = np.array([0, 2]) return pd.DataFrame() # else: # self.row_ix += 1 # return pd.DataFrame() # Add bases to beginning and end of sequence if first or last peak is lonely if fictional_bases: if bases.size == 0: bases = np.array([0, sc.size]) else: if bases[0] > peaks[0]: bases = np.insert(bases, 0, 0) if bases[-1] < peaks[-1]: bases = np.insert(bases, -1, sc.size) # Construct peak & base columns on dataframe peaks_df = pd.DataFrame() peaks_df['event'] = peaks peaks_df['label'] = 'peak' bases_df = pd.DataFrame() bases_df['event'] = bases bases_df['label'] = 'base' peaks_bases_df = pd.concat([peaks_df, bases_df]) peaks_bases_df = peaks_bases_df.sort_values('event') peaks_bases_df.reset_index(drop=True, inplace=True) # if abort: # self.row_ix += 1 # warn(f'No peaks detected at index: {self.row_ix}') # return peaks_bases_df # peaks_bases_df['peak'] = peaks_bases_df['label'] == 'peak' # peaks_bases_df['base'] = peaks_bases_df['label'] == 'base' # Set the peaks at the index of the local maxima of the raw curve instead of the maxima inferred # from the derivative # Also remove peaks which are lower than the relative amplitude threshold rows_drop = [] for ix, r in peaks_bases_df.iterrows(): if r['label'] == 'peak' and ix > 0: if peaks_bases_df.iloc[ix - 1]['label'] == 'base' and peaks_bases_df.iloc[ix + 1]['label'] == 'base': ix_left_base = peaks_bases_df.iloc[ix - 1]['event'] ix_right_base = peaks_bases_df.iloc[ix + 1]['event'] # Adjust the xval of the curve by finding the absolute maxima of this section of the raw curve, # flanked by the bases of the peak peak_revised = np.where(sig == np.max( np.take(sig, np.arange(ix_left_base, ix_right_base))))[0][0] # Get rising and falling amplitudes rise_ampl = sig[peak_revised] - sig[ix_left_base] fall_ampl = sig[peak_revised] - sig[ix_right_base] # Check if above relative amplitude threshold if (rise_ampl + fall_ampl) > self.ctrls['AmplThrRel'].value(): peaks_bases_df.set_value(ix, 'event', peak_revised) else: rows_drop.append(ix) peaks_bases_df = peaks_bases_df.drop(peaks_bases_df.index[rows_drop]) peaks_bases_df = peaks_bases_df.reset_index(drop=True) # remove bases that aren't around any peak for ix, r in peaks_bases_df.iterrows(): if r['label'] == 'base' and 1 < ix < (peaks_bases_df.index.size - 1): if peaks_bases_df.iloc[ix - 1]['label'] != 'peak' and peaks_bases_df.iloc[ix + 1]['label'] != 'peak': rows_drop.append(ix) # Weird behavior dealing with bases at the end of a curve try: peaks_bases_df = peaks_bases_df.drop(peaks_bases_df.index[rows_drop]) peaks_bases_df = peaks_bases_df.reset_index(drop=True) except: pass self.row_ix += 1 # print(peaks_bases_df) return peaks_bases_df def process(self, display=True, **kwargs): out = self.processData(**kwargs) return {'Out': out} def processData(self, **inputs): if self.editor_output: return self.t pb_input = inputs['PB_Input'] if isinstance(pb_input, Transmission): self.t = pb_input self.t.df.reset_index(drop=True, inplace=True) if self.pbw is None: self._peak_editor() else: self.pbw.update_transmission(self.t, self.t) self.pbw.btnDisconnectFromFlowchart.setDisabled(True) return self.t columns = inputs['Curve'].df.columns.to_list() self.ctrls['data_column'].setItems(organize_dataframe_columns(columns)[0]) if self.data_modified is True: if QtWidgets.QMessageBox.question(None, 'Discard peak edits?', 'You have made modifications to peak data passing ' 'through this node! Would you like to discard all ' 'changes and load the newly transmitted data?', QtWidgets.QMessageBox.Yes, QtWidgets.QMessageBox.No) == QtWidgets.QMessageBox.No: return self.t self.data_modified = False if not self.ctrls['Apply'].isChecked(): return # if inputs['Derivative'] is None: # raise Exception('No incoming Derivative transmission. ' # 'You must input at least a derivative') # # self.t = inputs['Derivative'].copy() # # if inputs['Curve'] is not None: # if inputs['Derivative'].df.index.size != inputs['Curve'].df.index.size: # QtWidgets.QMessageBox.warning(None, 'ValueError!', 'Input diemensions of Derivative and Curve transmissions' # ' MUST match!') # raise ValueError('Input diemensions of Derivative and Curve transmissions MUST match!') if inputs['Derivative'] is None: raise KeyError('No incoming Derivative transmission. ' 'You must input both a curve and its derivative ' 'You must input at least a derivative') if inputs['Curve'] is None: raise KeyError('No incoming Curve transmission.' ' You must input both a curve and its derivative') if inputs['Derivative'].df.index.size != inputs['Curve'].df.index.size: raise ValueError('Input diemensions of Derivative and Curve transmissions MUST match!') t_d1 = inputs['Derivative'].copy() assert isinstance(t_d1, Transmission) d1 = t_d1.df['_DERIVATIVE'] t_norm = inputs['Normalized'].copy() assert isinstance(t_norm, Transmission) norm_series = t_norm.df[t_norm.last_output] self.t = inputs['Curve'].copy() self.t.df['_DERIVATIVE'] = d1 self.t.df['_NORM_PD'] = norm_series data_column = self.ctrls['data_column'].currentText() # self.t.df['raw_curve'] = self.t.df[data_column] assert isinstance(self.t, Transmission) # self.t.data_column['peaks_bases'] = 'peaks_bases' fb = self.ctrls['Fictional_Bases'].isChecked() self.row_ix = 0 tqdm.pandas() self.t.df['peaks_bases'] = self.t.df.progress_apply(lambda r: self._get_zero_crossings(r['_DERIVATIVE'], r[data_column], r['_NORM_PD'], fb), axis=1) self.t.df['curve'] = self.t.df[data_column] self.t.df.drop(columns=['_NORM_PD'], inplace=True) self.t.df.reset_index(inplace=True, drop=True) # self.t.df.drop(columns=['raw_curve']) if self.pbw is not None: # self.pbw.curve_column = data_column self.pbw.update_transmission(self.t, self.t) params = {'data_column': data_column, 'SlopeThr': self.ctrls['SlopeThr'].value(), 'AmplThrRel': self.ctrls['AmplThrRel'].value(), 'AmplThrAbs': self.ctrls['AmplThrAbs'].value(), 'derivative_input_history_trace': t_d1.history_trace.get_all_data_blocks_history(), 'normalized_input_history_trace': t_norm.history_trace.get_all_data_blocks_history(), 'units': self.t.last_unit } self.params = params self.t.history_trace.add_operation('all', operation='peak_detect', parameters=params) return self.t def _set_editor_output(self, edited_transmission: Transmission): self.t = edited_transmission self.t.history_trace.add_operation('all', operation='peak_detect', parameters=self.params) self.editor_output = True self.data_modified = True # self.pbw.close() # self.editor_output = True # self.t = self.pbw.getData() # self.pbw = None # self.data_modified = True self.changed() def _peak_editor(self): if self.pbw is None: self.pbw = peak_editor.PeakEditorWindow(self.t, self.t) self.pbw.sig_send_data.connect(self._set_editor_output) self.pbw.sig_reconnect_flowchart.connect(self.changed) self.pbw.setWindowTitle(self.name()) self.pbw.show()
[docs]class PeakFeatures(CtrlNode): """Extract peak features after peak detection""" nodeName = 'PeakFeatures' uiTemplate = [('data_column', 'combo', {}), # ('Compute', 'button', {'text': 'compute'}), ('Apply', 'check', {'applyBox': True, 'checked': False}) ] def __init__(self, *args, **kwargs): super(PeakFeatures, self).__init__(*args, **kwargs) self.peak_results = None def processData(self, transmission: Transmission): dcols = organize_dataframe_columns(transmission.df.columns)[0] self.ctrls['data_column'].setItems(dcols) if not self.apply_checked(): return self.computer = ComputePeakFeatures() self.t = self.computer.compute(transmission=transmission.copy(), data_column=self.data_column) params = {'data_column': self.data_column} self.t.history_trace.add_operation('all', 'peak-features', params) # Data units cannot be relied on anymore since any user specifid data column # can be used for computing peak features self.t.last_output = None return self.t
[docs]class SigmaMAD(CtrlNode): nodeName = 'SigmaMAD' uiTemplate = [('data_column', 'combo', {}), ('Apply', 'check', {'applyBox': True, 'checked': True}) ] def processData(self, transmission: Transmission): dcols = organize_dataframe_columns(transmission.df.columns)[0] self.ctrls['data_column'].setItems(dcols) if not self.apply_checked(): return self.t = transmission.copy() output_column = '_' + self.__class__.__name__.upper() params = {'data_column': self.data_column} self.t.df[output_column] = self.t.df[self.data_column].apply(lambda x: np.median(np.abs(x - np.median(x)))) self.t.history_trace.add_operation('all', self.__class__.__name__.lower(), params) return self.t