bci_essentials.io.lsl_messenger

 1from mne_lsl.lsl import StreamInfo, StreamOutlet
 2from .messenger import Messenger
 3from ..classification.generic_classifier import Prediction
 4import numpy as np
 5
 6__all__ = ["LslMessenger"]
 7
 8
 9class LslMessenger(Messenger):
10    """A Messenger object for sending event messages to an LSL outlet."""
11
12    def __init__(self):
13        """Create an LslMessenger object.
14
15        If the LSL outlet cannot be created, an exception is raised."""
16        try:
17            info = StreamInfo(
18                name="PythonResponse",
19                stype="BCI_Essentials_Predictions",
20                n_channels=1,
21                sfreq=0,  # 0 means irregular rate
22                dtype="string",
23                source_id="pyp30042",
24            )
25            self.__outlet = StreamOutlet(info)
26            self.__outlet.push_sample(["This is the python response stream"])
27        except Exception:
28            raise Exception("LslMessenger: could not create outlet")
29
30    def ping(self):
31        self.__outlet.push_sample(["ping"])
32
33    def marker_received(self, marker):
34        # ignore
35        pass
36
37    def prediction(self, prediction: Prediction):
38        prediction_message = self.format_prediction_message(prediction)
39        self.__outlet.push_sample([prediction_message])
40
41    def format_prediction_message(self, prediction: Prediction) -> str:
42        labels = prediction.labels
43        probabilities = prediction.probabilities
44
45        # One label, list of scalars
46        if np.isscalar(probabilities[0]):
47            return self.format_constituent_prediction_string(labels[0], probabilities)
48
49        # One or more label, nested list of scalars
50        constituent_prediction_strings = []
51        for label_index in range(len(labels)):
52            label = int(labels[label_index])
53            label_probabilities = probabilities[label_index]
54
55            constituent_prediction_strings.append(
56                self.format_constituent_prediction_string(label, label_probabilities)
57            )
58
59        return ",".join(constituent_prediction_strings)
60
61    def format_constituent_prediction_string(
62        self,
63        label: int,
64        probabilities: list | np.ndarray,
65        probability_precision: int = 4,
66    ) -> str:
67        probability_format = "%.{}f".format(probability_precision)
68        probabilities_string = " ".join([probability_format % p for p in probabilities])
69
70        return "%s:[%s]" % (str(label), probabilities_string)
class LslMessenger(bci_essentials.io.messenger.Messenger):
10class LslMessenger(Messenger):
11    """A Messenger object for sending event messages to an LSL outlet."""
12
13    def __init__(self):
14        """Create an LslMessenger object.
15
16        If the LSL outlet cannot be created, an exception is raised."""
17        try:
18            info = StreamInfo(
19                name="PythonResponse",
20                stype="BCI_Essentials_Predictions",
21                n_channels=1,
22                sfreq=0,  # 0 means irregular rate
23                dtype="string",
24                source_id="pyp30042",
25            )
26            self.__outlet = StreamOutlet(info)
27            self.__outlet.push_sample(["This is the python response stream"])
28        except Exception:
29            raise Exception("LslMessenger: could not create outlet")
30
31    def ping(self):
32        self.__outlet.push_sample(["ping"])
33
34    def marker_received(self, marker):
35        # ignore
36        pass
37
38    def prediction(self, prediction: Prediction):
39        prediction_message = self.format_prediction_message(prediction)
40        self.__outlet.push_sample([prediction_message])
41
42    def format_prediction_message(self, prediction: Prediction) -> str:
43        labels = prediction.labels
44        probabilities = prediction.probabilities
45
46        # One label, list of scalars
47        if np.isscalar(probabilities[0]):
48            return self.format_constituent_prediction_string(labels[0], probabilities)
49
50        # One or more label, nested list of scalars
51        constituent_prediction_strings = []
52        for label_index in range(len(labels)):
53            label = int(labels[label_index])
54            label_probabilities = probabilities[label_index]
55
56            constituent_prediction_strings.append(
57                self.format_constituent_prediction_string(label, label_probabilities)
58            )
59
60        return ",".join(constituent_prediction_strings)
61
62    def format_constituent_prediction_string(
63        self,
64        label: int,
65        probabilities: list | np.ndarray,
66        probability_precision: int = 4,
67    ) -> str:
68        probability_format = "%.{}f".format(probability_precision)
69        probabilities_string = " ".join([probability_format % p for p in probabilities])
70
71        return "%s:[%s]" % (str(label), probabilities_string)

A Messenger object for sending event messages to an LSL outlet.

LslMessenger()
13    def __init__(self):
14        """Create an LslMessenger object.
15
16        If the LSL outlet cannot be created, an exception is raised."""
17        try:
18            info = StreamInfo(
19                name="PythonResponse",
20                stype="BCI_Essentials_Predictions",
21                n_channels=1,
22                sfreq=0,  # 0 means irregular rate
23                dtype="string",
24                source_id="pyp30042",
25            )
26            self.__outlet = StreamOutlet(info)
27            self.__outlet.push_sample(["This is the python response stream"])
28        except Exception:
29            raise Exception("LslMessenger: could not create outlet")

Create an LslMessenger object.

If the LSL outlet cannot be created, an exception is raised.

def ping(self):
31    def ping(self):
32        self.__outlet.push_sample(["ping"])

Indicate that sender is alive

def marker_received(self, marker):
34    def marker_received(self, marker):
35        # ignore
36        pass

Acknowledge that a marker was processed

def prediction( self, prediction: bci_essentials.classification.generic_classifier.Prediction):
38    def prediction(self, prediction: Prediction):
39        prediction_message = self.format_prediction_message(prediction)
40        self.__outlet.push_sample([prediction_message])

Send latest prediction

def format_prediction_message( self, prediction: bci_essentials.classification.generic_classifier.Prediction) -> str:
42    def format_prediction_message(self, prediction: Prediction) -> str:
43        labels = prediction.labels
44        probabilities = prediction.probabilities
45
46        # One label, list of scalars
47        if np.isscalar(probabilities[0]):
48            return self.format_constituent_prediction_string(labels[0], probabilities)
49
50        # One or more label, nested list of scalars
51        constituent_prediction_strings = []
52        for label_index in range(len(labels)):
53            label = int(labels[label_index])
54            label_probabilities = probabilities[label_index]
55
56            constituent_prediction_strings.append(
57                self.format_constituent_prediction_string(label, label_probabilities)
58            )
59
60        return ",".join(constituent_prediction_strings)
def format_constituent_prediction_string( self, label: int, probabilities: list | numpy.ndarray, probability_precision: int = 4) -> str:
62    def format_constituent_prediction_string(
63        self,
64        label: int,
65        probabilities: list | np.ndarray,
66        probability_precision: int = 4,
67    ) -> str:
68        probability_format = "%.{}f".format(probability_precision)
69        probabilities_string = " ".join([probability_format % p for p in probabilities])
70
71        return "%s:[%s]" % (str(label), probabilities_string)