Skip to content

feature_suggestions

sleap.info.feature_suggestions

Module for generating lists of frames using frame features, pca, kmeans, etc.

Classes:

Name Description
FrameGroupSet

Class for a set of groups of FrameItem objects.

FrameItem

Just a simple wrapper for (video, frame_idx), plus method to get image.

HogVec
ItemStack

Container for items, each item can "own" one or more rows of data.

ParallelFeaturePipeline

Enables easy per-video pipeline parallelization for feature suggestions.

FrameGroupSet

Bases: object

Class for a set of groups of FrameItem objects.

Each item can have at most one group; each group is represented as an int.

Attributes:

Name Type Description
method str

Label for the method used to generate group set.

item_group Dict[FrameItem, int]

Dictionary which maps each item to its group.

group_data Dict[int, dict]

Dictionary of any extra data for each group; keys are group ids, values are dictionaries of data.

groupset_data Dict

Dictionary for any data about the entire set of groups.

Methods:

Name Description
append_to_group

Adds item to group.

extend_group_items

Adds all items in list to group.

get_item_group

Returns group that contain item.

sample

Returns new FrameGroupSet with groups sampled from current groups.

Source code in sleap/info/feature_suggestions.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
@attr.s(auto_attribs=True)
class FrameGroupSet(object):
    """
    Class for a set of groups of FrameItem objects.

    Each item can have at most one group; each group is represented as an int.

    Attributes:
        method: Label for the method used to generate group set.
        item_group: Dictionary which maps each item to its group.
        group_data: Dictionary of any extra data for each group;
            keys are group ids, values are dictionaries of data.
        groupset_data: Dictionary for any data about the entire set of groups.
    """

    method: str
    item_group: Dict[FrameItem, int] = attr.ib(default=attr.Factory(dict))
    group_data: Dict[int, dict] = attr.ib(default=attr.Factory(dict))
    groupset_data: Dict = attr.ib(default=attr.Factory(dict))

    def append_to_group(self, group: int, item: FrameItem):
        """Adds item to group."""
        self.item_group[item] = group
        if group not in self.group_data:
            self.group_data[group] = dict()

    def extend_group_items(self, group: int, item_list: List[FrameItem]):
        """Adds all items in list to group."""
        for item in item_list:
            self.append_to_group(group, item)

    def get_item_group(self, item: FrameItem):
        """Returns group that contain item."""
        return self.item_group.get(item, None)

    @property
    def groups(self):
        """Iterate over groups, yielding group and list of items."""
        for group in self.group_data.keys():
            item_list = [
                frame_item
                for (frame_item, frame_group) in self.item_group.items()
                if frame_group == group
            ]
            yield group, item_list

    @property
    def all_items(self):
        """Gets list of all items."""
        return list(itertools.chain(self.item_group.keys()))

    def sample(self, per_group: int, unique_samples: bool = True):
        """
        Returns new FrameGroupSet with groups sampled from current groups.

        Note that the order of items in the new groups will not match order of
        items in the groups from which samples are drawn.

        Args:
            per_group: The number of samples to take from each group.
            unique_samples: Whether to ensure that there are no shared items
                in the resulting groups.

        Returns:
            New FrameGroupSet.
        """
        new_groupset = FrameGroupSet(method="sample_groups")
        new_groupset.groupset_data["per_group"] = per_group

        selected_set = set()
        for group, group_item_list in self.groups:
            if unique_samples:
                # Remove items that were already sampled from other groups
                group_item_list = list(set(group_item_list) - selected_set)

            # Sample items from this group
            samples_from_group = np.random.choice(
                group_item_list, min(len(group_item_list), per_group), False
            )

            # Keep track of the items we sampled so far from any group
            selected_set = selected_set.union(set(samples_from_group))

            # Add this sampled group to the new set of groups

            # samples_from_group.sort()
            new_groupset.extend_group_items(group, list(samples_from_group))

        return new_groupset

all_items property

Gets list of all items.

groups property

Iterate over groups, yielding group and list of items.

append_to_group(group, item)

Adds item to group.

Source code in sleap/info/feature_suggestions.py
266
267
268
269
270
def append_to_group(self, group: int, item: FrameItem):
    """Adds item to group."""
    self.item_group[item] = group
    if group not in self.group_data:
        self.group_data[group] = dict()

extend_group_items(group, item_list)

Adds all items in list to group.

Source code in sleap/info/feature_suggestions.py
272
273
274
275
def extend_group_items(self, group: int, item_list: List[FrameItem]):
    """Adds all items in list to group."""
    for item in item_list:
        self.append_to_group(group, item)

get_item_group(item)

Returns group that contain item.

Source code in sleap/info/feature_suggestions.py
277
278
279
def get_item_group(self, item: FrameItem):
    """Returns group that contain item."""
    return self.item_group.get(item, None)

sample(per_group, unique_samples=True)

Returns new FrameGroupSet with groups sampled from current groups.

Note that the order of items in the new groups will not match order of items in the groups from which samples are drawn.

Parameters:

Name Type Description Default
per_group int

The number of samples to take from each group.

required
unique_samples bool

Whether to ensure that there are no shared items in the resulting groups.

True

Returns:

Type Description

New FrameGroupSet.

Source code in sleap/info/feature_suggestions.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
def sample(self, per_group: int, unique_samples: bool = True):
    """
    Returns new FrameGroupSet with groups sampled from current groups.

    Note that the order of items in the new groups will not match order of
    items in the groups from which samples are drawn.

    Args:
        per_group: The number of samples to take from each group.
        unique_samples: Whether to ensure that there are no shared items
            in the resulting groups.

    Returns:
        New FrameGroupSet.
    """
    new_groupset = FrameGroupSet(method="sample_groups")
    new_groupset.groupset_data["per_group"] = per_group

    selected_set = set()
    for group, group_item_list in self.groups:
        if unique_samples:
            # Remove items that were already sampled from other groups
            group_item_list = list(set(group_item_list) - selected_set)

        # Sample items from this group
        samples_from_group = np.random.choice(
            group_item_list, min(len(group_item_list), per_group), False
        )

        # Keep track of the items we sampled so far from any group
        selected_set = selected_set.union(set(samples_from_group))

        # Add this sampled group to the new set of groups

        # samples_from_group.sort()
        new_groupset.extend_group_items(group, list(samples_from_group))

    return new_groupset

FrameItem

Bases: object

Just a simple wrapper for (video, frame_idx), plus method to get image.

Source code in sleap/info/feature_suggestions.py
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
@attr.s(auto_attribs=True, frozen=True)
class FrameItem(object):
    """Just a simple wrapper for (video, frame_idx), plus method to get image."""

    video: Video
    frame_idx: int

    def get_raw_image(self, scale: float = 1.0):
        if scale == 1.0:
            return self.video[self.frame_idx]
        else:
            img = self.video[self.frame_idx]
            h, w, c = img.shape
            h_, w_ = int(h // (1 / scale)), int(w // (1 / scale))
            # note that cv2 expects (width, height) instead of (rows, columns)
            img = cv2.resize(np.squeeze(img), (w_, h_))
            if c == 1:
                img = img[..., None]
            return img

HogVec

Methods:

Name Description
get_brisk_keypoints_as_points

Returns matrix of brisk keypoints for single image.

get_hogs

Returns descriptors and corresponding image for all images.

get_image_crops

Returns stack of windows around keypoints on single image.

get_image_hog

Returns hog descriptor for all brisk keypoints on single image.

Source code in sleap/info/feature_suggestions.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
@attr.s(auto_attribs=True)
class HogVec:
    brisk_threshold: int
    vocab_size: int
    debug: bool = False

    def __attrs_post_init__(self):
        self._brisk = cv2.BRISK_create(thresh=self.brisk_threshold)
        self.points_list = []
        self.cmap = [
            [31, 120, 180],
            [51, 160, 44],
            [227, 26, 28],
            [255, 127, 0],
            [106, 61, 154],
            [177, 89, 40],
            [166, 206, 227],
            [178, 223, 138],
            [251, 154, 153],
            [253, 191, 111],
            [202, 178, 214],
            [255, 255, 153],
        ]

    def get_vecs(self, imgs):
        # Get matrix of hog descriptors for all images, and array which says
        # which image is the source for each row.
        descs, ownership = self.get_hogs(imgs)

        # Cluster the descriptors into a vocabulary for bag of features
        kmeans = KMeans(n_clusters=self.vocab_size).fit(descs)

        if self.debug:
            if imgs.shape[-1] == 1:
                new_shape = (imgs.shape[0], imgs.shape[1], imgs.shape[2], 3)

                self.vis = np.empty(new_shape, dtype=imgs.dtype)
                self.vis[..., 0] = imgs[..., 0]
                self.vis[..., 1] = imgs[..., 0]
                self.vis[..., 2] = imgs[..., 0]
            else:
                self.vis = np.copy(imgs)

            for i, img in enumerate(self.vis):
                img_desc_clusters = kmeans.labels_[ownership == i]
                img_points = self.points_list[i]
                for point, cluster in zip(img_points, img_desc_clusters):
                    color = self.cmap[cluster % len(self.cmap)]
                    cv2.circle(img, tuple(point), 3, color, lineType=cv2.LINE_AA)

        return self.clusters_to_vecs(kmeans.labels_, ownership, len(imgs))

    def clusters_to_vecs(self, cluster_labels, ownership, img_count):
        # Make helper function that builds bag of features vector for a single
        # image by looking up all the descriptors for an image and counting
        # how many there are for each cluster (vocab word).
        def img_bof_vec(img_idx):
            return np.bincount(
                cluster_labels[ownership == img_idx], minlength=self.vocab_size
            )

        # Now make the matrix with a bag of features vector for each image
        return np.stack([img_bof_vec(i) for i in range(img_count)])

    def get_hogs(self, imgs):
        """Returns descriptors and corresponding image for all images."""
        per_image_hog_descriptors = [self.get_image_hog(img) for img in imgs]
        descs = np.concatenate(
            [image_descs for image_descs in per_image_hog_descriptors]
        )
        ownership = np.array(
            list(
                itertools.chain.from_iterable(
                    [
                        [i] * len(image_descs)
                        for i, image_descs in enumerate(per_image_hog_descriptors)
                    ]
                )
            )
        )
        return descs, ownership

    def get_image_hog(self, img):
        """Returns hog descriptor for all brisk keypoints on single image."""
        points = self.get_brisk_keypoints_as_points(img)
        center_points = points + np.array([8, 8])

        crops = self.get_image_crops(img, center_points)
        multichannel = img.ndim > 2

        img_descs = np.stack(
            [
                hog(
                    crop,
                    orientations=8,
                    pixels_per_cell=(16, 16),
                    cells_per_block=(1, 1),
                    visualize=False,
                    multichannel=multichannel,
                )
                for crop in crops
            ]
        )
        return img_descs

    def get_image_crops(self, img, points):
        """Returns stack of windows around keypoints on single image."""
        W = view_as_windows(img, (16, 16, img.shape[-1]))[..., 0, :, :, :]

        max_y = W.shape[1] - 1
        max_x = W.shape[0] - 1

        xs = points[:, 0]
        ys = points[:, 1]

        # Shift crops for keypoints that are too close to edges
        # TODO: is this how we should handle this case?
        xs[xs > max_x] = max_x
        ys[ys > max_y] = max_y

        return W[xs, ys]

    def get_brisk_keypoints_as_points(self, img):
        """Returns matrix of brisk keypoints for single image."""
        kps = self._brisk.detect(img)
        points = self.keypoints_to_points_matrix(kps)
        return points

    def keypoints_to_points_matrix(self, kps):
        points = np.round(np.array([kps[idx].pt for idx in range(0, len(kps))])).astype(
            np.int
        )
        self.points_list.append(points)
        return points

get_brisk_keypoints_as_points(img)

Returns matrix of brisk keypoints for single image.

Source code in sleap/info/feature_suggestions.py
211
212
213
214
215
def get_brisk_keypoints_as_points(self, img):
    """Returns matrix of brisk keypoints for single image."""
    kps = self._brisk.detect(img)
    points = self.keypoints_to_points_matrix(kps)
    return points

get_hogs(imgs)

Returns descriptors and corresponding image for all images.

Source code in sleap/info/feature_suggestions.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def get_hogs(self, imgs):
    """Returns descriptors and corresponding image for all images."""
    per_image_hog_descriptors = [self.get_image_hog(img) for img in imgs]
    descs = np.concatenate(
        [image_descs for image_descs in per_image_hog_descriptors]
    )
    ownership = np.array(
        list(
            itertools.chain.from_iterable(
                [
                    [i] * len(image_descs)
                    for i, image_descs in enumerate(per_image_hog_descriptors)
                ]
            )
        )
    )
    return descs, ownership

get_image_crops(img, points)

Returns stack of windows around keypoints on single image.

Source code in sleap/info/feature_suggestions.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def get_image_crops(self, img, points):
    """Returns stack of windows around keypoints on single image."""
    W = view_as_windows(img, (16, 16, img.shape[-1]))[..., 0, :, :, :]

    max_y = W.shape[1] - 1
    max_x = W.shape[0] - 1

    xs = points[:, 0]
    ys = points[:, 1]

    # Shift crops for keypoints that are too close to edges
    # TODO: is this how we should handle this case?
    xs[xs > max_x] = max_x
    ys[ys > max_y] = max_y

    return W[xs, ys]

get_image_hog(img)

Returns hog descriptor for all brisk keypoints on single image.

Source code in sleap/info/feature_suggestions.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
def get_image_hog(self, img):
    """Returns hog descriptor for all brisk keypoints on single image."""
    points = self.get_brisk_keypoints_as_points(img)
    center_points = points + np.array([8, 8])

    crops = self.get_image_crops(img, center_points)
    multichannel = img.ndim > 2

    img_descs = np.stack(
        [
            hog(
                crop,
                orientations=8,
                pixels_per_cell=(16, 16),
                cells_per_block=(1, 1),
                visualize=False,
                multichannel=multichannel,
            )
            for crop in crops
        ]
    )
    return img_descs

ItemStack

Bases: object

Container for items, each item can "own" one or more rows of data.

Attributes:

Name Type Description
items List

The list of items

data Optional[ndarray]

An ndarray with rows of data corresponding to items.

ownership Optional[List[tuple]]

List which specifies which rows of data correspond to which items.

meta List

List which stores metadata about each operation on stack.

group_sets List[FrameGroupSet]

List of GroupSets of items.

Methods:

Name Description
brisk_bag_of_features

Transform data using bag of features based on brisk features.

extend_ownership

Extends an ownership list with number of rows owned by next item.

flatten

Flattens each row of data to 1-d array.

get_all_items_from_group

Sets items for Stack to all items from current GroupSet.

get_item_data

Returns rows of data which belong to item.

get_item_data_idxs

Returns indexes of rows in data which belong to item.

get_raw_images

Sets data to raw image for each FrameItem.

hog_bag_of_features

Transforms data into bag of features vector of hog descriptors.

kmeans

Adds GroupSet using k-means clustering on data.

make_sample_group

Adds GroupSet by sampling frames from each video.

pca

Transforms data by applying PCA.

sample_groups

Adds GroupSet by sampling items from current GroupSet.

Source code in sleap/info/feature_suggestions.py
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
@attr.s(auto_attribs=True)
class ItemStack(object):
    """
    Container for items, each item can "own" one or more rows of data.

    Attributes:
        items: The list of items
        data: An ndarray with rows of data corresponding to items.
        ownership: List which specifies which rows of data correspond to which
            items.
        meta: List which stores metadata about each operation on stack.
        group_sets: List of GroupSets of items.

    """

    items: List = attr.ib(default=attr.Factory(list))
    data: Optional[np.ndarray] = attr.ib(default=None, repr=False)
    ownership: Optional[List[tuple]] = None
    meta: List = attr.ib(default=attr.Factory(list))
    group_sets: List[FrameGroupSet] = attr.ib(default=attr.Factory(list))

    @property
    def current_groupset(self):
        """Gets current (most recent) group set."""
        if not self.group_sets:
            return None
        return self.group_sets[-1]

    def get_item_data_idxs(self, item):
        """Returns indexes of rows in data which belong to item."""
        item_idx = self.items.index(item)
        if self.ownership:
            owns = self.ownership[item_idx]
        else:
            owns = tuple([item_idx])
        return owns

    def get_item_data(self, item):
        """Returns rows of data which belong to item."""
        owns = self.get_item_data_idxs(item)
        return self.data[owns, ...]

    def get_item_by_data_row(self, row_idx):
        if self.ownership:
            for item_idx, owns in enumerate(self.ownership):
                if row_idx in owns:
                    return self.items[item_idx]

        elif len(self.items) > row_idx:
            return self.items[row_idx]

        raise IndexError(f"No ownership for row {row_idx}.")

    def extend_ownership(self, ownership, row_count):
        """Extends an ownership list with number of rows owned by next item."""
        start_i = 0
        if len(ownership):
            # Start at 1 + (last row index of last item so far)
            start_i = 1 + ownership[-1][-1]

        item_owns = list(range(start_i, start_i + row_count))
        ownership.append(item_owns)

    def get_raw_images(self, scale=0.5):
        """Sets data to raw image for each FrameItem."""
        self.meta.append(dict(action="raw_images"))

        data_shape = [1, 1, 1]
        mixed_shapes = False
        imgs = []
        for frame in self.items:
            # Add to list of raw images
            img = frame.get_raw_image(scale=scale)
            imgs.append(img)

            # Keep track of shape large enough to hold any of the images
            img_shape = img.shape
            # get_raw_image returns 3D arrays (H, W, C), so use indices 0, 1, 2 directly
            data_shape = [max(data_shape[i], img_shape[i]) for i in (0, 1, 2)]

            if data_shape != img_shape:
                mixed_shapes = True

        if mixed_shapes:
            # Make array large enough to hold any image and pad smaller images
            self.data = np.zeros((len(self.items), *data_shape), dtype="uint8")
            for i, img in enumerate(imgs):
                rows, columns, channels = img.shape
                self.data[i, :rows, :columns, :channels] = img
        else:
            # All images have same shape, add batch dimension and concatenate
            self.data = np.stack(imgs)

    def flatten(self):
        """Flattens each row of data to 1-d array."""
        meta = dict(action="flatten", shape=self.data.shape[1:])
        self.meta.append(meta)

        row_count = self.data.shape[0]
        row_size = np.prod(meta["shape"])
        self.data = np.reshape(self.data, (row_count, row_size))

    def brisk_bag_of_features(self, brisk_threshold=40, vocab_size=20):
        """Transform data using bag of features based on brisk features."""
        brisk = BriskVec(brisk_threshold=brisk_threshold, vocab_size=vocab_size)
        self.data = brisk.get_vecs(self.data)

    def hog_bag_of_features(self, brisk_threshold=40, vocab_size=20):
        """Transforms data into bag of features vector of hog descriptors."""
        hog = HogVec(brisk_threshold=brisk_threshold, vocab_size=vocab_size)
        self.data = hog.get_vecs(self.data)

    def pca(self, n_components: int):
        """Transforms data by applying PCA."""
        pca = PCA(n_components=n_components)
        # PCA applies row by row, so we can modify data in place
        self.data = pca.fit_transform(self.data)
        self.meta.append(
            dict(
                action="pca",
                n_components=n_components,
                # components=pca.components_.tolist(),
            )
        )

    def kmeans(self, n_clusters: int):
        """Adds GroupSet using k-means clustering on data."""
        # print(f"kmeans on {len(self.data)} rows of data")
        kmeans = KMeans(n_clusters=n_clusters).fit(self.data)

        cluster_groupset = FrameGroupSet(method="kmeans")
        cluster_groupset.groupset_data = dict(centers=kmeans.cluster_centers_.tolist())

        # Make list of the items in each cluster
        item_labels = kmeans.labels_
        for cluster_idx in range(n_clusters):
            (cluster_item_idxs,) = np.where(item_labels == cluster_idx)
            for data_row_idx in cluster_item_idxs:
                item = self.get_item_by_data_row(data_row_idx)
                cluster_groupset.append_to_group(cluster_idx, item)

        self.group_sets.append(cluster_groupset)
        self.meta.append(dict(action="kmeans", n_clusters=n_clusters))

    def make_sample_group(
        self, videos: List[Video], samples_per_video: int, sample_method: str = "stride"
    ):
        """Adds GroupSet by sampling frames from each video."""
        groupset = FrameGroupSet(method="stride")
        groupset.groupset_data = dict(samples_per_video=samples_per_video)

        for i, video in enumerate(videos):
            if samples_per_video >= len(video):
                idxs = list(range(len(video)))
            elif sample_method == "stride":
                idxs = list(
                    range(
                        0,
                        len(video),
                        len(video) // samples_per_video,
                    )
                )
                idxs = idxs[:samples_per_video]
            elif sample_method == "random":
                idxs = random.sample(range(len(video)), samples_per_video)
            else:
                raise ValueError(f"Invalid sampling method: {sample_method}")

            group_id = i
            for frame_idx in idxs:
                groupset.append_to_group(group_id, FrameItem(video, frame_idx))

        self.group_sets.append(groupset)
        self.meta.append(dict(action="sample", method="sample_method"))

    def get_all_items_from_group(self):
        """Sets items for Stack to all items from current GroupSet."""
        if self.current_groupset:
            self.items = self.current_groupset.all_items
            self.data = None  # clear data when setting items

    def sample_groups(self, samples_per_group: int):
        """Adds GroupSet by sampling items from current GroupSet."""
        if self.current_groupset:
            new_groupset = self.current_groupset.sample(
                per_group=samples_per_group, unique_samples=True
            )
            self.group_sets.append(new_groupset)

    def to_suggestion_tuples(
        self, videos, group_offset: int = 0, video_offset: int = 0
    ) -> List[Tuple[int, int, int]]:
        tuples = []
        for frame in self.items:
            group = self.current_groupset.get_item_group(frame)
            if group is not None:
                group += group_offset
            video_idx = videos.index(frame.video) + video_offset
            tuples.append((video_idx, frame.frame_idx, group))
        return tuples

    def to_suggestion_frames(self, group_offset: int = 0) -> List["SuggestionFrame"]:
        from sleap_io import SuggestionFrame

        suggestions = []
        for frame in self.items:
            group = self.current_groupset.get_item_group(frame)
            if group is not None:
                group += group_offset
            suggestions.append(SuggestionFrame(frame.video, frame.frame_idx))
        return suggestions

current_groupset property

Gets current (most recent) group set.

brisk_bag_of_features(brisk_threshold=40, vocab_size=20)

Transform data using bag of features based on brisk features.

Source code in sleap/info/feature_suggestions.py
439
440
441
442
def brisk_bag_of_features(self, brisk_threshold=40, vocab_size=20):
    """Transform data using bag of features based on brisk features."""
    brisk = BriskVec(brisk_threshold=brisk_threshold, vocab_size=vocab_size)
    self.data = brisk.get_vecs(self.data)

extend_ownership(ownership, row_count)

Extends an ownership list with number of rows owned by next item.

Source code in sleap/info/feature_suggestions.py
390
391
392
393
394
395
396
397
398
def extend_ownership(self, ownership, row_count):
    """Extends an ownership list with number of rows owned by next item."""
    start_i = 0
    if len(ownership):
        # Start at 1 + (last row index of last item so far)
        start_i = 1 + ownership[-1][-1]

    item_owns = list(range(start_i, start_i + row_count))
    ownership.append(item_owns)

flatten()

Flattens each row of data to 1-d array.

Source code in sleap/info/feature_suggestions.py
430
431
432
433
434
435
436
437
def flatten(self):
    """Flattens each row of data to 1-d array."""
    meta = dict(action="flatten", shape=self.data.shape[1:])
    self.meta.append(meta)

    row_count = self.data.shape[0]
    row_size = np.prod(meta["shape"])
    self.data = np.reshape(self.data, (row_count, row_size))

get_all_items_from_group()

Sets items for Stack to all items from current GroupSet.

Source code in sleap/info/feature_suggestions.py
512
513
514
515
516
def get_all_items_from_group(self):
    """Sets items for Stack to all items from current GroupSet."""
    if self.current_groupset:
        self.items = self.current_groupset.all_items
        self.data = None  # clear data when setting items

get_item_data(item)

Returns rows of data which belong to item.

Source code in sleap/info/feature_suggestions.py
374
375
376
377
def get_item_data(self, item):
    """Returns rows of data which belong to item."""
    owns = self.get_item_data_idxs(item)
    return self.data[owns, ...]

get_item_data_idxs(item)

Returns indexes of rows in data which belong to item.

Source code in sleap/info/feature_suggestions.py
365
366
367
368
369
370
371
372
def get_item_data_idxs(self, item):
    """Returns indexes of rows in data which belong to item."""
    item_idx = self.items.index(item)
    if self.ownership:
        owns = self.ownership[item_idx]
    else:
        owns = tuple([item_idx])
    return owns

get_raw_images(scale=0.5)

Sets data to raw image for each FrameItem.

Source code in sleap/info/feature_suggestions.py
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
def get_raw_images(self, scale=0.5):
    """Sets data to raw image for each FrameItem."""
    self.meta.append(dict(action="raw_images"))

    data_shape = [1, 1, 1]
    mixed_shapes = False
    imgs = []
    for frame in self.items:
        # Add to list of raw images
        img = frame.get_raw_image(scale=scale)
        imgs.append(img)

        # Keep track of shape large enough to hold any of the images
        img_shape = img.shape
        # get_raw_image returns 3D arrays (H, W, C), so use indices 0, 1, 2 directly
        data_shape = [max(data_shape[i], img_shape[i]) for i in (0, 1, 2)]

        if data_shape != img_shape:
            mixed_shapes = True

    if mixed_shapes:
        # Make array large enough to hold any image and pad smaller images
        self.data = np.zeros((len(self.items), *data_shape), dtype="uint8")
        for i, img in enumerate(imgs):
            rows, columns, channels = img.shape
            self.data[i, :rows, :columns, :channels] = img
    else:
        # All images have same shape, add batch dimension and concatenate
        self.data = np.stack(imgs)

hog_bag_of_features(brisk_threshold=40, vocab_size=20)

Transforms data into bag of features vector of hog descriptors.

Source code in sleap/info/feature_suggestions.py
444
445
446
447
def hog_bag_of_features(self, brisk_threshold=40, vocab_size=20):
    """Transforms data into bag of features vector of hog descriptors."""
    hog = HogVec(brisk_threshold=brisk_threshold, vocab_size=vocab_size)
    self.data = hog.get_vecs(self.data)

kmeans(n_clusters)

Adds GroupSet using k-means clustering on data.

Source code in sleap/info/feature_suggestions.py
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
def kmeans(self, n_clusters: int):
    """Adds GroupSet using k-means clustering on data."""
    # print(f"kmeans on {len(self.data)} rows of data")
    kmeans = KMeans(n_clusters=n_clusters).fit(self.data)

    cluster_groupset = FrameGroupSet(method="kmeans")
    cluster_groupset.groupset_data = dict(centers=kmeans.cluster_centers_.tolist())

    # Make list of the items in each cluster
    item_labels = kmeans.labels_
    for cluster_idx in range(n_clusters):
        (cluster_item_idxs,) = np.where(item_labels == cluster_idx)
        for data_row_idx in cluster_item_idxs:
            item = self.get_item_by_data_row(data_row_idx)
            cluster_groupset.append_to_group(cluster_idx, item)

    self.group_sets.append(cluster_groupset)
    self.meta.append(dict(action="kmeans", n_clusters=n_clusters))

make_sample_group(videos, samples_per_video, sample_method='stride')

Adds GroupSet by sampling frames from each video.

Source code in sleap/info/feature_suggestions.py
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
def make_sample_group(
    self, videos: List[Video], samples_per_video: int, sample_method: str = "stride"
):
    """Adds GroupSet by sampling frames from each video."""
    groupset = FrameGroupSet(method="stride")
    groupset.groupset_data = dict(samples_per_video=samples_per_video)

    for i, video in enumerate(videos):
        if samples_per_video >= len(video):
            idxs = list(range(len(video)))
        elif sample_method == "stride":
            idxs = list(
                range(
                    0,
                    len(video),
                    len(video) // samples_per_video,
                )
            )
            idxs = idxs[:samples_per_video]
        elif sample_method == "random":
            idxs = random.sample(range(len(video)), samples_per_video)
        else:
            raise ValueError(f"Invalid sampling method: {sample_method}")

        group_id = i
        for frame_idx in idxs:
            groupset.append_to_group(group_id, FrameItem(video, frame_idx))

    self.group_sets.append(groupset)
    self.meta.append(dict(action="sample", method="sample_method"))

pca(n_components)

Transforms data by applying PCA.

Source code in sleap/info/feature_suggestions.py
449
450
451
452
453
454
455
456
457
458
459
460
def pca(self, n_components: int):
    """Transforms data by applying PCA."""
    pca = PCA(n_components=n_components)
    # PCA applies row by row, so we can modify data in place
    self.data = pca.fit_transform(self.data)
    self.meta.append(
        dict(
            action="pca",
            n_components=n_components,
            # components=pca.components_.tolist(),
        )
    )

sample_groups(samples_per_group)

Adds GroupSet by sampling items from current GroupSet.

Source code in sleap/info/feature_suggestions.py
518
519
520
521
522
523
524
def sample_groups(self, samples_per_group: int):
    """Adds GroupSet by sampling items from current GroupSet."""
    if self.current_groupset:
        new_groupset = self.current_groupset.sample(
            per_group=samples_per_group, unique_samples=True
        )
        self.group_sets.append(new_groupset)

ParallelFeaturePipeline

Bases: object

Enables easy per-video pipeline parallelization for feature suggestions.

Create a FeatureSuggestionPipeline with the desired parameters, and then call ParallelFeaturePipeline.run() with the pipeline and the list of videos to process in parallel. This will take care of serializing the videos, running the pipelines in a process pool, and then deserializing the results back into a single list of SuggestionFrame objects.

Methods:

Name Description
get

Apply pipeline to single video by idx. Can be called in process.

make

Make class object from pipeline and list of videos.

run

Runs pipeline on all videos in parallel and returns suggestions.

tuples_to_suggestions

Converts serialized data from processes back into SuggestionFrames.

Source code in sleap/info/feature_suggestions.py
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
@attr.s(auto_attribs=True, slots=True)
class ParallelFeaturePipeline(object):
    """
    Enables easy per-video pipeline parallelization for feature suggestions.

    Create a `FeatureSuggestionPipeline` with the desired parameters, and
    then call `ParallelFeaturePipeline.run()` with the pipeline and the list
    of videos to process in parallel. This will take care of serializing the
    videos, running the pipelines in a process pool, and then deserializing
    the results back into a single list of `SuggestionFrame` objects.
    """

    pipeline: FeatureSuggestionPipeline
    videos_for_processes: List

    def get(self, video_idx):
        """Apply pipeline to single video by idx. Can be called in process."""
        video = self.videos_for_processes[video_idx]
        # Reopen video in the new process
        if not video.is_open:
            video.open()
        group_offset = video_idx * self.pipeline.n_clusters

        # t0 = time()
        # logger.info(f"starting {video_idx}")

        result = self.pipeline.get_suggestion_tuples(
            videos=[video], group_offset=group_offset, video_offset=video_idx
        )
        self.pipeline.reset()

        # logger.info(f"done with {video_idx} in {time() - t0} s for "
        #              f"{len(result)} suggestions")
        return result

    @classmethod
    def make(cls, pipeline, videos):
        """Make class object from pipeline and list of videos."""
        import copy

        # Use close -> copy strategy for sleap-io Video compatibility
        # Don't reopen until inside the subprocess worker
        videos_for_processes = []
        for video in videos:
            was_open = video.is_open
            video.close()  # Close original for safe copying
            video_copy = copy.deepcopy(video)  # Copy the closed video
            if was_open:
                video.open()  # Reopen original, but keep copy closed
            videos_for_processes.append(video_copy)  # Send closed copy to processes

        return cls(pipeline, videos_for_processes)

    @classmethod
    def tuples_to_suggestions(cls, tuples, videos):
        """Converts serialized data from processes back into SuggestionFrames."""
        from sleap.gui.suggestions import SuggestionFrame

        suggestions = []
        for video_idx, frame_idx, group in tuples:
            video = videos[video_idx]
            suggestions.append(SuggestionFrame(video, frame_idx))
        return suggestions

    @classmethod
    def run(cls, pipeline, videos, parallel=True):
        """Runs pipeline on all videos in parallel and returns suggestions."""
        from multiprocessing import Pool

        pp = cls.make(pipeline, videos)
        video_idxs = list(range(len(videos)))

        if parallel:
            pool = Pool()

            per_video_tuples = pool.map(pp.get, video_idxs)

        else:
            per_video_tuples = map(pp.get, video_idxs)

        tuples = list(itertools.chain.from_iterable(per_video_tuples))

        return pp.tuples_to_suggestions(tuples, videos)

get(video_idx)

Apply pipeline to single video by idx. Can be called in process.

Source code in sleap/info/feature_suggestions.py
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
def get(self, video_idx):
    """Apply pipeline to single video by idx. Can be called in process."""
    video = self.videos_for_processes[video_idx]
    # Reopen video in the new process
    if not video.is_open:
        video.open()
    group_offset = video_idx * self.pipeline.n_clusters

    # t0 = time()
    # logger.info(f"starting {video_idx}")

    result = self.pipeline.get_suggestion_tuples(
        videos=[video], group_offset=group_offset, video_offset=video_idx
    )
    self.pipeline.reset()

    # logger.info(f"done with {video_idx} in {time() - t0} s for "
    #              f"{len(result)} suggestions")
    return result

make(pipeline, videos) classmethod

Make class object from pipeline and list of videos.

Source code in sleap/info/feature_suggestions.py
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
@classmethod
def make(cls, pipeline, videos):
    """Make class object from pipeline and list of videos."""
    import copy

    # Use close -> copy strategy for sleap-io Video compatibility
    # Don't reopen until inside the subprocess worker
    videos_for_processes = []
    for video in videos:
        was_open = video.is_open
        video.close()  # Close original for safe copying
        video_copy = copy.deepcopy(video)  # Copy the closed video
        if was_open:
            video.open()  # Reopen original, but keep copy closed
        videos_for_processes.append(video_copy)  # Send closed copy to processes

    return cls(pipeline, videos_for_processes)

run(pipeline, videos, parallel=True) classmethod

Runs pipeline on all videos in parallel and returns suggestions.

Source code in sleap/info/feature_suggestions.py
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
@classmethod
def run(cls, pipeline, videos, parallel=True):
    """Runs pipeline on all videos in parallel and returns suggestions."""
    from multiprocessing import Pool

    pp = cls.make(pipeline, videos)
    video_idxs = list(range(len(videos)))

    if parallel:
        pool = Pool()

        per_video_tuples = pool.map(pp.get, video_idxs)

    else:
        per_video_tuples = map(pp.get, video_idxs)

    tuples = list(itertools.chain.from_iterable(per_video_tuples))

    return pp.tuples_to_suggestions(tuples, videos)

tuples_to_suggestions(tuples, videos) classmethod

Converts serialized data from processes back into SuggestionFrames.

Source code in sleap/info/feature_suggestions.py
683
684
685
686
687
688
689
690
691
692
@classmethod
def tuples_to_suggestions(cls, tuples, videos):
    """Converts serialized data from processes back into SuggestionFrames."""
    from sleap.gui.suggestions import SuggestionFrame

    suggestions = []
    for video_idx, frame_idx, group in tuples:
        video = videos[video_idx]
        suggestions.append(SuggestionFrame(video, frame_idx))
    return suggestions