bci_essentials.paradigm.paradigm
1import numpy as np 2from abc import ABC, abstractmethod 3 4from ..utils.logger import Logger 5from ..signal_processing import bandpass, lowpass 6 7# Instantiate a logger for the module at the default level of logging.INFO 8# Logs to bci_essentials.__module__) where __module__ is the name of the module 9logger = Logger(name=__name__) 10 11 12class Paradigm(ABC): 13 def __init__(self, filters=[5, 30], channel_subset=None): 14 """ 15 Base class for all paradigms. 16 Please use a subclass of this class for your specific paradigm. 17 18 Parameters 19 ---------- 20 filters : list of floats | [5, 30] 21 Filter bands. 22 channel_subset : list of str | None 23 Channel subset to use. 24 """ 25 self.lowcut = filters[0] 26 self.highcut = filters[1] 27 self.channel_subset = channel_subset 28 29 # When do we return classifications? 30 self.classify_each_epoch = False 31 self.classify_each_trial = False 32 33 # Do we classify labeled epochs (such as in the case of iterative training)? 34 self.classify_labeled_epochs = False 35 36 self.paradigm_name = "Generic" 37 38 def _preprocess(self, eeg, fsample, lowcut, highcut, order=5): 39 """ 40 Preprocess EEG data with the appropriate filter type: 41 - If the data is continuous (i.e., shape is [channels, samples]), a 42 bandpass filter is used. 43 44 - If the data is epoched (i.e., shape is [epoch, channels, samples]), 45 the filter type depends on the signal length relative to the filter's settling time: 46 - If signal length > settling time: use bandpass filter 47 - If signal length ≤ settling time: use lowpass filter 48 - The settling time is calculated by: 49 1. Compute the time constant (tc) of the highpass filter: 50 tc = 1 / (2 * π * lowcut) 51 2. Compute the settling time with the formula: 52 settling_time = tc * 5 * order 53 - In a first-order system, a rule-of-thumb is that the signal settles in approximately 5 time constants (tc * 5) 54 - In higher-order filters (e.g., a 5th-order filter set as the default), the settling time incrases linearly with the order of the filter (order * tc * 5) 55 - This is a simplification, but it provides a good approximation for the settling time of the filter. 56 - For more details, see the reference below: 57 https://www.analogictips.com/an-overview-of-filters-and-their-parameters-part-4-time-and-phase-issues/ 58 59 Parameters 60 ---------- 61 eeg : np.ndarray 62 EEG data. Shape can be 2D [n_channels, n_samples] 63 or 3D [epochs, n_channels, n_samples]. 64 fsample : float 65 Sampling frequency. 66 lowcut : float 67 Lower cutoff frequency. 68 highcut : float 69 Upper cutoff frequency. 70 order : int 71 Order of the filter [n]. Default is 5. 72 73 Returns 74 ------- 75 np.ndarray 76 Preprocessed EEG. Shape is the same as `eeg`. 77 78 """ 79 80 n_dims = len(eeg.shape) 81 if n_dims == 2: 82 logger.debug("Preprocessing continuous EEG") 83 preprocessed_eeg = bandpass(eeg, lowcut, highcut, order, fsample) 84 elif n_dims == 3: 85 logger.debug("Preprocessing epoched EEG") 86 87 # Get the length of the signal 88 signal_length = eeg.shape[2] 89 90 # Highpass filter settling time 91 tc = 1 / (2 * np.pi * lowcut) 92 settling_time = order * tc * 5 93 94 if signal_length > settling_time: 95 logger.debug("Applied bandpass filter to epoched EEG") 96 preprocessed_eeg = bandpass(eeg, lowcut, highcut, order, fsample) 97 else: 98 logger.debug("Applied lowpass filter to epoched EEG") 99 preprocessed_eeg = lowpass(eeg, highcut, order, fsample) 100 else: 101 raise ValueError( 102 "Preprocessing failed. EEG must be 2D (continuous) or 3D (epoched)." 103 ) 104 105 return preprocessed_eeg 106 107 def package_resting_state_data( 108 self, marker_data, marker_timestamps, bci_controller, eeg_timestamps, fsample 109 ): 110 """Package resting state data. 111 112 Parameters 113 ---------- 114 marker_data : list of str 115 List of markers. 116 marker_timestamps : np.ndarray 117 Timestamps of markers. 118 bci_controller : np.ndarray 119 EEG data. Shape is (n_channels, n_samples). 120 eeg_timestamps : np.ndarray 121 Timestamps of EEG data. Shape is (n_samples,). 122 fsample : float 123 Sampling frequency. 124 125 Returns 126 ------- 127 resting_state_data : dict 128 Dictionary containing resting state data with keys: 129 - 'eyes_open_trials': np.ndarray of EEG data during eyes open condition 130 - 'eyes_open_timestamps': np.ndarray of timestamps for eyes open trials 131 - 'eyes_closed_trials': np.ndarray of EEG data during eyes closed condition 132 - 'eyes_closed_timestamps': np.ndarray of timestamps for eyes closed trials 133 - 'rest_trials': np.ndarray of EEG data during rest condition 134 - 'rest_timestamps': np.ndarray of timestamps for rest trials 135 If an error occurs, returns `None`. 136 137 """ 138 try: 139 logger.debug("Packaging resting state data") 140 141 eyes_open_start_time = [] 142 eyes_open_end_time = [] 143 eyes_closed_start_time = [] 144 eyes_closed_end_time = [] 145 rest_start_time = [] 146 rest_end_time = [] 147 148 # Initialize start and end locations 149 eyes_open_start_loc = [] 150 eyes_open_end_loc = [] 151 eyes_closed_start_loc = [] 152 eyes_closed_end_loc = [] 153 rest_start_loc = [] 154 rest_end_loc = [] 155 156 current_time = eeg_timestamps[0] 157 current_timestamp_loc = 0 158 159 self.fsample = fsample 160 self.n_channels = bci_controller.shape[0] 161 162 for i in range(len(marker_data)): 163 # Get current resting state data marker and time stamp 164 current_rs_data_marker = marker_data[i] 165 current_rs_timestamp = marker_timestamps[i] 166 167 # Increment the EEG until just past the marker timestamp 168 while current_time < current_rs_timestamp: 169 current_timestamp_loc += 1 170 current_time = eeg_timestamps[current_timestamp_loc] 171 172 # get eyes open start times 173 if current_rs_data_marker == "Start Eyes Open RS: 1": 174 eyes_open_start_time.append(current_rs_timestamp) 175 eyes_open_start_loc.append(current_timestamp_loc - 1) 176 logger.debug("received eyes open start") 177 178 # get eyes open end times 179 if current_rs_data_marker == "End Eyes Open RS: 1": 180 eyes_open_end_time.append(current_rs_timestamp) 181 eyes_open_end_loc.append(current_timestamp_loc) 182 logger.debug("received eyes open end") 183 184 # get eyes closed start times 185 if current_rs_data_marker == "Start Eyes Closed RS: 2": 186 eyes_closed_start_time.append(current_rs_timestamp) 187 eyes_closed_start_loc.append(current_timestamp_loc - 1) 188 logger.debug("received eyes closed start") 189 190 # get eyes closed end times 191 if current_rs_data_marker == "End Eyes Closed RS: 2": 192 eyes_closed_end_time.append(current_rs_timestamp) 193 eyes_closed_end_loc.append(current_timestamp_loc) 194 logger.debug("received eyes closed end") 195 196 # get rest start times 197 if current_rs_data_marker == "Start Rest for RS: 0": 198 rest_start_time.append(current_rs_timestamp) 199 rest_start_loc.append(current_timestamp_loc - 1) 200 logger.debug("received rest start") 201 # get rest end times 202 if current_rs_data_marker == "End Rest for RS: 0": 203 rest_end_time.append(current_rs_timestamp) 204 rest_end_loc.append(current_timestamp_loc) 205 logger.debug("received rest end") 206 207 # Eyes open 208 # Get duration, nsamples 209 210 if len(eyes_open_end_loc) > 0: 211 duration = np.floor(eyes_open_end_time[0] - eyes_open_start_time[0]) 212 n_samples = int(duration * self.fsample) 213 214 new_eeg = np.zeros((1, self.n_channels, n_samples)) 215 216 eyes_open_timestamps = np.arange(0, duration, 1 / self.fsample) 217 218 # Now copy EEG for these trials 219 for i in range(len(eyes_open_start_time)): 220 eyes_open_time_correction = eyes_open_start_time[i] 221 222 corrected_eeg_timestamps = ( 223 eeg_timestamps - eyes_open_time_correction 224 ) 225 226 for c in range(self.n_channels): 227 new_eeg[0, c, :] = np.interp( 228 eyes_open_timestamps, 229 corrected_eeg_timestamps, 230 bci_controller[c, :], 231 ) 232 233 if i == 0: 234 eyes_open_trials = new_eeg 235 eyes_open_timestamps = eyes_open_timestamps 236 else: 237 eyes_open_trials = np.concatenate( 238 (eyes_open_trials, new_eeg), axis=0 239 ) 240 241 # Eyes closed 242 243 if len(eyes_closed_end_loc) > 0: 244 # Get duration, nsmaples 245 duration = np.floor(eyes_closed_end_time[0] - eyes_closed_start_time[0]) 246 n_samples = int(duration * self.fsample) 247 248 new_eeg = np.zeros((1, self.n_channels, n_samples)) 249 250 eyes_closed_timestamps = np.arange(0, duration, 1 / self.fsample) 251 252 # Now copy EEG for these trials 253 for i in range(len(eyes_closed_start_time)): 254 eyes_closed_time_correction = eyes_closed_start_time[i] 255 256 corrected_eeg_timestamps = ( 257 eeg_timestamps - eyes_closed_time_correction 258 ) 259 260 for c in range(self.n_channels): 261 new_eeg[0, c, :] = np.interp( 262 eyes_closed_timestamps, 263 corrected_eeg_timestamps, 264 bci_controller[c, :], 265 ) 266 267 if i == 0: 268 eyes_closed_trials = new_eeg 269 eyes_closed_timestamps = eyes_closed_timestamps 270 else: 271 eyes_closed_trials = np.concatenate( 272 (eyes_closed_trials, new_eeg), axis=0 273 ) 274 275 # Rest 276 if len(rest_end_loc) > 0: 277 # Get duration, nsmaples 278 while rest_end_time[0] < rest_start_time[0]: 279 rest_end_time.pop(0) 280 rest_end_loc.pop(0) 281 282 duration = np.floor(rest_end_time[0] - rest_start_time[0]) 283 n_samples = int(duration * self.fsample) 284 285 new_eeg = np.zeros((1, self.n_channels, n_samples)) 286 287 rest_timestamps = np.arange(0, duration, 1 / self.fsample) 288 289 # Now copy EEG for these trials 290 for i in range(len(rest_start_time)): 291 rest_time_correction = rest_start_time[i] 292 293 corrected_eeg_timestamps = eeg_timestamps - rest_time_correction 294 295 for c in range(self.n_channels): 296 new_eeg[0, c, :] = np.interp( 297 rest_timestamps, 298 corrected_eeg_timestamps, 299 bci_controller[c, :], 300 ) 301 302 if i == 0: 303 rest_trials = new_eeg 304 rest_timestamps = rest_timestamps 305 306 else: 307 rest_trials = np.concatenate((rest_trials, new_eeg), axis=0) 308 309 # Put all the available data into a dictionary 310 resting_state_data = { 311 "eyes_open_trials": eyes_open_trials, 312 "eyes_open_timestamps": eyes_open_timestamps, 313 "eyes_closed_trials": eyes_closed_trials, 314 "eyes_closed_timestamps": eyes_closed_timestamps, 315 "rest_trials": rest_trials, 316 "rest_timestamps": rest_timestamps, 317 } 318 319 logger.debug("Done packaging resting state data") 320 321 return resting_state_data 322 323 except Exception as e: 324 logger.error("Error packaging resting state data: %s", e) 325 326 return None 327 328 @abstractmethod 329 def get_eeg_start_and_end_times(self, markers, timestamps): 330 """ 331 Get the start and end times of the EEG data based on the markers. 332 333 Parameters 334 ---------- 335 markers : list of str 336 List of markers. 337 timestamps : list of float 338 List of timestamps. 339 340 Returns 341 ------- 342 float 343 Start time. 344 float 345 End time. 346 """ 347 348 pass 349 350 @abstractmethod 351 def process_markers(self, markers, marker_timestamps, eeg, eeg_timestamps, fsample): 352 """ 353 This takes in the markers and EEG data and processes them into epochs accordingt to the MI paradigm. 354 355 Parameters 356 ---------- 357 markers : list of str 358 List of markers. 359 marker_timestamps : list of float 360 List of timestamps. 361 eeg : np.array 362 EEG data. Shape is (n_channels, n_samples). 363 eeg_timestamps : np.array 364 EEG timestamps. Shape is (n_samples). 365 fsample : float 366 Sampling frequency. 367 368 Returns 369 ------- 370 np.array 371 Processed EEG data. Shape is (n_epochs, n_channels, n_samples). 372 np.array 373 Labels. Shape is (n_epochs). 374 """ 375 376 pass
logger =
<bci_essentials.utils.logger.Logger object>
class
Paradigm(abc.ABC):
13class Paradigm(ABC): 14 def __init__(self, filters=[5, 30], channel_subset=None): 15 """ 16 Base class for all paradigms. 17 Please use a subclass of this class for your specific paradigm. 18 19 Parameters 20 ---------- 21 filters : list of floats | [5, 30] 22 Filter bands. 23 channel_subset : list of str | None 24 Channel subset to use. 25 """ 26 self.lowcut = filters[0] 27 self.highcut = filters[1] 28 self.channel_subset = channel_subset 29 30 # When do we return classifications? 31 self.classify_each_epoch = False 32 self.classify_each_trial = False 33 34 # Do we classify labeled epochs (such as in the case of iterative training)? 35 self.classify_labeled_epochs = False 36 37 self.paradigm_name = "Generic" 38 39 def _preprocess(self, eeg, fsample, lowcut, highcut, order=5): 40 """ 41 Preprocess EEG data with the appropriate filter type: 42 - If the data is continuous (i.e., shape is [channels, samples]), a 43 bandpass filter is used. 44 45 - If the data is epoched (i.e., shape is [epoch, channels, samples]), 46 the filter type depends on the signal length relative to the filter's settling time: 47 - If signal length > settling time: use bandpass filter 48 - If signal length ≤ settling time: use lowpass filter 49 - The settling time is calculated by: 50 1. Compute the time constant (tc) of the highpass filter: 51 tc = 1 / (2 * π * lowcut) 52 2. Compute the settling time with the formula: 53 settling_time = tc * 5 * order 54 - In a first-order system, a rule-of-thumb is that the signal settles in approximately 5 time constants (tc * 5) 55 - In higher-order filters (e.g., a 5th-order filter set as the default), the settling time incrases linearly with the order of the filter (order * tc * 5) 56 - This is a simplification, but it provides a good approximation for the settling time of the filter. 57 - For more details, see the reference below: 58 https://www.analogictips.com/an-overview-of-filters-and-their-parameters-part-4-time-and-phase-issues/ 59 60 Parameters 61 ---------- 62 eeg : np.ndarray 63 EEG data. Shape can be 2D [n_channels, n_samples] 64 or 3D [epochs, n_channels, n_samples]. 65 fsample : float 66 Sampling frequency. 67 lowcut : float 68 Lower cutoff frequency. 69 highcut : float 70 Upper cutoff frequency. 71 order : int 72 Order of the filter [n]. Default is 5. 73 74 Returns 75 ------- 76 np.ndarray 77 Preprocessed EEG. Shape is the same as `eeg`. 78 79 """ 80 81 n_dims = len(eeg.shape) 82 if n_dims == 2: 83 logger.debug("Preprocessing continuous EEG") 84 preprocessed_eeg = bandpass(eeg, lowcut, highcut, order, fsample) 85 elif n_dims == 3: 86 logger.debug("Preprocessing epoched EEG") 87 88 # Get the length of the signal 89 signal_length = eeg.shape[2] 90 91 # Highpass filter settling time 92 tc = 1 / (2 * np.pi * lowcut) 93 settling_time = order * tc * 5 94 95 if signal_length > settling_time: 96 logger.debug("Applied bandpass filter to epoched EEG") 97 preprocessed_eeg = bandpass(eeg, lowcut, highcut, order, fsample) 98 else: 99 logger.debug("Applied lowpass filter to epoched EEG") 100 preprocessed_eeg = lowpass(eeg, highcut, order, fsample) 101 else: 102 raise ValueError( 103 "Preprocessing failed. EEG must be 2D (continuous) or 3D (epoched)." 104 ) 105 106 return preprocessed_eeg 107 108 def package_resting_state_data( 109 self, marker_data, marker_timestamps, bci_controller, eeg_timestamps, fsample 110 ): 111 """Package resting state data. 112 113 Parameters 114 ---------- 115 marker_data : list of str 116 List of markers. 117 marker_timestamps : np.ndarray 118 Timestamps of markers. 119 bci_controller : np.ndarray 120 EEG data. Shape is (n_channels, n_samples). 121 eeg_timestamps : np.ndarray 122 Timestamps of EEG data. Shape is (n_samples,). 123 fsample : float 124 Sampling frequency. 125 126 Returns 127 ------- 128 resting_state_data : dict 129 Dictionary containing resting state data with keys: 130 - 'eyes_open_trials': np.ndarray of EEG data during eyes open condition 131 - 'eyes_open_timestamps': np.ndarray of timestamps for eyes open trials 132 - 'eyes_closed_trials': np.ndarray of EEG data during eyes closed condition 133 - 'eyes_closed_timestamps': np.ndarray of timestamps for eyes closed trials 134 - 'rest_trials': np.ndarray of EEG data during rest condition 135 - 'rest_timestamps': np.ndarray of timestamps for rest trials 136 If an error occurs, returns `None`. 137 138 """ 139 try: 140 logger.debug("Packaging resting state data") 141 142 eyes_open_start_time = [] 143 eyes_open_end_time = [] 144 eyes_closed_start_time = [] 145 eyes_closed_end_time = [] 146 rest_start_time = [] 147 rest_end_time = [] 148 149 # Initialize start and end locations 150 eyes_open_start_loc = [] 151 eyes_open_end_loc = [] 152 eyes_closed_start_loc = [] 153 eyes_closed_end_loc = [] 154 rest_start_loc = [] 155 rest_end_loc = [] 156 157 current_time = eeg_timestamps[0] 158 current_timestamp_loc = 0 159 160 self.fsample = fsample 161 self.n_channels = bci_controller.shape[0] 162 163 for i in range(len(marker_data)): 164 # Get current resting state data marker and time stamp 165 current_rs_data_marker = marker_data[i] 166 current_rs_timestamp = marker_timestamps[i] 167 168 # Increment the EEG until just past the marker timestamp 169 while current_time < current_rs_timestamp: 170 current_timestamp_loc += 1 171 current_time = eeg_timestamps[current_timestamp_loc] 172 173 # get eyes open start times 174 if current_rs_data_marker == "Start Eyes Open RS: 1": 175 eyes_open_start_time.append(current_rs_timestamp) 176 eyes_open_start_loc.append(current_timestamp_loc - 1) 177 logger.debug("received eyes open start") 178 179 # get eyes open end times 180 if current_rs_data_marker == "End Eyes Open RS: 1": 181 eyes_open_end_time.append(current_rs_timestamp) 182 eyes_open_end_loc.append(current_timestamp_loc) 183 logger.debug("received eyes open end") 184 185 # get eyes closed start times 186 if current_rs_data_marker == "Start Eyes Closed RS: 2": 187 eyes_closed_start_time.append(current_rs_timestamp) 188 eyes_closed_start_loc.append(current_timestamp_loc - 1) 189 logger.debug("received eyes closed start") 190 191 # get eyes closed end times 192 if current_rs_data_marker == "End Eyes Closed RS: 2": 193 eyes_closed_end_time.append(current_rs_timestamp) 194 eyes_closed_end_loc.append(current_timestamp_loc) 195 logger.debug("received eyes closed end") 196 197 # get rest start times 198 if current_rs_data_marker == "Start Rest for RS: 0": 199 rest_start_time.append(current_rs_timestamp) 200 rest_start_loc.append(current_timestamp_loc - 1) 201 logger.debug("received rest start") 202 # get rest end times 203 if current_rs_data_marker == "End Rest for RS: 0": 204 rest_end_time.append(current_rs_timestamp) 205 rest_end_loc.append(current_timestamp_loc) 206 logger.debug("received rest end") 207 208 # Eyes open 209 # Get duration, nsamples 210 211 if len(eyes_open_end_loc) > 0: 212 duration = np.floor(eyes_open_end_time[0] - eyes_open_start_time[0]) 213 n_samples = int(duration * self.fsample) 214 215 new_eeg = np.zeros((1, self.n_channels, n_samples)) 216 217 eyes_open_timestamps = np.arange(0, duration, 1 / self.fsample) 218 219 # Now copy EEG for these trials 220 for i in range(len(eyes_open_start_time)): 221 eyes_open_time_correction = eyes_open_start_time[i] 222 223 corrected_eeg_timestamps = ( 224 eeg_timestamps - eyes_open_time_correction 225 ) 226 227 for c in range(self.n_channels): 228 new_eeg[0, c, :] = np.interp( 229 eyes_open_timestamps, 230 corrected_eeg_timestamps, 231 bci_controller[c, :], 232 ) 233 234 if i == 0: 235 eyes_open_trials = new_eeg 236 eyes_open_timestamps = eyes_open_timestamps 237 else: 238 eyes_open_trials = np.concatenate( 239 (eyes_open_trials, new_eeg), axis=0 240 ) 241 242 # Eyes closed 243 244 if len(eyes_closed_end_loc) > 0: 245 # Get duration, nsmaples 246 duration = np.floor(eyes_closed_end_time[0] - eyes_closed_start_time[0]) 247 n_samples = int(duration * self.fsample) 248 249 new_eeg = np.zeros((1, self.n_channels, n_samples)) 250 251 eyes_closed_timestamps = np.arange(0, duration, 1 / self.fsample) 252 253 # Now copy EEG for these trials 254 for i in range(len(eyes_closed_start_time)): 255 eyes_closed_time_correction = eyes_closed_start_time[i] 256 257 corrected_eeg_timestamps = ( 258 eeg_timestamps - eyes_closed_time_correction 259 ) 260 261 for c in range(self.n_channels): 262 new_eeg[0, c, :] = np.interp( 263 eyes_closed_timestamps, 264 corrected_eeg_timestamps, 265 bci_controller[c, :], 266 ) 267 268 if i == 0: 269 eyes_closed_trials = new_eeg 270 eyes_closed_timestamps = eyes_closed_timestamps 271 else: 272 eyes_closed_trials = np.concatenate( 273 (eyes_closed_trials, new_eeg), axis=0 274 ) 275 276 # Rest 277 if len(rest_end_loc) > 0: 278 # Get duration, nsmaples 279 while rest_end_time[0] < rest_start_time[0]: 280 rest_end_time.pop(0) 281 rest_end_loc.pop(0) 282 283 duration = np.floor(rest_end_time[0] - rest_start_time[0]) 284 n_samples = int(duration * self.fsample) 285 286 new_eeg = np.zeros((1, self.n_channels, n_samples)) 287 288 rest_timestamps = np.arange(0, duration, 1 / self.fsample) 289 290 # Now copy EEG for these trials 291 for i in range(len(rest_start_time)): 292 rest_time_correction = rest_start_time[i] 293 294 corrected_eeg_timestamps = eeg_timestamps - rest_time_correction 295 296 for c in range(self.n_channels): 297 new_eeg[0, c, :] = np.interp( 298 rest_timestamps, 299 corrected_eeg_timestamps, 300 bci_controller[c, :], 301 ) 302 303 if i == 0: 304 rest_trials = new_eeg 305 rest_timestamps = rest_timestamps 306 307 else: 308 rest_trials = np.concatenate((rest_trials, new_eeg), axis=0) 309 310 # Put all the available data into a dictionary 311 resting_state_data = { 312 "eyes_open_trials": eyes_open_trials, 313 "eyes_open_timestamps": eyes_open_timestamps, 314 "eyes_closed_trials": eyes_closed_trials, 315 "eyes_closed_timestamps": eyes_closed_timestamps, 316 "rest_trials": rest_trials, 317 "rest_timestamps": rest_timestamps, 318 } 319 320 logger.debug("Done packaging resting state data") 321 322 return resting_state_data 323 324 except Exception as e: 325 logger.error("Error packaging resting state data: %s", e) 326 327 return None 328 329 @abstractmethod 330 def get_eeg_start_and_end_times(self, markers, timestamps): 331 """ 332 Get the start and end times of the EEG data based on the markers. 333 334 Parameters 335 ---------- 336 markers : list of str 337 List of markers. 338 timestamps : list of float 339 List of timestamps. 340 341 Returns 342 ------- 343 float 344 Start time. 345 float 346 End time. 347 """ 348 349 pass 350 351 @abstractmethod 352 def process_markers(self, markers, marker_timestamps, eeg, eeg_timestamps, fsample): 353 """ 354 This takes in the markers and EEG data and processes them into epochs accordingt to the MI paradigm. 355 356 Parameters 357 ---------- 358 markers : list of str 359 List of markers. 360 marker_timestamps : list of float 361 List of timestamps. 362 eeg : np.array 363 EEG data. Shape is (n_channels, n_samples). 364 eeg_timestamps : np.array 365 EEG timestamps. Shape is (n_samples). 366 fsample : float 367 Sampling frequency. 368 369 Returns 370 ------- 371 np.array 372 Processed EEG data. Shape is (n_epochs, n_channels, n_samples). 373 np.array 374 Labels. Shape is (n_epochs). 375 """ 376 377 pass
Helper class that provides a standard way to create an ABC using inheritance.
Paradigm(filters=[5, 30], channel_subset=None)
14 def __init__(self, filters=[5, 30], channel_subset=None): 15 """ 16 Base class for all paradigms. 17 Please use a subclass of this class for your specific paradigm. 18 19 Parameters 20 ---------- 21 filters : list of floats | [5, 30] 22 Filter bands. 23 channel_subset : list of str | None 24 Channel subset to use. 25 """ 26 self.lowcut = filters[0] 27 self.highcut = filters[1] 28 self.channel_subset = channel_subset 29 30 # When do we return classifications? 31 self.classify_each_epoch = False 32 self.classify_each_trial = False 33 34 # Do we classify labeled epochs (such as in the case of iterative training)? 35 self.classify_labeled_epochs = False 36 37 self.paradigm_name = "Generic"
Base class for all paradigms. Please use a subclass of this class for your specific paradigm.
Parameters
- filters (list of floats | [5, 30]): Filter bands.
- channel_subset (list of str | None): Channel subset to use.
def
package_resting_state_data( self, marker_data, marker_timestamps, bci_controller, eeg_timestamps, fsample):
108 def package_resting_state_data( 109 self, marker_data, marker_timestamps, bci_controller, eeg_timestamps, fsample 110 ): 111 """Package resting state data. 112 113 Parameters 114 ---------- 115 marker_data : list of str 116 List of markers. 117 marker_timestamps : np.ndarray 118 Timestamps of markers. 119 bci_controller : np.ndarray 120 EEG data. Shape is (n_channels, n_samples). 121 eeg_timestamps : np.ndarray 122 Timestamps of EEG data. Shape is (n_samples,). 123 fsample : float 124 Sampling frequency. 125 126 Returns 127 ------- 128 resting_state_data : dict 129 Dictionary containing resting state data with keys: 130 - 'eyes_open_trials': np.ndarray of EEG data during eyes open condition 131 - 'eyes_open_timestamps': np.ndarray of timestamps for eyes open trials 132 - 'eyes_closed_trials': np.ndarray of EEG data during eyes closed condition 133 - 'eyes_closed_timestamps': np.ndarray of timestamps for eyes closed trials 134 - 'rest_trials': np.ndarray of EEG data during rest condition 135 - 'rest_timestamps': np.ndarray of timestamps for rest trials 136 If an error occurs, returns `None`. 137 138 """ 139 try: 140 logger.debug("Packaging resting state data") 141 142 eyes_open_start_time = [] 143 eyes_open_end_time = [] 144 eyes_closed_start_time = [] 145 eyes_closed_end_time = [] 146 rest_start_time = [] 147 rest_end_time = [] 148 149 # Initialize start and end locations 150 eyes_open_start_loc = [] 151 eyes_open_end_loc = [] 152 eyes_closed_start_loc = [] 153 eyes_closed_end_loc = [] 154 rest_start_loc = [] 155 rest_end_loc = [] 156 157 current_time = eeg_timestamps[0] 158 current_timestamp_loc = 0 159 160 self.fsample = fsample 161 self.n_channels = bci_controller.shape[0] 162 163 for i in range(len(marker_data)): 164 # Get current resting state data marker and time stamp 165 current_rs_data_marker = marker_data[i] 166 current_rs_timestamp = marker_timestamps[i] 167 168 # Increment the EEG until just past the marker timestamp 169 while current_time < current_rs_timestamp: 170 current_timestamp_loc += 1 171 current_time = eeg_timestamps[current_timestamp_loc] 172 173 # get eyes open start times 174 if current_rs_data_marker == "Start Eyes Open RS: 1": 175 eyes_open_start_time.append(current_rs_timestamp) 176 eyes_open_start_loc.append(current_timestamp_loc - 1) 177 logger.debug("received eyes open start") 178 179 # get eyes open end times 180 if current_rs_data_marker == "End Eyes Open RS: 1": 181 eyes_open_end_time.append(current_rs_timestamp) 182 eyes_open_end_loc.append(current_timestamp_loc) 183 logger.debug("received eyes open end") 184 185 # get eyes closed start times 186 if current_rs_data_marker == "Start Eyes Closed RS: 2": 187 eyes_closed_start_time.append(current_rs_timestamp) 188 eyes_closed_start_loc.append(current_timestamp_loc - 1) 189 logger.debug("received eyes closed start") 190 191 # get eyes closed end times 192 if current_rs_data_marker == "End Eyes Closed RS: 2": 193 eyes_closed_end_time.append(current_rs_timestamp) 194 eyes_closed_end_loc.append(current_timestamp_loc) 195 logger.debug("received eyes closed end") 196 197 # get rest start times 198 if current_rs_data_marker == "Start Rest for RS: 0": 199 rest_start_time.append(current_rs_timestamp) 200 rest_start_loc.append(current_timestamp_loc - 1) 201 logger.debug("received rest start") 202 # get rest end times 203 if current_rs_data_marker == "End Rest for RS: 0": 204 rest_end_time.append(current_rs_timestamp) 205 rest_end_loc.append(current_timestamp_loc) 206 logger.debug("received rest end") 207 208 # Eyes open 209 # Get duration, nsamples 210 211 if len(eyes_open_end_loc) > 0: 212 duration = np.floor(eyes_open_end_time[0] - eyes_open_start_time[0]) 213 n_samples = int(duration * self.fsample) 214 215 new_eeg = np.zeros((1, self.n_channels, n_samples)) 216 217 eyes_open_timestamps = np.arange(0, duration, 1 / self.fsample) 218 219 # Now copy EEG for these trials 220 for i in range(len(eyes_open_start_time)): 221 eyes_open_time_correction = eyes_open_start_time[i] 222 223 corrected_eeg_timestamps = ( 224 eeg_timestamps - eyes_open_time_correction 225 ) 226 227 for c in range(self.n_channels): 228 new_eeg[0, c, :] = np.interp( 229 eyes_open_timestamps, 230 corrected_eeg_timestamps, 231 bci_controller[c, :], 232 ) 233 234 if i == 0: 235 eyes_open_trials = new_eeg 236 eyes_open_timestamps = eyes_open_timestamps 237 else: 238 eyes_open_trials = np.concatenate( 239 (eyes_open_trials, new_eeg), axis=0 240 ) 241 242 # Eyes closed 243 244 if len(eyes_closed_end_loc) > 0: 245 # Get duration, nsmaples 246 duration = np.floor(eyes_closed_end_time[0] - eyes_closed_start_time[0]) 247 n_samples = int(duration * self.fsample) 248 249 new_eeg = np.zeros((1, self.n_channels, n_samples)) 250 251 eyes_closed_timestamps = np.arange(0, duration, 1 / self.fsample) 252 253 # Now copy EEG for these trials 254 for i in range(len(eyes_closed_start_time)): 255 eyes_closed_time_correction = eyes_closed_start_time[i] 256 257 corrected_eeg_timestamps = ( 258 eeg_timestamps - eyes_closed_time_correction 259 ) 260 261 for c in range(self.n_channels): 262 new_eeg[0, c, :] = np.interp( 263 eyes_closed_timestamps, 264 corrected_eeg_timestamps, 265 bci_controller[c, :], 266 ) 267 268 if i == 0: 269 eyes_closed_trials = new_eeg 270 eyes_closed_timestamps = eyes_closed_timestamps 271 else: 272 eyes_closed_trials = np.concatenate( 273 (eyes_closed_trials, new_eeg), axis=0 274 ) 275 276 # Rest 277 if len(rest_end_loc) > 0: 278 # Get duration, nsmaples 279 while rest_end_time[0] < rest_start_time[0]: 280 rest_end_time.pop(0) 281 rest_end_loc.pop(0) 282 283 duration = np.floor(rest_end_time[0] - rest_start_time[0]) 284 n_samples = int(duration * self.fsample) 285 286 new_eeg = np.zeros((1, self.n_channels, n_samples)) 287 288 rest_timestamps = np.arange(0, duration, 1 / self.fsample) 289 290 # Now copy EEG for these trials 291 for i in range(len(rest_start_time)): 292 rest_time_correction = rest_start_time[i] 293 294 corrected_eeg_timestamps = eeg_timestamps - rest_time_correction 295 296 for c in range(self.n_channels): 297 new_eeg[0, c, :] = np.interp( 298 rest_timestamps, 299 corrected_eeg_timestamps, 300 bci_controller[c, :], 301 ) 302 303 if i == 0: 304 rest_trials = new_eeg 305 rest_timestamps = rest_timestamps 306 307 else: 308 rest_trials = np.concatenate((rest_trials, new_eeg), axis=0) 309 310 # Put all the available data into a dictionary 311 resting_state_data = { 312 "eyes_open_trials": eyes_open_trials, 313 "eyes_open_timestamps": eyes_open_timestamps, 314 "eyes_closed_trials": eyes_closed_trials, 315 "eyes_closed_timestamps": eyes_closed_timestamps, 316 "rest_trials": rest_trials, 317 "rest_timestamps": rest_timestamps, 318 } 319 320 logger.debug("Done packaging resting state data") 321 322 return resting_state_data 323 324 except Exception as e: 325 logger.error("Error packaging resting state data: %s", e) 326 327 return None
Package resting state data.
Parameters
- marker_data (list of str): List of markers.
- marker_timestamps (np.ndarray): Timestamps of markers.
- bci_controller (np.ndarray): EEG data. Shape is (n_channels, n_samples).
- eeg_timestamps (np.ndarray): Timestamps of EEG data. Shape is (n_samples,).
- fsample (float): Sampling frequency.
Returns
- resting_state_data (dict):
Dictionary containing resting state data with keys:
- 'eyes_open_trials': np.ndarray of EEG data during eyes open condition
- 'eyes_open_timestamps': np.ndarray of timestamps for eyes open trials
- 'eyes_closed_trials': np.ndarray of EEG data during eyes closed condition
- 'eyes_closed_timestamps': np.ndarray of timestamps for eyes closed trials
- 'rest_trials': np.ndarray of EEG data during rest condition
- 'rest_timestamps': np.ndarray of timestamps for rest trials
If an error occurs, returns
None.
@abstractmethod
def
get_eeg_start_and_end_times(self, markers, timestamps):
329 @abstractmethod 330 def get_eeg_start_and_end_times(self, markers, timestamps): 331 """ 332 Get the start and end times of the EEG data based on the markers. 333 334 Parameters 335 ---------- 336 markers : list of str 337 List of markers. 338 timestamps : list of float 339 List of timestamps. 340 341 Returns 342 ------- 343 float 344 Start time. 345 float 346 End time. 347 """ 348 349 pass
Get the start and end times of the EEG data based on the markers.
Parameters
- markers (list of str): List of markers.
- timestamps (list of float): List of timestamps.
Returns
- float: Start time.
- float: End time.
@abstractmethod
def
process_markers(self, markers, marker_timestamps, eeg, eeg_timestamps, fsample):
351 @abstractmethod 352 def process_markers(self, markers, marker_timestamps, eeg, eeg_timestamps, fsample): 353 """ 354 This takes in the markers and EEG data and processes them into epochs accordingt to the MI paradigm. 355 356 Parameters 357 ---------- 358 markers : list of str 359 List of markers. 360 marker_timestamps : list of float 361 List of timestamps. 362 eeg : np.array 363 EEG data. Shape is (n_channels, n_samples). 364 eeg_timestamps : np.array 365 EEG timestamps. Shape is (n_samples). 366 fsample : float 367 Sampling frequency. 368 369 Returns 370 ------- 371 np.array 372 Processed EEG data. Shape is (n_epochs, n_channels, n_samples). 373 np.array 374 Labels. Shape is (n_epochs). 375 """ 376 377 pass
This takes in the markers and EEG data and processes them into epochs accordingt to the MI paradigm.
Parameters
- markers (list of str): List of markers.
- marker_timestamps (list of float): List of timestamps.
- eeg (np.array): EEG data. Shape is (n_channels, n_samples).
- eeg_timestamps (np.array): EEG timestamps. Shape is (n_samples).
- fsample (float): Sampling frequency.
Returns
- np.array: Processed EEG data. Shape is (n_epochs, n_channels, n_samples).
- np.array: Labels. Shape is (n_epochs).