bci_essentials.io.lsl_sources

  1from mne_lsl.lsl import StreamInlet, StreamInfo, resolve_streams
  2from .sources import MarkerSource, EegSource
  3from ..utils.logger import Logger  # Logger wrapper
  4
  5# Instantiate a logger for the module at the default level of logging.INFO
  6# Logs to bci_essentials.__module__) where __module__ is the name of the module
  7logger = Logger(name=__name__)
  8
  9__all__ = ["LslMarkerSource", "LslEegSource"]
 10
 11
 12class LslMarkerSource(MarkerSource):
 13    def __init__(
 14        self, stream: StreamInfo = None, buffer_size: int = 5, timeout: float = 600
 15    ):
 16        """Create a MarkerSource object that obtains markers from an LSL outlet
 17
 18        Parameters
 19        ----------
 20        stream : StreamInfo, *optional*
 21            Provide stream to use for Markers, if not provided, stream will be discovered.
 22        buffer_size : int, *optional*
 23            Size of the buffer is `buffer_size * 100`. Default is 5 (i.e., 500 samples).
 24        timeout : float, *optional*
 25            How many seconds to wait for marker outlet stream to be discovered.
 26            If no stream is discovered, an Exception is raised.
 27            By default init will wait 10 minutes.
 28        """
 29        try:
 30            if stream is None:
 31                stream = discover_first_stream("LSL_Marker_Strings", timeout=timeout)
 32            self._inlet = StreamInlet(
 33                stream, max_buffered=buffer_size, processing_flags=["dejitter"]
 34            )
 35            self._inlet.open_stream(timeout=5)
 36            self.__info = self._inlet.get_sinfo()
 37        except Exception:
 38            raise Exception("LslMarkerSource: could not create inlet")
 39
 40    @property
 41    def name(self) -> str:
 42        return self.__info.name
 43
 44    def get_markers(self) -> tuple[list[list], list]:
 45        return pull_from_lsl_inlet(self._inlet)
 46
 47    def time_correction(self) -> float:
 48        return self._inlet.time_correction()
 49
 50
 51class LslEegSource(EegSource):
 52    def __init__(
 53        self, stream: StreamInfo = None, buffer_size: int = 5, timeout: float = 600
 54    ):
 55        """Create a MarkerSource object that obtains EEG from an LSL outlet
 56
 57        Parameters
 58        ----------
 59        stream : StreamInfo, *optional*
 60            Provide stream to use for EEG, if not provided, stream will be discovered.
 61        buffer_size : int, *optional*
 62            Size of the buffer to use for the inlet in seconds. Default is 5.
 63        timeout : float, *optional*
 64            How many seconds to wait for marker outlet stream to be discovered.
 65            If no stream is discovered, an Exception is raised.
 66            By default init will wait 10 minutes.
 67        """
 68        try:
 69            if stream is None:
 70                stream = discover_first_stream("EEG", timeout=timeout)
 71            self._inlet = StreamInlet(
 72                stream, max_buffered=buffer_size, processing_flags=["dejitter"]
 73            )
 74            self._inlet.open_stream(timeout=5)
 75            self.__info = self._inlet.get_sinfo()
 76        except Exception:
 77            raise Exception("LslEegSource: could not create inlet")
 78
 79    @property
 80    def name(self) -> str:
 81        return self.__info.name
 82
 83    @property
 84    def fsample(self) -> float:
 85        return self.__info.sfreq
 86
 87    @property
 88    def n_channels(self) -> int:
 89        return self.__info.n_channels
 90
 91    @property
 92    def channel_types(self) -> list[str]:
 93        return self.__info.get_channel_types()
 94
 95    @property
 96    def channel_units(self) -> list[str]:
 97        """Get channel units. Default to "microvolts"."""
 98        try:
 99            units = self.__info.get_channel_units()
100            # If no units found or empty strings, use default
101            if not units or all(unit == "" for unit in units):
102                logger.warning("No channel units found, defaulting to microvolts")
103                units = ["microvolts"] * self.n_channels
104            return units
105        except Exception:
106            logger.warning("Could not get channel units, defaulting to microvolts")
107            return ["microvolts"] * self.n_channels
108
109    @property
110    def channel_labels(self) -> list[str]:
111        """Get channel labels.  Default to Ch1, Ch2, etc."""
112
113        try:
114            ch_names = self.__info.get_channel_names()
115            # If no labels found or empty strings, use default
116            if not ch_names or all(label == "" for label in ch_names):
117                logger.warning("No channel labels found, defaulting to Ch1, Ch2, etc.")
118                ch_names = [f"Ch{i+1}" for i in range(self.n_channels)]
119            return ch_names
120        except Exception:
121            logger.warning("Could not get channel labels, defaulting to Ch1, Ch2, etc.")
122            return [f"Ch{i+1}" for i in range(self.n_channels)]
123
124        # if hasattr(self.__info, 'ch_names') and self.__info.ch_names:
125        #     return list(self.__info.ch_names)
126        # return [f"Ch{i+1}" for i in range(self.n_channels)]
127
128    def get_samples(self) -> tuple[list[list], list]:
129        return pull_from_lsl_inlet(self._inlet)
130
131    def time_correction(self) -> float:
132        return self._inlet.time_correction()
133
134    def get_channel_properties(self, property: str) -> list[str]:
135        """Get channel properties from mne_lsl stream info.
136
137        Parameters
138        ----------
139        property : str
140            Property to get ('name', 'unit', 'type', etc)
141
142        Returns
143        -------
144        list[str]
145            List of property values for each channel
146        """
147        if property == "name":
148            return self.name
149        elif property == "unit":
150            return self.channel_units
151        elif property == "type":
152            return self.channel_types
153        elif property == "label":
154            return self.channel_labels
155        else:
156            logger.warning(f"Property '{property}' not supported in mne_lsl")
157            return [""] * self.n_channels
158
159
160def discover_first_stream(type: str, timeout: float = 600) -> StreamInfo:
161    """This helper returns the first stream of the specified type.
162
163    If no stream is found, an exception is raised."""
164    streams = resolve_streams(stype=type, timeout=timeout)
165    return streams[0]
166
167
168def pull_from_lsl_inlet(inlet: StreamInlet) -> tuple[list[list], list]:
169    """StreamInlet.pull_chunk() may return None for samples.
170
171    This helper prevents `None` from propagating by converting it into [[]].
172
173    If None is detected, the timestamps list is also forced to [].
174    """
175
176    # read from inlet
177    samples, timestamps = inlet.pull_chunk(timeout=0.001)
178
179    # convert None or empty samples into empty lists
180    if samples is None or len(samples) == 0:
181        samples = [[]]
182        timestamps = []
183
184    # return tuple[list[list], list]
185    return [samples, timestamps]
class LslMarkerSource(bci_essentials.io.sources.MarkerSource):
13class LslMarkerSource(MarkerSource):
14    def __init__(
15        self, stream: StreamInfo = None, buffer_size: int = 5, timeout: float = 600
16    ):
17        """Create a MarkerSource object that obtains markers from an LSL outlet
18
19        Parameters
20        ----------
21        stream : StreamInfo, *optional*
22            Provide stream to use for Markers, if not provided, stream will be discovered.
23        buffer_size : int, *optional*
24            Size of the buffer is `buffer_size * 100`. Default is 5 (i.e., 500 samples).
25        timeout : float, *optional*
26            How many seconds to wait for marker outlet stream to be discovered.
27            If no stream is discovered, an Exception is raised.
28            By default init will wait 10 minutes.
29        """
30        try:
31            if stream is None:
32                stream = discover_first_stream("LSL_Marker_Strings", timeout=timeout)
33            self._inlet = StreamInlet(
34                stream, max_buffered=buffer_size, processing_flags=["dejitter"]
35            )
36            self._inlet.open_stream(timeout=5)
37            self.__info = self._inlet.get_sinfo()
38        except Exception:
39            raise Exception("LslMarkerSource: could not create inlet")
40
41    @property
42    def name(self) -> str:
43        return self.__info.name
44
45    def get_markers(self) -> tuple[list[list], list]:
46        return pull_from_lsl_inlet(self._inlet)
47
48    def time_correction(self) -> float:
49        return self._inlet.time_correction()

MarkerSource objects send time synchronized markers/commands to bci_controller.

LslMarkerSource( stream: mne_lsl.lsl.stream_info.StreamInfo = None, buffer_size: int = 5, timeout: float = 600)
14    def __init__(
15        self, stream: StreamInfo = None, buffer_size: int = 5, timeout: float = 600
16    ):
17        """Create a MarkerSource object that obtains markers from an LSL outlet
18
19        Parameters
20        ----------
21        stream : StreamInfo, *optional*
22            Provide stream to use for Markers, if not provided, stream will be discovered.
23        buffer_size : int, *optional*
24            Size of the buffer is `buffer_size * 100`. Default is 5 (i.e., 500 samples).
25        timeout : float, *optional*
26            How many seconds to wait for marker outlet stream to be discovered.
27            If no stream is discovered, an Exception is raised.
28            By default init will wait 10 minutes.
29        """
30        try:
31            if stream is None:
32                stream = discover_first_stream("LSL_Marker_Strings", timeout=timeout)
33            self._inlet = StreamInlet(
34                stream, max_buffered=buffer_size, processing_flags=["dejitter"]
35            )
36            self._inlet.open_stream(timeout=5)
37            self.__info = self._inlet.get_sinfo()
38        except Exception:
39            raise Exception("LslMarkerSource: could not create inlet")

Create a MarkerSource object that obtains markers from an LSL outlet

Parameters
  • stream (StreamInfo, optional): Provide stream to use for Markers, if not provided, stream will be discovered.
  • buffer_size (int, optional): Size of the buffer is buffer_size * 100. Default is 5 (i.e., 500 samples).
  • timeout (float, optional): How many seconds to wait for marker outlet stream to be discovered. If no stream is discovered, an Exception is raised. By default init will wait 10 minutes.
name: str
41    @property
42    def name(self) -> str:
43        return self.__info.name

Name of the marker source

def get_markers(self) -> tuple[list[list], list]:
45    def get_markers(self) -> tuple[list[list], list]:
46        return pull_from_lsl_inlet(self._inlet)

Get marker/command data and timestamps since last call

Returns
  • marker_data (tuple(marker, timestemps)): A tuple of (markers, timestamps):
    • markers : list[list]
      • A list of samples, where each sample corresponds to a timestamp.
      • Each sample is a list with a single string element that represents a command or a marker.
      • The string is formatted as follows:
        • command = an arbitrary string, ex: "Trial Started"
        • marker = "paradigm, num options, label number, trial length"
    • timestamps : list[float]
      • A list timestamps (float in seconds) corresponding to the markers
def time_correction(self) -> float:
48    def time_correction(self) -> float:
49        return self._inlet.time_correction()

Get the current time correction for timestamps.

Returns
  • time_correction (float): The current time correction estimate (seconds).
    • This is the number that needs to be added to a time tamp that was remotely generated via local_clock() to map it into the local clock domain of the machine.
class LslEegSource(bci_essentials.io.sources.EegSource):
 52class LslEegSource(EegSource):
 53    def __init__(
 54        self, stream: StreamInfo = None, buffer_size: int = 5, timeout: float = 600
 55    ):
 56        """Create a MarkerSource object that obtains EEG from an LSL outlet
 57
 58        Parameters
 59        ----------
 60        stream : StreamInfo, *optional*
 61            Provide stream to use for EEG, if not provided, stream will be discovered.
 62        buffer_size : int, *optional*
 63            Size of the buffer to use for the inlet in seconds. Default is 5.
 64        timeout : float, *optional*
 65            How many seconds to wait for marker outlet stream to be discovered.
 66            If no stream is discovered, an Exception is raised.
 67            By default init will wait 10 minutes.
 68        """
 69        try:
 70            if stream is None:
 71                stream = discover_first_stream("EEG", timeout=timeout)
 72            self._inlet = StreamInlet(
 73                stream, max_buffered=buffer_size, processing_flags=["dejitter"]
 74            )
 75            self._inlet.open_stream(timeout=5)
 76            self.__info = self._inlet.get_sinfo()
 77        except Exception:
 78            raise Exception("LslEegSource: could not create inlet")
 79
 80    @property
 81    def name(self) -> str:
 82        return self.__info.name
 83
 84    @property
 85    def fsample(self) -> float:
 86        return self.__info.sfreq
 87
 88    @property
 89    def n_channels(self) -> int:
 90        return self.__info.n_channels
 91
 92    @property
 93    def channel_types(self) -> list[str]:
 94        return self.__info.get_channel_types()
 95
 96    @property
 97    def channel_units(self) -> list[str]:
 98        """Get channel units. Default to "microvolts"."""
 99        try:
100            units = self.__info.get_channel_units()
101            # If no units found or empty strings, use default
102            if not units or all(unit == "" for unit in units):
103                logger.warning("No channel units found, defaulting to microvolts")
104                units = ["microvolts"] * self.n_channels
105            return units
106        except Exception:
107            logger.warning("Could not get channel units, defaulting to microvolts")
108            return ["microvolts"] * self.n_channels
109
110    @property
111    def channel_labels(self) -> list[str]:
112        """Get channel labels.  Default to Ch1, Ch2, etc."""
113
114        try:
115            ch_names = self.__info.get_channel_names()
116            # If no labels found or empty strings, use default
117            if not ch_names or all(label == "" for label in ch_names):
118                logger.warning("No channel labels found, defaulting to Ch1, Ch2, etc.")
119                ch_names = [f"Ch{i+1}" for i in range(self.n_channels)]
120            return ch_names
121        except Exception:
122            logger.warning("Could not get channel labels, defaulting to Ch1, Ch2, etc.")
123            return [f"Ch{i+1}" for i in range(self.n_channels)]
124
125        # if hasattr(self.__info, 'ch_names') and self.__info.ch_names:
126        #     return list(self.__info.ch_names)
127        # return [f"Ch{i+1}" for i in range(self.n_channels)]
128
129    def get_samples(self) -> tuple[list[list], list]:
130        return pull_from_lsl_inlet(self._inlet)
131
132    def time_correction(self) -> float:
133        return self._inlet.time_correction()
134
135    def get_channel_properties(self, property: str) -> list[str]:
136        """Get channel properties from mne_lsl stream info.
137
138        Parameters
139        ----------
140        property : str
141            Property to get ('name', 'unit', 'type', etc)
142
143        Returns
144        -------
145        list[str]
146            List of property values for each channel
147        """
148        if property == "name":
149            return self.name
150        elif property == "unit":
151            return self.channel_units
152        elif property == "type":
153            return self.channel_types
154        elif property == "label":
155            return self.channel_labels
156        else:
157            logger.warning(f"Property '{property}' not supported in mne_lsl")
158            return [""] * self.n_channels

EegSource objects produce samples of EEG for use in bci_controller.

It can be used to represent an BCI headset providing EEG data, or it could be a source of Markers to control bci_controller behaviour, etc.

LslEegSource( stream: mne_lsl.lsl.stream_info.StreamInfo = None, buffer_size: int = 5, timeout: float = 600)
53    def __init__(
54        self, stream: StreamInfo = None, buffer_size: int = 5, timeout: float = 600
55    ):
56        """Create a MarkerSource object that obtains EEG from an LSL outlet
57
58        Parameters
59        ----------
60        stream : StreamInfo, *optional*
61            Provide stream to use for EEG, if not provided, stream will be discovered.
62        buffer_size : int, *optional*
63            Size of the buffer to use for the inlet in seconds. Default is 5.
64        timeout : float, *optional*
65            How many seconds to wait for marker outlet stream to be discovered.
66            If no stream is discovered, an Exception is raised.
67            By default init will wait 10 minutes.
68        """
69        try:
70            if stream is None:
71                stream = discover_first_stream("EEG", timeout=timeout)
72            self._inlet = StreamInlet(
73                stream, max_buffered=buffer_size, processing_flags=["dejitter"]
74            )
75            self._inlet.open_stream(timeout=5)
76            self.__info = self._inlet.get_sinfo()
77        except Exception:
78            raise Exception("LslEegSource: could not create inlet")

Create a MarkerSource object that obtains EEG from an LSL outlet

Parameters
  • stream (StreamInfo, optional): Provide stream to use for EEG, if not provided, stream will be discovered.
  • buffer_size (int, optional): Size of the buffer to use for the inlet in seconds. Default is 5.
  • timeout (float, optional): How many seconds to wait for marker outlet stream to be discovered. If no stream is discovered, an Exception is raised. By default init will wait 10 minutes.
name: str
80    @property
81    def name(self) -> str:
82        return self.__info.name

Name of the EEG source

fsample: float
84    @property
85    def fsample(self) -> float:
86        return self.__info.sfreq

Sample rate of EEG source

n_channels: int
88    @property
89    def n_channels(self) -> int:
90        return self.__info.n_channels

Number of EEG channels per sample

channel_types: list[str]
92    @property
93    def channel_types(self) -> list[str]:
94        return self.__info.get_channel_types()

The type of each channel, ex: eeg, or stim

channel_units: list[str]
 96    @property
 97    def channel_units(self) -> list[str]:
 98        """Get channel units. Default to "microvolts"."""
 99        try:
100            units = self.__info.get_channel_units()
101            # If no units found or empty strings, use default
102            if not units or all(unit == "" for unit in units):
103                logger.warning("No channel units found, defaulting to microvolts")
104                units = ["microvolts"] * self.n_channels
105            return units
106        except Exception:
107            logger.warning("Could not get channel units, defaulting to microvolts")
108            return ["microvolts"] * self.n_channels

Get channel units. Default to "microvolts".

channel_labels: list[str]
110    @property
111    def channel_labels(self) -> list[str]:
112        """Get channel labels.  Default to Ch1, Ch2, etc."""
113
114        try:
115            ch_names = self.__info.get_channel_names()
116            # If no labels found or empty strings, use default
117            if not ch_names or all(label == "" for label in ch_names):
118                logger.warning("No channel labels found, defaulting to Ch1, Ch2, etc.")
119                ch_names = [f"Ch{i+1}" for i in range(self.n_channels)]
120            return ch_names
121        except Exception:
122            logger.warning("Could not get channel labels, defaulting to Ch1, Ch2, etc.")
123            return [f"Ch{i+1}" for i in range(self.n_channels)]
124
125        # if hasattr(self.__info, 'ch_names') and self.__info.ch_names:
126        #     return list(self.__info.ch_names)
127        # return [f"Ch{i+1}" for i in range(self.n_channels)]

Get channel labels. Default to Ch1, Ch2, etc.

def get_samples(self) -> tuple[list[list], list]:
129    def get_samples(self) -> tuple[list[list], list]:
130        return pull_from_lsl_inlet(self._inlet)

Get EEG samples and timestamps since last call

Returns
  • samples_data (tuple(samples, timestamps)):
    • A tuple of (samples, timestamps) where:
      • samples : list[float]
        • A list of samples, where each sample corresponds to a timestamp. Each sample is a list of floats representing the value for each channel of EEG.
      • timestamps : list[float]
        • A list timestamps (float in seconds) corresponding to the samples
def time_correction(self) -> float:
132    def time_correction(self) -> float:
133        return self._inlet.time_correction()

Get the current time correction for timestamps.

Returns
  • time_correction (float): The current time correction estimate (seconds).
    • This is the number that needs to be added to a time tamp that was remotely generated via local_clock() to map it into the local clock domain of the machine.
def get_channel_properties(self, property: str) -> list[str]:
135    def get_channel_properties(self, property: str) -> list[str]:
136        """Get channel properties from mne_lsl stream info.
137
138        Parameters
139        ----------
140        property : str
141            Property to get ('name', 'unit', 'type', etc)
142
143        Returns
144        -------
145        list[str]
146            List of property values for each channel
147        """
148        if property == "name":
149            return self.name
150        elif property == "unit":
151            return self.channel_units
152        elif property == "type":
153            return self.channel_types
154        elif property == "label":
155            return self.channel_labels
156        else:
157            logger.warning(f"Property '{property}' not supported in mne_lsl")
158            return [""] * self.n_channels

Get channel properties from mne_lsl stream info.

Parameters
  • property (str): Property to get ('name', 'unit', 'type', etc)
Returns
  • list[str]: List of property values for each channel