Skip to content

runners

sleap.gui.learning.runners

Run training/inference in background process via CLI.

Classes:

Name Description
DatasetItemForInference

Encapsulate data about frame selection based on dataset data.

InferenceTask

Encapsulates all data needed for running inference via CLI.

ItemForInference

Abstract base class for item on which we can run inference via CLI.

ItemsForInference

Encapsulates list of items for inference.

VideoItemForInference

Encapsulate data about video on which inference should run.

Functions:

Name Description
get_timestamp

Return the date and time as a string.

kill_process

Force kill a running process and any child processes.

run_gui_inference

Run inference on specified frames using models from training_jobs.

run_gui_training

Runs training for each training job.

run_learning_pipeline

Runs training (as needed) and inference.

setup_new_run_folder

Create a new run folder from config.

train_subprocess

Runs training inside subprocess.

write_pipeline_files

Writes the config files and scripts for manually running pipeline.

DatasetItemForInference

Bases: ItemForInference

Encapsulate data about frame selection based on dataset data.

Attributes:

Name Type Description
labels_path str

path to the saved :py:class:Labels dataset.

frame_filter str

which subset of frames to get from dataset, supports * "user" * "suggested"

use_absolute_path bool

whether to use absolute path for inference cli call.

Source code in sleap/gui/learning/runners.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
@attr.s(auto_attribs=True)
class DatasetItemForInference(ItemForInference):
    """Encapsulate data about frame selection based on dataset data.

    Attributes:
        labels_path: path to the saved :py:class:`Labels` dataset.
        frame_filter: which subset of frames to get from dataset, supports
            * "user"
            * "suggested"
        use_absolute_path: whether to use absolute path for inference cli call.
    """

    labels_path: str
    frame_filter: str = "user"
    use_absolute_path: bool = False

    @property
    def path(self):
        if self.use_absolute_path:
            return os.path.abspath(self.labels_path)
        return self.labels_path

    @property
    def cli_args(self):
        args_list = [self.path]
        args_list = ["--data_path", self.path]
        if self.frame_filter == "user":
            args_list.append("--only_labeled_frames")
        elif self.frame_filter == "suggested":
            args_list.append("--only_suggested_frames")
        return args_list

InferenceTask

Encapsulates all data needed for running inference via CLI.

Methods:

Name Description
make_predict_cli_call

Makes list of CLI arguments needed for running inference.

merge_results

Merges result frames into labels dataset.

predict_subprocess

Runs inference in a subprocess.

Source code in sleap/gui/learning/runners.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
@attr.s(auto_attribs=True)
class InferenceTask:
    """Encapsulates all data needed for running inference via CLI."""

    trained_job_paths: List[str]
    inference_params: Dict[str, Any] = attr.ib(default=attr.Factory(dict))
    labels: Optional[Labels] = None
    labels_filename: Optional[str] = None
    results: List[LabeledFrame] = attr.ib(default=attr.Factory(list))

    def make_predict_cli_call(
        self,
        item_for_inference: ItemForInference,
        output_path: Optional[str] = None,
        gui: bool = True,
    ) -> List[Text]:
        """Makes list of CLI arguments needed for running inference."""
        cli_args = [
            "sleap-nn",
            "track",
        ]
        cli_args.extend(
            item_for_inference.cli_args
        )  # sample inference CLI args: ['--data_path', '...', '--video_index', '0',
        # '--video_input_format', 'channels_last', '--frames', '0,-2559']

        # Make path where we'll save predictions (if not specified)
        if output_path is None:
            if self.labels_filename:
                # Make a predictions directory next to the labels dataset file
                predictions_dir = os.path.join(
                    os.path.dirname(self.labels_filename), "predictions"
                )
                os.makedirs(predictions_dir, exist_ok=True)
            else:
                # Dataset filename wasn't given, so save predictions in same dir
                # as the video
                predictions_dir = os.path.dirname(item_for_inference.video.filename)

            # Build filename with video name and timestamp
            timestamp = datetime.now().strftime("%y%m%d_%H%M%S")
            output_path = os.path.join(
                predictions_dir,
                f"{os.path.basename(item_for_inference.path)}.{timestamp}."
                "predictions.slp",
            )

        for job_path in self.trained_job_paths:
            if (
                job_path.endswith(".yaml")
                or job_path.endswith(".json")
                or job_path.endswith(".yml")
            ):
                job_path = str(
                    Path(job_path).parent
                )  # get the model ckpt folder path from the path of
                # `training_config.yaml`
            cli_args.extend(("--model_paths", job_path))

        cli_args.extend(["-o", output_path])

        if "_batch_size" in self.inference_params:
            cli_args.extend(["--batch_size", str(self.inference_params["_batch_size"])])

        if (
            "_max_instances" in self.inference_params
            and self.inference_params["_max_instances"] is not None
        ):
            cli_args.extend(
                ["--max_instances", self.inference_params["_max_instances"]]
            )

        # add tracking args
        if (
            "tracking.tracker" in self.inference_params
            and self.inference_params["tracking.tracker"] != "none"
        ):
            cli_args.extend(["--tracking"])
            cli_args.extend(
                ["--track_matching_method", self.inference_params["tracking.match"]]
            )
            cli_args.extend(
                [
                    "--tracking_window_size",
                    str(self.inference_params["tracking.track_window"]),
                ]
            )
            if self.inference_params["tracking.max_tracks"] is not None:
                cli_args.extend(["--candidates_method", "local_queues"])
                cli_args.extend(
                    ["--max_tracks", str(self.inference_params["tracking.max_tracks"])]
                )
            if "flow" in self.inference_params["tracking.tracker"]:
                cli_args.extend(["--use_flow"])

            if self.inference_params["tracking.post_connect_single_breaks"] == 1:
                cli_args.extend(["--post_connect_single_breaks"])

            if self.inference_params["tracking.robust"] != 1.0:
                cli_args.extend(["--scoring_reduction", "robust_quantile"])
                if self.inference_params["tracking.robust"] is not None:
                    cli_args.extend(
                        [
                            "--robust_best_instance",
                            str(self.inference_params["tracking.robust"]),
                        ]
                    )

            if self.inference_params["tracking.similarity"] == "oks":
                cli_args.extend(["--features", "keypoints"])
                cli_args.extend(["--scoring_method", "oks"])
            elif self.inference_params["tracking.similarity"] == "centroids":
                cli_args.extend(["--features", "centroids"])
                cli_args.extend(["--scoring_method", "euclidean_dist"])
            elif self.inference_params["tracking.similarity"] == "iou":
                cli_args.extend(["--features", "bboxes"])
                cli_args.extend(["--scoring_method", "iou"])

        return cli_args, output_path

    def predict_subprocess(
        self,
        item_for_inference: ItemForInference,
        append_results: bool = False,
        waiting_callback: Optional[Callable] = None,
        gui: bool = True,
    ) -> Tuple[Text, bool]:
        """Runs inference in a subprocess."""
        cli_args, output_path = self.make_predict_cli_call(item_for_inference, gui=gui)

        print("Command line call:")
        print(" ".join(cli_args))
        print()

        # Run inference CLI capturing output.
        with subprocess.Popen(cli_args, stdout=subprocess.PIPE) as proc:
            # Poll until finished.
            while proc.poll() is None:
                # Read line.
                line = proc.stdout.readline()
                line = line.decode().rstrip()

                is_json = False
                if line.startswith("{"):
                    try:
                        # Parse line.
                        line_data = json.loads(line)
                        is_json = True
                    except (json.JSONDecodeError, ValueError):
                        is_json = False

                if not is_json:
                    # Pass through non-json output.
                    print(line)
                    line_data = {}

                if waiting_callback is not None:
                    # Pass line data to callback.
                    ret = waiting_callback(**line_data)

                    if ret == "cancel":
                        # Stop if callback returned cancel signal.
                        kill_process(proc.pid)
                        print(f"Killed PID: {proc.pid}")
                        return "", "canceled"
                time.sleep(0.05)

            print(f"Process return code: {proc.returncode}")
            success = proc.returncode == 0

        if success and append_results:
            # Load frames from inference into results list
            # new_inference_labels = load_and_match(output_path, match_to=self.labels)
            # FIXME:If necessary
            new_inference_labels = sio.load_slp(output_path)
            self.results.extend(new_inference_labels.labeled_frames)

        # Return "success" or return code if failed.
        ret = "success" if success else proc.returncode
        return output_path, ret

    def merge_results(self):
        """Merges result frames into labels dataset."""

        def remove_empty_instances_and_frames(lf: LabeledFrame):
            """Removes instances without visible points and empty frames."""
            lf.remove_empty_instances()
            return len(lf.instances) > 0

        # Remove instances without graphable points and any frames without instances.
        self.results = list(
            filter(lambda lf: remove_empty_instances_and_frames(lf), self.results)
        )
        new_labels = Labels(self.results)

        # Merge pred results into base labels
        self.labels.merge(new_labels)  # , frame_strategy="keep_both")

make_predict_cli_call(item_for_inference, output_path=None, gui=True)

Makes list of CLI arguments needed for running inference.

Source code in sleap/gui/learning/runners.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
def make_predict_cli_call(
    self,
    item_for_inference: ItemForInference,
    output_path: Optional[str] = None,
    gui: bool = True,
) -> List[Text]:
    """Makes list of CLI arguments needed for running inference."""
    cli_args = [
        "sleap-nn",
        "track",
    ]
    cli_args.extend(
        item_for_inference.cli_args
    )  # sample inference CLI args: ['--data_path', '...', '--video_index', '0',
    # '--video_input_format', 'channels_last', '--frames', '0,-2559']

    # Make path where we'll save predictions (if not specified)
    if output_path is None:
        if self.labels_filename:
            # Make a predictions directory next to the labels dataset file
            predictions_dir = os.path.join(
                os.path.dirname(self.labels_filename), "predictions"
            )
            os.makedirs(predictions_dir, exist_ok=True)
        else:
            # Dataset filename wasn't given, so save predictions in same dir
            # as the video
            predictions_dir = os.path.dirname(item_for_inference.video.filename)

        # Build filename with video name and timestamp
        timestamp = datetime.now().strftime("%y%m%d_%H%M%S")
        output_path = os.path.join(
            predictions_dir,
            f"{os.path.basename(item_for_inference.path)}.{timestamp}."
            "predictions.slp",
        )

    for job_path in self.trained_job_paths:
        if (
            job_path.endswith(".yaml")
            or job_path.endswith(".json")
            or job_path.endswith(".yml")
        ):
            job_path = str(
                Path(job_path).parent
            )  # get the model ckpt folder path from the path of
            # `training_config.yaml`
        cli_args.extend(("--model_paths", job_path))

    cli_args.extend(["-o", output_path])

    if "_batch_size" in self.inference_params:
        cli_args.extend(["--batch_size", str(self.inference_params["_batch_size"])])

    if (
        "_max_instances" in self.inference_params
        and self.inference_params["_max_instances"] is not None
    ):
        cli_args.extend(
            ["--max_instances", self.inference_params["_max_instances"]]
        )

    # add tracking args
    if (
        "tracking.tracker" in self.inference_params
        and self.inference_params["tracking.tracker"] != "none"
    ):
        cli_args.extend(["--tracking"])
        cli_args.extend(
            ["--track_matching_method", self.inference_params["tracking.match"]]
        )
        cli_args.extend(
            [
                "--tracking_window_size",
                str(self.inference_params["tracking.track_window"]),
            ]
        )
        if self.inference_params["tracking.max_tracks"] is not None:
            cli_args.extend(["--candidates_method", "local_queues"])
            cli_args.extend(
                ["--max_tracks", str(self.inference_params["tracking.max_tracks"])]
            )
        if "flow" in self.inference_params["tracking.tracker"]:
            cli_args.extend(["--use_flow"])

        if self.inference_params["tracking.post_connect_single_breaks"] == 1:
            cli_args.extend(["--post_connect_single_breaks"])

        if self.inference_params["tracking.robust"] != 1.0:
            cli_args.extend(["--scoring_reduction", "robust_quantile"])
            if self.inference_params["tracking.robust"] is not None:
                cli_args.extend(
                    [
                        "--robust_best_instance",
                        str(self.inference_params["tracking.robust"]),
                    ]
                )

        if self.inference_params["tracking.similarity"] == "oks":
            cli_args.extend(["--features", "keypoints"])
            cli_args.extend(["--scoring_method", "oks"])
        elif self.inference_params["tracking.similarity"] == "centroids":
            cli_args.extend(["--features", "centroids"])
            cli_args.extend(["--scoring_method", "euclidean_dist"])
        elif self.inference_params["tracking.similarity"] == "iou":
            cli_args.extend(["--features", "bboxes"])
            cli_args.extend(["--scoring_method", "iou"])

    return cli_args, output_path

merge_results()

Merges result frames into labels dataset.

Source code in sleap/gui/learning/runners.py
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
def merge_results(self):
    """Merges result frames into labels dataset."""

    def remove_empty_instances_and_frames(lf: LabeledFrame):
        """Removes instances without visible points and empty frames."""
        lf.remove_empty_instances()
        return len(lf.instances) > 0

    # Remove instances without graphable points and any frames without instances.
    self.results = list(
        filter(lambda lf: remove_empty_instances_and_frames(lf), self.results)
    )
    new_labels = Labels(self.results)

    # Merge pred results into base labels
    self.labels.merge(new_labels)  # , frame_strategy="keep_both")

predict_subprocess(item_for_inference, append_results=False, waiting_callback=None, gui=True)

Runs inference in a subprocess.

Source code in sleap/gui/learning/runners.py
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
def predict_subprocess(
    self,
    item_for_inference: ItemForInference,
    append_results: bool = False,
    waiting_callback: Optional[Callable] = None,
    gui: bool = True,
) -> Tuple[Text, bool]:
    """Runs inference in a subprocess."""
    cli_args, output_path = self.make_predict_cli_call(item_for_inference, gui=gui)

    print("Command line call:")
    print(" ".join(cli_args))
    print()

    # Run inference CLI capturing output.
    with subprocess.Popen(cli_args, stdout=subprocess.PIPE) as proc:
        # Poll until finished.
        while proc.poll() is None:
            # Read line.
            line = proc.stdout.readline()
            line = line.decode().rstrip()

            is_json = False
            if line.startswith("{"):
                try:
                    # Parse line.
                    line_data = json.loads(line)
                    is_json = True
                except (json.JSONDecodeError, ValueError):
                    is_json = False

            if not is_json:
                # Pass through non-json output.
                print(line)
                line_data = {}

            if waiting_callback is not None:
                # Pass line data to callback.
                ret = waiting_callback(**line_data)

                if ret == "cancel":
                    # Stop if callback returned cancel signal.
                    kill_process(proc.pid)
                    print(f"Killed PID: {proc.pid}")
                    return "", "canceled"
            time.sleep(0.05)

        print(f"Process return code: {proc.returncode}")
        success = proc.returncode == 0

    if success and append_results:
        # Load frames from inference into results list
        # new_inference_labels = load_and_match(output_path, match_to=self.labels)
        # FIXME:If necessary
        new_inference_labels = sio.load_slp(output_path)
        self.results.extend(new_inference_labels.labeled_frames)

    # Return "success" or return code if failed.
    ret = "success" if success else proc.returncode
    return output_path, ret

ItemForInference

Bases: ABC

Abstract base class for item on which we can run inference via CLI.

Must have path and cli_args properties, used to build CLI call.

Source code in sleap/gui/learning/runners.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
@attr.s(auto_attribs=True)
class ItemForInference(abc.ABC):
    """Abstract base class for item on which we can run inference via CLI.

    Must have `path` and `cli_args` properties, used to build CLI call.
    """

    @property
    @abc.abstractmethod
    def path(self) -> Text:
        pass

    @property
    @abc.abstractmethod
    def cli_args(self) -> List[Text]:
        pass

ItemsForInference

Encapsulates list of items for inference.

Source code in sleap/gui/learning/runners.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
@attr.s(auto_attribs=True)
class ItemsForInference:
    """Encapsulates list of items for inference."""

    items: List[ItemForInference]
    total_frame_count: int
    batch_size: int

    def __len__(self):
        return len(self.items)

    @classmethod
    def from_video_frames_dict(
        cls,
        video_frames_dict: Dict[Video, List[int]],
        total_frame_count: int,
        batch_size: int,
        labels: Labels,
        labels_path: Optional[str] = None,
    ):
        items = []
        for video, frames in video_frames_dict.items():
            if frames:
                items.append(
                    VideoItemForInference(
                        video=video,
                        frames=frames,
                        labels_path=labels_path,
                        video_idx=labels.videos.index(video),
                    )
                )
        return cls(
            items=items, total_frame_count=total_frame_count, batch_size=batch_size
        )

VideoItemForInference

Bases: ItemForInference

Encapsulate data about video on which inference should run.

This allows for inference on an arbitrary list of frames from video.

Attributes:

Name Type Description
video Video

the :py:class:Video object (which already stores its own path)

frames Optional[List[int]]

list of frames for inference; if None, then all frames are used

use_absolute_path bool

whether to use absolute path for inference cli call

video Video

The :py:class:Video object (which already stores its own path)

frames Optional[List[int]]

List of frames for inference; if None, then all frames are used

labels_path Optional[str]

Path to .slp project; if None, then use video path instead.

video_idx int

Video index for inference; if None, then first video is used. Only used if labels_path is specified.

Source code in sleap/gui/learning/runners.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
@attr.s(auto_attribs=True)
class VideoItemForInference(ItemForInference):
    """Encapsulate data about video on which inference should run.

    This allows for inference on an arbitrary list of frames from video.

    Attributes:
        video: the :py:class:`Video` object (which already stores its own path)
        frames: list of frames for inference; if None, then all frames are used
        use_absolute_path: whether to use absolute path for inference cli call
        video: The :py:class:`Video` object (which already stores its own path)
        frames: List of frames for inference; if None, then all frames are used
        labels_path: Path to .slp project; if None, then use video path instead.
        video_idx: Video index for inference; if None, then first video is used. Only
            used if labels_path is specified.
    """

    video: Video
    frames: Optional[List[int]] = None
    use_absolute_path: bool = False
    labels_path: Optional[str] = None
    video_idx: int = 0

    @property
    def path(self):
        if self.labels_path is not None:
            return self.labels_path
        if self.use_absolute_path:
            return os.path.abspath(self.video.filename)
        return self.video.filename

    @property
    def cli_args(self):
        arg_list = list()
        arg_list.extend(["--data_path", f"{self.path}"])
        if self.labels_path is not None:
            arg_list.extend(["--video_index", str(self.video_idx)])

        # TODO: better support for video params
        if (
            self.video.backend
            and hasattr(self.video.backend, "dataset")
            and self.video.backend.dataset
        ):
            arg_list.extend(("--video_dataset", self.video.backend.dataset))

        if (
            self.video.backend
            and hasattr(self.video.backend, "input_format")
            and self.video.backend.input_format
        ):
            arg_list.extend(("--video_input_format", self.video.backend.input_format))

        # -Y represents endpoint of [X, Y) range but inference cli expects
        # [X, Y-1] range (so add 1 since negative).
        frame_int_list = list(set([i + 1 if i < 0 else i for i in self.frames]))
        frame_int_list.sort(reverse=min(frame_int_list) < 0)  # Assumes len of 2 if neg.

        arg_list.extend(("--frames", ",".join(map(str, frame_int_list))))

        return arg_list

get_timestamp()

Return the date and time as a string.

Source code in sleap/gui/learning/runners.py
32
33
34
def get_timestamp() -> Text:
    """Return the date and time as a string."""
    return datetime.now().strftime("%y%m%d_%H%M%S")

kill_process(pid)

Force kill a running process and any child processes.

Parameters:

Name Type Description Default
proc

A process ID.

required
Source code in sleap/gui/learning/runners.py
65
66
67
68
69
70
71
72
73
74
def kill_process(pid: int):
    """Force kill a running process and any child processes.

    Args:
        proc: A process ID.
    """
    proc_ = psutil.Process(pid)
    for subproc_ in proc_.children(recursive=True):
        subproc_.kill()
    proc_.kill()

run_gui_inference(inference_task, items_for_inference, gui=True)

Run inference on specified frames using models from training_jobs.

Parameters:

Name Type Description Default
inference_task InferenceTask

Encapsulates information needed for running inference, such as labels dataset and models.

required
items_for_inference ItemsForInference

Encapsulates information about the videos (etc.) on which we're running inference.

required
gui bool

Whether to show gui windows and process gui events.

True

Returns:

Type Description
int

Number of new frames added to labels.

Source code in sleap/gui/learning/runners.py
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
def run_gui_inference(
    inference_task: InferenceTask,
    items_for_inference: ItemsForInference,
    gui: bool = True,
) -> int:
    """Run inference on specified frames using models from training_jobs.

    Args:
        inference_task: Encapsulates information needed for running inference,
            such as labels dataset and models.
        items_for_inference: Encapsulates information about the videos (etc.)
            on which we're running inference.
        gui: Whether to show gui windows and process gui events.

    Returns:
        Number of new frames added to labels.
    """

    if gui:
        progress = QtWidgets.QProgressDialog(
            "Initializing...",
            "Cancel",
            0,
            1,
        )
        progress.show()
        QtWidgets.QApplication.instance().processEvents()

    # Make callback to process events while running inference
    def waiting(
        n_processed: Optional[int] = None,
        n_total: Optional[int] = None,
        elapsed: Optional[float] = None,
        rate: Optional[float] = None,
        eta: Optional[float] = None,
        current_item: Optional[int] = None,
        total_items: Optional[int] = None,
        **kwargs,
    ) -> str:
        if gui:
            QtWidgets.QApplication.instance().processEvents()
            if n_total is not None:
                progress.setMaximum(n_total)
            if n_processed is not None:
                progress.setValue(n_processed)

            msg = "Predicting..."

            if n_processed is not None and n_total is not None:
                msg = f"<b>Predicted:</b> {n_processed:,}/{n_total:,}"

            # Show time elapsed?
            if rate is not None and eta is not None:
                eta_mins, eta_secs = divmod(eta, 60)
                if eta_mins > 60:
                    eta_hours, eta_mins = divmod(eta_mins, 60)
                    eta_str = f"{int(eta_hours)} hours, {int(eta_mins):02} mins"
                elif eta_mins > 0:
                    eta_str = f"{int(eta_mins)} mins, {int(eta_secs):02} secs"
                else:
                    eta_str = f"{int(eta_secs):02} secs"
                msg += f"<br><b>ETA:</b> {eta_str}"
                msg += f"<br><b>FPS:</b> {rate:.1f}"

            msg = msg.replace(" ", "&nbsp;")

            progress.setLabelText(msg)
            QtWidgets.QApplication.instance().processEvents()

            if progress.wasCanceled():
                return "cancel"

    for i, item_for_inference in enumerate(items_for_inference.items):

        def waiting_item(**kwargs):
            kwargs["current_item"] = i
            kwargs["total_items"] = len(items_for_inference.items)
            return waiting(**kwargs)

        # Run inference for desired frames in this video.
        predictions_path, ret = inference_task.predict_subprocess(
            item_for_inference,
            append_results=True,
            waiting_callback=waiting_item,
            gui=gui,
        )

        if ret == "canceled":
            return -1
        elif ret != "success":
            if gui:
                QtWidgets.QMessageBox(
                    text=(
                        "An error occcured during inference. Your command line "
                        "terminal may have more information about the error."
                    )
                ).exec_()
            return -1

    inference_task.merge_results()
    if gui:
        progress.close()
    return len(inference_task.results)

run_gui_training(labels_filename, labels, config_info_list, inference_params, gui=True)

Runs training for each training job.

Parameters:

Name Type Description Default
labels Labels

Labels object from which we'll get training data.

required
config_info_list List[ConfigFileInfo]

List of ConfigFileInfo with configs for training.

required
gui bool

Whether to show gui windows and process gui events.

True

Returns:

Type Description
Dict[Text, Text]

Dictionary, keys are head name, values are path to trained config.

Source code in sleap/gui/learning/runners.py
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
def run_gui_training(
    labels_filename: str,
    labels: Labels,
    config_info_list: List[ConfigFileInfo],
    inference_params: Dict[str, Any],
    gui: bool = True,
) -> Dict[Text, Text]:
    """
    Runs training for each training job.

    Args:
        labels: Labels object from which we'll get training data.
        config_info_list: List of ConfigFileInfo with configs for training.
        gui: Whether to show gui windows and process gui events.

    Returns:
        Dictionary, keys are head name, values are path to trained config.
    """

    trained_job_paths = dict()
    zmq_ports = None
    if gui:
        from sleap.gui.widgets.monitor import LossViewer
        from sleap.gui.widgets.imagedir import QtImageDirectoryWidget

        zmq_ports = dict()
        zmq_ports["controller_port"] = inference_params.get("controller_port", 9000)
        zmq_ports["publish_port"] = inference_params.get("publish_port", 9001)

        # Open training monitor window
        win = LossViewer(zmq_ports=zmq_ports)

        # Reassign the values in the inference parameters in case the ports were changed
        inference_params["controller_port"] = win.zmq_ports["controller_port"]
        inference_params["publish_port"] = win.zmq_ports["publish_port"]
        win.resize(600, 400)
        win.show()

    for config_info in config_info_list:
        if config_info.dont_retrain:
            if not config_info.has_trained_model:
                raise ValueError(
                    "Config is set to not retrain but no trained model found: "
                    f"{config_info.path}"
                )

            print(
                f"Using already trained model for {config_info.head_name}: "
                f"{config_info.path}"
            )

            trained_job_paths[config_info.head_name] = config_info.path

        else:
            job = config_info.config
            model_type = config_info.head_name

            # We'll pass along the list of paths we actually used for loading
            # the videos so that we don't have to rely on the paths currently
            # saved in the labels file for finding videos.
            video_path_list = [video.filename for video in labels.videos]

            # Update save dir and run name for job we're about to train
            # so we have access to them here (rather than letting
            # train_subprocess update them).
            # training.Trainer.set_run_name(job, labels_filename)
            job.trainer_config.ckpt_dir = os.path.join(
                os.path.dirname(labels_filename), "models"
            )
            base_run_name = f"{model_type}.n={len(labels.user_labeled_frames)}"
            setup_new_run_folder(
                job,
                base_run_name=base_run_name,
            )

            if gui:
                print("Resetting monitor window.")
                plateau_patience = job.trainer_config.early_stopping.patience
                plateau_min_delta = job.trainer_config.early_stopping.min_delta
                win.reset(
                    what=str(model_type),
                    plateau_patience=plateau_patience,
                    plateau_min_delta=plateau_min_delta,
                )
                win.setWindowTitle(f"Training Model - {str(model_type)}")
                win.set_message("Preparing to run training...")
                if job.trainer_config.visualize_preds_during_training:
                    viz_window = QtImageDirectoryWidget.make_training_vizualizer(
                        (
                            Path(job.trainer_config.ckpt_dir)
                            / job.trainer_config.run_name
                        ).as_posix()
                    )
                    viz_window.move(win.x() + win.width() + 20, win.y())
                    win.on_epoch.connect(viz_window.poll)

            print(f"Start training {str(model_type)}...")

            def waiting():
                if gui:
                    QtWidgets.QApplication.instance().processEvents()
                    if win.canceled:
                        return "cancel"

            # Run training
            trained_job_path, ret = train_subprocess(
                job_config=job,
                inference_params=inference_params,
                labels_filename=labels_filename,
                video_paths=video_path_list,
                waiting_callback=waiting,
            )

            if ret == "success":
                # get the path to the resulting TrainingJob file
                trained_job_paths[model_type] = trained_job_path
                print(f"Finished training {str(model_type)}.")
            elif ret == "canceled":
                if gui:
                    win.close()
                print("Deleting canceled run data:", trained_job_path)
                shutil.rmtree(trained_job_path, ignore_errors=True)
                trained_job_paths[model_type] = None
                break
            else:
                if gui:
                    win.close()
                    QtWidgets.QMessageBox(
                        text=f"An error occurred while training {str(model_type)}. "
                        "Your command line terminal may have more information about "
                        "the error."
                    ).exec_()
                trained_job_paths[model_type] = None

    if gui:
        # close training monitor window
        win.close()

    return trained_job_paths

run_learning_pipeline(labels_filename, labels, config_info_list, inference_params, items_for_inference)

Runs training (as needed) and inference.

Parameters:

Name Type Description Default
labels_filename str

Path to already saved current labels object.

required
labels Labels

The current labels object; results will be added to this.

required
config_info_list List[ConfigFileInfo]

List of ConfigFileInfo with configs for training and inference.

required
inference_params Dict[str, Any]

Parameters to pass to inference.

required
frames_to_predict

Dict that gives list of frame indices for each video.

required

Returns:

Type Description
int

Number of new frames added to labels.

Source code in sleap/gui/learning/runners.py
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
def run_learning_pipeline(
    labels_filename: str,
    labels: Labels,
    config_info_list: List[ConfigFileInfo],
    inference_params: Dict[str, Any],
    items_for_inference: ItemsForInference,
) -> int:
    """Runs training (as needed) and inference.

    Args:
        labels_filename: Path to already saved current labels object.
        labels: The current labels object; results will be added to this.
        config_info_list: List of ConfigFileInfo with configs for training
            and inference.
        inference_params: Parameters to pass to inference.
        frames_to_predict: Dict that gives list of frame indices for each video.

    Returns:
        Number of new frames added to labels.

    """

    if "movenet" in inference_params["_pipeline"]:
        trained_job_paths = [inference_params["_pipeline"]]

    else:
        # Train the TrainingJobs
        trained_job_paths = run_gui_training(
            labels_filename=labels_filename,
            labels=labels,
            config_info_list=config_info_list,
            inference_params=inference_params,
            gui=True,
        )

        # Check that all the models were trained
        if None in trained_job_paths.values():
            return -1

        trained_job_paths = list(trained_job_paths.values())

    inference_task = InferenceTask(
        labels=labels,
        labels_filename=labels_filename,
        trained_job_paths=trained_job_paths,
        inference_params=inference_params,
    )

    # Run the Predictor for suggested frames
    new_labeled_frame_count = run_gui_inference(inference_task, items_for_inference)

    return new_labeled_frame_count

setup_new_run_folder(config, base_run_name=None)

Create a new run folder from config.

Source code in sleap/gui/learning/runners.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def setup_new_run_folder(
    config: OmegaConf, base_run_name: Optional[Text] = None
) -> Text:
    """Create a new run folder from config."""
    run_path = None
    if config.trainer_config.save_ckpt:
        # Auto-generate run name suffix.
        run_name = get_timestamp()
        if isinstance(base_run_name, str):
            run_name = run_name + "." + base_run_name

        cfg_run_name = (
            config.trainer_config.run_name
            if config.trainer_config.run_name is not None
            else ""
        )
        cfg_run_name = cfg_run_name + "_" + run_name

        config.trainer_config.run_name = cfg_run_name

        # Build run path.
        run_path = (
            Path(config.trainer_config.ckpt_dir) / config.trainer_config.run_name
        ).as_posix()

    return run_path

train_subprocess(job_config, labels_filename, inference_params, video_paths=None, waiting_callback=None)

Runs training inside subprocess.

Source code in sleap/gui/learning/runners.py
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
def train_subprocess(
    job_config: OmegaConf,
    labels_filename: str,
    inference_params: Dict[str, Any],
    video_paths: Optional[List[Text]] = None,
    waiting_callback: Optional[Callable] = None,
):
    """Runs training inside subprocess."""
    run_path = (
        Path(job_config.trainer_config.ckpt_dir) / job_config.trainer_config.run_name
    ).as_posix()

    with tempfile.TemporaryDirectory() as temp_dir:
        # Write a temporary file of the TrainingJob so that we can respect
        # any changed made to the job attributes after it was loaded.
        from sleap_nn.config.training_job_config import verify_training_cfg

        # convert json to yaml (to sleap-nn config format)
        cfg_file_name = datetime.now().strftime("%y%m%d_%H%M%S") + "_config"
        filter_job_config = filter_cfg(deepcopy(job_config))
        cfg = verify_training_cfg(filter_job_config)
        cfg.data_config.train_labels_path = [labels_filename]

        cfg.trainer_config.ckpt_dir = Path(run_path).parent.as_posix()
        cfg.trainer_config.run_name = Path(run_path).name
        cfg.trainer_config.zmq.controller_port = inference_params["controller_port"]
        cfg.trainer_config.zmq.publish_port = inference_params["publish_port"]

        OmegaConf.save(cfg, (Path(temp_dir) / f"{cfg_file_name}.yaml").as_posix())

        # Build CLI arguments for training
        cli_args = [
            "sleap-nn",
            "train",
            "--config-name",
            f"{cfg_file_name}",
            "--config-dir",
            f"{temp_dir}",
        ]

        # Run training in a subprocess.
        print(cli_args)
        proc = subprocess.Popen(cli_args)

        # Wait till training is done, calling a callback if given.
        while proc.poll() is None:
            if waiting_callback is not None:
                ret = waiting_callback()
                if ret == "cancel":
                    print("Canceling training...")
                    kill_process(proc.pid)
                    print(f"Killed PID: {proc.pid}")
                    return run_path, "canceled"
            time.sleep(0.1)

        # Check return code.
        if proc.returncode == 0:
            ret = "success"
        else:
            ret = proc.returncode

    print("Run Path:", run_path)

    return run_path, ret

write_pipeline_files(output_dir, labels_filename, config_info_list, inference_params, items_for_inference)

Writes the config files and scripts for manually running pipeline.

Source code in sleap/gui/learning/runners.py
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
def write_pipeline_files(
    output_dir: str,
    labels_filename: str,
    config_info_list: List[ConfigFileInfo],
    inference_params: Dict[str, Any],
    items_for_inference: ItemsForInference,
):
    """Writes the config files and scripts for manually running pipeline."""

    # Use absolute path for all files that aren't contained in the output dir.
    labels_filename = os.path.abspath(labels_filename)

    # Preserve current working directory and change working directory to the
    # output directory, so we can set local paths relative to that.
    old_cwd = os.getcwd()
    os.chdir(output_dir)

    new_cfg_filenames = []
    train_script = "#!/bin/bash\n"

    # Add head type to save path suffix to prevent overwriting.
    for cfg_info in config_info_list:
        if not cfg_info.dont_retrain:
            # Update config.
            cfg_info.config.trainer_config.run_name += (
                OmegaConf.select(cfg_info.config, "trainer_config.run_name", default="")
                + cfg_info.head_name
            )

    training_jobs = []
    for cfg_info in config_info_list:
        if cfg_info.dont_retrain:
            # Use full absolute path to already trained model
            trained_path = os.path.normpath(os.path.join(old_cwd, cfg_info.path))
            new_cfg_filenames.append(trained_path)

        else:
            # We're training this model, so save config file...

            # First we want to set the run folder so that we know where to find
            # the model after it's trained.
            # We'll use local path to the output directory (cwd).
            # Note that setup_new_run_folder does things relative to cwd which
            # is the main reason we're setting it to the output directory rather
            # than just using normpath.
            # cfg_info.config.outputs.runs_folder = ""
            ckpt_path = setup_new_run_folder(cfg_info.config)
            # training.setup_new_run_folder(
            #     cfg_info.config.outputs,
            #     # base_run_name=f"{model_type}.n={len(labels.user_labeled_frames)}",
            #     base_run_name=cfg_info.head_name,
            # )

            # Now we set the filename for the training config file
            new_cfg_filename = f"{cfg_info.head_name}.yaml"

            # Save the config file (convert to yaml)
            from sleap_nn.config.training_job_config import verify_training_cfg

            # Save the config file
            cfg_info.config = filter_cfg(cfg_info.config)
            cfg = verify_training_cfg(cfg_info.config)
            cfg.data_config.train_labels_path = [os.path.basename(labels_filename)]
            OmegaConf.save(cfg, new_cfg_filename)

            # Keep track of the path where we'll find the trained model
            new_cfg_filenames.append(
                (
                    Path(cfg_info.config.trainer_config.ckpt_dir)
                    / cfg_info.config.trainer_config.run_name
                ).as_posix()
            )

            # Add a line to the script for training this model
            train_script += (
                f"sleap-nn train --config-name {new_cfg_filename} --config-dir {''} "
                f"trainer_config.ckpt_dir={Path(ckpt_path).parent.as_posix()} "
                f"trainer_config.run_name={Path(ckpt_path).name}"
                f"trainer_config.zmq.controller_port={cfg_info.config.trainer_config.zmq.controller_port}"
                f"trainer_config.zmq.publish_port={cfg_info.config.trainer_config.zmq.publish_port}"
                "\n"
            )

            # Setup job params
            training_jobs.append(
                {
                    "cfg": new_cfg_filename,
                    "run_path": (
                        Path(cfg_info.config.trainer_config.ckpt_dir)
                        / cfg_info.config.trainer_config.run_name
                    ).as_posix(),
                    "train_labels": os.path.basename(labels_filename),
                }
            )

    # Write the script to train the models which need to be trained
    with open(os.path.join(output_dir, "train-script.sh"), "w") as f:
        f.write(train_script)

    # Build the script for running inference
    inference_script = "#!/bin/bash\n"

    # Object with settings for inference
    inference_task = InferenceTask(
        labels_filename=labels_filename,
        trained_job_paths=new_cfg_filenames,
        inference_params=inference_params,
    )

    inference_jobs = []
    for item_for_inference in items_for_inference.items:
        if type(item_for_inference) == DatasetItemForInference:
            data_path = labels_filename
        else:
            data_path = item_for_inference.path

        # We want to save predictions in output dir so use local path
        prediction_output_path = f"{os.path.basename(data_path)}.predictions.slp"

        # Use absolute path to video
        item_for_inference.use_absolute_path = True

        # Get list of cli args
        cli_args, _ = inference_task.make_predict_cli_call(
            item_for_inference=item_for_inference,
            output_path=prediction_output_path,
        )
        # And join them into a single call to inference
        inference_script += " ".join(cli_args) + "\n"
        # Setup job params
        only_suggested_frames = False
        if type(item_for_inference) == DatasetItemForInference:
            only_suggested_frames = item_for_inference.frame_filter == "suggested"

        # TODO: support frame ranges, user-labeled frames
        tracking_args = {
            k: v for k, v in inference_params.items() if k.startswith("tracking.")
        }
        inference_jobs.append(
            {
                "data_path": os.path.basename(data_path),
                "models": [Path(p).as_posix() for p in new_cfg_filenames],
                "output_path": prediction_output_path,
                "type": (
                    "labels"
                    if type(item_for_inference) == DatasetItemForInference
                    else "video"
                ),
                "only_suggested_frames": only_suggested_frames,
                "tracking": tracking_args,
            }
        )

    # And write it
    with open(os.path.join(output_dir, "inference-script.sh"), "w") as f:
        f.write(inference_script)

    # Save jobs.yaml
    jobs = {"training": training_jobs, "inference": inference_jobs}
    with open(os.path.join(output_dir, "jobs.yaml"), "w") as f:
        yaml.dump(jobs, f)

    # Restore the working directory
    os.chdir(old_cwd)