bci_essentials.io.lsl_sources

  1from mne_lsl.lsl import StreamInlet, StreamInfo, resolve_streams
  2from .sources import MarkerSource, EegSource
  3from ..utils.logger import Logger  # Logger wrapper
  4
  5# Instantiate a logger for the module at the default level of logging.INFO
  6# Logs to bci_essentials.__module__) where __module__ is the name of the module
  7logger = Logger(name=__name__)
  8
  9__all__ = ["LslMarkerSource", "LslEegSource"]
 10
 11
 12class LslMarkerSource(MarkerSource):
 13    def __init__(
 14        self, stream: StreamInfo = None, buffer_size: int = 5, timeout: float = 600
 15    ):
 16        """Create a MarkerSource object that obtains markers from an LSL outlet
 17
 18        Parameters
 19        ----------
 20        stream : StreamInfo, *optional*
 21            Provide stream to use for Markers, if not provided, stream will be discovered.
 22        buffer_size : int, *optional*
 23            Size of the buffer is `buffer_size * 100`. Default is 5 (i.e., 500 samples).
 24        timeout : float, *optional*
 25            How many seconds to wait for marker outlet stream to be discovered.
 26            If no stream is discovered, an Exception is raised.
 27            By default init will wait 10 minutes.
 28        """
 29        try:
 30            if stream is None:
 31                stream = discover_first_stream(
 32                    "BCI_Essentials_Markers", timeout=timeout
 33                )
 34            self._inlet = StreamInlet(
 35                stream, max_buffered=buffer_size, processing_flags=["dejitter"]
 36            )
 37            self._inlet.open_stream(timeout=5)
 38            self.__info = self._inlet.get_sinfo()
 39        except Exception:
 40            raise Exception("LslMarkerSource: could not create inlet")
 41
 42    @property
 43    def name(self) -> str:
 44        return self.__info.name
 45
 46    def get_markers(self) -> tuple[list[list], list]:
 47        return pull_from_lsl_inlet(self._inlet)
 48
 49    def time_correction(self) -> float:
 50        return self._inlet.time_correction()
 51
 52
 53class LslEegSource(EegSource):
 54    def __init__(
 55        self, stream: StreamInfo = None, buffer_size: int = 5, timeout: float = 600
 56    ):
 57        """Create a MarkerSource object that obtains EEG from an LSL outlet
 58
 59        Parameters
 60        ----------
 61        stream : StreamInfo, *optional*
 62            Provide stream to use for EEG, if not provided, stream will be discovered.
 63        buffer_size : int, *optional*
 64            Size of the buffer to use for the inlet in seconds. Default is 5.
 65        timeout : float, *optional*
 66            How many seconds to wait for marker outlet stream to be discovered.
 67            If no stream is discovered, an Exception is raised.
 68            By default init will wait 10 minutes.
 69        """
 70        try:
 71            if stream is None:
 72                stream = discover_first_stream("EEG", timeout=timeout)
 73            self._inlet = StreamInlet(
 74                stream, max_buffered=buffer_size, processing_flags=["dejitter"]
 75            )
 76            self._inlet.open_stream(timeout=5)
 77            self.__info = self._inlet.get_sinfo()
 78        except Exception:
 79            raise Exception("LslEegSource: could not create inlet")
 80
 81    @property
 82    def name(self) -> str:
 83        return self.__info.name
 84
 85    @property
 86    def fsample(self) -> float:
 87        return self.__info.sfreq
 88
 89    @property
 90    def n_channels(self) -> int:
 91        return self.__info.n_channels
 92
 93    @property
 94    def channel_types(self) -> list[str]:
 95        return self.__info.get_channel_types()
 96
 97    @property
 98    def channel_units(self) -> list[str]:
 99        """Get channel units. Default to "microvolts"."""
100        try:
101            units = self.__info.get_channel_units()
102            # If no units found or empty strings, use default
103            if not units or all(unit == "" for unit in units):
104                logger.warning("No channel units found, defaulting to microvolts")
105                units = ["microvolts"] * self.n_channels
106            return units
107        except Exception:
108            logger.warning("Could not get channel units, defaulting to microvolts")
109            return ["microvolts"] * self.n_channels
110
111    @property
112    def channel_labels(self) -> list[str]:
113        """Get channel labels.  Default to Ch1, Ch2, etc."""
114
115        try:
116            ch_names = self.__info.get_channel_names()
117            # If no labels found or empty strings, use default
118            if not ch_names or all(label == "" for label in ch_names):
119                logger.warning("No channel labels found, defaulting to Ch1, Ch2, etc.")
120                ch_names = [f"Ch{i+1}" for i in range(self.n_channels)]
121            return ch_names
122        except Exception:
123            logger.warning("Could not get channel labels, defaulting to Ch1, Ch2, etc.")
124            return [f"Ch{i+1}" for i in range(self.n_channels)]
125
126        # if hasattr(self.__info, 'ch_names') and self.__info.ch_names:
127        #     return list(self.__info.ch_names)
128        # return [f"Ch{i+1}" for i in range(self.n_channels)]
129
130    def get_samples(self) -> tuple[list[list], list]:
131        return pull_from_lsl_inlet(self._inlet)
132
133    def time_correction(self) -> float:
134        return self._inlet.time_correction()
135
136    def get_channel_properties(self, property: str) -> list[str]:
137        """Get channel properties from mne_lsl stream info.
138
139        Parameters
140        ----------
141        property : str
142            Property to get ('name', 'unit', 'type', etc)
143
144        Returns
145        -------
146        list[str]
147            List of property values for each channel
148        """
149        if property == "name":
150            return self.name
151        elif property == "unit":
152            return self.channel_units
153        elif property == "type":
154            return self.channel_types
155        elif property == "label":
156            return self.channel_labels
157        else:
158            logger.warning(f"Property '{property}' not supported in mne_lsl")
159            return [""] * self.n_channels
160
161
162def discover_first_stream(type: str, timeout: float = 600) -> StreamInfo:
163    """This helper returns the first stream of the specified type.
164
165    If no stream is found, an exception is raised."""
166    streams = resolve_streams(stype=type, timeout=timeout)
167    return streams[0]
168
169
170def pull_from_lsl_inlet(inlet: StreamInlet) -> tuple[list[list], list]:
171    """StreamInlet.pull_chunk() may return None for samples.
172
173    This helper prevents `None` from propagating by converting it into [[]].
174
175    If None is detected, the timestamps list is also forced to [].
176    """
177
178    # read from inlet
179    samples, timestamps = inlet.pull_chunk(timeout=0.001)
180
181    # convert None or empty samples into empty lists
182    if samples is None or len(samples) == 0:
183        samples = [[]]
184        timestamps = []
185
186    # return tuple[list[list], list]
187    return [samples, timestamps]
class LslMarkerSource(bci_essentials.io.sources.MarkerSource):
13class LslMarkerSource(MarkerSource):
14    def __init__(
15        self, stream: StreamInfo = None, buffer_size: int = 5, timeout: float = 600
16    ):
17        """Create a MarkerSource object that obtains markers from an LSL outlet
18
19        Parameters
20        ----------
21        stream : StreamInfo, *optional*
22            Provide stream to use for Markers, if not provided, stream will be discovered.
23        buffer_size : int, *optional*
24            Size of the buffer is `buffer_size * 100`. Default is 5 (i.e., 500 samples).
25        timeout : float, *optional*
26            How many seconds to wait for marker outlet stream to be discovered.
27            If no stream is discovered, an Exception is raised.
28            By default init will wait 10 minutes.
29        """
30        try:
31            if stream is None:
32                stream = discover_first_stream(
33                    "BCI_Essentials_Markers", timeout=timeout
34                )
35            self._inlet = StreamInlet(
36                stream, max_buffered=buffer_size, processing_flags=["dejitter"]
37            )
38            self._inlet.open_stream(timeout=5)
39            self.__info = self._inlet.get_sinfo()
40        except Exception:
41            raise Exception("LslMarkerSource: could not create inlet")
42
43    @property
44    def name(self) -> str:
45        return self.__info.name
46
47    def get_markers(self) -> tuple[list[list], list]:
48        return pull_from_lsl_inlet(self._inlet)
49
50    def time_correction(self) -> float:
51        return self._inlet.time_correction()

MarkerSource objects send time synchronized markers/commands to bci_controller.

LslMarkerSource( stream: mne_lsl.lsl.stream_info.StreamInfo = None, buffer_size: int = 5, timeout: float = 600)
14    def __init__(
15        self, stream: StreamInfo = None, buffer_size: int = 5, timeout: float = 600
16    ):
17        """Create a MarkerSource object that obtains markers from an LSL outlet
18
19        Parameters
20        ----------
21        stream : StreamInfo, *optional*
22            Provide stream to use for Markers, if not provided, stream will be discovered.
23        buffer_size : int, *optional*
24            Size of the buffer is `buffer_size * 100`. Default is 5 (i.e., 500 samples).
25        timeout : float, *optional*
26            How many seconds to wait for marker outlet stream to be discovered.
27            If no stream is discovered, an Exception is raised.
28            By default init will wait 10 minutes.
29        """
30        try:
31            if stream is None:
32                stream = discover_first_stream(
33                    "BCI_Essentials_Markers", timeout=timeout
34                )
35            self._inlet = StreamInlet(
36                stream, max_buffered=buffer_size, processing_flags=["dejitter"]
37            )
38            self._inlet.open_stream(timeout=5)
39            self.__info = self._inlet.get_sinfo()
40        except Exception:
41            raise Exception("LslMarkerSource: could not create inlet")

Create a MarkerSource object that obtains markers from an LSL outlet

Parameters
  • stream (StreamInfo, optional): Provide stream to use for Markers, if not provided, stream will be discovered.
  • buffer_size (int, optional): Size of the buffer is buffer_size * 100. Default is 5 (i.e., 500 samples).
  • timeout (float, optional): How many seconds to wait for marker outlet stream to be discovered. If no stream is discovered, an Exception is raised. By default init will wait 10 minutes.
name: str
43    @property
44    def name(self) -> str:
45        return self.__info.name

Name of the marker source

def get_markers(self) -> tuple[list[list], list]:
47    def get_markers(self) -> tuple[list[list], list]:
48        return pull_from_lsl_inlet(self._inlet)

Get marker/command data and timestamps since last call

Returns
  • marker_data (tuple(marker, timestemps)): A tuple of (markers, timestamps):
    • markers : list[list]
      • A list of samples, where each sample corresponds to a timestamp.
      • Each sample is a list with a single string element that represents a command or a marker.
      • The string is formatted as follows:
        • command = an arbitrary string, ex: "Trial Started"
        • marker = "paradigm, num options, label number, trial length"
    • timestamps : list[float]
      • A list timestamps (float in seconds) corresponding to the markers
def time_correction(self) -> float:
50    def time_correction(self) -> float:
51        return self._inlet.time_correction()

Get the current time correction for timestamps.

Returns
  • time_correction (float): The current time correction estimate (seconds).
    • This is the number that needs to be added to a time tamp that was remotely generated via local_clock() to map it into the local clock domain of the machine.
class LslEegSource(bci_essentials.io.sources.EegSource):
 54class LslEegSource(EegSource):
 55    def __init__(
 56        self, stream: StreamInfo = None, buffer_size: int = 5, timeout: float = 600
 57    ):
 58        """Create a MarkerSource object that obtains EEG from an LSL outlet
 59
 60        Parameters
 61        ----------
 62        stream : StreamInfo, *optional*
 63            Provide stream to use for EEG, if not provided, stream will be discovered.
 64        buffer_size : int, *optional*
 65            Size of the buffer to use for the inlet in seconds. Default is 5.
 66        timeout : float, *optional*
 67            How many seconds to wait for marker outlet stream to be discovered.
 68            If no stream is discovered, an Exception is raised.
 69            By default init will wait 10 minutes.
 70        """
 71        try:
 72            if stream is None:
 73                stream = discover_first_stream("EEG", timeout=timeout)
 74            self._inlet = StreamInlet(
 75                stream, max_buffered=buffer_size, processing_flags=["dejitter"]
 76            )
 77            self._inlet.open_stream(timeout=5)
 78            self.__info = self._inlet.get_sinfo()
 79        except Exception:
 80            raise Exception("LslEegSource: could not create inlet")
 81
 82    @property
 83    def name(self) -> str:
 84        return self.__info.name
 85
 86    @property
 87    def fsample(self) -> float:
 88        return self.__info.sfreq
 89
 90    @property
 91    def n_channels(self) -> int:
 92        return self.__info.n_channels
 93
 94    @property
 95    def channel_types(self) -> list[str]:
 96        return self.__info.get_channel_types()
 97
 98    @property
 99    def channel_units(self) -> list[str]:
100        """Get channel units. Default to "microvolts"."""
101        try:
102            units = self.__info.get_channel_units()
103            # If no units found or empty strings, use default
104            if not units or all(unit == "" for unit in units):
105                logger.warning("No channel units found, defaulting to microvolts")
106                units = ["microvolts"] * self.n_channels
107            return units
108        except Exception:
109            logger.warning("Could not get channel units, defaulting to microvolts")
110            return ["microvolts"] * self.n_channels
111
112    @property
113    def channel_labels(self) -> list[str]:
114        """Get channel labels.  Default to Ch1, Ch2, etc."""
115
116        try:
117            ch_names = self.__info.get_channel_names()
118            # If no labels found or empty strings, use default
119            if not ch_names or all(label == "" for label in ch_names):
120                logger.warning("No channel labels found, defaulting to Ch1, Ch2, etc.")
121                ch_names = [f"Ch{i+1}" for i in range(self.n_channels)]
122            return ch_names
123        except Exception:
124            logger.warning("Could not get channel labels, defaulting to Ch1, Ch2, etc.")
125            return [f"Ch{i+1}" for i in range(self.n_channels)]
126
127        # if hasattr(self.__info, 'ch_names') and self.__info.ch_names:
128        #     return list(self.__info.ch_names)
129        # return [f"Ch{i+1}" for i in range(self.n_channels)]
130
131    def get_samples(self) -> tuple[list[list], list]:
132        return pull_from_lsl_inlet(self._inlet)
133
134    def time_correction(self) -> float:
135        return self._inlet.time_correction()
136
137    def get_channel_properties(self, property: str) -> list[str]:
138        """Get channel properties from mne_lsl stream info.
139
140        Parameters
141        ----------
142        property : str
143            Property to get ('name', 'unit', 'type', etc)
144
145        Returns
146        -------
147        list[str]
148            List of property values for each channel
149        """
150        if property == "name":
151            return self.name
152        elif property == "unit":
153            return self.channel_units
154        elif property == "type":
155            return self.channel_types
156        elif property == "label":
157            return self.channel_labels
158        else:
159            logger.warning(f"Property '{property}' not supported in mne_lsl")
160            return [""] * self.n_channels

EegSource objects produce samples of EEG for use in bci_controller.

It can be used to represent an BCI headset providing EEG data, or it could be a source of Markers to control bci_controller behaviour, etc.

LslEegSource( stream: mne_lsl.lsl.stream_info.StreamInfo = None, buffer_size: int = 5, timeout: float = 600)
55    def __init__(
56        self, stream: StreamInfo = None, buffer_size: int = 5, timeout: float = 600
57    ):
58        """Create a MarkerSource object that obtains EEG from an LSL outlet
59
60        Parameters
61        ----------
62        stream : StreamInfo, *optional*
63            Provide stream to use for EEG, if not provided, stream will be discovered.
64        buffer_size : int, *optional*
65            Size of the buffer to use for the inlet in seconds. Default is 5.
66        timeout : float, *optional*
67            How many seconds to wait for marker outlet stream to be discovered.
68            If no stream is discovered, an Exception is raised.
69            By default init will wait 10 minutes.
70        """
71        try:
72            if stream is None:
73                stream = discover_first_stream("EEG", timeout=timeout)
74            self._inlet = StreamInlet(
75                stream, max_buffered=buffer_size, processing_flags=["dejitter"]
76            )
77            self._inlet.open_stream(timeout=5)
78            self.__info = self._inlet.get_sinfo()
79        except Exception:
80            raise Exception("LslEegSource: could not create inlet")

Create a MarkerSource object that obtains EEG from an LSL outlet

Parameters
  • stream (StreamInfo, optional): Provide stream to use for EEG, if not provided, stream will be discovered.
  • buffer_size (int, optional): Size of the buffer to use for the inlet in seconds. Default is 5.
  • timeout (float, optional): How many seconds to wait for marker outlet stream to be discovered. If no stream is discovered, an Exception is raised. By default init will wait 10 minutes.
name: str
82    @property
83    def name(self) -> str:
84        return self.__info.name

Name of the EEG source

fsample: float
86    @property
87    def fsample(self) -> float:
88        return self.__info.sfreq

Sample rate of EEG source

n_channels: int
90    @property
91    def n_channels(self) -> int:
92        return self.__info.n_channels

Number of EEG channels per sample

channel_types: list[str]
94    @property
95    def channel_types(self) -> list[str]:
96        return self.__info.get_channel_types()

The type of each channel, ex: eeg, or stim

channel_units: list[str]
 98    @property
 99    def channel_units(self) -> list[str]:
100        """Get channel units. Default to "microvolts"."""
101        try:
102            units = self.__info.get_channel_units()
103            # If no units found or empty strings, use default
104            if not units or all(unit == "" for unit in units):
105                logger.warning("No channel units found, defaulting to microvolts")
106                units = ["microvolts"] * self.n_channels
107            return units
108        except Exception:
109            logger.warning("Could not get channel units, defaulting to microvolts")
110            return ["microvolts"] * self.n_channels

Get channel units. Default to "microvolts".

channel_labels: list[str]
112    @property
113    def channel_labels(self) -> list[str]:
114        """Get channel labels.  Default to Ch1, Ch2, etc."""
115
116        try:
117            ch_names = self.__info.get_channel_names()
118            # If no labels found or empty strings, use default
119            if not ch_names or all(label == "" for label in ch_names):
120                logger.warning("No channel labels found, defaulting to Ch1, Ch2, etc.")
121                ch_names = [f"Ch{i+1}" for i in range(self.n_channels)]
122            return ch_names
123        except Exception:
124            logger.warning("Could not get channel labels, defaulting to Ch1, Ch2, etc.")
125            return [f"Ch{i+1}" for i in range(self.n_channels)]
126
127        # if hasattr(self.__info, 'ch_names') and self.__info.ch_names:
128        #     return list(self.__info.ch_names)
129        # return [f"Ch{i+1}" for i in range(self.n_channels)]

Get channel labels. Default to Ch1, Ch2, etc.

def get_samples(self) -> tuple[list[list], list]:
131    def get_samples(self) -> tuple[list[list], list]:
132        return pull_from_lsl_inlet(self._inlet)

Get EEG samples and timestamps since last call

Returns
  • samples_data (tuple(samples, timestamps)):
    • A tuple of (samples, timestamps) where:
      • samples : list[float]
        • A list of samples, where each sample corresponds to a timestamp. Each sample is a list of floats representing the value for each channel of EEG.
      • timestamps : list[float]
        • A list timestamps (float in seconds) corresponding to the samples
def time_correction(self) -> float:
134    def time_correction(self) -> float:
135        return self._inlet.time_correction()

Get the current time correction for timestamps.

Returns
  • time_correction (float): The current time correction estimate (seconds).
    • This is the number that needs to be added to a time tamp that was remotely generated via local_clock() to map it into the local clock domain of the machine.
def get_channel_properties(self, property: str) -> list[str]:
137    def get_channel_properties(self, property: str) -> list[str]:
138        """Get channel properties from mne_lsl stream info.
139
140        Parameters
141        ----------
142        property : str
143            Property to get ('name', 'unit', 'type', etc)
144
145        Returns
146        -------
147        list[str]
148            List of property values for each channel
149        """
150        if property == "name":
151            return self.name
152        elif property == "unit":
153            return self.channel_units
154        elif property == "type":
155            return self.channel_types
156        elif property == "label":
157            return self.channel_labels
158        else:
159            logger.warning(f"Property '{property}' not supported in mne_lsl")
160            return [""] * self.n_channels

Get channel properties from mne_lsl stream info.

Parameters
  • property (str): Property to get ('name', 'unit', 'type', etc)
Returns
  • list[str]: List of property values for each channel