bci_essentials.paradigm.p300_paradigm

  1import numpy as np
  2
  3from .paradigm import Paradigm
  4
  5
  6class P300Paradigm(Paradigm):
  7    """
  8    P300 paradigm.
  9    """
 10
 11    def __init__(
 12        self,
 13        filters=[1, 15],
 14        iterative_training=False,
 15        epoch_start=0,
 16        epoch_end=0.6,
 17        buffer_time=0.01,
 18    ):
 19        """
 20        Parameters
 21        ----------
 22        filters : list of floats, *optional*
 23            Filter bands.
 24            - Default is `[1, 15]`.
 25        iterative_training : bool, *optional*
 26            Flag to indicate if the classifier will be updated iteratively.
 27            - Default is `False`.
 28        epoch_start : float, *optional*
 29            The start of the epoch relative to flash onset in seconds.
 30            - Default is `0`.
 31        epoch_end : float, *optional*
 32            The end of the epoch relative to flash onset in seconds.
 33            - Default is `0.6`.
 34        buffer_time : float, *optional*
 35            Defines the time in seconds after an epoch for which we require EEG data to ensure that all EEG is present in that epoch.
 36            - Default is `0.01`.
 37        """
 38
 39        super().__init__(filters)
 40
 41        self.iterative_training = iterative_training
 42
 43        # The P300 paradigm needs epochs from each object in order to decide which object was selected
 44        # therefore we need to classify each trial
 45        self.classify_each_trial = True
 46        self.classify_each_epoch = False
 47
 48        # This paradigm uses ensemble averaging to increase the signal to nosie ratio and will average
 49        # over all epochs for each object by default
 50        self.ensemble_average = True
 51
 52        self.epoch_start = epoch_start
 53        self.epoch_end = epoch_end
 54
 55        self.buffer_time = buffer_time
 56
 57        self.paradigm_name = "P300"
 58
 59    def get_eeg_start_and_end_times(self, markers, timestamps):
 60        """
 61        Get the start and end times of the EEG data based on the markers.
 62
 63        Parameters
 64        ----------
 65        markers : list of str
 66            List of markers.
 67        timestamps : list of float
 68            List of timestamps.
 69
 70        Returns
 71        -------
 72        float
 73            Start time.
 74        float
 75            End time.
 76        """
 77        start_time = timestamps[0] + self.epoch_start - self.buffer_time
 78
 79        end_time = timestamps[-1] + self.epoch_end + self.buffer_time
 80
 81        return start_time, end_time
 82
 83    def process_markers(self, markers, marker_timestamps, eeg, eeg_timestamps, fsample):
 84        """
 85        This takes in the markers and EEG data and processes them into epochs according to the P300 paradigm.
 86
 87        Parameters
 88        ----------
 89        markers : list of str
 90            List of markers.
 91        marker_timestamps : list of float
 92            List of timestamps.
 93        eeg : np.array
 94            EEG data. Shape is (n_channels, n_samples).
 95        eeg_timestamps : np.array
 96            EEG timestamps. Shape is (n_samples).
 97        fsample : float
 98            Sampling frequency.
 99
100        Returns
101        -------
102        np.array
103            Processed EEG data. Shape is (n_epochs, n_channels, n_samples).
104        np.array
105            Labels. Shape is (n_epochs).
106        """
107
108        n_channels, _ = eeg.shape
109        num_objects = int(markers[0].split(",")[2])
110        train_target = int(markers[0].split(",")[3])
111        y = np.zeros(num_objects, dtype=int)
112        if train_target != -1:
113            # Convert 1-indexed target to array index
114            train_target_array_index = train_target - 1
115            y[train_target_array_index] = 1
116        if train_target == -1:  # Set all values of y to -1
117            y = np.full(num_objects, -1)
118
119        flash_counts = np.zeros(num_objects)
120
121        # X = np.zeros((num_objects, n_channels, len(epoch_time)))
122
123        # Do ensemble averaging so that we return a single epoch for each object
124
125        for i, marker in enumerate(markers):
126            marker = marker.split(",")
127            flash_indices = [int(x) for x in marker[4:]]
128
129            n_channels, _ = eeg.shape
130
131            marker_timestamp = marker_timestamps[i]
132
133            # Subtract the marker timestamp from the EEG timestamps so that 0 becomes the marker onset
134            marker_eeg_timestamps = eeg_timestamps - marker_timestamp
135
136            # Create the epoch time vector
137            epoch_time = np.arange(self.epoch_start, self.epoch_end, 1 / fsample)
138
139            epoch_X = np.zeros((1, n_channels, len(epoch_time)))
140
141            # Initialize object_epochs if this is the first epoch
142            if i == 0:
143                object_epochs = [
144                    np.zeros((num_objects, n_channels, len(epoch_time)))
145                ] * num_objects
146
147            # Interpolate the EEG data to the epoch time vector for each channel
148            for c in range(n_channels):
149                epoch_X[0, c, :] = np.interp(
150                    epoch_time, marker_eeg_timestamps, eeg[c, :]
151                )
152
153            epoch_X[0, :, :] = super()._preprocess(
154                epoch_X[0, :, :], fsample, self.lowcut, self.highcut
155            )
156
157            # For each flash index in the marker
158            for flash_index in flash_indices:
159                # Convert 1-indexed value to array index
160                flash_array_index = flash_index - 1
161
162                if flash_counts[flash_array_index] == 0:
163                    object_epochs[flash_array_index] = epoch_X
164                    flash_counts[flash_array_index] += 1
165                else:
166                    object_epochs[flash_array_index] = np.concatenate(
167                        (object_epochs[flash_array_index], epoch_X), axis=0
168                    )
169                    flash_counts[flash_array_index] += 1
170
171        # Average all epochs for each object
172        object_epochs_mean = [np.zeros((n_channels, len(epoch_time)))] * num_objects
173        for i in range(num_objects):
174            object_epochs_mean[i] = np.mean(object_epochs[i], axis=0)
175
176        X = np.zeros((num_objects, n_channels, len(epoch_time)))
177        for i in range(num_objects):
178            X[i, :, :] = object_epochs_mean[i]
179        # # object_epochs_mean = np.mean(object_epochs, axis=1)
180        # X = object_epochs_mean
181
182        return X, y
183
184    # TODO: Implement this
185    def check_compatibility(self):
186        pass
class P300Paradigm(bci_essentials.paradigm.paradigm.Paradigm):
  7class P300Paradigm(Paradigm):
  8    """
  9    P300 paradigm.
 10    """
 11
 12    def __init__(
 13        self,
 14        filters=[1, 15],
 15        iterative_training=False,
 16        epoch_start=0,
 17        epoch_end=0.6,
 18        buffer_time=0.01,
 19    ):
 20        """
 21        Parameters
 22        ----------
 23        filters : list of floats, *optional*
 24            Filter bands.
 25            - Default is `[1, 15]`.
 26        iterative_training : bool, *optional*
 27            Flag to indicate if the classifier will be updated iteratively.
 28            - Default is `False`.
 29        epoch_start : float, *optional*
 30            The start of the epoch relative to flash onset in seconds.
 31            - Default is `0`.
 32        epoch_end : float, *optional*
 33            The end of the epoch relative to flash onset in seconds.
 34            - Default is `0.6`.
 35        buffer_time : float, *optional*
 36            Defines the time in seconds after an epoch for which we require EEG data to ensure that all EEG is present in that epoch.
 37            - Default is `0.01`.
 38        """
 39
 40        super().__init__(filters)
 41
 42        self.iterative_training = iterative_training
 43
 44        # The P300 paradigm needs epochs from each object in order to decide which object was selected
 45        # therefore we need to classify each trial
 46        self.classify_each_trial = True
 47        self.classify_each_epoch = False
 48
 49        # This paradigm uses ensemble averaging to increase the signal to nosie ratio and will average
 50        # over all epochs for each object by default
 51        self.ensemble_average = True
 52
 53        self.epoch_start = epoch_start
 54        self.epoch_end = epoch_end
 55
 56        self.buffer_time = buffer_time
 57
 58        self.paradigm_name = "P300"
 59
 60    def get_eeg_start_and_end_times(self, markers, timestamps):
 61        """
 62        Get the start and end times of the EEG data based on the markers.
 63
 64        Parameters
 65        ----------
 66        markers : list of str
 67            List of markers.
 68        timestamps : list of float
 69            List of timestamps.
 70
 71        Returns
 72        -------
 73        float
 74            Start time.
 75        float
 76            End time.
 77        """
 78        start_time = timestamps[0] + self.epoch_start - self.buffer_time
 79
 80        end_time = timestamps[-1] + self.epoch_end + self.buffer_time
 81
 82        return start_time, end_time
 83
 84    def process_markers(self, markers, marker_timestamps, eeg, eeg_timestamps, fsample):
 85        """
 86        This takes in the markers and EEG data and processes them into epochs according to the P300 paradigm.
 87
 88        Parameters
 89        ----------
 90        markers : list of str
 91            List of markers.
 92        marker_timestamps : list of float
 93            List of timestamps.
 94        eeg : np.array
 95            EEG data. Shape is (n_channels, n_samples).
 96        eeg_timestamps : np.array
 97            EEG timestamps. Shape is (n_samples).
 98        fsample : float
 99            Sampling frequency.
100
101        Returns
102        -------
103        np.array
104            Processed EEG data. Shape is (n_epochs, n_channels, n_samples).
105        np.array
106            Labels. Shape is (n_epochs).
107        """
108
109        n_channels, _ = eeg.shape
110        num_objects = int(markers[0].split(",")[2])
111        train_target = int(markers[0].split(",")[3])
112        y = np.zeros(num_objects, dtype=int)
113        if train_target != -1:
114            # Convert 1-indexed target to array index
115            train_target_array_index = train_target - 1
116            y[train_target_array_index] = 1
117        if train_target == -1:  # Set all values of y to -1
118            y = np.full(num_objects, -1)
119
120        flash_counts = np.zeros(num_objects)
121
122        # X = np.zeros((num_objects, n_channels, len(epoch_time)))
123
124        # Do ensemble averaging so that we return a single epoch for each object
125
126        for i, marker in enumerate(markers):
127            marker = marker.split(",")
128            flash_indices = [int(x) for x in marker[4:]]
129
130            n_channels, _ = eeg.shape
131
132            marker_timestamp = marker_timestamps[i]
133
134            # Subtract the marker timestamp from the EEG timestamps so that 0 becomes the marker onset
135            marker_eeg_timestamps = eeg_timestamps - marker_timestamp
136
137            # Create the epoch time vector
138            epoch_time = np.arange(self.epoch_start, self.epoch_end, 1 / fsample)
139
140            epoch_X = np.zeros((1, n_channels, len(epoch_time)))
141
142            # Initialize object_epochs if this is the first epoch
143            if i == 0:
144                object_epochs = [
145                    np.zeros((num_objects, n_channels, len(epoch_time)))
146                ] * num_objects
147
148            # Interpolate the EEG data to the epoch time vector for each channel
149            for c in range(n_channels):
150                epoch_X[0, c, :] = np.interp(
151                    epoch_time, marker_eeg_timestamps, eeg[c, :]
152                )
153
154            epoch_X[0, :, :] = super()._preprocess(
155                epoch_X[0, :, :], fsample, self.lowcut, self.highcut
156            )
157
158            # For each flash index in the marker
159            for flash_index in flash_indices:
160                # Convert 1-indexed value to array index
161                flash_array_index = flash_index - 1
162
163                if flash_counts[flash_array_index] == 0:
164                    object_epochs[flash_array_index] = epoch_X
165                    flash_counts[flash_array_index] += 1
166                else:
167                    object_epochs[flash_array_index] = np.concatenate(
168                        (object_epochs[flash_array_index], epoch_X), axis=0
169                    )
170                    flash_counts[flash_array_index] += 1
171
172        # Average all epochs for each object
173        object_epochs_mean = [np.zeros((n_channels, len(epoch_time)))] * num_objects
174        for i in range(num_objects):
175            object_epochs_mean[i] = np.mean(object_epochs[i], axis=0)
176
177        X = np.zeros((num_objects, n_channels, len(epoch_time)))
178        for i in range(num_objects):
179            X[i, :, :] = object_epochs_mean[i]
180        # # object_epochs_mean = np.mean(object_epochs, axis=1)
181        # X = object_epochs_mean
182
183        return X, y
184
185    # TODO: Implement this
186    def check_compatibility(self):
187        pass

P300 paradigm.

P300Paradigm( filters=[1, 15], iterative_training=False, epoch_start=0, epoch_end=0.6, buffer_time=0.01)
12    def __init__(
13        self,
14        filters=[1, 15],
15        iterative_training=False,
16        epoch_start=0,
17        epoch_end=0.6,
18        buffer_time=0.01,
19    ):
20        """
21        Parameters
22        ----------
23        filters : list of floats, *optional*
24            Filter bands.
25            - Default is `[1, 15]`.
26        iterative_training : bool, *optional*
27            Flag to indicate if the classifier will be updated iteratively.
28            - Default is `False`.
29        epoch_start : float, *optional*
30            The start of the epoch relative to flash onset in seconds.
31            - Default is `0`.
32        epoch_end : float, *optional*
33            The end of the epoch relative to flash onset in seconds.
34            - Default is `0.6`.
35        buffer_time : float, *optional*
36            Defines the time in seconds after an epoch for which we require EEG data to ensure that all EEG is present in that epoch.
37            - Default is `0.01`.
38        """
39
40        super().__init__(filters)
41
42        self.iterative_training = iterative_training
43
44        # The P300 paradigm needs epochs from each object in order to decide which object was selected
45        # therefore we need to classify each trial
46        self.classify_each_trial = True
47        self.classify_each_epoch = False
48
49        # This paradigm uses ensemble averaging to increase the signal to nosie ratio and will average
50        # over all epochs for each object by default
51        self.ensemble_average = True
52
53        self.epoch_start = epoch_start
54        self.epoch_end = epoch_end
55
56        self.buffer_time = buffer_time
57
58        self.paradigm_name = "P300"
Parameters
  • filters (list of floats, optional): Filter bands.
    • Default is [1, 15].
  • iterative_training (bool, optional): Flag to indicate if the classifier will be updated iteratively.
    • Default is False.
  • epoch_start (float, optional): The start of the epoch relative to flash onset in seconds.
    • Default is 0.
  • epoch_end (float, optional): The end of the epoch relative to flash onset in seconds.
    • Default is 0.6.
  • buffer_time (float, optional): Defines the time in seconds after an epoch for which we require EEG data to ensure that all EEG is present in that epoch.
    • Default is 0.01.
iterative_training
classify_each_trial
classify_each_epoch
ensemble_average
epoch_start
epoch_end
buffer_time
paradigm_name
def get_eeg_start_and_end_times(self, markers, timestamps):
60    def get_eeg_start_and_end_times(self, markers, timestamps):
61        """
62        Get the start and end times of the EEG data based on the markers.
63
64        Parameters
65        ----------
66        markers : list of str
67            List of markers.
68        timestamps : list of float
69            List of timestamps.
70
71        Returns
72        -------
73        float
74            Start time.
75        float
76            End time.
77        """
78        start_time = timestamps[0] + self.epoch_start - self.buffer_time
79
80        end_time = timestamps[-1] + self.epoch_end + self.buffer_time
81
82        return start_time, end_time

Get the start and end times of the EEG data based on the markers.

Parameters
  • markers (list of str): List of markers.
  • timestamps (list of float): List of timestamps.
Returns
  • float: Start time.
  • float: End time.
def process_markers(self, markers, marker_timestamps, eeg, eeg_timestamps, fsample):
 84    def process_markers(self, markers, marker_timestamps, eeg, eeg_timestamps, fsample):
 85        """
 86        This takes in the markers and EEG data and processes them into epochs according to the P300 paradigm.
 87
 88        Parameters
 89        ----------
 90        markers : list of str
 91            List of markers.
 92        marker_timestamps : list of float
 93            List of timestamps.
 94        eeg : np.array
 95            EEG data. Shape is (n_channels, n_samples).
 96        eeg_timestamps : np.array
 97            EEG timestamps. Shape is (n_samples).
 98        fsample : float
 99            Sampling frequency.
100
101        Returns
102        -------
103        np.array
104            Processed EEG data. Shape is (n_epochs, n_channels, n_samples).
105        np.array
106            Labels. Shape is (n_epochs).
107        """
108
109        n_channels, _ = eeg.shape
110        num_objects = int(markers[0].split(",")[2])
111        train_target = int(markers[0].split(",")[3])
112        y = np.zeros(num_objects, dtype=int)
113        if train_target != -1:
114            # Convert 1-indexed target to array index
115            train_target_array_index = train_target - 1
116            y[train_target_array_index] = 1
117        if train_target == -1:  # Set all values of y to -1
118            y = np.full(num_objects, -1)
119
120        flash_counts = np.zeros(num_objects)
121
122        # X = np.zeros((num_objects, n_channels, len(epoch_time)))
123
124        # Do ensemble averaging so that we return a single epoch for each object
125
126        for i, marker in enumerate(markers):
127            marker = marker.split(",")
128            flash_indices = [int(x) for x in marker[4:]]
129
130            n_channels, _ = eeg.shape
131
132            marker_timestamp = marker_timestamps[i]
133
134            # Subtract the marker timestamp from the EEG timestamps so that 0 becomes the marker onset
135            marker_eeg_timestamps = eeg_timestamps - marker_timestamp
136
137            # Create the epoch time vector
138            epoch_time = np.arange(self.epoch_start, self.epoch_end, 1 / fsample)
139
140            epoch_X = np.zeros((1, n_channels, len(epoch_time)))
141
142            # Initialize object_epochs if this is the first epoch
143            if i == 0:
144                object_epochs = [
145                    np.zeros((num_objects, n_channels, len(epoch_time)))
146                ] * num_objects
147
148            # Interpolate the EEG data to the epoch time vector for each channel
149            for c in range(n_channels):
150                epoch_X[0, c, :] = np.interp(
151                    epoch_time, marker_eeg_timestamps, eeg[c, :]
152                )
153
154            epoch_X[0, :, :] = super()._preprocess(
155                epoch_X[0, :, :], fsample, self.lowcut, self.highcut
156            )
157
158            # For each flash index in the marker
159            for flash_index in flash_indices:
160                # Convert 1-indexed value to array index
161                flash_array_index = flash_index - 1
162
163                if flash_counts[flash_array_index] == 0:
164                    object_epochs[flash_array_index] = epoch_X
165                    flash_counts[flash_array_index] += 1
166                else:
167                    object_epochs[flash_array_index] = np.concatenate(
168                        (object_epochs[flash_array_index], epoch_X), axis=0
169                    )
170                    flash_counts[flash_array_index] += 1
171
172        # Average all epochs for each object
173        object_epochs_mean = [np.zeros((n_channels, len(epoch_time)))] * num_objects
174        for i in range(num_objects):
175            object_epochs_mean[i] = np.mean(object_epochs[i], axis=0)
176
177        X = np.zeros((num_objects, n_channels, len(epoch_time)))
178        for i in range(num_objects):
179            X[i, :, :] = object_epochs_mean[i]
180        # # object_epochs_mean = np.mean(object_epochs, axis=1)
181        # X = object_epochs_mean
182
183        return X, y

This takes in the markers and EEG data and processes them into epochs according to the P300 paradigm.

Parameters
  • markers (list of str): List of markers.
  • marker_timestamps (list of float): List of timestamps.
  • eeg (np.array): EEG data. Shape is (n_channels, n_samples).
  • eeg_timestamps (np.array): EEG timestamps. Shape is (n_samples).
  • fsample (float): Sampling frequency.
Returns
  • np.array: Processed EEG data. Shape is (n_epochs, n_channels, n_samples).
  • np.array: Labels. Shape is (n_epochs).
def check_compatibility(self):
186    def check_compatibility(self):
187        pass