bci_essentials.io.lsl_sources
1from mne_lsl.lsl import StreamInlet, StreamInfo, resolve_streams 2from .sources import MarkerSource, EegSource 3from ..utils.logger import Logger # Logger wrapper 4 5# Instantiate a logger for the module at the default level of logging.INFO 6# Logs to bci_essentials.__module__) where __module__ is the name of the module 7logger = Logger(name=__name__) 8 9__all__ = ["LslMarkerSource", "LslEegSource"] 10 11 12class LslMarkerSource(MarkerSource): 13 def __init__( 14 self, stream: StreamInfo = None, buffer_size: int = 5, timeout: float = 600 15 ): 16 """Create a MarkerSource object that obtains markers from an LSL outlet 17 18 Parameters 19 ---------- 20 stream : StreamInfo, *optional* 21 Provide stream to use for Markers, if not provided, stream will be discovered. 22 buffer_size : int, *optional* 23 Size of the buffer is `buffer_size * 100`. Default is 5 (i.e., 500 samples). 24 timeout : float, *optional* 25 How many seconds to wait for marker outlet stream to be discovered. 26 If no stream is discovered, an Exception is raised. 27 By default init will wait 10 minutes. 28 """ 29 try: 30 if stream is None: 31 stream = discover_first_stream( 32 "BCI_Essentials_Markers", timeout=timeout 33 ) 34 self._inlet = StreamInlet( 35 stream, max_buffered=buffer_size, processing_flags=["dejitter"] 36 ) 37 self._inlet.open_stream(timeout=5) 38 self.__info = self._inlet.get_sinfo() 39 except Exception: 40 raise Exception("LslMarkerSource: could not create inlet") 41 42 @property 43 def name(self) -> str: 44 return self.__info.name 45 46 def get_markers(self) -> tuple[list[list], list]: 47 return pull_from_lsl_inlet(self._inlet) 48 49 def time_correction(self) -> float: 50 return self._inlet.time_correction() 51 52 53class LslEegSource(EegSource): 54 def __init__( 55 self, stream: StreamInfo = None, buffer_size: int = 5, timeout: float = 600 56 ): 57 """Create a MarkerSource object that obtains EEG from an LSL outlet 58 59 Parameters 60 ---------- 61 stream : StreamInfo, *optional* 62 Provide stream to use for EEG, if not provided, stream will be discovered. 63 buffer_size : int, *optional* 64 Size of the buffer to use for the inlet in seconds. Default is 5. 65 timeout : float, *optional* 66 How many seconds to wait for marker outlet stream to be discovered. 67 If no stream is discovered, an Exception is raised. 68 By default init will wait 10 minutes. 69 """ 70 try: 71 if stream is None: 72 stream = discover_first_stream("EEG", timeout=timeout) 73 self._inlet = StreamInlet( 74 stream, max_buffered=buffer_size, processing_flags=["dejitter"] 75 ) 76 self._inlet.open_stream(timeout=5) 77 self.__info = self._inlet.get_sinfo() 78 except Exception: 79 raise Exception("LslEegSource: could not create inlet") 80 81 @property 82 def name(self) -> str: 83 return self.__info.name 84 85 @property 86 def fsample(self) -> float: 87 return self.__info.sfreq 88 89 @property 90 def n_channels(self) -> int: 91 return self.__info.n_channels 92 93 @property 94 def channel_types(self) -> list[str]: 95 return self.__info.get_channel_types() 96 97 @property 98 def channel_units(self) -> list[str]: 99 """Get channel units. Default to "microvolts".""" 100 try: 101 units = self.__info.get_channel_units() 102 # If no units found or empty strings, use default 103 if not units or all(unit == "" for unit in units): 104 logger.warning("No channel units found, defaulting to microvolts") 105 units = ["microvolts"] * self.n_channels 106 return units 107 except Exception: 108 logger.warning("Could not get channel units, defaulting to microvolts") 109 return ["microvolts"] * self.n_channels 110 111 @property 112 def channel_labels(self) -> list[str]: 113 """Get channel labels. Default to Ch1, Ch2, etc.""" 114 115 try: 116 ch_names = self.__info.get_channel_names() 117 # If no labels found or empty strings, use default 118 if not ch_names or all(label == "" for label in ch_names): 119 logger.warning("No channel labels found, defaulting to Ch1, Ch2, etc.") 120 ch_names = [f"Ch{i+1}" for i in range(self.n_channels)] 121 return ch_names 122 except Exception: 123 logger.warning("Could not get channel labels, defaulting to Ch1, Ch2, etc.") 124 return [f"Ch{i+1}" for i in range(self.n_channels)] 125 126 # if hasattr(self.__info, 'ch_names') and self.__info.ch_names: 127 # return list(self.__info.ch_names) 128 # return [f"Ch{i+1}" for i in range(self.n_channels)] 129 130 def get_samples(self) -> tuple[list[list], list]: 131 return pull_from_lsl_inlet(self._inlet) 132 133 def time_correction(self) -> float: 134 return self._inlet.time_correction() 135 136 def get_channel_properties(self, property: str) -> list[str]: 137 """Get channel properties from mne_lsl stream info. 138 139 Parameters 140 ---------- 141 property : str 142 Property to get ('name', 'unit', 'type', etc) 143 144 Returns 145 ------- 146 list[str] 147 List of property values for each channel 148 """ 149 if property == "name": 150 return self.name 151 elif property == "unit": 152 return self.channel_units 153 elif property == "type": 154 return self.channel_types 155 elif property == "label": 156 return self.channel_labels 157 else: 158 logger.warning(f"Property '{property}' not supported in mne_lsl") 159 return [""] * self.n_channels 160 161 162def discover_first_stream(type: str, timeout: float = 600) -> StreamInfo: 163 """This helper returns the first stream of the specified type. 164 165 If no stream is found, an exception is raised.""" 166 streams = resolve_streams(stype=type, timeout=timeout) 167 return streams[0] 168 169 170def pull_from_lsl_inlet(inlet: StreamInlet) -> tuple[list[list], list]: 171 """StreamInlet.pull_chunk() may return None for samples. 172 173 This helper prevents `None` from propagating by converting it into [[]]. 174 175 If None is detected, the timestamps list is also forced to []. 176 """ 177 178 # read from inlet 179 samples, timestamps = inlet.pull_chunk(timeout=0.001) 180 181 # convert None or empty samples into empty lists 182 if samples is None or len(samples) == 0: 183 samples = [[]] 184 timestamps = [] 185 186 # return tuple[list[list], list] 187 return [samples, timestamps]
13class LslMarkerSource(MarkerSource): 14 def __init__( 15 self, stream: StreamInfo = None, buffer_size: int = 5, timeout: float = 600 16 ): 17 """Create a MarkerSource object that obtains markers from an LSL outlet 18 19 Parameters 20 ---------- 21 stream : StreamInfo, *optional* 22 Provide stream to use for Markers, if not provided, stream will be discovered. 23 buffer_size : int, *optional* 24 Size of the buffer is `buffer_size * 100`. Default is 5 (i.e., 500 samples). 25 timeout : float, *optional* 26 How many seconds to wait for marker outlet stream to be discovered. 27 If no stream is discovered, an Exception is raised. 28 By default init will wait 10 minutes. 29 """ 30 try: 31 if stream is None: 32 stream = discover_first_stream( 33 "BCI_Essentials_Markers", timeout=timeout 34 ) 35 self._inlet = StreamInlet( 36 stream, max_buffered=buffer_size, processing_flags=["dejitter"] 37 ) 38 self._inlet.open_stream(timeout=5) 39 self.__info = self._inlet.get_sinfo() 40 except Exception: 41 raise Exception("LslMarkerSource: could not create inlet") 42 43 @property 44 def name(self) -> str: 45 return self.__info.name 46 47 def get_markers(self) -> tuple[list[list], list]: 48 return pull_from_lsl_inlet(self._inlet) 49 50 def time_correction(self) -> float: 51 return self._inlet.time_correction()
MarkerSource objects send time synchronized markers/commands to bci_controller.
LslMarkerSource( stream: mne_lsl.lsl.stream_info.StreamInfo = None, buffer_size: int = 5, timeout: float = 600)
14 def __init__( 15 self, stream: StreamInfo = None, buffer_size: int = 5, timeout: float = 600 16 ): 17 """Create a MarkerSource object that obtains markers from an LSL outlet 18 19 Parameters 20 ---------- 21 stream : StreamInfo, *optional* 22 Provide stream to use for Markers, if not provided, stream will be discovered. 23 buffer_size : int, *optional* 24 Size of the buffer is `buffer_size * 100`. Default is 5 (i.e., 500 samples). 25 timeout : float, *optional* 26 How many seconds to wait for marker outlet stream to be discovered. 27 If no stream is discovered, an Exception is raised. 28 By default init will wait 10 minutes. 29 """ 30 try: 31 if stream is None: 32 stream = discover_first_stream( 33 "BCI_Essentials_Markers", timeout=timeout 34 ) 35 self._inlet = StreamInlet( 36 stream, max_buffered=buffer_size, processing_flags=["dejitter"] 37 ) 38 self._inlet.open_stream(timeout=5) 39 self.__info = self._inlet.get_sinfo() 40 except Exception: 41 raise Exception("LslMarkerSource: could not create inlet")
Create a MarkerSource object that obtains markers from an LSL outlet
Parameters
- stream (StreamInfo, optional): Provide stream to use for Markers, if not provided, stream will be discovered.
- buffer_size (int, optional):
Size of the buffer is
buffer_size * 100. Default is 5 (i.e., 500 samples). - timeout (float, optional): How many seconds to wait for marker outlet stream to be discovered. If no stream is discovered, an Exception is raised. By default init will wait 10 minutes.
def
get_markers(self) -> tuple[list[list], list]:
Get marker/command data and timestamps since last call
Returns
- marker_data (tuple(marker, timestemps)):
A tuple of (markers, timestamps):
- markers : list[list]
- A list of samples, where each sample corresponds to a timestamp.
- Each sample is a list with a single string element that represents a command or a marker.
- The string is formatted as follows:
- command = an arbitrary string, ex: "Trial Started"
- marker = "paradigm, num options, label number, trial length"
- timestamps : list[float]
- A list timestamps (float in seconds) corresponding to the markers
- markers : list[list]
def
time_correction(self) -> float:
Get the current time correction for timestamps.
Returns
- time_correction (float):
The current time correction estimate (seconds).
- This is the number that needs to be added to a time tamp that was remotely generated via local_clock() to map it into the local clock domain of the machine.
54class LslEegSource(EegSource): 55 def __init__( 56 self, stream: StreamInfo = None, buffer_size: int = 5, timeout: float = 600 57 ): 58 """Create a MarkerSource object that obtains EEG from an LSL outlet 59 60 Parameters 61 ---------- 62 stream : StreamInfo, *optional* 63 Provide stream to use for EEG, if not provided, stream will be discovered. 64 buffer_size : int, *optional* 65 Size of the buffer to use for the inlet in seconds. Default is 5. 66 timeout : float, *optional* 67 How many seconds to wait for marker outlet stream to be discovered. 68 If no stream is discovered, an Exception is raised. 69 By default init will wait 10 minutes. 70 """ 71 try: 72 if stream is None: 73 stream = discover_first_stream("EEG", timeout=timeout) 74 self._inlet = StreamInlet( 75 stream, max_buffered=buffer_size, processing_flags=["dejitter"] 76 ) 77 self._inlet.open_stream(timeout=5) 78 self.__info = self._inlet.get_sinfo() 79 except Exception: 80 raise Exception("LslEegSource: could not create inlet") 81 82 @property 83 def name(self) -> str: 84 return self.__info.name 85 86 @property 87 def fsample(self) -> float: 88 return self.__info.sfreq 89 90 @property 91 def n_channels(self) -> int: 92 return self.__info.n_channels 93 94 @property 95 def channel_types(self) -> list[str]: 96 return self.__info.get_channel_types() 97 98 @property 99 def channel_units(self) -> list[str]: 100 """Get channel units. Default to "microvolts".""" 101 try: 102 units = self.__info.get_channel_units() 103 # If no units found or empty strings, use default 104 if not units or all(unit == "" for unit in units): 105 logger.warning("No channel units found, defaulting to microvolts") 106 units = ["microvolts"] * self.n_channels 107 return units 108 except Exception: 109 logger.warning("Could not get channel units, defaulting to microvolts") 110 return ["microvolts"] * self.n_channels 111 112 @property 113 def channel_labels(self) -> list[str]: 114 """Get channel labels. Default to Ch1, Ch2, etc.""" 115 116 try: 117 ch_names = self.__info.get_channel_names() 118 # If no labels found or empty strings, use default 119 if not ch_names or all(label == "" for label in ch_names): 120 logger.warning("No channel labels found, defaulting to Ch1, Ch2, etc.") 121 ch_names = [f"Ch{i+1}" for i in range(self.n_channels)] 122 return ch_names 123 except Exception: 124 logger.warning("Could not get channel labels, defaulting to Ch1, Ch2, etc.") 125 return [f"Ch{i+1}" for i in range(self.n_channels)] 126 127 # if hasattr(self.__info, 'ch_names') and self.__info.ch_names: 128 # return list(self.__info.ch_names) 129 # return [f"Ch{i+1}" for i in range(self.n_channels)] 130 131 def get_samples(self) -> tuple[list[list], list]: 132 return pull_from_lsl_inlet(self._inlet) 133 134 def time_correction(self) -> float: 135 return self._inlet.time_correction() 136 137 def get_channel_properties(self, property: str) -> list[str]: 138 """Get channel properties from mne_lsl stream info. 139 140 Parameters 141 ---------- 142 property : str 143 Property to get ('name', 'unit', 'type', etc) 144 145 Returns 146 ------- 147 list[str] 148 List of property values for each channel 149 """ 150 if property == "name": 151 return self.name 152 elif property == "unit": 153 return self.channel_units 154 elif property == "type": 155 return self.channel_types 156 elif property == "label": 157 return self.channel_labels 158 else: 159 logger.warning(f"Property '{property}' not supported in mne_lsl") 160 return [""] * self.n_channels
EegSource objects produce samples of EEG for use in bci_controller.
It can be used to represent an BCI headset providing EEG data, or it could be a source of Markers to control bci_controller behaviour, etc.
LslEegSource( stream: mne_lsl.lsl.stream_info.StreamInfo = None, buffer_size: int = 5, timeout: float = 600)
55 def __init__( 56 self, stream: StreamInfo = None, buffer_size: int = 5, timeout: float = 600 57 ): 58 """Create a MarkerSource object that obtains EEG from an LSL outlet 59 60 Parameters 61 ---------- 62 stream : StreamInfo, *optional* 63 Provide stream to use for EEG, if not provided, stream will be discovered. 64 buffer_size : int, *optional* 65 Size of the buffer to use for the inlet in seconds. Default is 5. 66 timeout : float, *optional* 67 How many seconds to wait for marker outlet stream to be discovered. 68 If no stream is discovered, an Exception is raised. 69 By default init will wait 10 minutes. 70 """ 71 try: 72 if stream is None: 73 stream = discover_first_stream("EEG", timeout=timeout) 74 self._inlet = StreamInlet( 75 stream, max_buffered=buffer_size, processing_flags=["dejitter"] 76 ) 77 self._inlet.open_stream(timeout=5) 78 self.__info = self._inlet.get_sinfo() 79 except Exception: 80 raise Exception("LslEegSource: could not create inlet")
Create a MarkerSource object that obtains EEG from an LSL outlet
Parameters
- stream (StreamInfo, optional): Provide stream to use for EEG, if not provided, stream will be discovered.
- buffer_size (int, optional): Size of the buffer to use for the inlet in seconds. Default is 5.
- timeout (float, optional): How many seconds to wait for marker outlet stream to be discovered. If no stream is discovered, an Exception is raised. By default init will wait 10 minutes.
channel_units: list[str]
98 @property 99 def channel_units(self) -> list[str]: 100 """Get channel units. Default to "microvolts".""" 101 try: 102 units = self.__info.get_channel_units() 103 # If no units found or empty strings, use default 104 if not units or all(unit == "" for unit in units): 105 logger.warning("No channel units found, defaulting to microvolts") 106 units = ["microvolts"] * self.n_channels 107 return units 108 except Exception: 109 logger.warning("Could not get channel units, defaulting to microvolts") 110 return ["microvolts"] * self.n_channels
Get channel units. Default to "microvolts".
channel_labels: list[str]
112 @property 113 def channel_labels(self) -> list[str]: 114 """Get channel labels. Default to Ch1, Ch2, etc.""" 115 116 try: 117 ch_names = self.__info.get_channel_names() 118 # If no labels found or empty strings, use default 119 if not ch_names or all(label == "" for label in ch_names): 120 logger.warning("No channel labels found, defaulting to Ch1, Ch2, etc.") 121 ch_names = [f"Ch{i+1}" for i in range(self.n_channels)] 122 return ch_names 123 except Exception: 124 logger.warning("Could not get channel labels, defaulting to Ch1, Ch2, etc.") 125 return [f"Ch{i+1}" for i in range(self.n_channels)] 126 127 # if hasattr(self.__info, 'ch_names') and self.__info.ch_names: 128 # return list(self.__info.ch_names) 129 # return [f"Ch{i+1}" for i in range(self.n_channels)]
Get channel labels. Default to Ch1, Ch2, etc.
def
get_samples(self) -> tuple[list[list], list]:
Get EEG samples and timestamps since last call
Returns
- samples_data (tuple(samples, timestamps)):
- A tuple of (samples, timestamps) where:
- samples : list[float]
- A list of samples, where each sample corresponds to a timestamp. Each sample is a list of floats representing the value for each channel of EEG.
- timestamps : list[float]
- A list timestamps (float in seconds) corresponding to the samples
- samples : list[float]
- A tuple of (samples, timestamps) where:
def
time_correction(self) -> float:
Get the current time correction for timestamps.
Returns
- time_correction (float):
The current time correction estimate (seconds).
- This is the number that needs to be added to a time tamp that was remotely generated via local_clock() to map it into the local clock domain of the machine.
def
get_channel_properties(self, property: str) -> list[str]:
137 def get_channel_properties(self, property: str) -> list[str]: 138 """Get channel properties from mne_lsl stream info. 139 140 Parameters 141 ---------- 142 property : str 143 Property to get ('name', 'unit', 'type', etc) 144 145 Returns 146 ------- 147 list[str] 148 List of property values for each channel 149 """ 150 if property == "name": 151 return self.name 152 elif property == "unit": 153 return self.channel_units 154 elif property == "type": 155 return self.channel_types 156 elif property == "label": 157 return self.channel_labels 158 else: 159 logger.warning(f"Property '{property}' not supported in mne_lsl") 160 return [""] * self.n_channels
Get channel properties from mne_lsl stream info.
Parameters
- property (str): Property to get ('name', 'unit', 'type', etc)
Returns
- list[str]: List of property values for each channel