bci_essentials.io.lsl_messenger

 1from mne_lsl.lsl import StreamInfo, StreamOutlet
 2from .messenger import Messenger
 3from ..classification.generic_classifier import Prediction
 4
 5__all__ = ["LslMessenger"]
 6
 7
 8class LslMessenger(Messenger):
 9    """A Messenger object for sending event messages to an LSL outlet."""
10
11    def __init__(self):
12        """Create an LslMessenger object.
13
14        If the LSL outlet cannot be created, an exception is raised."""
15        try:
16            info = StreamInfo(
17                name="PythonResponse",
18                stype="BCI",
19                n_channels=1,
20                sfreq=0,  # 0 means irregular rate
21                dtype="string",
22                source_id="pyp30042",
23            )
24            self.__outlet = StreamOutlet(info)
25            self.__outlet.push_sample(["This is the python response stream"])
26        except Exception:
27            raise Exception("LslMessenger: could not create outlet")
28
29    def ping(self):
30        self.__outlet.push_sample(["ping"])
31
32    def marker_received(self, marker):
33        self.__outlet.push_sample(["marker received : {}".format(marker)])
34
35    def prediction(self, prediction: Prediction):
36        # probability isn't used by Unity at this time
37        self.__outlet.push_sample(["{}".format(prediction.labels)])
class LslMessenger(bci_essentials.io.messenger.Messenger):
 9class LslMessenger(Messenger):
10    """A Messenger object for sending event messages to an LSL outlet."""
11
12    def __init__(self):
13        """Create an LslMessenger object.
14
15        If the LSL outlet cannot be created, an exception is raised."""
16        try:
17            info = StreamInfo(
18                name="PythonResponse",
19                stype="BCI",
20                n_channels=1,
21                sfreq=0,  # 0 means irregular rate
22                dtype="string",
23                source_id="pyp30042",
24            )
25            self.__outlet = StreamOutlet(info)
26            self.__outlet.push_sample(["This is the python response stream"])
27        except Exception:
28            raise Exception("LslMessenger: could not create outlet")
29
30    def ping(self):
31        self.__outlet.push_sample(["ping"])
32
33    def marker_received(self, marker):
34        self.__outlet.push_sample(["marker received : {}".format(marker)])
35
36    def prediction(self, prediction: Prediction):
37        # probability isn't used by Unity at this time
38        self.__outlet.push_sample(["{}".format(prediction.labels)])

A Messenger object for sending event messages to an LSL outlet.

LslMessenger()
12    def __init__(self):
13        """Create an LslMessenger object.
14
15        If the LSL outlet cannot be created, an exception is raised."""
16        try:
17            info = StreamInfo(
18                name="PythonResponse",
19                stype="BCI",
20                n_channels=1,
21                sfreq=0,  # 0 means irregular rate
22                dtype="string",
23                source_id="pyp30042",
24            )
25            self.__outlet = StreamOutlet(info)
26            self.__outlet.push_sample(["This is the python response stream"])
27        except Exception:
28            raise Exception("LslMessenger: could not create outlet")

Create an LslMessenger object.

If the LSL outlet cannot be created, an exception is raised.

def ping(self):
30    def ping(self):
31        self.__outlet.push_sample(["ping"])

Indicate that sender is alive

def marker_received(self, marker):
33    def marker_received(self, marker):
34        self.__outlet.push_sample(["marker received : {}".format(marker)])

Acknowledge that a marker was processed

def prediction( self, prediction: bci_essentials.classification.generic_classifier.Prediction):
36    def prediction(self, prediction: Prediction):
37        # probability isn't used by Unity at this time
38        self.__outlet.push_sample(["{}".format(prediction.labels)])

Send latest prediction