bci_essentials.io.lsl_messenger
1from mne_lsl.lsl import StreamInfo, StreamOutlet 2from .messenger import Messenger 3from ..classification.generic_classifier import Prediction 4import numpy as np 5 6__all__ = ["LslMessenger"] 7 8 9class LslMessenger(Messenger): 10 """A Messenger object for sending event messages to an LSL outlet.""" 11 12 def __init__(self): 13 """Create an LslMessenger object. 14 15 If the LSL outlet cannot be created, an exception is raised.""" 16 try: 17 info = StreamInfo( 18 name="PythonResponse", 19 stype="BCI_Essentials_Predictions", 20 n_channels=1, 21 sfreq=0, # 0 means irregular rate 22 dtype="string", 23 source_id="pyp30042", 24 ) 25 self.__outlet = StreamOutlet(info) 26 self.__outlet.push_sample(["This is the python response stream"]) 27 except Exception: 28 raise Exception("LslMessenger: could not create outlet") 29 30 def ping(self): 31 self.__outlet.push_sample(["ping"]) 32 33 def marker_received(self, marker): 34 # ignore 35 pass 36 37 def prediction(self, prediction: Prediction): 38 prediction_message = self.format_prediction_message(prediction) 39 self.__outlet.push_sample([prediction_message]) 40 41 def format_prediction_message(self, prediction: Prediction) -> str: 42 labels = prediction.labels 43 probabilities = prediction.probabilities 44 45 # One label, list of scalars 46 if np.isscalar(probabilities[0]): 47 return self.format_constituent_prediction_string(labels[0], probabilities) 48 49 # One or more label, nested list of scalars 50 constituent_prediction_strings = [] 51 for label_index in range(len(labels)): 52 label = int(labels[label_index]) 53 label_probabilities = probabilities[label_index] 54 55 constituent_prediction_strings.append( 56 self.format_constituent_prediction_string(label, label_probabilities) 57 ) 58 59 return ",".join(constituent_prediction_strings) 60 61 def format_constituent_prediction_string( 62 self, 63 label: int, 64 probabilities: list | np.ndarray, 65 probability_precision: int = 4, 66 ) -> str: 67 probability_format = "%.{}f".format(probability_precision) 68 probabilities_string = " ".join([probability_format % p for p in probabilities]) 69 70 return "%s:[%s]" % (str(label), probabilities_string)
10class LslMessenger(Messenger): 11 """A Messenger object for sending event messages to an LSL outlet.""" 12 13 def __init__(self): 14 """Create an LslMessenger object. 15 16 If the LSL outlet cannot be created, an exception is raised.""" 17 try: 18 info = StreamInfo( 19 name="PythonResponse", 20 stype="BCI_Essentials_Predictions", 21 n_channels=1, 22 sfreq=0, # 0 means irregular rate 23 dtype="string", 24 source_id="pyp30042", 25 ) 26 self.__outlet = StreamOutlet(info) 27 self.__outlet.push_sample(["This is the python response stream"]) 28 except Exception: 29 raise Exception("LslMessenger: could not create outlet") 30 31 def ping(self): 32 self.__outlet.push_sample(["ping"]) 33 34 def marker_received(self, marker): 35 # ignore 36 pass 37 38 def prediction(self, prediction: Prediction): 39 prediction_message = self.format_prediction_message(prediction) 40 self.__outlet.push_sample([prediction_message]) 41 42 def format_prediction_message(self, prediction: Prediction) -> str: 43 labels = prediction.labels 44 probabilities = prediction.probabilities 45 46 # One label, list of scalars 47 if np.isscalar(probabilities[0]): 48 return self.format_constituent_prediction_string(labels[0], probabilities) 49 50 # One or more label, nested list of scalars 51 constituent_prediction_strings = [] 52 for label_index in range(len(labels)): 53 label = int(labels[label_index]) 54 label_probabilities = probabilities[label_index] 55 56 constituent_prediction_strings.append( 57 self.format_constituent_prediction_string(label, label_probabilities) 58 ) 59 60 return ",".join(constituent_prediction_strings) 61 62 def format_constituent_prediction_string( 63 self, 64 label: int, 65 probabilities: list | np.ndarray, 66 probability_precision: int = 4, 67 ) -> str: 68 probability_format = "%.{}f".format(probability_precision) 69 probabilities_string = " ".join([probability_format % p for p in probabilities]) 70 71 return "%s:[%s]" % (str(label), probabilities_string)
A Messenger object for sending event messages to an LSL outlet.
LslMessenger()
13 def __init__(self): 14 """Create an LslMessenger object. 15 16 If the LSL outlet cannot be created, an exception is raised.""" 17 try: 18 info = StreamInfo( 19 name="PythonResponse", 20 stype="BCI_Essentials_Predictions", 21 n_channels=1, 22 sfreq=0, # 0 means irregular rate 23 dtype="string", 24 source_id="pyp30042", 25 ) 26 self.__outlet = StreamOutlet(info) 27 self.__outlet.push_sample(["This is the python response stream"]) 28 except Exception: 29 raise Exception("LslMessenger: could not create outlet")
Create an LslMessenger object.
If the LSL outlet cannot be created, an exception is raised.
38 def prediction(self, prediction: Prediction): 39 prediction_message = self.format_prediction_message(prediction) 40 self.__outlet.push_sample([prediction_message])
Send latest prediction
def
format_prediction_message( self, prediction: bci_essentials.classification.generic_classifier.Prediction) -> str:
42 def format_prediction_message(self, prediction: Prediction) -> str: 43 labels = prediction.labels 44 probabilities = prediction.probabilities 45 46 # One label, list of scalars 47 if np.isscalar(probabilities[0]): 48 return self.format_constituent_prediction_string(labels[0], probabilities) 49 50 # One or more label, nested list of scalars 51 constituent_prediction_strings = [] 52 for label_index in range(len(labels)): 53 label = int(labels[label_index]) 54 label_probabilities = probabilities[label_index] 55 56 constituent_prediction_strings.append( 57 self.format_constituent_prediction_string(label, label_probabilities) 58 ) 59 60 return ",".join(constituent_prediction_strings)
def
format_constituent_prediction_string( self, label: int, probabilities: list | numpy.ndarray, probability_precision: int = 4) -> str:
62 def format_constituent_prediction_string( 63 self, 64 label: int, 65 probabilities: list | np.ndarray, 66 probability_precision: int = 4, 67 ) -> str: 68 probability_format = "%.{}f".format(probability_precision) 69 probabilities_string = " ".join([probability_format % p for p in probabilities]) 70 71 return "%s:[%s]" % (str(label), probabilities_string)