bci_essentials.io.lsl_sources
1from mne_lsl.lsl import StreamInlet, StreamInfo, resolve_streams 2from .sources import MarkerSource, EegSource 3from ..utils.logger import Logger # Logger wrapper 4 5# Instantiate a logger for the module at the default level of logging.INFO 6# Logs to bci_essentials.__module__) where __module__ is the name of the module 7logger = Logger(name=__name__) 8 9__all__ = ["LslMarkerSource", "LslEegSource"] 10 11 12class LslMarkerSource(MarkerSource): 13 def __init__( 14 self, stream: StreamInfo = None, buffer_size: int = 5, timeout: float = 600 15 ): 16 """Create a MarkerSource object that obtains markers from an LSL outlet 17 18 Parameters 19 ---------- 20 stream : StreamInfo, *optional* 21 Provide stream to use for Markers, if not provided, stream will be discovered. 22 buffer_size : int, *optional* 23 Size of the buffer is `buffer_size * 100`. Default is 5 (i.e., 500 samples). 24 timeout : float, *optional* 25 How many seconds to wait for marker outlet stream to be discovered. 26 If no stream is discovered, an Exception is raised. 27 By default init will wait 10 minutes. 28 """ 29 try: 30 if stream is None: 31 stream = discover_first_stream("LSL_Marker_Strings", timeout=timeout) 32 self._inlet = StreamInlet( 33 stream, max_buffered=buffer_size, processing_flags=["dejitter"] 34 ) 35 self._inlet.open_stream(timeout=5) 36 self.__info = self._inlet.get_sinfo() 37 except Exception: 38 raise Exception("LslMarkerSource: could not create inlet") 39 40 @property 41 def name(self) -> str: 42 return self.__info.name 43 44 def get_markers(self) -> tuple[list[list], list]: 45 return pull_from_lsl_inlet(self._inlet) 46 47 def time_correction(self) -> float: 48 return self._inlet.time_correction() 49 50 51class LslEegSource(EegSource): 52 def __init__( 53 self, stream: StreamInfo = None, buffer_size: int = 5, timeout: float = 600 54 ): 55 """Create a MarkerSource object that obtains EEG from an LSL outlet 56 57 Parameters 58 ---------- 59 stream : StreamInfo, *optional* 60 Provide stream to use for EEG, if not provided, stream will be discovered. 61 buffer_size : int, *optional* 62 Size of the buffer to use for the inlet in seconds. Default is 5. 63 timeout : float, *optional* 64 How many seconds to wait for marker outlet stream to be discovered. 65 If no stream is discovered, an Exception is raised. 66 By default init will wait 10 minutes. 67 """ 68 try: 69 if stream is None: 70 stream = discover_first_stream("EEG", timeout=timeout) 71 self._inlet = StreamInlet( 72 stream, max_buffered=buffer_size, processing_flags=["dejitter"] 73 ) 74 self._inlet.open_stream(timeout=5) 75 self.__info = self._inlet.get_sinfo() 76 except Exception: 77 raise Exception("LslEegSource: could not create inlet") 78 79 @property 80 def name(self) -> str: 81 return self.__info.name 82 83 @property 84 def fsample(self) -> float: 85 return self.__info.sfreq 86 87 @property 88 def n_channels(self) -> int: 89 return self.__info.n_channels 90 91 @property 92 def channel_types(self) -> list[str]: 93 return self.__info.get_channel_types() 94 95 @property 96 def channel_units(self) -> list[str]: 97 """Get channel units. Default to "microvolts".""" 98 try: 99 units = self.__info.get_channel_units() 100 # If no units found or empty strings, use default 101 if not units or all(unit == "" for unit in units): 102 logger.warning("No channel units found, defaulting to microvolts") 103 units = ["microvolts"] * self.n_channels 104 return units 105 except Exception: 106 logger.warning("Could not get channel units, defaulting to microvolts") 107 return ["microvolts"] * self.n_channels 108 109 @property 110 def channel_labels(self) -> list[str]: 111 """Get channel labels. Default to Ch1, Ch2, etc.""" 112 113 try: 114 ch_names = self.__info.get_channel_names() 115 # If no labels found or empty strings, use default 116 if not ch_names or all(label == "" for label in ch_names): 117 logger.warning("No channel labels found, defaulting to Ch1, Ch2, etc.") 118 ch_names = [f"Ch{i+1}" for i in range(self.n_channels)] 119 return ch_names 120 except Exception: 121 logger.warning("Could not get channel labels, defaulting to Ch1, Ch2, etc.") 122 return [f"Ch{i+1}" for i in range(self.n_channels)] 123 124 # if hasattr(self.__info, 'ch_names') and self.__info.ch_names: 125 # return list(self.__info.ch_names) 126 # return [f"Ch{i+1}" for i in range(self.n_channels)] 127 128 def get_samples(self) -> tuple[list[list], list]: 129 return pull_from_lsl_inlet(self._inlet) 130 131 def time_correction(self) -> float: 132 return self._inlet.time_correction() 133 134 def get_channel_properties(self, property: str) -> list[str]: 135 """Get channel properties from mne_lsl stream info. 136 137 Parameters 138 ---------- 139 property : str 140 Property to get ('name', 'unit', 'type', etc) 141 142 Returns 143 ------- 144 list[str] 145 List of property values for each channel 146 """ 147 if property == "name": 148 return self.name 149 elif property == "unit": 150 return self.channel_units 151 elif property == "type": 152 return self.channel_types 153 elif property == "label": 154 return self.channel_labels 155 else: 156 logger.warning(f"Property '{property}' not supported in mne_lsl") 157 return [""] * self.n_channels 158 159 160def discover_first_stream(type: str, timeout: float = 600) -> StreamInfo: 161 """This helper returns the first stream of the specified type. 162 163 If no stream is found, an exception is raised.""" 164 streams = resolve_streams(stype=type, timeout=timeout) 165 return streams[0] 166 167 168def pull_from_lsl_inlet(inlet: StreamInlet) -> tuple[list[list], list]: 169 """StreamInlet.pull_chunk() may return None for samples. 170 171 This helper prevents `None` from propagating by converting it into [[]]. 172 173 If None is detected, the timestamps list is also forced to []. 174 """ 175 176 # read from inlet 177 samples, timestamps = inlet.pull_chunk(timeout=0.001) 178 179 # convert None or empty samples into empty lists 180 if samples is None or len(samples) == 0: 181 samples = [[]] 182 timestamps = [] 183 184 # return tuple[list[list], list] 185 return [samples, timestamps]
13class LslMarkerSource(MarkerSource): 14 def __init__( 15 self, stream: StreamInfo = None, buffer_size: int = 5, timeout: float = 600 16 ): 17 """Create a MarkerSource object that obtains markers from an LSL outlet 18 19 Parameters 20 ---------- 21 stream : StreamInfo, *optional* 22 Provide stream to use for Markers, if not provided, stream will be discovered. 23 buffer_size : int, *optional* 24 Size of the buffer is `buffer_size * 100`. Default is 5 (i.e., 500 samples). 25 timeout : float, *optional* 26 How many seconds to wait for marker outlet stream to be discovered. 27 If no stream is discovered, an Exception is raised. 28 By default init will wait 10 minutes. 29 """ 30 try: 31 if stream is None: 32 stream = discover_first_stream("LSL_Marker_Strings", timeout=timeout) 33 self._inlet = StreamInlet( 34 stream, max_buffered=buffer_size, processing_flags=["dejitter"] 35 ) 36 self._inlet.open_stream(timeout=5) 37 self.__info = self._inlet.get_sinfo() 38 except Exception: 39 raise Exception("LslMarkerSource: could not create inlet") 40 41 @property 42 def name(self) -> str: 43 return self.__info.name 44 45 def get_markers(self) -> tuple[list[list], list]: 46 return pull_from_lsl_inlet(self._inlet) 47 48 def time_correction(self) -> float: 49 return self._inlet.time_correction()
MarkerSource objects send time synchronized markers/commands to bci_controller.
LslMarkerSource( stream: mne_lsl.lsl.stream_info.StreamInfo = None, buffer_size: int = 5, timeout: float = 600)
14 def __init__( 15 self, stream: StreamInfo = None, buffer_size: int = 5, timeout: float = 600 16 ): 17 """Create a MarkerSource object that obtains markers from an LSL outlet 18 19 Parameters 20 ---------- 21 stream : StreamInfo, *optional* 22 Provide stream to use for Markers, if not provided, stream will be discovered. 23 buffer_size : int, *optional* 24 Size of the buffer is `buffer_size * 100`. Default is 5 (i.e., 500 samples). 25 timeout : float, *optional* 26 How many seconds to wait for marker outlet stream to be discovered. 27 If no stream is discovered, an Exception is raised. 28 By default init will wait 10 minutes. 29 """ 30 try: 31 if stream is None: 32 stream = discover_first_stream("LSL_Marker_Strings", timeout=timeout) 33 self._inlet = StreamInlet( 34 stream, max_buffered=buffer_size, processing_flags=["dejitter"] 35 ) 36 self._inlet.open_stream(timeout=5) 37 self.__info = self._inlet.get_sinfo() 38 except Exception: 39 raise Exception("LslMarkerSource: could not create inlet")
Create a MarkerSource object that obtains markers from an LSL outlet
Parameters
- stream (StreamInfo, optional): Provide stream to use for Markers, if not provided, stream will be discovered.
- buffer_size (int, optional):
Size of the buffer is
buffer_size * 100. Default is 5 (i.e., 500 samples). - timeout (float, optional): How many seconds to wait for marker outlet stream to be discovered. If no stream is discovered, an Exception is raised. By default init will wait 10 minutes.
def
get_markers(self) -> tuple[list[list], list]:
Get marker/command data and timestamps since last call
Returns
- marker_data (tuple(marker, timestemps)):
A tuple of (markers, timestamps):
- markers : list[list]
- A list of samples, where each sample corresponds to a timestamp.
- Each sample is a list with a single string element that represents a command or a marker.
- The string is formatted as follows:
- command = an arbitrary string, ex: "Trial Started"
- marker = "paradigm, num options, label number, trial length"
- timestamps : list[float]
- A list timestamps (float in seconds) corresponding to the markers
- markers : list[list]
def
time_correction(self) -> float:
Get the current time correction for timestamps.
Returns
- time_correction (float):
The current time correction estimate (seconds).
- This is the number that needs to be added to a time tamp that was remotely generated via local_clock() to map it into the local clock domain of the machine.
52class LslEegSource(EegSource): 53 def __init__( 54 self, stream: StreamInfo = None, buffer_size: int = 5, timeout: float = 600 55 ): 56 """Create a MarkerSource object that obtains EEG from an LSL outlet 57 58 Parameters 59 ---------- 60 stream : StreamInfo, *optional* 61 Provide stream to use for EEG, if not provided, stream will be discovered. 62 buffer_size : int, *optional* 63 Size of the buffer to use for the inlet in seconds. Default is 5. 64 timeout : float, *optional* 65 How many seconds to wait for marker outlet stream to be discovered. 66 If no stream is discovered, an Exception is raised. 67 By default init will wait 10 minutes. 68 """ 69 try: 70 if stream is None: 71 stream = discover_first_stream("EEG", timeout=timeout) 72 self._inlet = StreamInlet( 73 stream, max_buffered=buffer_size, processing_flags=["dejitter"] 74 ) 75 self._inlet.open_stream(timeout=5) 76 self.__info = self._inlet.get_sinfo() 77 except Exception: 78 raise Exception("LslEegSource: could not create inlet") 79 80 @property 81 def name(self) -> str: 82 return self.__info.name 83 84 @property 85 def fsample(self) -> float: 86 return self.__info.sfreq 87 88 @property 89 def n_channels(self) -> int: 90 return self.__info.n_channels 91 92 @property 93 def channel_types(self) -> list[str]: 94 return self.__info.get_channel_types() 95 96 @property 97 def channel_units(self) -> list[str]: 98 """Get channel units. Default to "microvolts".""" 99 try: 100 units = self.__info.get_channel_units() 101 # If no units found or empty strings, use default 102 if not units or all(unit == "" for unit in units): 103 logger.warning("No channel units found, defaulting to microvolts") 104 units = ["microvolts"] * self.n_channels 105 return units 106 except Exception: 107 logger.warning("Could not get channel units, defaulting to microvolts") 108 return ["microvolts"] * self.n_channels 109 110 @property 111 def channel_labels(self) -> list[str]: 112 """Get channel labels. Default to Ch1, Ch2, etc.""" 113 114 try: 115 ch_names = self.__info.get_channel_names() 116 # If no labels found or empty strings, use default 117 if not ch_names or all(label == "" for label in ch_names): 118 logger.warning("No channel labels found, defaulting to Ch1, Ch2, etc.") 119 ch_names = [f"Ch{i+1}" for i in range(self.n_channels)] 120 return ch_names 121 except Exception: 122 logger.warning("Could not get channel labels, defaulting to Ch1, Ch2, etc.") 123 return [f"Ch{i+1}" for i in range(self.n_channels)] 124 125 # if hasattr(self.__info, 'ch_names') and self.__info.ch_names: 126 # return list(self.__info.ch_names) 127 # return [f"Ch{i+1}" for i in range(self.n_channels)] 128 129 def get_samples(self) -> tuple[list[list], list]: 130 return pull_from_lsl_inlet(self._inlet) 131 132 def time_correction(self) -> float: 133 return self._inlet.time_correction() 134 135 def get_channel_properties(self, property: str) -> list[str]: 136 """Get channel properties from mne_lsl stream info. 137 138 Parameters 139 ---------- 140 property : str 141 Property to get ('name', 'unit', 'type', etc) 142 143 Returns 144 ------- 145 list[str] 146 List of property values for each channel 147 """ 148 if property == "name": 149 return self.name 150 elif property == "unit": 151 return self.channel_units 152 elif property == "type": 153 return self.channel_types 154 elif property == "label": 155 return self.channel_labels 156 else: 157 logger.warning(f"Property '{property}' not supported in mne_lsl") 158 return [""] * self.n_channels
EegSource objects produce samples of EEG for use in bci_controller.
It can be used to represent an BCI headset providing EEG data, or it could be a source of Markers to control bci_controller behaviour, etc.
LslEegSource( stream: mne_lsl.lsl.stream_info.StreamInfo = None, buffer_size: int = 5, timeout: float = 600)
53 def __init__( 54 self, stream: StreamInfo = None, buffer_size: int = 5, timeout: float = 600 55 ): 56 """Create a MarkerSource object that obtains EEG from an LSL outlet 57 58 Parameters 59 ---------- 60 stream : StreamInfo, *optional* 61 Provide stream to use for EEG, if not provided, stream will be discovered. 62 buffer_size : int, *optional* 63 Size of the buffer to use for the inlet in seconds. Default is 5. 64 timeout : float, *optional* 65 How many seconds to wait for marker outlet stream to be discovered. 66 If no stream is discovered, an Exception is raised. 67 By default init will wait 10 minutes. 68 """ 69 try: 70 if stream is None: 71 stream = discover_first_stream("EEG", timeout=timeout) 72 self._inlet = StreamInlet( 73 stream, max_buffered=buffer_size, processing_flags=["dejitter"] 74 ) 75 self._inlet.open_stream(timeout=5) 76 self.__info = self._inlet.get_sinfo() 77 except Exception: 78 raise Exception("LslEegSource: could not create inlet")
Create a MarkerSource object that obtains EEG from an LSL outlet
Parameters
- stream (StreamInfo, optional): Provide stream to use for EEG, if not provided, stream will be discovered.
- buffer_size (int, optional): Size of the buffer to use for the inlet in seconds. Default is 5.
- timeout (float, optional): How many seconds to wait for marker outlet stream to be discovered. If no stream is discovered, an Exception is raised. By default init will wait 10 minutes.
channel_units: list[str]
96 @property 97 def channel_units(self) -> list[str]: 98 """Get channel units. Default to "microvolts".""" 99 try: 100 units = self.__info.get_channel_units() 101 # If no units found or empty strings, use default 102 if not units or all(unit == "" for unit in units): 103 logger.warning("No channel units found, defaulting to microvolts") 104 units = ["microvolts"] * self.n_channels 105 return units 106 except Exception: 107 logger.warning("Could not get channel units, defaulting to microvolts") 108 return ["microvolts"] * self.n_channels
Get channel units. Default to "microvolts".
channel_labels: list[str]
110 @property 111 def channel_labels(self) -> list[str]: 112 """Get channel labels. Default to Ch1, Ch2, etc.""" 113 114 try: 115 ch_names = self.__info.get_channel_names() 116 # If no labels found or empty strings, use default 117 if not ch_names or all(label == "" for label in ch_names): 118 logger.warning("No channel labels found, defaulting to Ch1, Ch2, etc.") 119 ch_names = [f"Ch{i+1}" for i in range(self.n_channels)] 120 return ch_names 121 except Exception: 122 logger.warning("Could not get channel labels, defaulting to Ch1, Ch2, etc.") 123 return [f"Ch{i+1}" for i in range(self.n_channels)] 124 125 # if hasattr(self.__info, 'ch_names') and self.__info.ch_names: 126 # return list(self.__info.ch_names) 127 # return [f"Ch{i+1}" for i in range(self.n_channels)]
Get channel labels. Default to Ch1, Ch2, etc.
def
get_samples(self) -> tuple[list[list], list]:
Get EEG samples and timestamps since last call
Returns
- samples_data (tuple(samples, timestamps)):
- A tuple of (samples, timestamps) where:
- samples : list[float]
- A list of samples, where each sample corresponds to a timestamp. Each sample is a list of floats representing the value for each channel of EEG.
- timestamps : list[float]
- A list timestamps (float in seconds) corresponding to the samples
- samples : list[float]
- A tuple of (samples, timestamps) where:
def
time_correction(self) -> float:
Get the current time correction for timestamps.
Returns
- time_correction (float):
The current time correction estimate (seconds).
- This is the number that needs to be added to a time tamp that was remotely generated via local_clock() to map it into the local clock domain of the machine.
def
get_channel_properties(self, property: str) -> list[str]:
135 def get_channel_properties(self, property: str) -> list[str]: 136 """Get channel properties from mne_lsl stream info. 137 138 Parameters 139 ---------- 140 property : str 141 Property to get ('name', 'unit', 'type', etc) 142 143 Returns 144 ------- 145 list[str] 146 List of property values for each channel 147 """ 148 if property == "name": 149 return self.name 150 elif property == "unit": 151 return self.channel_units 152 elif property == "type": 153 return self.channel_types 154 elif property == "label": 155 return self.channel_labels 156 else: 157 logger.warning(f"Property '{property}' not supported in mne_lsl") 158 return [""] * self.n_channels
Get channel properties from mne_lsl stream info.
Parameters
- property (str): Property to get ('name', 'unit', 'type', etc)
Returns
- list[str]: List of property values for each channel