Simple, clean worker thread implementation for video frame loading.
This avoids the complex signal/slot system that was causing QBasicTimer errors.
Classes:
| Name |
Description |
FrameLoaderThread |
Simple thread that loads frames using a queue-based approach.
|
Bases: QThread
Simple thread that loads frames using a queue-based approach.
This avoids complex signal/slot connections that cause threading issues.
Methods:
| Name |
Description |
request_frame |
Request a frame to be loaded (called from main thread).
|
run |
Main thread loop - processes frame requests from the queue.
|
stop |
|
Source code in sleap/gui/widgets/video_worker.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171 | class FrameLoaderThread(QThread):
"""
Simple thread that loads frames using a queue-based approach.
This avoids complex signal/slot connections that cause threading issues.
"""
# Signal emitted when a frame is ready
frameReady = Signal(int, QImage) # (frame_idx, qimage)
def __init__(self):
super().__init__()
self.request_queue = queue.Queue()
self.stop_flag = threading.Event()
self.current_video = None
self.local_video_copy = None
# Performance tracking
self._frame_load_times = deque(maxlen=100)
self._dropped_frames = 0
# Debug mode flag for logging
self.debug_mode = False
def set_debug_mode(self, value: bool):
self.debug_mode = value
def run(self):
"""Main thread loop - processes frame requests from the queue."""
while not self.stop_flag.is_set():
try:
# Wait for a request with timeout
video, frame_idx = self.request_queue.get(timeout=0.01)
if self.debug_mode:
print(f"[THREAD] Got frame request: {frame_idx}")
# Collect all pending requests to find the latest one (LIFO)
pending_requests = [(video, frame_idx)] # Start with current request
while not self.request_queue.empty():
try:
pending_video, pending_idx = self.request_queue.get_nowait()
pending_requests.append((pending_video, pending_idx))
self._dropped_frames += 1
if self.debug_mode:
print(f"[THREAD] Found pending request: {pending_idx}")
except queue.Empty:
break
# Process only the latest (most recent) request
latest_video, latest_frame_idx = pending_requests[-1]
if self.debug_mode and len(pending_requests) > 1:
dropped_count = len(pending_requests) - 1
print(
f"[THREAD] Processing latest frame {latest_frame_idx}, "
f"dropped {dropped_count} older requests"
)
# Process the frame
self._process_frame(latest_video, latest_frame_idx)
except queue.Empty:
# No requests, continue waiting
continue
except Exception as e:
print(f"[THREAD] Error in worker loop: {e}")
pass # Thread stopped
def _process_frame(self, video, frame_idx: int):
"""Load and emit a frame."""
try:
start_time = time.time()
if self.debug_mode:
print(f"[THREAD] Loading frame {frame_idx}")
# Load the frame
frame = video[frame_idx]
if frame is not None:
# Convert to QImage
qimage = ndarray_to_qimage(frame, copy=True)
# Emit the result
self.frameReady.emit(frame_idx, qimage)
if self.debug_mode:
print(f"[THREAD] Emitted frame {frame_idx}")
# Track performance
load_time = time.time() - start_time
self._frame_load_times.append(load_time)
# Log performance stats periodically
if self.debug_mode and len(self._frame_load_times) == 100:
avg_time = sum(self._frame_load_times) / 100
dropped = self._dropped_frames
print(f"[PERF] Avg load: {avg_time:.3f}s, Dropped: {dropped}")
else:
if self.debug_mode:
print(f"[THREAD] Frame {frame_idx} was None")
except Exception as e:
print(f"[THREAD] Error processing frame {frame_idx}: {e}")
def request_frame(self, video: sio.Video, frame_idx: int):
"""Request a frame to be loaded (called from main thread)."""
if self.debug_mode:
print(f"[MAIN] Requesting frame {frame_idx}")
# Update the current video if a new one was provided
if self.current_video is not video:
if self.debug_mode:
print("[MAIN] Switching to new video")
# Retain original state
reopen = video.is_open
open_backend = video.open_backend
# Close the backend
video.close()
video.open_backend = False
# Update the reference
self.current_video = video
# Make a thread-local copy
self.local_video_copy = deepcopy(video)
# Set it to open the backend on first read
self.local_video_copy.open_backend = True
# Restore the original state in the incoming video
self.current_video.open_backend = open_backend
if reopen:
self.current_video.open()
self.request_queue.put((self.local_video_copy, frame_idx))
if self.debug_mode:
queue_size = self.request_queue.qsize()
print(f"[MAIN] Frame {frame_idx} added to queue (size: {queue_size})")
def stop(self):
"""Stop the worker thread."""
self.stop_flag.set()
self.quit()
if not self.wait(2000):
self.terminate()
self.wait()
|
Request a frame to be loaded (called from main thread).
Source code in sleap/gui/widgets/video_worker.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163 | def request_frame(self, video: sio.Video, frame_idx: int):
"""Request a frame to be loaded (called from main thread)."""
if self.debug_mode:
print(f"[MAIN] Requesting frame {frame_idx}")
# Update the current video if a new one was provided
if self.current_video is not video:
if self.debug_mode:
print("[MAIN] Switching to new video")
# Retain original state
reopen = video.is_open
open_backend = video.open_backend
# Close the backend
video.close()
video.open_backend = False
# Update the reference
self.current_video = video
# Make a thread-local copy
self.local_video_copy = deepcopy(video)
# Set it to open the backend on first read
self.local_video_copy.open_backend = True
# Restore the original state in the incoming video
self.current_video.open_backend = open_backend
if reopen:
self.current_video.open()
self.request_queue.put((self.local_video_copy, frame_idx))
if self.debug_mode:
queue_size = self.request_queue.qsize()
print(f"[MAIN] Frame {frame_idx} added to queue (size: {queue_size})")
|
Main thread loop - processes frame requests from the queue.
Source code in sleap/gui/widgets/video_worker.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88 | def run(self):
"""Main thread loop - processes frame requests from the queue."""
while not self.stop_flag.is_set():
try:
# Wait for a request with timeout
video, frame_idx = self.request_queue.get(timeout=0.01)
if self.debug_mode:
print(f"[THREAD] Got frame request: {frame_idx}")
# Collect all pending requests to find the latest one (LIFO)
pending_requests = [(video, frame_idx)] # Start with current request
while not self.request_queue.empty():
try:
pending_video, pending_idx = self.request_queue.get_nowait()
pending_requests.append((pending_video, pending_idx))
self._dropped_frames += 1
if self.debug_mode:
print(f"[THREAD] Found pending request: {pending_idx}")
except queue.Empty:
break
# Process only the latest (most recent) request
latest_video, latest_frame_idx = pending_requests[-1]
if self.debug_mode and len(pending_requests) > 1:
dropped_count = len(pending_requests) - 1
print(
f"[THREAD] Processing latest frame {latest_frame_idx}, "
f"dropped {dropped_count} older requests"
)
# Process the frame
self._process_frame(latest_video, latest_frame_idx)
except queue.Empty:
# No requests, continue waiting
continue
except Exception as e:
print(f"[THREAD] Error in worker loop: {e}")
pass # Thread stopped
|
Stop the worker thread.
Source code in sleap/gui/widgets/video_worker.py
165
166
167
168
169
170
171 | def stop(self):
"""Stop the worker thread."""
self.stop_flag.set()
self.quit()
if not self.wait(2000):
self.terminate()
self.wait()
|