Skip to content

message

sleap.message

Module with classes for sending and receiving messages between processes.

These use ZMQ pub/sub sockets.

Most of the time you'll want the PairedSender and PairedReceiver. These support a "handshake" to confirm connection. Without an initial handshake there's a good chance early messages will be dropped.

Each message is either dictionary or dictionary + numpy ndarray.

Classes:

Name Description
BaseMessageParticipant

Base class for simple Sender and Receiver.

PairedReceiver
PairedSender
Receiver

Receives messages from corresponding Sender.

Sender

Publishes messages to corresponding Receiver.

BaseMessageParticipant

Base class for simple Sender and Receiver.

Source code in sleap/message.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@attr.s(auto_attribs=True)
class BaseMessageParticipant:
    """Base class for simple Sender and Receiver."""

    address: Text = "tcp://127.0.0.1:9001"
    context: Optional[zmq.Context] = None
    _socket: Optional[zmq.Socket] = None

    def __attrs_post_init__(self):
        if self.context is None:
            self._owns_context = True
            self.context = zmq.Context()
        else:
            self._owns_context = False

    def __del__(self):
        if self._owns_context and self.context is not None:
            self.context.term()

PairedReceiver

Bases: PairedMessageParticipant

Methods:

Name Description
check_messages

Checks for messages.

receive_handshake

Waits to receive and acknowledge handshake message.

Source code in sleap/message.py
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
@attr.s(auto_attribs=True)
class PairedReceiver(PairedMessageParticipant):
    connected: bool = False

    @classmethod
    def from_defaults(cls):
        return cls.from_tcp_ports(9002, 9001)

    def receive_handshake(self, timeout_sec=30):
        """Waits to receive and acknowledge handshake message."""
        wait_till = time.time() + timeout_sec
        while time.time() < wait_till and not self.connected:
            message = self._receiver.check_message(fresh=True)

            if message is None:
                continue
            if self._is_handshake(message):
                self._respond_to_handshake()
                return True
            else:
                self._receiver.push_back_message(message)
                return True
        return False

    def _respond_to_handshake(self):
        self._sender.send_dict(dict(type="handshake reply"))
        self.connected = True

    def _is_handshake(self, message: Any):
        if message:
            return message.get("type", "") == "handshake request"
        return False

    def check_messages(self, ack_handshakes: bool = True, *args, **kwargs):
        """
        Checks for messages.

        Args:
            ack_handshakes: If True, then any handshake messages are
                acknowledged and aren't included in return results

        Results:
            List of messages, possibly excluding any handshake requests.
        """
        messages = self._receiver.check_messages(*args, **kwargs)

        if ack_handshakes:
            non_handshakes = [m for m in messages if not self._is_handshake(m)]
            if len(non_handshakes) < len(messages):
                self._respond_to_handshake()
                messages = non_handshakes

        return messages

check_messages(ack_handshakes=True, *args, **kwargs)

Checks for messages.

Parameters:

Name Type Description Default
ack_handshakes bool

If True, then any handshake messages are acknowledged and aren't included in return results

True
Results

List of messages, possibly excluding any handshake requests.

Source code in sleap/message.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
def check_messages(self, ack_handshakes: bool = True, *args, **kwargs):
    """
    Checks for messages.

    Args:
        ack_handshakes: If True, then any handshake messages are
            acknowledged and aren't included in return results

    Results:
        List of messages, possibly excluding any handshake requests.
    """
    messages = self._receiver.check_messages(*args, **kwargs)

    if ack_handshakes:
        non_handshakes = [m for m in messages if not self._is_handshake(m)]
        if len(non_handshakes) < len(messages):
            self._respond_to_handshake()
            messages = non_handshakes

    return messages

receive_handshake(timeout_sec=30)

Waits to receive and acknowledge handshake message.

Source code in sleap/message.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
def receive_handshake(self, timeout_sec=30):
    """Waits to receive and acknowledge handshake message."""
    wait_till = time.time() + timeout_sec
    while time.time() < wait_till and not self.connected:
        message = self._receiver.check_message(fresh=True)

        if message is None:
            continue
        if self._is_handshake(message):
            self._respond_to_handshake()
            return True
        else:
            self._receiver.push_back_message(message)
            return True
    return False

PairedSender

Bases: PairedMessageParticipant

Methods:

Name Description
send_handshake

Send handshake until we get reply.

Source code in sleap/message.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
@attr.s(auto_attribs=True)
class PairedSender(PairedMessageParticipant):
    connected: bool = False

    @classmethod
    def from_defaults(cls):
        return cls.from_tcp_ports(9001, 9002)

    def send_handshake(self, timeout_sec=30):
        """Send handshake until we get reply."""
        wait_till = time.time() + timeout_sec
        while time.time() < wait_till:
            self._sender.send_dict(dict(type="handshake request"))
            reply = self._receiver.check_message()
            if self._is_handshake_reply(reply):
                return True
            else:
                # currently we drop replies until handshake is acknowledged
                pass
            time.sleep(0.1)
        return False

    def _is_handshake_reply(self, message: Any) -> bool:
        if message:
            return message.get("type", "") == "handshake reply"
        return False

    def send_dict(self, *args, **kwargs):
        self._sender.send_dict(*args, **kwargs)

    def send_array(self, *args, **kwargs):
        self._sender.send_array(*args, **kwargs)

send_handshake(timeout_sec=30)

Send handshake until we get reply.

Source code in sleap/message.py
185
186
187
188
189
190
191
192
193
194
195
196
197
def send_handshake(self, timeout_sec=30):
    """Send handshake until we get reply."""
    wait_till = time.time() + timeout_sec
    while time.time() < wait_till:
        self._sender.send_dict(dict(type="handshake request"))
        reply = self._receiver.check_message()
        if self._is_handshake_reply(reply):
            return True
        else:
            # currently we drop replies until handshake is acknowledged
            pass
        time.sleep(0.1)
    return False

Receiver

Bases: BaseMessageParticipant

Receives messages from corresponding Sender.

Methods:

Name Description
check_message

Attempt to receive a single message.

check_messages

Attempt to receive multiple messages.

push_back_message

Act like we didn't receive this message yet.

Source code in sleap/message.py
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
@attr.s(auto_attribs=True)
class Receiver(BaseMessageParticipant):
    """Receives messages from corresponding Sender."""

    _message_queue: List[Any] = attr.ib(factory=list)

    def setup(self):
        self._socket = self.context.socket(zmq.SUB)
        self._socket.subscribe("")
        self._socket.bind(self.address)

    def __del__(self):
        if self._socket is not None:
            self._socket.unbind(self._socket.LAST_ENDPOINT)
            self._socket.close()
            self._socket = None

    def push_back_message(self, message):
        """Act like we didn't receive this message yet."""
        self._message_queue.append(message)

    def _recv(self, flags=0, copy=True, track=False):
        json_message = self._socket.recv_json(flags=flags)

        if "dtype" in json_message and "shape" in json_message:
            msg = self._socket.recv(flags=flags, copy=copy, track=track)
            buf = memoryview(msg)
            A = np.frombuffer(buf, dtype=json_message["dtype"]).reshape(
                json_message["shape"]
            )
            json_message["ndarray"] = A

        return json_message

    def check_message(self, timeout: int = 10, fresh: bool = False) -> Any:
        """Attempt to receive a single message."""
        if self._message_queue and not fresh:
            return self._message_queue.pop(0)

        if self._socket is None:
            self.setup()

        if self._socket and self._socket.poll(timeout, zmq.POLLIN):
            return self._recv()
        else:
            return None

    def check_messages(self, timeout: int = 10, times_to_check: int = 10) -> List[dict]:
        """
        Attempt to receive multiple messages.

        This method allows us to keep up with the messages by getting
        multiple messages that have been sent since the last check.
        It keeps checking until limit is reached *or* we check without
        getting any messages back.
        """
        messages = []

        # keep looping until we don't receive a message or have checked enough times
        while True:
            this_message = self.check_message(timeout)

            # if we didn't get a message, we're done checking
            if this_message is None:
                return messages

            # we got a message so add it to list
            messages.append(this_message)

            # if we've checked enough times, we're done checking
            if times_to_check <= 0:
                return messages

            # count down the number of times to check for messages
            times_to_check -= 1

check_message(timeout=10, fresh=False)

Attempt to receive a single message.

Source code in sleap/message.py
75
76
77
78
79
80
81
82
83
84
85
86
def check_message(self, timeout: int = 10, fresh: bool = False) -> Any:
    """Attempt to receive a single message."""
    if self._message_queue and not fresh:
        return self._message_queue.pop(0)

    if self._socket is None:
        self.setup()

    if self._socket and self._socket.poll(timeout, zmq.POLLIN):
        return self._recv()
    else:
        return None

check_messages(timeout=10, times_to_check=10)

Attempt to receive multiple messages.

This method allows us to keep up with the messages by getting multiple messages that have been sent since the last check. It keeps checking until limit is reached or we check without getting any messages back.

Source code in sleap/message.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def check_messages(self, timeout: int = 10, times_to_check: int = 10) -> List[dict]:
    """
    Attempt to receive multiple messages.

    This method allows us to keep up with the messages by getting
    multiple messages that have been sent since the last check.
    It keeps checking until limit is reached *or* we check without
    getting any messages back.
    """
    messages = []

    # keep looping until we don't receive a message or have checked enough times
    while True:
        this_message = self.check_message(timeout)

        # if we didn't get a message, we're done checking
        if this_message is None:
            return messages

        # we got a message so add it to list
        messages.append(this_message)

        # if we've checked enough times, we're done checking
        if times_to_check <= 0:
            return messages

        # count down the number of times to check for messages
        times_to_check -= 1

push_back_message(message)

Act like we didn't receive this message yet.

Source code in sleap/message.py
58
59
60
def push_back_message(self, message):
    """Act like we didn't receive this message yet."""
    self._message_queue.append(message)

Sender

Bases: BaseMessageParticipant

Publishes messages to corresponding Receiver.

Methods:

Name Description
send_array

Sends dictionary + numpy ndarray.

send_dict

Sends dictionary.

Source code in sleap/message.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
@attr.s(auto_attribs=True)
class Sender(BaseMessageParticipant):
    """Publishes messages to corresponding Receiver."""

    def setup(self):
        self._socket = self.context.socket(zmq.PUB)
        self._socket.connect(self.address)

    def __del__(self):
        self._socket.setsockopt(zmq.LINGER, 0)
        self._socket.close()
        super().__del__()

    def send_dict(self, data: dict):
        """Sends dictionary."""
        if self._socket is None:
            self.setup()
        self._socket.send_json(data)

    def send_array(
        self, header_data: dict, A: np.ndarray, flags=0, copy=True, track=False
    ):
        """Sends dictionary + numpy ndarray."""
        if self._socket is None:
            self.setup()

        header_data["dtype"] = str(A.dtype)
        header_data["shape"] = A.shape

        self._socket.send_json(header_data, flags | zmq.SNDMORE)
        return self._socket.send(A, flags, copy=copy, track=track)

send_array(header_data, A, flags=0, copy=True, track=False)

Sends dictionary + numpy ndarray.

Source code in sleap/message.py
137
138
139
140
141
142
143
144
145
146
147
148
def send_array(
    self, header_data: dict, A: np.ndarray, flags=0, copy=True, track=False
):
    """Sends dictionary + numpy ndarray."""
    if self._socket is None:
        self.setup()

    header_data["dtype"] = str(A.dtype)
    header_data["shape"] = A.shape

    self._socket.send_json(header_data, flags | zmq.SNDMORE)
    return self._socket.send(A, flags, copy=copy, track=track)

send_dict(data)

Sends dictionary.

Source code in sleap/message.py
131
132
133
134
135
def send_dict(self, data: dict):
    """Sends dictionary."""
    if self._socket is None:
        self.setup()
    self._socket.send_json(data)