bci_essentials.paradigm.paradigm

  1import numpy as np
  2from abc import ABC, abstractmethod
  3
  4from ..utils.logger import Logger
  5from ..signal_processing import bandpass, lowpass
  6
  7# Instantiate a logger for the module at the default level of logging.INFO
  8# Logs to bci_essentials.__module__) where __module__ is the name of the module
  9logger = Logger(name=__name__)
 10
 11
 12class Paradigm(ABC):
 13    def __init__(self, filters=[5, 30], channel_subset=None):
 14        """
 15        Base class for all paradigms.
 16        Please use a subclass of this class for your specific paradigm.
 17
 18        Parameters
 19        ----------
 20        filters : list of floats | [5, 30]
 21            Filter bands.
 22        channel_subset : list of str | None
 23            Channel subset to use.
 24        """
 25        self.lowcut = filters[0]
 26        self.highcut = filters[1]
 27        self.channel_subset = channel_subset
 28
 29        # When do we return classifications?
 30        self.classify_each_epoch = False
 31        self.classify_each_trial = False
 32
 33        # Do we classify labeled epochs (such as in the case of iterative training)?
 34        self.classify_labeled_epochs = False
 35
 36        self.paradigm_name = "Generic"
 37
 38    def _preprocess(self, eeg, fsample, lowcut, highcut, order=5):
 39        """
 40        Preprocess EEG data with the appropriate filter type:
 41        - If the data is continuous (i.e., shape is [channels, samples]), a
 42        bandpass filter is used.
 43
 44        - If the data is epoched (i.e., shape is [epoch, channels, samples]),
 45        the filter type depends on the signal length relative to the filter's settling time:
 46            - If signal length > settling time: use bandpass filter
 47            - If signal length ≤ settling time: use lowpass filter
 48            - The settling time is calculated by:
 49                1. Compute the time constant (tc) of the highpass filter:
 50                    tc = 1 / (2 * π * lowcut)
 51                2. Compute the settling time with the formula:
 52                    settling_time = tc * 5 * order
 53                    - In a first-order system, a rule-of-thumb is that the signal settles in approximately 5 time constants (tc * 5)
 54                    - In higher-order filters (e.g., a 5th-order filter set as the default), the settling time incrases linearly with the order of the filter (order * tc * 5)
 55                    - This is a simplification, but it provides a good approximation for the settling time of the filter.
 56                    - For more details, see the reference below:
 57                        https://www.analogictips.com/an-overview-of-filters-and-their-parameters-part-4-time-and-phase-issues/
 58
 59        Parameters
 60        ----------
 61        eeg : np.ndarray
 62            EEG data. Shape can be 2D [n_channels, n_samples]
 63            or 3D [epochs, n_channels, n_samples].
 64        fsample : float
 65            Sampling frequency.
 66        lowcut : float
 67            Lower cutoff frequency.
 68        highcut : float
 69            Upper cutoff frequency.
 70        order : int
 71            Order of the filter [n]. Default is 5.
 72
 73        Returns
 74        -------
 75        np.ndarray
 76            Preprocessed EEG. Shape is the same as `eeg`.
 77
 78        """
 79
 80        n_dims = len(eeg.shape)
 81        if n_dims == 2:
 82            logger.debug("Preprocessing continuous EEG")
 83            preprocessed_eeg = bandpass(eeg, lowcut, highcut, order, fsample)
 84        elif n_dims == 3:
 85            logger.debug("Preprocessing epoched EEG")
 86
 87            # Get the length of the signal
 88            signal_length = eeg.shape[2]
 89
 90            # Highpass filter settling time
 91            tc = 1 / (2 * np.pi * lowcut)
 92            settling_time = order * tc * 5
 93
 94            if signal_length > settling_time:
 95                logger.debug("Applied bandpass filter to epoched EEG")
 96                preprocessed_eeg = bandpass(eeg, lowcut, highcut, order, fsample)
 97            else:
 98                logger.debug("Applied lowpass filter to epoched EEG")
 99                preprocessed_eeg = lowpass(eeg, highcut, order, fsample)
100        else:
101            raise ValueError(
102                "Preprocessing failed. EEG must be 2D (continuous) or 3D (epoched)."
103            )
104
105        return preprocessed_eeg
106
107    def package_resting_state_data(
108        self, marker_data, marker_timestamps, bci_controller, eeg_timestamps, fsample
109    ):
110        """Package resting state data.
111
112        Parameters
113        ----------
114        marker_data : list of str
115            List of markers.
116        marker_timestamps : np.ndarray
117            Timestamps of markers.
118        bci_controller : np.ndarray
119            EEG data. Shape is (n_channels, n_samples).
120        eeg_timestamps : np.ndarray
121            Timestamps of EEG data. Shape is (n_samples,).
122        fsample : float
123            Sampling frequency.
124
125        Returns
126        -------
127        resting_state_data : dict
128            Dictionary containing resting state data with keys:
129            - 'eyes_open_trials': np.ndarray of EEG data during eyes open condition
130            - 'eyes_open_timestamps': np.ndarray of timestamps for eyes open trials
131            - 'eyes_closed_trials': np.ndarray of EEG data during eyes closed condition
132            - 'eyes_closed_timestamps': np.ndarray of timestamps for eyes closed trials
133            - 'rest_trials': np.ndarray of EEG data during rest condition
134            - 'rest_timestamps': np.ndarray of timestamps for rest trials
135            If an error occurs, returns `None`.
136
137        """
138        try:
139            logger.debug("Packaging resting state data")
140
141            eyes_open_start_time = []
142            eyes_open_end_time = []
143            eyes_closed_start_time = []
144            eyes_closed_end_time = []
145            rest_start_time = []
146            rest_end_time = []
147
148            # Initialize start and end locations
149            eyes_open_start_loc = []
150            eyes_open_end_loc = []
151            eyes_closed_start_loc = []
152            eyes_closed_end_loc = []
153            rest_start_loc = []
154            rest_end_loc = []
155
156            current_time = eeg_timestamps[0]
157            current_timestamp_loc = 0
158
159            self.fsample = fsample
160            self.n_channels = bci_controller.shape[0]
161
162            for i in range(len(marker_data)):
163                # Get current resting state data marker and time stamp
164                current_rs_data_marker = marker_data[i]
165                current_rs_timestamp = marker_timestamps[i]
166
167                # Increment the EEG until just past the marker timestamp
168                while current_time < current_rs_timestamp:
169                    current_timestamp_loc += 1
170                    current_time = eeg_timestamps[current_timestamp_loc]
171
172                # get eyes open start times
173                if current_rs_data_marker == "Start Eyes Open RS: 1":
174                    eyes_open_start_time.append(current_rs_timestamp)
175                    eyes_open_start_loc.append(current_timestamp_loc - 1)
176                    logger.debug("received eyes open start")
177
178                # get eyes open end times
179                if current_rs_data_marker == "End Eyes Open RS: 1":
180                    eyes_open_end_time.append(current_rs_timestamp)
181                    eyes_open_end_loc.append(current_timestamp_loc)
182                    logger.debug("received eyes open end")
183
184                # get eyes closed start times
185                if current_rs_data_marker == "Start Eyes Closed RS: 2":
186                    eyes_closed_start_time.append(current_rs_timestamp)
187                    eyes_closed_start_loc.append(current_timestamp_loc - 1)
188                    logger.debug("received eyes closed start")
189
190                # get eyes closed end times
191                if current_rs_data_marker == "End Eyes Closed RS: 2":
192                    eyes_closed_end_time.append(current_rs_timestamp)
193                    eyes_closed_end_loc.append(current_timestamp_loc)
194                    logger.debug("received eyes closed end")
195
196                # get rest start times
197                if current_rs_data_marker == "Start Rest for RS: 0":
198                    rest_start_time.append(current_rs_timestamp)
199                    rest_start_loc.append(current_timestamp_loc - 1)
200                    logger.debug("received rest start")
201                # get rest end times
202                if current_rs_data_marker == "End Rest for RS: 0":
203                    rest_end_time.append(current_rs_timestamp)
204                    rest_end_loc.append(current_timestamp_loc)
205                    logger.debug("received rest end")
206
207            # Eyes open
208            # Get duration, nsamples
209
210            if len(eyes_open_end_loc) > 0:
211                duration = np.floor(eyes_open_end_time[0] - eyes_open_start_time[0])
212                n_samples = int(duration * self.fsample)
213
214                new_eeg = np.zeros((1, self.n_channels, n_samples))
215
216                eyes_open_timestamps = np.arange(0, duration, 1 / self.fsample)
217
218                # Now copy EEG for these trials
219                for i in range(len(eyes_open_start_time)):
220                    eyes_open_time_correction = eyes_open_start_time[i]
221
222                    corrected_eeg_timestamps = (
223                        eeg_timestamps - eyes_open_time_correction
224                    )
225
226                    for c in range(self.n_channels):
227                        new_eeg[0, c, :] = np.interp(
228                            eyes_open_timestamps,
229                            corrected_eeg_timestamps,
230                            bci_controller[c, :],
231                        )
232
233                    if i == 0:
234                        eyes_open_trials = new_eeg
235                        eyes_open_timestamps = eyes_open_timestamps
236                    else:
237                        eyes_open_trials = np.concatenate(
238                            (eyes_open_trials, new_eeg), axis=0
239                        )
240
241            # Eyes closed
242
243            if len(eyes_closed_end_loc) > 0:
244                # Get duration, nsmaples
245                duration = np.floor(eyes_closed_end_time[0] - eyes_closed_start_time[0])
246                n_samples = int(duration * self.fsample)
247
248                new_eeg = np.zeros((1, self.n_channels, n_samples))
249
250                eyes_closed_timestamps = np.arange(0, duration, 1 / self.fsample)
251
252                # Now copy EEG for these trials
253                for i in range(len(eyes_closed_start_time)):
254                    eyes_closed_time_correction = eyes_closed_start_time[i]
255
256                    corrected_eeg_timestamps = (
257                        eeg_timestamps - eyes_closed_time_correction
258                    )
259
260                    for c in range(self.n_channels):
261                        new_eeg[0, c, :] = np.interp(
262                            eyes_closed_timestamps,
263                            corrected_eeg_timestamps,
264                            bci_controller[c, :],
265                        )
266
267                    if i == 0:
268                        eyes_closed_trials = new_eeg
269                        eyes_closed_timestamps = eyes_closed_timestamps
270                    else:
271                        eyes_closed_trials = np.concatenate(
272                            (eyes_closed_trials, new_eeg), axis=0
273                        )
274
275            # Rest
276            if len(rest_end_loc) > 0:
277                # Get duration, nsmaples
278                while rest_end_time[0] < rest_start_time[0]:
279                    rest_end_time.pop(0)
280                    rest_end_loc.pop(0)
281
282                duration = np.floor(rest_end_time[0] - rest_start_time[0])
283                n_samples = int(duration * self.fsample)
284
285                new_eeg = np.zeros((1, self.n_channels, n_samples))
286
287                rest_timestamps = np.arange(0, duration, 1 / self.fsample)
288
289                # Now copy EEG for these trials
290                for i in range(len(rest_start_time)):
291                    rest_time_correction = rest_start_time[i]
292
293                    corrected_eeg_timestamps = eeg_timestamps - rest_time_correction
294
295                    for c in range(self.n_channels):
296                        new_eeg[0, c, :] = np.interp(
297                            rest_timestamps,
298                            corrected_eeg_timestamps,
299                            bci_controller[c, :],
300                        )
301
302                    if i == 0:
303                        rest_trials = new_eeg
304                        rest_timestamps = rest_timestamps
305
306                    else:
307                        rest_trials = np.concatenate((rest_trials, new_eeg), axis=0)
308
309            # Put all the available data into a dictionary
310            resting_state_data = {
311                "eyes_open_trials": eyes_open_trials,
312                "eyes_open_timestamps": eyes_open_timestamps,
313                "eyes_closed_trials": eyes_closed_trials,
314                "eyes_closed_timestamps": eyes_closed_timestamps,
315                "rest_trials": rest_trials,
316                "rest_timestamps": rest_timestamps,
317            }
318
319            logger.debug("Done packaging resting state data")
320
321            return resting_state_data
322
323        except Exception as e:
324            logger.error("Error packaging resting state data: %s", e)
325
326            return None
327
328    @abstractmethod
329    def get_eeg_start_and_end_times(self, markers, timestamps):
330        """
331        Get the start and end times of the EEG data based on the markers.
332
333        Parameters
334        ----------
335        markers : list of str
336            List of markers.
337        timestamps : list of float
338            List of timestamps.
339
340        Returns
341        -------
342        float
343            Start time.
344        float
345            End time.
346        """
347
348        pass
349
350    @abstractmethod
351    def process_markers(self, markers, marker_timestamps, eeg, eeg_timestamps, fsample):
352        """
353        This takes in the markers and EEG data and processes them into epochs accordingt to the MI paradigm.
354
355        Parameters
356        ----------
357        markers : list of str
358            List of markers.
359        marker_timestamps : list of float
360            List of timestamps.
361        eeg : np.array
362            EEG data. Shape is (n_channels, n_samples).
363        eeg_timestamps : np.array
364            EEG timestamps. Shape is (n_samples).
365        fsample : float
366            Sampling frequency.
367
368        Returns
369        -------
370        np.array
371            Processed EEG data. Shape is (n_epochs, n_channels, n_samples).
372        np.array
373            Labels. Shape is (n_epochs).
374        """
375
376        pass
class Paradigm(abc.ABC):
 13class Paradigm(ABC):
 14    def __init__(self, filters=[5, 30], channel_subset=None):
 15        """
 16        Base class for all paradigms.
 17        Please use a subclass of this class for your specific paradigm.
 18
 19        Parameters
 20        ----------
 21        filters : list of floats | [5, 30]
 22            Filter bands.
 23        channel_subset : list of str | None
 24            Channel subset to use.
 25        """
 26        self.lowcut = filters[0]
 27        self.highcut = filters[1]
 28        self.channel_subset = channel_subset
 29
 30        # When do we return classifications?
 31        self.classify_each_epoch = False
 32        self.classify_each_trial = False
 33
 34        # Do we classify labeled epochs (such as in the case of iterative training)?
 35        self.classify_labeled_epochs = False
 36
 37        self.paradigm_name = "Generic"
 38
 39    def _preprocess(self, eeg, fsample, lowcut, highcut, order=5):
 40        """
 41        Preprocess EEG data with the appropriate filter type:
 42        - If the data is continuous (i.e., shape is [channels, samples]), a
 43        bandpass filter is used.
 44
 45        - If the data is epoched (i.e., shape is [epoch, channels, samples]),
 46        the filter type depends on the signal length relative to the filter's settling time:
 47            - If signal length > settling time: use bandpass filter
 48            - If signal length ≤ settling time: use lowpass filter
 49            - The settling time is calculated by:
 50                1. Compute the time constant (tc) of the highpass filter:
 51                    tc = 1 / (2 * π * lowcut)
 52                2. Compute the settling time with the formula:
 53                    settling_time = tc * 5 * order
 54                    - In a first-order system, a rule-of-thumb is that the signal settles in approximately 5 time constants (tc * 5)
 55                    - In higher-order filters (e.g., a 5th-order filter set as the default), the settling time incrases linearly with the order of the filter (order * tc * 5)
 56                    - This is a simplification, but it provides a good approximation for the settling time of the filter.
 57                    - For more details, see the reference below:
 58                        https://www.analogictips.com/an-overview-of-filters-and-their-parameters-part-4-time-and-phase-issues/
 59
 60        Parameters
 61        ----------
 62        eeg : np.ndarray
 63            EEG data. Shape can be 2D [n_channels, n_samples]
 64            or 3D [epochs, n_channels, n_samples].
 65        fsample : float
 66            Sampling frequency.
 67        lowcut : float
 68            Lower cutoff frequency.
 69        highcut : float
 70            Upper cutoff frequency.
 71        order : int
 72            Order of the filter [n]. Default is 5.
 73
 74        Returns
 75        -------
 76        np.ndarray
 77            Preprocessed EEG. Shape is the same as `eeg`.
 78
 79        """
 80
 81        n_dims = len(eeg.shape)
 82        if n_dims == 2:
 83            logger.debug("Preprocessing continuous EEG")
 84            preprocessed_eeg = bandpass(eeg, lowcut, highcut, order, fsample)
 85        elif n_dims == 3:
 86            logger.debug("Preprocessing epoched EEG")
 87
 88            # Get the length of the signal
 89            signal_length = eeg.shape[2]
 90
 91            # Highpass filter settling time
 92            tc = 1 / (2 * np.pi * lowcut)
 93            settling_time = order * tc * 5
 94
 95            if signal_length > settling_time:
 96                logger.debug("Applied bandpass filter to epoched EEG")
 97                preprocessed_eeg = bandpass(eeg, lowcut, highcut, order, fsample)
 98            else:
 99                logger.debug("Applied lowpass filter to epoched EEG")
100                preprocessed_eeg = lowpass(eeg, highcut, order, fsample)
101        else:
102            raise ValueError(
103                "Preprocessing failed. EEG must be 2D (continuous) or 3D (epoched)."
104            )
105
106        return preprocessed_eeg
107
108    def package_resting_state_data(
109        self, marker_data, marker_timestamps, bci_controller, eeg_timestamps, fsample
110    ):
111        """Package resting state data.
112
113        Parameters
114        ----------
115        marker_data : list of str
116            List of markers.
117        marker_timestamps : np.ndarray
118            Timestamps of markers.
119        bci_controller : np.ndarray
120            EEG data. Shape is (n_channels, n_samples).
121        eeg_timestamps : np.ndarray
122            Timestamps of EEG data. Shape is (n_samples,).
123        fsample : float
124            Sampling frequency.
125
126        Returns
127        -------
128        resting_state_data : dict
129            Dictionary containing resting state data with keys:
130            - 'eyes_open_trials': np.ndarray of EEG data during eyes open condition
131            - 'eyes_open_timestamps': np.ndarray of timestamps for eyes open trials
132            - 'eyes_closed_trials': np.ndarray of EEG data during eyes closed condition
133            - 'eyes_closed_timestamps': np.ndarray of timestamps for eyes closed trials
134            - 'rest_trials': np.ndarray of EEG data during rest condition
135            - 'rest_timestamps': np.ndarray of timestamps for rest trials
136            If an error occurs, returns `None`.
137
138        """
139        try:
140            logger.debug("Packaging resting state data")
141
142            eyes_open_start_time = []
143            eyes_open_end_time = []
144            eyes_closed_start_time = []
145            eyes_closed_end_time = []
146            rest_start_time = []
147            rest_end_time = []
148
149            # Initialize start and end locations
150            eyes_open_start_loc = []
151            eyes_open_end_loc = []
152            eyes_closed_start_loc = []
153            eyes_closed_end_loc = []
154            rest_start_loc = []
155            rest_end_loc = []
156
157            current_time = eeg_timestamps[0]
158            current_timestamp_loc = 0
159
160            self.fsample = fsample
161            self.n_channels = bci_controller.shape[0]
162
163            for i in range(len(marker_data)):
164                # Get current resting state data marker and time stamp
165                current_rs_data_marker = marker_data[i]
166                current_rs_timestamp = marker_timestamps[i]
167
168                # Increment the EEG until just past the marker timestamp
169                while current_time < current_rs_timestamp:
170                    current_timestamp_loc += 1
171                    current_time = eeg_timestamps[current_timestamp_loc]
172
173                # get eyes open start times
174                if current_rs_data_marker == "Start Eyes Open RS: 1":
175                    eyes_open_start_time.append(current_rs_timestamp)
176                    eyes_open_start_loc.append(current_timestamp_loc - 1)
177                    logger.debug("received eyes open start")
178
179                # get eyes open end times
180                if current_rs_data_marker == "End Eyes Open RS: 1":
181                    eyes_open_end_time.append(current_rs_timestamp)
182                    eyes_open_end_loc.append(current_timestamp_loc)
183                    logger.debug("received eyes open end")
184
185                # get eyes closed start times
186                if current_rs_data_marker == "Start Eyes Closed RS: 2":
187                    eyes_closed_start_time.append(current_rs_timestamp)
188                    eyes_closed_start_loc.append(current_timestamp_loc - 1)
189                    logger.debug("received eyes closed start")
190
191                # get eyes closed end times
192                if current_rs_data_marker == "End Eyes Closed RS: 2":
193                    eyes_closed_end_time.append(current_rs_timestamp)
194                    eyes_closed_end_loc.append(current_timestamp_loc)
195                    logger.debug("received eyes closed end")
196
197                # get rest start times
198                if current_rs_data_marker == "Start Rest for RS: 0":
199                    rest_start_time.append(current_rs_timestamp)
200                    rest_start_loc.append(current_timestamp_loc - 1)
201                    logger.debug("received rest start")
202                # get rest end times
203                if current_rs_data_marker == "End Rest for RS: 0":
204                    rest_end_time.append(current_rs_timestamp)
205                    rest_end_loc.append(current_timestamp_loc)
206                    logger.debug("received rest end")
207
208            # Eyes open
209            # Get duration, nsamples
210
211            if len(eyes_open_end_loc) > 0:
212                duration = np.floor(eyes_open_end_time[0] - eyes_open_start_time[0])
213                n_samples = int(duration * self.fsample)
214
215                new_eeg = np.zeros((1, self.n_channels, n_samples))
216
217                eyes_open_timestamps = np.arange(0, duration, 1 / self.fsample)
218
219                # Now copy EEG for these trials
220                for i in range(len(eyes_open_start_time)):
221                    eyes_open_time_correction = eyes_open_start_time[i]
222
223                    corrected_eeg_timestamps = (
224                        eeg_timestamps - eyes_open_time_correction
225                    )
226
227                    for c in range(self.n_channels):
228                        new_eeg[0, c, :] = np.interp(
229                            eyes_open_timestamps,
230                            corrected_eeg_timestamps,
231                            bci_controller[c, :],
232                        )
233
234                    if i == 0:
235                        eyes_open_trials = new_eeg
236                        eyes_open_timestamps = eyes_open_timestamps
237                    else:
238                        eyes_open_trials = np.concatenate(
239                            (eyes_open_trials, new_eeg), axis=0
240                        )
241
242            # Eyes closed
243
244            if len(eyes_closed_end_loc) > 0:
245                # Get duration, nsmaples
246                duration = np.floor(eyes_closed_end_time[0] - eyes_closed_start_time[0])
247                n_samples = int(duration * self.fsample)
248
249                new_eeg = np.zeros((1, self.n_channels, n_samples))
250
251                eyes_closed_timestamps = np.arange(0, duration, 1 / self.fsample)
252
253                # Now copy EEG for these trials
254                for i in range(len(eyes_closed_start_time)):
255                    eyes_closed_time_correction = eyes_closed_start_time[i]
256
257                    corrected_eeg_timestamps = (
258                        eeg_timestamps - eyes_closed_time_correction
259                    )
260
261                    for c in range(self.n_channels):
262                        new_eeg[0, c, :] = np.interp(
263                            eyes_closed_timestamps,
264                            corrected_eeg_timestamps,
265                            bci_controller[c, :],
266                        )
267
268                    if i == 0:
269                        eyes_closed_trials = new_eeg
270                        eyes_closed_timestamps = eyes_closed_timestamps
271                    else:
272                        eyes_closed_trials = np.concatenate(
273                            (eyes_closed_trials, new_eeg), axis=0
274                        )
275
276            # Rest
277            if len(rest_end_loc) > 0:
278                # Get duration, nsmaples
279                while rest_end_time[0] < rest_start_time[0]:
280                    rest_end_time.pop(0)
281                    rest_end_loc.pop(0)
282
283                duration = np.floor(rest_end_time[0] - rest_start_time[0])
284                n_samples = int(duration * self.fsample)
285
286                new_eeg = np.zeros((1, self.n_channels, n_samples))
287
288                rest_timestamps = np.arange(0, duration, 1 / self.fsample)
289
290                # Now copy EEG for these trials
291                for i in range(len(rest_start_time)):
292                    rest_time_correction = rest_start_time[i]
293
294                    corrected_eeg_timestamps = eeg_timestamps - rest_time_correction
295
296                    for c in range(self.n_channels):
297                        new_eeg[0, c, :] = np.interp(
298                            rest_timestamps,
299                            corrected_eeg_timestamps,
300                            bci_controller[c, :],
301                        )
302
303                    if i == 0:
304                        rest_trials = new_eeg
305                        rest_timestamps = rest_timestamps
306
307                    else:
308                        rest_trials = np.concatenate((rest_trials, new_eeg), axis=0)
309
310            # Put all the available data into a dictionary
311            resting_state_data = {
312                "eyes_open_trials": eyes_open_trials,
313                "eyes_open_timestamps": eyes_open_timestamps,
314                "eyes_closed_trials": eyes_closed_trials,
315                "eyes_closed_timestamps": eyes_closed_timestamps,
316                "rest_trials": rest_trials,
317                "rest_timestamps": rest_timestamps,
318            }
319
320            logger.debug("Done packaging resting state data")
321
322            return resting_state_data
323
324        except Exception as e:
325            logger.error("Error packaging resting state data: %s", e)
326
327            return None
328
329    @abstractmethod
330    def get_eeg_start_and_end_times(self, markers, timestamps):
331        """
332        Get the start and end times of the EEG data based on the markers.
333
334        Parameters
335        ----------
336        markers : list of str
337            List of markers.
338        timestamps : list of float
339            List of timestamps.
340
341        Returns
342        -------
343        float
344            Start time.
345        float
346            End time.
347        """
348
349        pass
350
351    @abstractmethod
352    def process_markers(self, markers, marker_timestamps, eeg, eeg_timestamps, fsample):
353        """
354        This takes in the markers and EEG data and processes them into epochs accordingt to the MI paradigm.
355
356        Parameters
357        ----------
358        markers : list of str
359            List of markers.
360        marker_timestamps : list of float
361            List of timestamps.
362        eeg : np.array
363            EEG data. Shape is (n_channels, n_samples).
364        eeg_timestamps : np.array
365            EEG timestamps. Shape is (n_samples).
366        fsample : float
367            Sampling frequency.
368
369        Returns
370        -------
371        np.array
372            Processed EEG data. Shape is (n_epochs, n_channels, n_samples).
373        np.array
374            Labels. Shape is (n_epochs).
375        """
376
377        pass

Helper class that provides a standard way to create an ABC using inheritance.

Paradigm(filters=[5, 30], channel_subset=None)
14    def __init__(self, filters=[5, 30], channel_subset=None):
15        """
16        Base class for all paradigms.
17        Please use a subclass of this class for your specific paradigm.
18
19        Parameters
20        ----------
21        filters : list of floats | [5, 30]
22            Filter bands.
23        channel_subset : list of str | None
24            Channel subset to use.
25        """
26        self.lowcut = filters[0]
27        self.highcut = filters[1]
28        self.channel_subset = channel_subset
29
30        # When do we return classifications?
31        self.classify_each_epoch = False
32        self.classify_each_trial = False
33
34        # Do we classify labeled epochs (such as in the case of iterative training)?
35        self.classify_labeled_epochs = False
36
37        self.paradigm_name = "Generic"

Base class for all paradigms. Please use a subclass of this class for your specific paradigm.

Parameters
  • filters (list of floats | [5, 30]): Filter bands.
  • channel_subset (list of str | None): Channel subset to use.
lowcut
highcut
channel_subset
classify_each_epoch
classify_each_trial
classify_labeled_epochs
paradigm_name
def package_resting_state_data( self, marker_data, marker_timestamps, bci_controller, eeg_timestamps, fsample):
108    def package_resting_state_data(
109        self, marker_data, marker_timestamps, bci_controller, eeg_timestamps, fsample
110    ):
111        """Package resting state data.
112
113        Parameters
114        ----------
115        marker_data : list of str
116            List of markers.
117        marker_timestamps : np.ndarray
118            Timestamps of markers.
119        bci_controller : np.ndarray
120            EEG data. Shape is (n_channels, n_samples).
121        eeg_timestamps : np.ndarray
122            Timestamps of EEG data. Shape is (n_samples,).
123        fsample : float
124            Sampling frequency.
125
126        Returns
127        -------
128        resting_state_data : dict
129            Dictionary containing resting state data with keys:
130            - 'eyes_open_trials': np.ndarray of EEG data during eyes open condition
131            - 'eyes_open_timestamps': np.ndarray of timestamps for eyes open trials
132            - 'eyes_closed_trials': np.ndarray of EEG data during eyes closed condition
133            - 'eyes_closed_timestamps': np.ndarray of timestamps for eyes closed trials
134            - 'rest_trials': np.ndarray of EEG data during rest condition
135            - 'rest_timestamps': np.ndarray of timestamps for rest trials
136            If an error occurs, returns `None`.
137
138        """
139        try:
140            logger.debug("Packaging resting state data")
141
142            eyes_open_start_time = []
143            eyes_open_end_time = []
144            eyes_closed_start_time = []
145            eyes_closed_end_time = []
146            rest_start_time = []
147            rest_end_time = []
148
149            # Initialize start and end locations
150            eyes_open_start_loc = []
151            eyes_open_end_loc = []
152            eyes_closed_start_loc = []
153            eyes_closed_end_loc = []
154            rest_start_loc = []
155            rest_end_loc = []
156
157            current_time = eeg_timestamps[0]
158            current_timestamp_loc = 0
159
160            self.fsample = fsample
161            self.n_channels = bci_controller.shape[0]
162
163            for i in range(len(marker_data)):
164                # Get current resting state data marker and time stamp
165                current_rs_data_marker = marker_data[i]
166                current_rs_timestamp = marker_timestamps[i]
167
168                # Increment the EEG until just past the marker timestamp
169                while current_time < current_rs_timestamp:
170                    current_timestamp_loc += 1
171                    current_time = eeg_timestamps[current_timestamp_loc]
172
173                # get eyes open start times
174                if current_rs_data_marker == "Start Eyes Open RS: 1":
175                    eyes_open_start_time.append(current_rs_timestamp)
176                    eyes_open_start_loc.append(current_timestamp_loc - 1)
177                    logger.debug("received eyes open start")
178
179                # get eyes open end times
180                if current_rs_data_marker == "End Eyes Open RS: 1":
181                    eyes_open_end_time.append(current_rs_timestamp)
182                    eyes_open_end_loc.append(current_timestamp_loc)
183                    logger.debug("received eyes open end")
184
185                # get eyes closed start times
186                if current_rs_data_marker == "Start Eyes Closed RS: 2":
187                    eyes_closed_start_time.append(current_rs_timestamp)
188                    eyes_closed_start_loc.append(current_timestamp_loc - 1)
189                    logger.debug("received eyes closed start")
190
191                # get eyes closed end times
192                if current_rs_data_marker == "End Eyes Closed RS: 2":
193                    eyes_closed_end_time.append(current_rs_timestamp)
194                    eyes_closed_end_loc.append(current_timestamp_loc)
195                    logger.debug("received eyes closed end")
196
197                # get rest start times
198                if current_rs_data_marker == "Start Rest for RS: 0":
199                    rest_start_time.append(current_rs_timestamp)
200                    rest_start_loc.append(current_timestamp_loc - 1)
201                    logger.debug("received rest start")
202                # get rest end times
203                if current_rs_data_marker == "End Rest for RS: 0":
204                    rest_end_time.append(current_rs_timestamp)
205                    rest_end_loc.append(current_timestamp_loc)
206                    logger.debug("received rest end")
207
208            # Eyes open
209            # Get duration, nsamples
210
211            if len(eyes_open_end_loc) > 0:
212                duration = np.floor(eyes_open_end_time[0] - eyes_open_start_time[0])
213                n_samples = int(duration * self.fsample)
214
215                new_eeg = np.zeros((1, self.n_channels, n_samples))
216
217                eyes_open_timestamps = np.arange(0, duration, 1 / self.fsample)
218
219                # Now copy EEG for these trials
220                for i in range(len(eyes_open_start_time)):
221                    eyes_open_time_correction = eyes_open_start_time[i]
222
223                    corrected_eeg_timestamps = (
224                        eeg_timestamps - eyes_open_time_correction
225                    )
226
227                    for c in range(self.n_channels):
228                        new_eeg[0, c, :] = np.interp(
229                            eyes_open_timestamps,
230                            corrected_eeg_timestamps,
231                            bci_controller[c, :],
232                        )
233
234                    if i == 0:
235                        eyes_open_trials = new_eeg
236                        eyes_open_timestamps = eyes_open_timestamps
237                    else:
238                        eyes_open_trials = np.concatenate(
239                            (eyes_open_trials, new_eeg), axis=0
240                        )
241
242            # Eyes closed
243
244            if len(eyes_closed_end_loc) > 0:
245                # Get duration, nsmaples
246                duration = np.floor(eyes_closed_end_time[0] - eyes_closed_start_time[0])
247                n_samples = int(duration * self.fsample)
248
249                new_eeg = np.zeros((1, self.n_channels, n_samples))
250
251                eyes_closed_timestamps = np.arange(0, duration, 1 / self.fsample)
252
253                # Now copy EEG for these trials
254                for i in range(len(eyes_closed_start_time)):
255                    eyes_closed_time_correction = eyes_closed_start_time[i]
256
257                    corrected_eeg_timestamps = (
258                        eeg_timestamps - eyes_closed_time_correction
259                    )
260
261                    for c in range(self.n_channels):
262                        new_eeg[0, c, :] = np.interp(
263                            eyes_closed_timestamps,
264                            corrected_eeg_timestamps,
265                            bci_controller[c, :],
266                        )
267
268                    if i == 0:
269                        eyes_closed_trials = new_eeg
270                        eyes_closed_timestamps = eyes_closed_timestamps
271                    else:
272                        eyes_closed_trials = np.concatenate(
273                            (eyes_closed_trials, new_eeg), axis=0
274                        )
275
276            # Rest
277            if len(rest_end_loc) > 0:
278                # Get duration, nsmaples
279                while rest_end_time[0] < rest_start_time[0]:
280                    rest_end_time.pop(0)
281                    rest_end_loc.pop(0)
282
283                duration = np.floor(rest_end_time[0] - rest_start_time[0])
284                n_samples = int(duration * self.fsample)
285
286                new_eeg = np.zeros((1, self.n_channels, n_samples))
287
288                rest_timestamps = np.arange(0, duration, 1 / self.fsample)
289
290                # Now copy EEG for these trials
291                for i in range(len(rest_start_time)):
292                    rest_time_correction = rest_start_time[i]
293
294                    corrected_eeg_timestamps = eeg_timestamps - rest_time_correction
295
296                    for c in range(self.n_channels):
297                        new_eeg[0, c, :] = np.interp(
298                            rest_timestamps,
299                            corrected_eeg_timestamps,
300                            bci_controller[c, :],
301                        )
302
303                    if i == 0:
304                        rest_trials = new_eeg
305                        rest_timestamps = rest_timestamps
306
307                    else:
308                        rest_trials = np.concatenate((rest_trials, new_eeg), axis=0)
309
310            # Put all the available data into a dictionary
311            resting_state_data = {
312                "eyes_open_trials": eyes_open_trials,
313                "eyes_open_timestamps": eyes_open_timestamps,
314                "eyes_closed_trials": eyes_closed_trials,
315                "eyes_closed_timestamps": eyes_closed_timestamps,
316                "rest_trials": rest_trials,
317                "rest_timestamps": rest_timestamps,
318            }
319
320            logger.debug("Done packaging resting state data")
321
322            return resting_state_data
323
324        except Exception as e:
325            logger.error("Error packaging resting state data: %s", e)
326
327            return None

Package resting state data.

Parameters
  • marker_data (list of str): List of markers.
  • marker_timestamps (np.ndarray): Timestamps of markers.
  • bci_controller (np.ndarray): EEG data. Shape is (n_channels, n_samples).
  • eeg_timestamps (np.ndarray): Timestamps of EEG data. Shape is (n_samples,).
  • fsample (float): Sampling frequency.
Returns
  • resting_state_data (dict): Dictionary containing resting state data with keys:
    • 'eyes_open_trials': np.ndarray of EEG data during eyes open condition
    • 'eyes_open_timestamps': np.ndarray of timestamps for eyes open trials
    • 'eyes_closed_trials': np.ndarray of EEG data during eyes closed condition
    • 'eyes_closed_timestamps': np.ndarray of timestamps for eyes closed trials
    • 'rest_trials': np.ndarray of EEG data during rest condition
    • 'rest_timestamps': np.ndarray of timestamps for rest trials If an error occurs, returns None.
@abstractmethod
def get_eeg_start_and_end_times(self, markers, timestamps):
329    @abstractmethod
330    def get_eeg_start_and_end_times(self, markers, timestamps):
331        """
332        Get the start and end times of the EEG data based on the markers.
333
334        Parameters
335        ----------
336        markers : list of str
337            List of markers.
338        timestamps : list of float
339            List of timestamps.
340
341        Returns
342        -------
343        float
344            Start time.
345        float
346            End time.
347        """
348
349        pass

Get the start and end times of the EEG data based on the markers.

Parameters
  • markers (list of str): List of markers.
  • timestamps (list of float): List of timestamps.
Returns
  • float: Start time.
  • float: End time.
@abstractmethod
def process_markers(self, markers, marker_timestamps, eeg, eeg_timestamps, fsample):
351    @abstractmethod
352    def process_markers(self, markers, marker_timestamps, eeg, eeg_timestamps, fsample):
353        """
354        This takes in the markers and EEG data and processes them into epochs accordingt to the MI paradigm.
355
356        Parameters
357        ----------
358        markers : list of str
359            List of markers.
360        marker_timestamps : list of float
361            List of timestamps.
362        eeg : np.array
363            EEG data. Shape is (n_channels, n_samples).
364        eeg_timestamps : np.array
365            EEG timestamps. Shape is (n_samples).
366        fsample : float
367            Sampling frequency.
368
369        Returns
370        -------
371        np.array
372            Processed EEG data. Shape is (n_epochs, n_channels, n_samples).
373        np.array
374            Labels. Shape is (n_epochs).
375        """
376
377        pass

This takes in the markers and EEG data and processes them into epochs accordingt to the MI paradigm.

Parameters
  • markers (list of str): List of markers.
  • marker_timestamps (list of float): List of timestamps.
  • eeg (np.array): EEG data. Shape is (n_channels, n_samples).
  • eeg_timestamps (np.array): EEG timestamps. Shape is (n_samples).
  • fsample (float): Sampling frequency.
Returns
  • np.array: Processed EEG data. Shape is (n_epochs, n_channels, n_samples).
  • np.array: Labels. Shape is (n_epochs).