Skip to content

visuals

sleap.io.visuals

Module for generating videos with visual annotation overlays.

Classes:

Name Description
VideoMarkerThread

Annotate frame images (draw instances).

Functions:

Name Description
img_to_cv

Prepares frame image as needed for opencv.

save_labeled_video

Function to generate and save video with annotations.

VideoMarkerThread

Bases: Thread

Annotate frame images (draw instances).

Parameters:

Name Type Description Default
in_q Queue

Queue with (list of frame indexes, ndarray of frame images).

required
out_q Queue

Queue to send annotated images as (images, h, w, channels) ndarray.

required
labels Labels

the Labels object from which to get data for annotating.

required
video_idx int

index of Video in labels.videos list.

required
scale float

scale of image (so we can scale point locations to match)

required
show_edges bool

whether to draw lines between nodes

True
color_manager Optional[ColorManager]

ColorManager object which determine what colors to use for what instance/node/edge

None
Source code in sleap/io/visuals.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
class VideoMarkerThread(Thread):
    """Annotate frame images (draw instances).

    Args:
        in_q: Queue with (list of frame indexes, ndarray of frame images).
        out_q: Queue to send annotated images as
            (images, h, w, channels) ndarray.
        labels: the `Labels` object from which to get data for annotating.
        video_idx: index of `Video` in `labels.videos` list.
        scale: scale of image (so we can scale point locations to match)
        show_edges: whether to draw lines between nodes
        color_manager: ColorManager object which determine what colors to use
            for what instance/node/edge
    """

    def __init__(
        self,
        in_q: Queue,
        out_q: Queue,
        labels: Labels,
        video_idx: int,
        scale: float,
        show_edges: bool = True,
        edge_is_wedge: bool = False,
        marker_size: int = 4,
        crop_size_xy: Optional[Tuple[int, int]] = None,
        color_manager: Optional[ColorManager] = None,
        palette: str = "standard",
        distinctly_color: str = "instances",
    ):
        super(VideoMarkerThread, self).__init__()
        self.in_q = in_q
        self.out_q = out_q
        self.labels = labels
        self.video_idx = video_idx
        self.scale = scale
        self.show_edges = show_edges
        self.edge_is_wedge = edge_is_wedge

        if color_manager is None:
            color_manager = ColorManager(labels=labels, palette=palette)
            color_manager.color_predicted = True
            color_manager.distinctly_color = distinctly_color

        self.color_manager = color_manager

        self.node_line_width = self.color_manager.get_item_type_pen_width("node")
        self.edge_line_width = self.color_manager.get_item_type_pen_width("edge")

        # fixme: these widths are based on *screen* pixels, so we'll adjust
        #  them since we want *video* pixels.
        self.node_line_width = max(1, self.node_line_width // 2)
        self.edge_line_width = max(1, self.node_line_width // 2)

        self.marker_radius = max(1, int(marker_size // (1 / scale)))

        self.edge_line_width *= 2
        self.marker_radius *= 2
        self.alpha = 0.6

        self.crop = False
        if crop_size_xy:
            self.crop = True
            self.crop_w, self.crop_h = crop_size_xy
            self._crop_centers = deque(maxlen=5)  # use running avg for smoother crops
        else:
            self.crop_h = 0
            self.crop_w = 0
            self._crop_centers = []

    def run(self):
        # when thread starts, start loop to receive images (from reader),
        # draw things on the images, and pass them along (to writer)
        self.marker()

    def marker(self):
        cv2.setNumThreads(usable_cpu_count())

        try:
            chunk_i = 0
            while True:
                data = self.in_q.get()

                if data is _sentinel:
                    # no more data to be received so stop
                    self.in_q.put(_sentinel)
                    break

                frames_idx_chunk, video_frame_images = data

                t0 = perf_counter()

                imgs = self._mark_images(
                    frame_indices=frames_idx_chunk,
                    frame_images=video_frame_images,
                )

                elapsed = perf_counter() - t0
                fps = len(imgs) / elapsed
                logger.debug(f"drawing chunk {chunk_i} in {elapsed} s = {fps} fps")
                chunk_i += 1
                self.out_q.put(imgs)
        except Exception as e:
            # Stop receiving data
            self.in_q.put(_sentinel)
            raise e

        finally:
            # Send _sentinal object into queue to signal that we're done
            self.out_q.put(_sentinel)

    def _mark_images(self, frame_indices, frame_images):
        imgs = []
        for i, frame_idx in enumerate(frame_indices):
            img = self._mark_single_frame(
                video_frame=frame_images[i], frame_idx=frame_idx
            )

            imgs.append(img)
        return imgs

    def _mark_single_frame(self, video_frame: np.ndarray, frame_idx: int) -> np.ndarray:
        """Return single annotated frame image.

        Args:
            video_frame: The ndarray of the frame image.
            frame_idx: Index of frame in video.

        Returns:
            ndarray of frame image with visual annotations added.
        """
        # Use OpenCV to convert to BGR color image
        video_frame = img_to_cv(video_frame)

        # Add the instances to the image
        overlay = self._plot_instances_cv(video_frame.copy(), frame_idx)

        # Crop video_frame to same size as overlay
        video_frame_cropped = (
            self._crop_frame(video_frame.copy())[0] if self.crop else video_frame
        )

        return cv2.addWeighted(
            overlay, self.alpha, video_frame_cropped, 1 - self.alpha, 0
        )

    def _plot_instances_cv(
        self,
        img: np.ndarray,
        frame_idx: int,
    ) -> np.ndarray:
        """Add visual annotations to single frame image.

        Args:
            img: The ndarray of the frame image.
            frame_idx: Index of frame in video.

        Returns:
            ndarray of frame image with visual annotations added.
        """
        labels = self.labels
        video_idx = self.video_idx

        lfs = labels.find(labels.videos[video_idx], frame_idx)

        if len(lfs) == 0:
            return self._crop_frame(img)[0] if self.crop else img

        instances = get_instances_to_show(lfs[0])

        offset = None
        if self.crop:
            img, offset = self._crop_frame(img, instances)

        for instance in instances:
            self._plot_instance_cv(img, instance, offset)

        return img

    def _get_crop_center(
        self, img: np.ndarray, instances: Optional[List["Instance"]] = None
    ) -> Tuple[int, int]:
        if instances:
            centroids = np.array(
                [np.nanmedian(inst.numpy(), axis=0) for inst in instances]
            )
            center_xy = np.nanmedian(centroids, axis=0)
            self._crop_centers.append(center_xy)

        elif not self._crop_centers:
            # no crops so far and no instances yet so just use image center
            img_w, img_h = img.shape[:2]
            center_xy = img_w // 2, img_h // 2

            self._crop_centers.append(center_xy)

        # use a running average of the last N centers to smooth movement
        center_xy = tuple(np.nanmean(np.stack(self._crop_centers), axis=0))

        return center_xy

    def _crop_frame(
        self, img: np.ndarray, instances: Optional[List["Instance"]] = None
    ) -> Tuple[np.ndarray, Tuple[int, int]]:
        center_xy = self._get_crop_center(img, instances)
        return self._crop_img(img, center_xy)

    def _crop_img(
        self, img: np.ndarray, center_xy: Tuple[int, int]
    ) -> Tuple[np.ndarray, Tuple[int, int]]:
        img_w, img_h = img.shape[:2]  # fixme?
        center_x, center_y = center_xy

        # Adjust center (on original coordinates) to scaled image coordinages
        center_x = center_x // (1 / self.scale)
        center_y = center_y // (1 / self.scale)

        # Find center, ensuring we're within top/left bounds for image
        crop_x0 = max(0, int(center_x - self.crop_w // 2))
        crop_y0 = max(0, int(center_y - self.crop_h // 2))

        # And ensure that we're within bottom/right bounds for image
        if crop_x0 + self.crop_w > img_w:
            crop_x0 = img_w - self.crop_w
        if crop_y0 + self.crop_h > img_h:
            crop_y0 = img_h - self.crop_h

        offset = crop_x0, crop_y0
        crop_x1 = crop_x0 + self.crop_w
        crop_y1 = crop_y0 + self.crop_h

        img = img[crop_y0:crop_y1, crop_x0:crop_x1, ...]

        return img, offset

    def _plot_instance_cv(
        self,
        img: np.ndarray,
        instance: "Instance",
        offset: Optional[Tuple[int, int]] = None,
        fill: bool = True,
    ):
        """
        Add visual annotations for single instance.

        Args:
            img: The ndarray of the frame image.
            instance: The :class:`Instance` to add to frame image.

        Returns:
            None; modifies img in place.
        """

        scale = self.scale
        nodes = instance.skeleton.nodes

        # Get matrix of all point locations
        from sleap.sleap_io_adaptors.instance_utils import instance_get_points_array

        points_array = instance_get_points_array(instance)

        # Rescale point locations
        points_array *= scale

        # Shift point locations (offset is for *scaled* coordinates)
        if offset:
            points_array -= offset

        for node_idx, (x, y) in enumerate(points_array):
            node = nodes[node_idx]
            node_color_bgr = self.color_manager.get_item_color(node, instance)[::-1]

            # Make sure this is a valid and visible point
            if not has_nans(x, y):
                # Convert to ints for opencv (now that we know these aren't nans)
                x, y = int(x), int(y)

                # Draw circle to mark node
                cv2.circle(
                    img=img,
                    center=(x, y),
                    radius=int(self.marker_radius),
                    color=node_color_bgr,
                    thickness=cv2.FILLED if fill else self.node_line_width,
                    lineType=cv2.FILLED if fill else cv2.LINE_AA,
                )

        if self.show_edges:
            for src, dst in instance.skeleton.edge_inds:
                # Get points for the nodes connected by this edge
                src_x, src_y = points_array[src]
                dst_x, dst_y = points_array[dst]

                edge = (nodes[src], nodes[dst])
                edge_color_bgr = self.color_manager.get_item_color(edge, instance)[::-1]

                # Make sure that both nodes are present in this instance before
                # drawing edge
                if not has_nans(src_x, src_y, dst_x, dst_y):
                    # Convert to ints for opencv
                    src_x, src_y = int(src_x), int(src_y)
                    dst_x, dst_y = int(dst_x), int(dst_y)

                    if self.edge_is_wedge:
                        r = self.marker_radius / 2

                        # Get vector from source to destination
                        vec_x = dst_x - src_x
                        vec_y = dst_y - src_y
                        mag = (pow(vec_x, 2) + pow(vec_y, 2)) ** 0.5
                        vec_x = int(r * vec_x / mag)
                        vec_y = int(r * vec_y / mag)

                        # Define the wedge
                        src_1 = [src_x - vec_y, src_y + vec_x]
                        dst = [dst_x, dst_y]
                        src_2 = [src_x + vec_y, src_y - vec_x]
                        pts = np.array([src_1, dst, src_2])

                        # Draw the wedge
                        cv2.fillPoly(
                            img=img,
                            pts=[pts],
                            color=edge_color_bgr,
                            lineType=cv2.LINE_AA,
                        )

                    else:
                        # Draw line to mark edge between nodes
                        cv2.line(
                            img=img,
                            pt1=(src_x, src_y),
                            pt2=(dst_x, dst_y),
                            color=edge_color_bgr,
                            thickness=int(self.edge_line_width),
                            lineType=cv2.LINE_AA,
                        )

img_to_cv(img)

Prepares frame image as needed for opencv.

Source code in sleap/io/visuals.py
480
481
482
483
484
485
486
487
488
def img_to_cv(img: np.ndarray) -> np.ndarray:
    """Prepares frame image as needed for opencv."""
    # Convert RGB to BGR for OpenCV
    if img.shape[-1] == 3:
        img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
    # Convert grayscale to BGR
    elif img.shape[-1] == 1:
        img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
    return img

save_labeled_video(filename, labels, video, frames, fps=15, scale=1.0, crop_size_xy=None, background='original', show_edges=True, edge_is_wedge=False, marker_size=4, color_manager=None, palette='standard', distinctly_color='instances', gui_progress=False, chunk_size=64)

Function to generate and save video with annotations.

Parameters:

Name Type Description Default
filename str

Output filename.

required
labels Labels

The dataset from which to get data.

required
video Video

The source :class:Video we want to annotate.

required
frames list[int]

List of frames to include in output video.

required
fps int

Frames per second for output video.

15
scale float

scale of image (so we can scale point locations to match)

1.0
crop_size_xy tuple[int, int] | None

size of crop around instances, or None for full images

None
background str

output video background. Either original, black, white, grey

'original'
show_edges bool

whether to draw lines between nodes

True
edge_is_wedge bool

whether to draw edges as wedges (draw as line if False)

False
marker_size int

Size of marker in pixels before scaling by scale

4
color_manager ColorManager | None

ColorManager object which determine what colors to use for what instance/node/edge

None
palette str

SLEAP color palette to use. Options include: "alphabet", "five+", "solarized", or "standard". Only used if color_manager is None.

'standard'
distinctly_color str

Specify how to color instances. Options include: "instances", "edges", and "nodes". Only used if color_manager is None.

'instances'
gui_progress bool

Whether to show Qt GUI progress dialog.

False

Returns:

Type Description

None.

Source code in sleap/io/visuals.py
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
def save_labeled_video(
    filename: str,
    labels: Labels,
    video: Video,
    frames: list[int],
    fps: int = 15,
    scale: float = 1.0,
    crop_size_xy: tuple[int, int] | None = None,
    background: str = "original",
    show_edges: bool = True,
    edge_is_wedge: bool = False,
    marker_size: int = 4,
    color_manager: ColorManager | None = None,
    palette: str = "standard",
    distinctly_color: str = "instances",
    gui_progress: bool = False,
    chunk_size: int = 64,
):
    """Function to generate and save video with annotations.

    Args:
        filename: Output filename.
        labels: The dataset from which to get data.
        video: The source :class:`Video` we want to annotate.
        frames: List of frames to include in output video.
        fps: Frames per second for output video.
        scale: scale of image (so we can scale point locations to match)
        crop_size_xy: size of crop around instances, or None for full images
        background: output video background. Either original, black, white, grey
        show_edges: whether to draw lines between nodes
        edge_is_wedge: whether to draw edges as wedges (draw as line if False)
        marker_size: Size of marker in pixels before scaling by `scale`
        color_manager: ColorManager object which determine what colors to use
            for what instance/node/edge
        palette: SLEAP color palette to use. Options include: "alphabet", "five+",
            "solarized", or "standard". Only used if `color_manager` is None.
        distinctly_color: Specify how to color instances. Options include: "instances",
            "edges", and "nodes". Only used if `color_manager` is None.
        gui_progress: Whether to show Qt GUI progress dialog.

    Returns:
        None.
    """
    # Create marker thread and queues.
    in_q = Queue(maxsize=10)
    out_q = Queue(maxsize=10)

    marker_thread = VideoMarkerThread(
        in_q=in_q,
        out_q=out_q,
        labels=labels,
        video_idx=labels.videos.index(video),
        scale=scale,
        show_edges=show_edges,
        edge_is_wedge=edge_is_wedge,
        marker_size=marker_size,
        crop_size_xy=crop_size_xy,
        color_manager=color_manager,
        palette=palette,
        distinctly_color=distinctly_color,
    )
    marker_thread.start()

    # Send frames to marker thread via input queue
    for i0 in range(0, len(frames), chunk_size):
        i1 = min(i0 + chunk_size, len(frames))
        frame_inds = frames[i0:i1]
        frame_imgs = video[frame_inds]
        in_q.put((frame_inds, frame_imgs))
    in_q.put(_sentinel)  # Signal end of input

    # Collect annotated frames from the output queue
    annotated_frames = []
    while True:
        imgs = out_q.get()
        if imgs is _sentinel:
            break
        annotated_frames.extend(imgs)

    marker_thread.join()

    # Pass marker thread in as intrmediate thread to write_video (and write video).
    # intermediate_threads = [thread_mark]

    # Save video at end after getting annotated frames
    save_video(
        frames=annotated_frames,
        filename=filename,
        fps=fps,
    )  # TODO: add other parameters