feat: add YuNet detector option, multi-scale detection, and streamlined CLI

- Add YuNet face detector as alternative option (built into OpenCV)
- Add multi-scale detection (1.0x + 1.5x) to catch faces at different distances
- Add NMS to remove duplicate detections from multi-scale
- Move frame interval and clustering settings to advanced options
- Increase default blur padding from 25% to 40%
- Change default frame interval from 30 to 15
- Change default confidence threshold from 0.7 to 0.8
- Add limitations section to README (extreme angles, small faces, motion blur)
- Require scikit-learn>=1.3.0 for HDBSCAN support
This commit is contained in:
fiatcode 2026-03-01 01:54:27 +07:00
parent baf1899616
commit 236e0d2ff2
7 changed files with 553 additions and 27 deletions

View file

@ -51,3 +51,9 @@ The original proof-of-concept command-line interface is also still available for
```bash ```bash
uv run pyfaceblur-legacy detect --video input.mp4 --output ./output --interval 30 --confidence 0.7 uv run pyfaceblur-legacy detect --video input.mp4 --output ./output --interval 30 --confidence 0.7
``` ```
## Limitations
- **Extreme face angles:** Faces viewed from extreme angles (e.g., strong profile views, looking up/down) may not be detected or may be clustered as separate identities. For best results, use videos where faces are mostly front-facing or at moderate angles.
- **Small/distant faces:** Very small faces (below 50 pixels) may not be reliably detected or produce accurate embeddings for clustering.
- **Rapid motion blur:** Fast head movements causing motion blur can affect detection accuracy.

View file

@ -8,7 +8,7 @@ dependencies = [
"uniface", "uniface",
"numpy", "numpy",
"opencv-python", "opencv-python",
"scikit-learn", "scikit-learn>=1.3.0",
"rich", "rich",
"questionary", "questionary",
] ]

View file

@ -32,6 +32,7 @@ os.environ.setdefault(
from .cluster import cluster_faces from .cluster import cluster_faces
from .detect import FaceDetector from .detect import FaceDetector
from .detect_yunet import YuNetDetector
from .encode import encode_video, find_best_encoder from .encode import encode_video, find_best_encoder
from .video import extract_frames from .video import extract_frames
@ -75,9 +76,26 @@ def run() -> None:
video_path = Path(video_str).expanduser() video_path = Path(video_str).expanduser()
# Advanced settings
use_advanced = questionary.confirm(
"Configure advanced settings?",
default=False,
).ask()
if use_advanced is None:
return
# Defaults
interval = 15
min_cluster_size = 2
confidence_threshold = 0.8
min_face_size = 50
detector_type = "retinaface"
if use_advanced:
interval_str = questionary.text( interval_str = questionary.text(
"Frame interval for face detection (default: 30):", "Frame interval for face detection (default: 15):",
default="30", default="15",
validate=lambda text: ( validate=lambda text: (
text.isdigit() and int(text) > 0 or "Must be a positive integer" text.isdigit() and int(text) > 0 or "Must be a positive integer"
), ),
@ -87,6 +105,62 @@ def run() -> None:
return return
interval = int(interval_str) interval = int(interval_str)
min_cluster_str = questionary.text(
"Minimum faces to form a cluster (default: 2):",
default="2",
validate=lambda text: (
text.isdigit() and int(text) >= 2 or "Must be an integer >= 2"
),
).ask()
if not min_cluster_str:
return
min_cluster_size = int(min_cluster_str)
detector_type = questionary.select(
"Select face detector:",
choices=[
questionary.Choice("RetinaFace (Default)", value="retinaface"),
questionary.Choice(
"YuNet (Alternative - built into OpenCV)", value="yunet"
),
],
default="retinaface",
).ask()
if not detector_type:
return
confidence_str = questionary.text(
"Detection confidence threshold (0.0-1.0, default: 0.8):",
default="0.8",
validate=lambda text: (
text.replace(".", "", 1).isdigit()
and 0.0 <= float(text) <= 1.0
or "Must be a number between 0.0 and 1.0"
),
).ask()
if not confidence_str:
return
confidence_threshold = float(confidence_str)
min_face_str = questionary.text(
"Minimum face size in pixels (default: 50):",
default="50",
validate=lambda text: (
text.isdigit() and int(text) >= 10 or "Must be an integer >= 10"
),
).ask()
if not min_face_str:
return
min_face_size = int(min_face_str)
temp_dir = tempfile.mkdtemp(prefix="pyfaceblur_") temp_dir = tempfile.mkdtemp(prefix="pyfaceblur_")
try: try:
@ -117,7 +191,19 @@ def run() -> None:
task_detect = progress.add_task( task_detect = progress.add_task(
"[cyan]Detecting faces...", total=len(frames) "[cyan]Detecting faces...", total=len(frames)
) )
detector = FaceDetector()
# Create detector based on user choice
if detector_type == "yunet":
detector = YuNetDetector(
confidence_threshold=confidence_threshold,
min_face_size=min_face_size,
)
else:
detector = FaceDetector(
confidence_threshold=confidence_threshold,
min_face_size=min_face_size,
)
all_faces = [] all_faces = []
for i, frame in enumerate(frames): for i, frame in enumerate(frames):
@ -140,7 +226,7 @@ def run() -> None:
return return
task_cluster = progress.add_task("[cyan]Clustering faces...", total=None) task_cluster = progress.add_task("[cyan]Clustering faces...", total=None)
clusters = cluster_faces(all_faces) clusters = cluster_faces(all_faces, min_samples=min_cluster_size)
real_clusters = [c for c in clusters if c.id >= 0] real_clusters = [c for c in clusters if c.id >= 0]
progress.update( progress.update(
task_cluster, task_cluster,

View file

@ -13,7 +13,7 @@ def apply_blur(
bbox: Tuple[int, int, int, int], bbox: Tuple[int, int, int, int],
method: BlurMethod = "gaussian", method: BlurMethod = "gaussian",
strength: float = 5.0, strength: float = 5.0,
padding: float = 0.25, padding: float = 0.40,
) -> np.ndarray: ) -> np.ndarray:
"""Apply blur to a face region in an image. """Apply blur to a face region in an image.
@ -22,7 +22,7 @@ def apply_blur(
bbox: Face bounding box (x1, y1, x2, y2) bbox: Face bounding box (x1, y1, x2, y2)
method: Blur method name method: Blur method name
strength: Blur strength multiplier strength: Blur strength multiplier
padding: Percentage to expand the bounding box to prevent tracking lag exposure padding: Percentage to expand the bounding box (default: 0.40 = 40%)
Returns: Returns:
The modified image The modified image
@ -111,6 +111,7 @@ def get_bboxes_for_frame(
frame_index: int, frame_index: int,
keyframe_bboxes: Dict[int, List[Tuple[int, Tuple[int, int, int, int]]]], keyframe_bboxes: Dict[int, List[Tuple[int, Tuple[int, int, int, int]]]],
keyframe_indices: List[int], keyframe_indices: List[int],
extend_frames: int = 0,
) -> List[Tuple[int, Tuple[int, int, int, int]]]: ) -> List[Tuple[int, Tuple[int, int, int, int]]]:
"""Get bounding boxes for a frame by looking up or interpolating from keyframes. """Get bounding boxes for a frame by looking up or interpolating from keyframes.
@ -118,6 +119,7 @@ def get_bboxes_for_frame(
frame_index: The current frame number frame_index: The current frame number
keyframe_bboxes: Dict mapping keyframe index -> list of (cluster_id, bbox) keyframe_bboxes: Dict mapping keyframe index -> list of (cluster_id, bbox)
keyframe_indices: Sorted list of keyframe indices keyframe_indices: Sorted list of keyframe indices
extend_frames: Number of frames to extend blur before first and after last detection
Returns: Returns:
List of (cluster_id, bbox) for this frame List of (cluster_id, bbox) for this frame
@ -138,11 +140,14 @@ def get_bboxes_for_frame(
if ki > frame_index and next_idx is None: if ki > frame_index and next_idx is None:
next_idx = ki next_idx = ki
# Before first or after last keyframe # Before first keyframe - extend blur backward if within extend_frames
if prev_idx is None and next_idx is not None: if prev_idx is None and next_idx is not None:
return keyframe_bboxes[next_idx] return keyframe_bboxes[next_idx]
# After last keyframe - extend blur forward if within extend_frames
if next_idx is None and prev_idx is not None: if next_idx is None and prev_idx is not None:
return keyframe_bboxes[prev_idx] return keyframe_bboxes[prev_idx]
if prev_idx is None or next_idx is None: if prev_idx is None or next_idx is None:
return [] return []

View file

@ -4,7 +4,7 @@ import cv2
import numpy as np import numpy as np
from dataclasses import dataclass, field from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from typing import List, Tuple from typing import List, Tuple, Optional
from uniface.detection import RetinaFace from uniface.detection import RetinaFace
from uniface.recognition import ArcFace from uniface.recognition import ArcFace
@ -24,14 +24,97 @@ class FaceData:
class FaceDetector: class FaceDetector:
"""Face detector using RetinaFace + ArcFace via UniFace.""" """Face detector using RetinaFace + ArcFace via UniFace.
Supports multi-scale detection to catch faces at different distances,
and filters out low-quality detections based on face size.
Key design: Detection runs at multiple scales, but embedding extraction
ALWAYS uses the original image to ensure consistent embeddings for clustering.
"""
def __init__(
self,
confidence_threshold: float = 0.8,
min_face_size: int = 50,
scales: Optional[List[float]] = None,
):
"""Initialize the face detector.
Args:
confidence_threshold: Minimum confidence to accept a detection (default: 0.8)
min_face_size: Minimum face width/height in pixels for reliable embeddings (default: 50)
scales: List of image scales to run detection on (default: [1.0, 1.5])
- 1.0: Normal scale for regular faces
- 1.5: Upscaled to catch small/distant faces
"""
self.confidence_threshold = confidence_threshold
self.min_face_size = min_face_size
# Simplified scales: 1.0 (normal) + 1.5 (catch small faces)
# Removed 0.5x as it rarely helps and can cause issues
self.scales = scales or [1.0, 1.5]
def __init__(self, confidence_threshold: float = 0.7):
self.detector = RetinaFace(confidence_threshold=confidence_threshold) self.detector = RetinaFace(confidence_threshold=confidence_threshold)
self.recognizer = ArcFace() self.recognizer = ArcFace()
def _nms_boxes(
self, boxes: List[Tuple], scores: List[float], iou_threshold: float = 0.4
) -> List[int]:
"""Non-maximum suppression to remove duplicate detections from multi-scale.
Args:
boxes: List of (x1, y1, x2, y2) bounding boxes
scores: Confidence scores for each box
iou_threshold: IOU threshold for suppression (lower = more aggressive)
Returns:
List of indices to keep
"""
if not boxes:
return []
boxes_arr = np.array(boxes, dtype=np.float32)
scores_arr = np.array(scores, dtype=np.float32)
x1, y1, x2, y2 = (
boxes_arr[:, 0],
boxes_arr[:, 1],
boxes_arr[:, 2],
boxes_arr[:, 3],
)
areas = (x2 - x1) * (y2 - y1)
order = scores_arr.argsort()[::-1]
keep = []
while order.size > 0:
i = order[0]
keep.append(i)
if order.size == 1:
break
xx1 = np.maximum(x1[i], x1[order[1:]])
yy1 = np.maximum(y1[i], y1[order[1:]])
xx2 = np.minimum(x2[i], x2[order[1:]])
yy2 = np.minimum(y2[i], y2[order[1:]])
w = np.maximum(0, xx2 - xx1)
h = np.maximum(0, yy2 - yy1)
inter = w * h
iou = inter / (areas[i] + areas[order[1:]] - inter)
inds = np.where(iou <= iou_threshold)[0]
order = order[inds + 1]
return keep
def detect_faces(self, frame_path: Path, frame_index: int) -> List[FaceData]: def detect_faces(self, frame_path: Path, frame_index: int) -> List[FaceData]:
"""Detect faces in a frame and generate embeddings. """Detect faces in a frame using multi-scale detection and generate embeddings.
Runs detection at multiple image scales to catch faces at different distances,
then applies NMS to remove duplicates. Embeddings are ALWAYS extracted from
the original image to ensure consistency for clustering.
Args: Args:
frame_path: Path to the frame image frame_path: Path to the frame image
@ -44,20 +127,87 @@ class FaceDetector:
if image is None: if image is None:
raise ValueError(f"Could not read image: {frame_path}") raise ValueError(f"Could not read image: {frame_path}")
detections = self.detector.detect(image) h, w = image.shape[:2]
# Collect detections from all scales
all_detections = [] # (bbox, confidence, landmarks_on_original)
for scale in self.scales:
if scale == 1.0:
scaled_image = image
else:
new_w, new_h = int(w * scale), int(h * scale)
if new_w < 100 or new_h < 100:
continue # Skip if scaled image is too small
scaled_image = cv2.resize(image, (new_w, new_h))
detections = self.detector.detect(scaled_image)
for det in detections:
# Scale bbox and landmarks back to original image coordinates
if scale != 1.0:
x1, y1, x2, y2 = det.bbox
bbox = (
int(x1 / scale),
int(y1 / scale),
int(x2 / scale),
int(y2 / scale),
)
landmarks = det.landmarks / scale
else:
x1, y1, x2, y2 = det.bbox
bbox = (int(x1), int(y1), int(x2), int(y2))
landmarks = det.landmarks.copy()
all_detections.append((bbox, det.confidence, landmarks))
if not all_detections:
return []
# Apply NMS to remove duplicates from multi-scale detection
# Using lower IOU threshold (0.4) to be more aggressive at removing duplicates
boxes = [d[0] for d in all_detections]
scores = [d[1] for d in all_detections]
keep_indices = self._nms_boxes(boxes, scores, iou_threshold=0.4)
# Filter and generate embeddings
faces = [] faces = []
for i, det in enumerate(detections): face_idx = 0
bbox = tuple(int(v) for v in det.bbox) # (x1, y1, x2, y2)
confidence = det.confidence
landmarks = det.landmarks
for idx in keep_indices:
bbox_tuple, confidence, landmarks = all_detections[idx]
x1, y1, x2, y2 = bbox_tuple
# Filter by minimum face size for reliable embeddings
face_w = x2 - x1
face_h = y2 - y1
if face_w < self.min_face_size or face_h < self.min_face_size:
continue
# Clamp bbox to image boundaries
x1 = max(0, min(x1, w - 1))
y1 = max(0, min(y1, h - 1))
x2 = max(0, min(x2, w))
y2 = max(0, min(y2, h))
bbox: Tuple[int, int, int, int] = (x1, y1, x2, y2)
# Clamp landmarks to image boundaries
landmarks = landmarks.copy()
landmarks[:, 0] = np.clip(landmarks[:, 0], 0, w - 1)
landmarks[:, 1] = np.clip(landmarks[:, 1], 0, h - 1)
# IMPORTANT: Always extract embedding from ORIGINAL image
# This ensures consistent embeddings regardless of detection scale
try:
embedding = self.recognizer.get_normalized_embedding(image, landmarks) embedding = self.recognizer.get_normalized_embedding(image, landmarks)
embedding = embedding.flatten() embedding = embedding.flatten()
except Exception:
# Skip faces where embedding extraction fails
continue
faces.append( faces.append(
FaceData( FaceData(
id=frame_index * 100 + i, id=frame_index * 1000 + face_idx,
frame_path=frame_path, frame_path=frame_path,
frame_index=frame_index, frame_index=frame_index,
bbox=bbox, bbox=bbox,
@ -66,6 +216,7 @@ class FaceDetector:
landmarks=landmarks, landmarks=landmarks,
) )
) )
face_idx += 1
return faces return faces

View file

@ -0,0 +1,273 @@
"""Face detection module using YuNet (OpenCV built-in) + ArcFace.
YuNet is a lightweight face detector built into OpenCV 4.5.4+.
It has good accuracy with fewer false positives than some other detectors.
"""
import cv2
import numpy as np
from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Tuple, Optional
from uniface.recognition import ArcFace
from .detect import FaceData
class YuNetDetector:
"""Face detector using YuNet (OpenCV) + ArcFace for embeddings.
YuNet is a lightweight CNN-based face detector that provides:
- Good accuracy with fewer false positives
- 5-point facial landmarks for alignment
- Built into OpenCV, no additional dependencies
Supports multi-scale detection to catch faces at different distances.
"""
# Default model path relative to package
DEFAULT_MODEL = "models/face_detection_yunet_2023mar.onnx"
def __init__(
self,
confidence_threshold: float = 0.8,
min_face_size: int = 50,
scales: Optional[List[float]] = None,
model_path: Optional[str] = None,
):
"""Initialize the YuNet face detector.
Args:
confidence_threshold: Minimum confidence to accept a detection (default: 0.8)
min_face_size: Minimum face width/height in pixels (default: 50)
scales: List of image scales for multi-scale detection (default: [1.0, 1.5])
model_path: Path to YuNet ONNX model file (default: auto-detect)
"""
self.confidence_threshold = confidence_threshold
self.min_face_size = min_face_size
self.scales = scales or [1.0, 1.5]
# Find model path
if model_path is None:
# Try relative to current working directory first
model_path = self.DEFAULT_MODEL
if not Path(model_path).exists():
# Try relative to this file
pkg_dir = Path(__file__).parent.parent.parent
model_path = str(pkg_dir / self.DEFAULT_MODEL)
if not Path(model_path).exists():
raise FileNotFoundError(
f"YuNet model not found at {model_path}. "
"Please download from: https://github.com/opencv/opencv_zoo/tree/main/models/face_detection_yunet"
)
self.model_path = model_path
# YuNet detector will be created per-image since input size must match
self._detector = None
self._detector_size = None
# ArcFace for embeddings
self.recognizer = ArcFace()
def _get_detector(self, width: int, height: int) -> cv2.FaceDetectorYN:
"""Get or create YuNet detector for given image size."""
size = (width, height)
if self._detector is None or self._detector_size != size:
self._detector = cv2.FaceDetectorYN.create(
self.model_path,
"", # config (not needed for ONNX)
size,
self.confidence_threshold,
0.3, # NMS threshold
5000, # top_k
)
self._detector_size = size
return self._detector
def _nms_boxes(
self, boxes: List[Tuple], scores: List[float], iou_threshold: float = 0.4
) -> List[int]:
"""Non-maximum suppression to remove duplicate detections.
Args:
boxes: List of (x1, y1, x2, y2) bounding boxes
scores: Confidence scores for each box
iou_threshold: IOU threshold for suppression
Returns:
List of indices to keep
"""
if not boxes:
return []
boxes_arr = np.array(boxes, dtype=np.float32)
scores_arr = np.array(scores, dtype=np.float32)
x1, y1, x2, y2 = (
boxes_arr[:, 0],
boxes_arr[:, 1],
boxes_arr[:, 2],
boxes_arr[:, 3],
)
areas = (x2 - x1) * (y2 - y1)
order = scores_arr.argsort()[::-1]
keep = []
while order.size > 0:
i = order[0]
keep.append(i)
if order.size == 1:
break
xx1 = np.maximum(x1[i], x1[order[1:]])
yy1 = np.maximum(y1[i], y1[order[1:]])
xx2 = np.minimum(x2[i], x2[order[1:]])
yy2 = np.minimum(y2[i], y2[order[1:]])
w = np.maximum(0, xx2 - xx1)
h = np.maximum(0, yy2 - yy1)
inter = w * h
iou = inter / (areas[i] + areas[order[1:]] - inter)
inds = np.where(iou <= iou_threshold)[0]
order = order[inds + 1]
return keep
def _convert_yunet_landmarks(self, yunet_landmarks: np.ndarray) -> np.ndarray:
"""Convert YuNet landmarks to ArcFace order.
YuNet order: right_eye, left_eye, nose, right_mouth, left_mouth
ArcFace order: left_eye, right_eye, nose, mouth_left, mouth_right
Args:
yunet_landmarks: (5, 2) array in YuNet order
Returns:
(5, 2) array in ArcFace order
"""
# Reorder: [1, 0, 2, 4, 3]
return yunet_landmarks[[1, 0, 2, 4, 3], :]
def detect_faces(self, frame_path: Path, frame_index: int) -> List[FaceData]:
"""Detect faces in a frame using multi-scale YuNet detection.
Args:
frame_path: Path to the frame image
frame_index: Index of the frame in the video
Returns:
List of FaceData objects with bboxes, embeddings, and confidence
"""
image = cv2.imread(str(frame_path))
if image is None:
raise ValueError(f"Could not read image: {frame_path}")
h, w = image.shape[:2]
# Collect detections from all scales
all_detections = [] # (bbox_xyxy, confidence, landmarks_arcface_order)
for scale in self.scales:
if scale == 1.0:
scaled_image = image
scaled_w, scaled_h = w, h
else:
scaled_w, scaled_h = int(w * scale), int(h * scale)
if scaled_w < 100 or scaled_h < 100:
continue
scaled_image = cv2.resize(image, (scaled_w, scaled_h))
# Get detector for this size
detector = self._get_detector(scaled_w, scaled_h)
_, faces = detector.detect(scaled_image)
if faces is None:
continue
for face in faces:
# YuNet output: [x, y, w, h, landmarks(10), score]
x, y, fw, fh = face[:4]
score = face[14]
yunet_landmarks = face[4:14].reshape(5, 2)
# Scale back to original coordinates
if scale != 1.0:
x, y, fw, fh = x / scale, y / scale, fw / scale, fh / scale
yunet_landmarks = yunet_landmarks / scale
# Convert to (x1, y1, x2, y2) format
x1, y1 = int(x), int(y)
x2, y2 = int(x + fw), int(y + fh)
bbox = (x1, y1, x2, y2)
# Convert landmarks to ArcFace order
arcface_landmarks = self._convert_yunet_landmarks(yunet_landmarks)
all_detections.append((bbox, float(score), arcface_landmarks))
if not all_detections:
return []
# Apply NMS to remove duplicates from multi-scale detection
boxes = [d[0] for d in all_detections]
scores = [d[1] for d in all_detections]
keep_indices = self._nms_boxes(boxes, scores, iou_threshold=0.4)
# Filter and generate embeddings
faces = []
face_idx = 0
for idx in keep_indices:
bbox_tuple, confidence, landmarks = all_detections[idx]
x1, y1, x2, y2 = bbox_tuple
# Filter by minimum face size
face_w = x2 - x1
face_h = y2 - y1
if face_w < self.min_face_size or face_h < self.min_face_size:
continue
# Clamp bbox to image boundaries
x1 = max(0, min(x1, w - 1))
y1 = max(0, min(y1, h - 1))
x2 = max(0, min(x2, w))
y2 = max(0, min(y2, h))
bbox: Tuple[int, int, int, int] = (x1, y1, x2, y2)
# Clamp landmarks to image boundaries
landmarks = landmarks.copy()
landmarks[:, 0] = np.clip(landmarks[:, 0], 0, w - 1)
landmarks[:, 1] = np.clip(landmarks[:, 1], 0, h - 1)
# Extract embedding from original image using ArcFace
try:
embedding = self.recognizer.get_normalized_embedding(image, landmarks)
embedding = embedding.flatten()
except Exception:
# Skip faces where embedding extraction fails
continue
faces.append(
FaceData(
id=frame_index * 1000 + face_idx,
frame_path=frame_path,
frame_index=frame_index,
bbox=bbox,
embedding=embedding,
confidence=confidence,
landmarks=landmarks,
)
)
face_idx += 1
return faces
def close(self):
"""Release resources."""
self._detector = None

7
uv.lock generated
View file

@ -1,6 +1,11 @@
version = 1 version = 1
revision = 3 revision = 3
requires-python = ">=3.11" requires-python = ">=3.11"
resolution-markers = [
"python_full_version >= '3.13'",
"python_full_version == '3.12.*'",
"python_full_version < '3.12'",
]
[[package]] [[package]]
name = "certifi" name = "certifi"
@ -460,7 +465,7 @@ requires-dist = [
{ name = "opencv-python" }, { name = "opencv-python" },
{ name = "questionary" }, { name = "questionary" },
{ name = "rich" }, { name = "rich" },
{ name = "scikit-learn" }, { name = "scikit-learn", specifier = ">=1.3.0" },
{ name = "uniface" }, { name = "uniface" },
] ]