feat: add processing screen with background worker and progress tracking

This commit is contained in:
fiatcode 2026-02-27 22:42:12 +07:00
parent 1a06093f25
commit d779432237

View file

@ -13,7 +13,7 @@ from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.containers import Center, Middle, Vertical
from textual.screen import Screen
from textual.widgets import Button, Footer, Input, Label, Static
from textual.widgets import Button, Footer, Input, Label, ProgressBar, Static
from textual_autocomplete import PathAutoComplete
@ -131,17 +131,191 @@ class WelcomeScreen(Screen):
self.app.push_screen(ProcessingScreen(video_path=path, interval=interval))
# Forward declaration — ProcessingScreen will be added in Task 5
class ProcessingScreen(Screen):
"""Placeholder — replaced in Task 5."""
"""Processing screen — extract, detect, cluster with progress."""
DEFAULT_CSS = """
ProcessingScreen {
align: center middle;
}
#processing-container {
width: 60;
height: auto;
border: round $accent;
padding: 1 2;
}
#processing-title {
text-align: center;
text-style: bold;
margin-bottom: 1;
}
#phase-label {
margin-bottom: 0;
}
#status-label {
color: $text-muted;
margin-top: 0;
}
#progress-bar {
margin: 1 0;
}
"""
def __init__(self, video_path: Path, interval: int) -> None:
super().__init__()
self.video_path = video_path
self.interval = interval
self.frames = []
self.all_faces = []
self.clusters = []
def compose(self) -> ComposeResult:
yield Label(f"Processing {self.video_path}...")
with Center():
with Middle():
with Vertical(id="processing-container"):
yield Static("PyFaceBlur", id="processing-title")
yield Label("[1/3] Preparing...", id="phase-label")
yield ProgressBar(total=100, id="progress-bar")
yield Label("", id="status-label")
def on_mount(self) -> None:
self.run_worker(self._process(), thread=True)
def _update_ui(self, phase: str, status: str, progress: float) -> None:
"""Thread-safe UI update."""
self.call_from_thread(self._do_update_ui, phase, status, progress)
def _do_update_ui(self, phase: str, status: str, progress: float) -> None:
self.query_one("#phase-label", Label).update(phase)
self.query_one("#status-label", Label).update(status)
bar = self.query_one("#progress-bar", ProgressBar)
bar.update(progress=progress)
def _process(self) -> None:
"""Run the full detection pipeline in a background thread."""
from .video import extract_frames
from .detect import FaceDetector
from .cluster import cluster_faces
# Phase 1: Extract frames
self._update_ui("[1/3] Extracting frames...", "Starting FFmpeg...", 0)
import tempfile
self._temp_dir = tempfile.mkdtemp(prefix="pyfaceblur_")
frames_dir = str(Path(self._temp_dir) / "frames")
self.frames = extract_frames(
str(self.video_path),
frames_dir,
self.interval,
)
self._update_ui(
"[1/3] Extracting frames...",
f"Extracted {len(self.frames)} frames",
33,
)
if not self.frames:
self._update_ui("[1/3] Error", "No frames extracted from video.", 0)
return
# Phase 2: Detect faces
self._update_ui("[2/3] Detecting faces...", "Initializing detector...", 33)
detector = FaceDetector()
self.all_faces = []
for i, frame in enumerate(self.frames):
try:
faces = detector.detect_faces(frame.path, frame.index)
self.all_faces.extend(faces)
except Exception:
pass
progress = 33 + (i + 1) / len(self.frames) * 34
self._update_ui(
"[2/3] Detecting faces...",
f"Frame {i + 1}/{len(self.frames)}{len(self.all_faces)} faces found",
progress,
)
detector.close()
if not self.all_faces:
self._update_ui("[2/3] Error", "No faces detected in video.", 67)
return
# Phase 3: Cluster
self._update_ui("[3/3] Clustering faces...", "Running DBSCAN...", 67)
self.clusters = cluster_faces(self.all_faces)
self._update_ui(
"[3/3] Clustering complete",
f"Found {len([c for c in self.clusters if c.id >= 0])} people",
100,
)
# Write face samples and transition to selection screen
face_samples = self._write_face_samples()
self.call_from_thread(
self.app.push_screen,
FaceSelectionScreen(
video_path=self.video_path,
interval=self.interval,
clusters=self.clusters,
all_faces=self.all_faces,
face_samples=face_samples,
temp_dir=self._temp_dir,
),
)
def _write_face_samples(self) -> dict:
"""Write one representative face crop per cluster. Returns {cluster_id: path}."""
import cv2
samples_dir = Path(self._temp_dir) / "face_samples"
samples_dir.mkdir(exist_ok=True)
face_samples = {}
for cluster in self.clusters:
if cluster.id < 0:
continue
# Pick the face with highest confidence as representative
best_face = max(cluster.faces, key=lambda f: f.confidence)
image = cv2.imread(str(best_face.frame_path))
if image is None:
continue
x1, y1, x2, y2 = best_face.bbox
crop = image[y1:y2, x1:x2]
if crop.size == 0:
continue
sample_path = samples_dir / f"person_{cluster.id:02d}.jpg"
cv2.imwrite(str(sample_path), crop)
face_samples[cluster.id] = sample_path
return face_samples
# Forward declaration — FaceSelectionScreen will be added in Task 6
class FaceSelectionScreen(Screen):
"""Placeholder — replaced in Task 6."""
def __init__(
self,
video_path: Path,
interval: int,
clusters: list,
all_faces: list,
face_samples: dict,
temp_dir: str,
) -> None:
super().__init__()
self.video_path = video_path
def compose(self) -> ComposeResult:
yield Label(f"Face Selection for {self.video_path}...")
class PyFaceBlurApp(App):