bci_essentials.paradigm.p300_paradigm
1import numpy as np 2 3from .paradigm import Paradigm 4 5 6class P300Paradigm(Paradigm): 7 """ 8 P300 paradigm. 9 """ 10 11 def __init__( 12 self, 13 filters=[1, 15], 14 iterative_training=False, 15 epoch_start=0, 16 epoch_end=0.6, 17 buffer_time=0.01, 18 ): 19 """ 20 Parameters 21 ---------- 22 filters : list of floats, *optional* 23 Filter bands. 24 - Default is `[1, 15]`. 25 iterative_training : bool, *optional* 26 Flag to indicate if the classifier will be updated iteratively. 27 - Default is `False`. 28 epoch_start : float, *optional* 29 The start of the epoch relative to flash onset in seconds. 30 - Default is `0`. 31 epoch_end : float, *optional* 32 The end of the epoch relative to flash onset in seconds. 33 - Default is `0.6`. 34 buffer_time : float, *optional* 35 Defines the time in seconds after an epoch for which we require EEG data to ensure that all EEG is present in that epoch. 36 - Default is `0.01`. 37 """ 38 39 super().__init__(filters) 40 41 self.iterative_training = iterative_training 42 43 # The P300 paradigm needs epochs from each object in order to decide which object was selected 44 # therefore we need to classify each trial 45 self.classify_each_trial = True 46 self.classify_each_epoch = False 47 48 # This paradigm uses ensemble averaging to increase the signal to nosie ratio and will average 49 # over all epochs for each object by default 50 self.ensemble_average = True 51 52 self.epoch_start = epoch_start 53 self.epoch_end = epoch_end 54 55 self.buffer_time = buffer_time 56 57 self.paradigm_name = "P300" 58 59 def get_eeg_start_and_end_times(self, markers, timestamps): 60 """ 61 Get the start and end times of the EEG data based on the markers. 62 63 Parameters 64 ---------- 65 markers : list of str 66 List of markers. 67 timestamps : list of float 68 List of timestamps. 69 70 Returns 71 ------- 72 float 73 Start time. 74 float 75 End time. 76 """ 77 start_time = timestamps[0] + self.epoch_start - self.buffer_time 78 79 end_time = timestamps[-1] + self.epoch_end + self.buffer_time 80 81 return start_time, end_time 82 83 def process_markers(self, markers, marker_timestamps, eeg, eeg_timestamps, fsample): 84 """ 85 This takes in the markers and EEG data and processes them into epochs according to the P300 paradigm. 86 87 Parameters 88 ---------- 89 markers : list of str 90 List of markers. 91 marker_timestamps : list of float 92 List of timestamps. 93 eeg : np.array 94 EEG data. Shape is (n_channels, n_samples). 95 eeg_timestamps : np.array 96 EEG timestamps. Shape is (n_samples). 97 fsample : float 98 Sampling frequency. 99 100 Returns 101 ------- 102 np.array 103 Processed EEG data. Shape is (n_epochs, n_channels, n_samples). 104 np.array 105 Labels. Shape is (n_epochs). 106 """ 107 108 n_channels, _ = eeg.shape 109 num_objects = int(markers[0].split(",")[2]) 110 train_target = int(markers[0].split(",")[3]) 111 y = np.zeros(num_objects, dtype=int) 112 if train_target != -1: 113 # Convert 1-indexed target to array index 114 train_target_array_index = train_target - 1 115 y[train_target_array_index] = 1 116 if train_target == -1: # Set all values of y to -1 117 y = np.full(num_objects, -1) 118 119 flash_counts = np.zeros(num_objects) 120 121 # X = np.zeros((num_objects, n_channels, len(epoch_time))) 122 123 # Do ensemble averaging so that we return a single epoch for each object 124 125 for i, marker in enumerate(markers): 126 marker = marker.split(",") 127 flash_indices = [int(x) for x in marker[4:]] 128 129 n_channels, _ = eeg.shape 130 131 marker_timestamp = marker_timestamps[i] 132 133 # Subtract the marker timestamp from the EEG timestamps so that 0 becomes the marker onset 134 marker_eeg_timestamps = eeg_timestamps - marker_timestamp 135 136 # Create the epoch time vector 137 epoch_time = np.arange(self.epoch_start, self.epoch_end, 1 / fsample) 138 139 epoch_X = np.zeros((1, n_channels, len(epoch_time))) 140 141 # Initialize object_epochs if this is the first epoch 142 if i == 0: 143 object_epochs = [ 144 np.zeros((num_objects, n_channels, len(epoch_time))) 145 ] * num_objects 146 147 # Interpolate the EEG data to the epoch time vector for each channel 148 for c in range(n_channels): 149 epoch_X[0, c, :] = np.interp( 150 epoch_time, marker_eeg_timestamps, eeg[c, :] 151 ) 152 153 epoch_X[0, :, :] = super()._preprocess( 154 epoch_X[0, :, :], fsample, self.lowcut, self.highcut 155 ) 156 157 # For each flash index in the marker 158 for flash_index in flash_indices: 159 # Convert 1-indexed value to array index 160 flash_array_index = flash_index - 1 161 162 if flash_counts[flash_array_index] == 0: 163 object_epochs[flash_array_index] = epoch_X 164 flash_counts[flash_array_index] += 1 165 else: 166 object_epochs[flash_array_index] = np.concatenate( 167 (object_epochs[flash_array_index], epoch_X), axis=0 168 ) 169 flash_counts[flash_array_index] += 1 170 171 # Average all epochs for each object 172 object_epochs_mean = [np.zeros((n_channels, len(epoch_time)))] * num_objects 173 for i in range(num_objects): 174 object_epochs_mean[i] = np.mean(object_epochs[i], axis=0) 175 176 X = np.zeros((num_objects, n_channels, len(epoch_time))) 177 for i in range(num_objects): 178 X[i, :, :] = object_epochs_mean[i] 179 # # object_epochs_mean = np.mean(object_epochs, axis=1) 180 # X = object_epochs_mean 181 182 return X, y 183 184 # TODO: Implement this 185 def check_compatibility(self): 186 pass
7class P300Paradigm(Paradigm): 8 """ 9 P300 paradigm. 10 """ 11 12 def __init__( 13 self, 14 filters=[1, 15], 15 iterative_training=False, 16 epoch_start=0, 17 epoch_end=0.6, 18 buffer_time=0.01, 19 ): 20 """ 21 Parameters 22 ---------- 23 filters : list of floats, *optional* 24 Filter bands. 25 - Default is `[1, 15]`. 26 iterative_training : bool, *optional* 27 Flag to indicate if the classifier will be updated iteratively. 28 - Default is `False`. 29 epoch_start : float, *optional* 30 The start of the epoch relative to flash onset in seconds. 31 - Default is `0`. 32 epoch_end : float, *optional* 33 The end of the epoch relative to flash onset in seconds. 34 - Default is `0.6`. 35 buffer_time : float, *optional* 36 Defines the time in seconds after an epoch for which we require EEG data to ensure that all EEG is present in that epoch. 37 - Default is `0.01`. 38 """ 39 40 super().__init__(filters) 41 42 self.iterative_training = iterative_training 43 44 # The P300 paradigm needs epochs from each object in order to decide which object was selected 45 # therefore we need to classify each trial 46 self.classify_each_trial = True 47 self.classify_each_epoch = False 48 49 # This paradigm uses ensemble averaging to increase the signal to nosie ratio and will average 50 # over all epochs for each object by default 51 self.ensemble_average = True 52 53 self.epoch_start = epoch_start 54 self.epoch_end = epoch_end 55 56 self.buffer_time = buffer_time 57 58 self.paradigm_name = "P300" 59 60 def get_eeg_start_and_end_times(self, markers, timestamps): 61 """ 62 Get the start and end times of the EEG data based on the markers. 63 64 Parameters 65 ---------- 66 markers : list of str 67 List of markers. 68 timestamps : list of float 69 List of timestamps. 70 71 Returns 72 ------- 73 float 74 Start time. 75 float 76 End time. 77 """ 78 start_time = timestamps[0] + self.epoch_start - self.buffer_time 79 80 end_time = timestamps[-1] + self.epoch_end + self.buffer_time 81 82 return start_time, end_time 83 84 def process_markers(self, markers, marker_timestamps, eeg, eeg_timestamps, fsample): 85 """ 86 This takes in the markers and EEG data and processes them into epochs according to the P300 paradigm. 87 88 Parameters 89 ---------- 90 markers : list of str 91 List of markers. 92 marker_timestamps : list of float 93 List of timestamps. 94 eeg : np.array 95 EEG data. Shape is (n_channels, n_samples). 96 eeg_timestamps : np.array 97 EEG timestamps. Shape is (n_samples). 98 fsample : float 99 Sampling frequency. 100 101 Returns 102 ------- 103 np.array 104 Processed EEG data. Shape is (n_epochs, n_channels, n_samples). 105 np.array 106 Labels. Shape is (n_epochs). 107 """ 108 109 n_channels, _ = eeg.shape 110 num_objects = int(markers[0].split(",")[2]) 111 train_target = int(markers[0].split(",")[3]) 112 y = np.zeros(num_objects, dtype=int) 113 if train_target != -1: 114 # Convert 1-indexed target to array index 115 train_target_array_index = train_target - 1 116 y[train_target_array_index] = 1 117 if train_target == -1: # Set all values of y to -1 118 y = np.full(num_objects, -1) 119 120 flash_counts = np.zeros(num_objects) 121 122 # X = np.zeros((num_objects, n_channels, len(epoch_time))) 123 124 # Do ensemble averaging so that we return a single epoch for each object 125 126 for i, marker in enumerate(markers): 127 marker = marker.split(",") 128 flash_indices = [int(x) for x in marker[4:]] 129 130 n_channels, _ = eeg.shape 131 132 marker_timestamp = marker_timestamps[i] 133 134 # Subtract the marker timestamp from the EEG timestamps so that 0 becomes the marker onset 135 marker_eeg_timestamps = eeg_timestamps - marker_timestamp 136 137 # Create the epoch time vector 138 epoch_time = np.arange(self.epoch_start, self.epoch_end, 1 / fsample) 139 140 epoch_X = np.zeros((1, n_channels, len(epoch_time))) 141 142 # Initialize object_epochs if this is the first epoch 143 if i == 0: 144 object_epochs = [ 145 np.zeros((num_objects, n_channels, len(epoch_time))) 146 ] * num_objects 147 148 # Interpolate the EEG data to the epoch time vector for each channel 149 for c in range(n_channels): 150 epoch_X[0, c, :] = np.interp( 151 epoch_time, marker_eeg_timestamps, eeg[c, :] 152 ) 153 154 epoch_X[0, :, :] = super()._preprocess( 155 epoch_X[0, :, :], fsample, self.lowcut, self.highcut 156 ) 157 158 # For each flash index in the marker 159 for flash_index in flash_indices: 160 # Convert 1-indexed value to array index 161 flash_array_index = flash_index - 1 162 163 if flash_counts[flash_array_index] == 0: 164 object_epochs[flash_array_index] = epoch_X 165 flash_counts[flash_array_index] += 1 166 else: 167 object_epochs[flash_array_index] = np.concatenate( 168 (object_epochs[flash_array_index], epoch_X), axis=0 169 ) 170 flash_counts[flash_array_index] += 1 171 172 # Average all epochs for each object 173 object_epochs_mean = [np.zeros((n_channels, len(epoch_time)))] * num_objects 174 for i in range(num_objects): 175 object_epochs_mean[i] = np.mean(object_epochs[i], axis=0) 176 177 X = np.zeros((num_objects, n_channels, len(epoch_time))) 178 for i in range(num_objects): 179 X[i, :, :] = object_epochs_mean[i] 180 # # object_epochs_mean = np.mean(object_epochs, axis=1) 181 # X = object_epochs_mean 182 183 return X, y 184 185 # TODO: Implement this 186 def check_compatibility(self): 187 pass
P300 paradigm.
P300Paradigm( filters=[1, 15], iterative_training=False, epoch_start=0, epoch_end=0.6, buffer_time=0.01)
12 def __init__( 13 self, 14 filters=[1, 15], 15 iterative_training=False, 16 epoch_start=0, 17 epoch_end=0.6, 18 buffer_time=0.01, 19 ): 20 """ 21 Parameters 22 ---------- 23 filters : list of floats, *optional* 24 Filter bands. 25 - Default is `[1, 15]`. 26 iterative_training : bool, *optional* 27 Flag to indicate if the classifier will be updated iteratively. 28 - Default is `False`. 29 epoch_start : float, *optional* 30 The start of the epoch relative to flash onset in seconds. 31 - Default is `0`. 32 epoch_end : float, *optional* 33 The end of the epoch relative to flash onset in seconds. 34 - Default is `0.6`. 35 buffer_time : float, *optional* 36 Defines the time in seconds after an epoch for which we require EEG data to ensure that all EEG is present in that epoch. 37 - Default is `0.01`. 38 """ 39 40 super().__init__(filters) 41 42 self.iterative_training = iterative_training 43 44 # The P300 paradigm needs epochs from each object in order to decide which object was selected 45 # therefore we need to classify each trial 46 self.classify_each_trial = True 47 self.classify_each_epoch = False 48 49 # This paradigm uses ensemble averaging to increase the signal to nosie ratio and will average 50 # over all epochs for each object by default 51 self.ensemble_average = True 52 53 self.epoch_start = epoch_start 54 self.epoch_end = epoch_end 55 56 self.buffer_time = buffer_time 57 58 self.paradigm_name = "P300"
Parameters
- filters (list of floats, optional):
Filter bands.
- Default is
[1, 15].
- Default is
- iterative_training (bool, optional):
Flag to indicate if the classifier will be updated iteratively.
- Default is
False.
- Default is
- epoch_start (float, optional):
The start of the epoch relative to flash onset in seconds.
- Default is
0.
- Default is
- epoch_end (float, optional):
The end of the epoch relative to flash onset in seconds.
- Default is
0.6.
- Default is
- buffer_time (float, optional):
Defines the time in seconds after an epoch for which we require EEG data to ensure that all EEG is present in that epoch.
- Default is
0.01.
- Default is
def
get_eeg_start_and_end_times(self, markers, timestamps):
60 def get_eeg_start_and_end_times(self, markers, timestamps): 61 """ 62 Get the start and end times of the EEG data based on the markers. 63 64 Parameters 65 ---------- 66 markers : list of str 67 List of markers. 68 timestamps : list of float 69 List of timestamps. 70 71 Returns 72 ------- 73 float 74 Start time. 75 float 76 End time. 77 """ 78 start_time = timestamps[0] + self.epoch_start - self.buffer_time 79 80 end_time = timestamps[-1] + self.epoch_end + self.buffer_time 81 82 return start_time, end_time
Get the start and end times of the EEG data based on the markers.
Parameters
- markers (list of str): List of markers.
- timestamps (list of float): List of timestamps.
Returns
- float: Start time.
- float: End time.
def
process_markers(self, markers, marker_timestamps, eeg, eeg_timestamps, fsample):
84 def process_markers(self, markers, marker_timestamps, eeg, eeg_timestamps, fsample): 85 """ 86 This takes in the markers and EEG data and processes them into epochs according to the P300 paradigm. 87 88 Parameters 89 ---------- 90 markers : list of str 91 List of markers. 92 marker_timestamps : list of float 93 List of timestamps. 94 eeg : np.array 95 EEG data. Shape is (n_channels, n_samples). 96 eeg_timestamps : np.array 97 EEG timestamps. Shape is (n_samples). 98 fsample : float 99 Sampling frequency. 100 101 Returns 102 ------- 103 np.array 104 Processed EEG data. Shape is (n_epochs, n_channels, n_samples). 105 np.array 106 Labels. Shape is (n_epochs). 107 """ 108 109 n_channels, _ = eeg.shape 110 num_objects = int(markers[0].split(",")[2]) 111 train_target = int(markers[0].split(",")[3]) 112 y = np.zeros(num_objects, dtype=int) 113 if train_target != -1: 114 # Convert 1-indexed target to array index 115 train_target_array_index = train_target - 1 116 y[train_target_array_index] = 1 117 if train_target == -1: # Set all values of y to -1 118 y = np.full(num_objects, -1) 119 120 flash_counts = np.zeros(num_objects) 121 122 # X = np.zeros((num_objects, n_channels, len(epoch_time))) 123 124 # Do ensemble averaging so that we return a single epoch for each object 125 126 for i, marker in enumerate(markers): 127 marker = marker.split(",") 128 flash_indices = [int(x) for x in marker[4:]] 129 130 n_channels, _ = eeg.shape 131 132 marker_timestamp = marker_timestamps[i] 133 134 # Subtract the marker timestamp from the EEG timestamps so that 0 becomes the marker onset 135 marker_eeg_timestamps = eeg_timestamps - marker_timestamp 136 137 # Create the epoch time vector 138 epoch_time = np.arange(self.epoch_start, self.epoch_end, 1 / fsample) 139 140 epoch_X = np.zeros((1, n_channels, len(epoch_time))) 141 142 # Initialize object_epochs if this is the first epoch 143 if i == 0: 144 object_epochs = [ 145 np.zeros((num_objects, n_channels, len(epoch_time))) 146 ] * num_objects 147 148 # Interpolate the EEG data to the epoch time vector for each channel 149 for c in range(n_channels): 150 epoch_X[0, c, :] = np.interp( 151 epoch_time, marker_eeg_timestamps, eeg[c, :] 152 ) 153 154 epoch_X[0, :, :] = super()._preprocess( 155 epoch_X[0, :, :], fsample, self.lowcut, self.highcut 156 ) 157 158 # For each flash index in the marker 159 for flash_index in flash_indices: 160 # Convert 1-indexed value to array index 161 flash_array_index = flash_index - 1 162 163 if flash_counts[flash_array_index] == 0: 164 object_epochs[flash_array_index] = epoch_X 165 flash_counts[flash_array_index] += 1 166 else: 167 object_epochs[flash_array_index] = np.concatenate( 168 (object_epochs[flash_array_index], epoch_X), axis=0 169 ) 170 flash_counts[flash_array_index] += 1 171 172 # Average all epochs for each object 173 object_epochs_mean = [np.zeros((n_channels, len(epoch_time)))] * num_objects 174 for i in range(num_objects): 175 object_epochs_mean[i] = np.mean(object_epochs[i], axis=0) 176 177 X = np.zeros((num_objects, n_channels, len(epoch_time))) 178 for i in range(num_objects): 179 X[i, :, :] = object_epochs_mean[i] 180 # # object_epochs_mean = np.mean(object_epochs, axis=1) 181 # X = object_epochs_mean 182 183 return X, y
This takes in the markers and EEG data and processes them into epochs according to the P300 paradigm.
Parameters
- markers (list of str): List of markers.
- marker_timestamps (list of float): List of timestamps.
- eeg (np.array): EEG data. Shape is (n_channels, n_samples).
- eeg_timestamps (np.array): EEG timestamps. Shape is (n_samples).
- fsample (float): Sampling frequency.
Returns
- np.array: Processed EEG data. Shape is (n_epochs, n_channels, n_samples).
- np.array: Labels. Shape is (n_epochs).