diff --git a/README.md b/README.md index e15d0d2..eab27df 100644 --- a/README.md +++ b/README.md @@ -51,3 +51,9 @@ The original proof-of-concept command-line interface is also still available for ```bash uv run pyfaceblur-legacy detect --video input.mp4 --output ./output --interval 30 --confidence 0.7 ``` + +## Limitations + +- **Extreme face angles:** Faces viewed from extreme angles (e.g., strong profile views, looking up/down) may not be detected or may be clustered as separate identities. For best results, use videos where faces are mostly front-facing or at moderate angles. +- **Small/distant faces:** Very small faces (below 50 pixels) may not be reliably detected or produce accurate embeddings for clustering. +- **Rapid motion blur:** Fast head movements causing motion blur can affect detection accuracy. diff --git a/pyproject.toml b/pyproject.toml index 1bcecb4..ed0e379 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,7 +8,7 @@ dependencies = [ "uniface", "numpy", "opencv-python", - "scikit-learn", + "scikit-learn>=1.3.0", "rich", "questionary", ] diff --git a/src/faceblur/app.py b/src/faceblur/app.py index e6371cf..c47cae4 100644 --- a/src/faceblur/app.py +++ b/src/faceblur/app.py @@ -32,6 +32,7 @@ os.environ.setdefault( from .cluster import cluster_faces from .detect import FaceDetector +from .detect_yunet import YuNetDetector from .encode import encode_video, find_best_encoder from .video import extract_frames @@ -75,18 +76,91 @@ def run() -> None: video_path = Path(video_str).expanduser() - interval_str = questionary.text( - "Frame interval for face detection (default: 30):", - default="30", - validate=lambda text: ( - text.isdigit() and int(text) > 0 or "Must be a positive integer" - ), + # Advanced settings + use_advanced = questionary.confirm( + "Configure advanced settings?", + default=False, ).ask() - if not interval_str: + if use_advanced is None: return - interval = int(interval_str) + # Defaults + interval = 15 + min_cluster_size = 2 + confidence_threshold = 0.8 + min_face_size = 50 + detector_type = "retinaface" + + if use_advanced: + interval_str = questionary.text( + "Frame interval for face detection (default: 15):", + default="15", + validate=lambda text: ( + text.isdigit() and int(text) > 0 or "Must be a positive integer" + ), + ).ask() + + if not interval_str: + return + + interval = int(interval_str) + + min_cluster_str = questionary.text( + "Minimum faces to form a cluster (default: 2):", + default="2", + validate=lambda text: ( + text.isdigit() and int(text) >= 2 or "Must be an integer >= 2" + ), + ).ask() + + if not min_cluster_str: + return + + min_cluster_size = int(min_cluster_str) + + detector_type = questionary.select( + "Select face detector:", + choices=[ + questionary.Choice("RetinaFace (Default)", value="retinaface"), + questionary.Choice( + "YuNet (Alternative - built into OpenCV)", value="yunet" + ), + ], + default="retinaface", + ).ask() + + if not detector_type: + return + + confidence_str = questionary.text( + "Detection confidence threshold (0.0-1.0, default: 0.8):", + default="0.8", + validate=lambda text: ( + text.replace(".", "", 1).isdigit() + and 0.0 <= float(text) <= 1.0 + or "Must be a number between 0.0 and 1.0" + ), + ).ask() + + if not confidence_str: + return + + confidence_threshold = float(confidence_str) + + min_face_str = questionary.text( + "Minimum face size in pixels (default: 50):", + default="50", + validate=lambda text: ( + text.isdigit() and int(text) >= 10 or "Must be an integer >= 10" + ), + ).ask() + + if not min_face_str: + return + + min_face_size = int(min_face_str) + temp_dir = tempfile.mkdtemp(prefix="pyfaceblur_") try: @@ -117,7 +191,19 @@ def run() -> None: task_detect = progress.add_task( "[cyan]Detecting faces...", total=len(frames) ) - detector = FaceDetector() + + # Create detector based on user choice + if detector_type == "yunet": + detector = YuNetDetector( + confidence_threshold=confidence_threshold, + min_face_size=min_face_size, + ) + else: + detector = FaceDetector( + confidence_threshold=confidence_threshold, + min_face_size=min_face_size, + ) + all_faces = [] for i, frame in enumerate(frames): @@ -140,7 +226,7 @@ def run() -> None: return task_cluster = progress.add_task("[cyan]Clustering faces...", total=None) - clusters = cluster_faces(all_faces) + clusters = cluster_faces(all_faces, min_samples=min_cluster_size) real_clusters = [c for c in clusters if c.id >= 0] progress.update( task_cluster, diff --git a/src/faceblur/blur.py b/src/faceblur/blur.py index 5a3dc49..25123ff 100644 --- a/src/faceblur/blur.py +++ b/src/faceblur/blur.py @@ -13,7 +13,7 @@ def apply_blur( bbox: Tuple[int, int, int, int], method: BlurMethod = "gaussian", strength: float = 5.0, - padding: float = 0.25, + padding: float = 0.40, ) -> np.ndarray: """Apply blur to a face region in an image. @@ -22,7 +22,7 @@ def apply_blur( bbox: Face bounding box (x1, y1, x2, y2) method: Blur method name strength: Blur strength multiplier - padding: Percentage to expand the bounding box to prevent tracking lag exposure + padding: Percentage to expand the bounding box (default: 0.40 = 40%) Returns: The modified image @@ -111,6 +111,7 @@ def get_bboxes_for_frame( frame_index: int, keyframe_bboxes: Dict[int, List[Tuple[int, Tuple[int, int, int, int]]]], keyframe_indices: List[int], + extend_frames: int = 0, ) -> List[Tuple[int, Tuple[int, int, int, int]]]: """Get bounding boxes for a frame by looking up or interpolating from keyframes. @@ -118,6 +119,7 @@ def get_bboxes_for_frame( frame_index: The current frame number keyframe_bboxes: Dict mapping keyframe index -> list of (cluster_id, bbox) keyframe_indices: Sorted list of keyframe indices + extend_frames: Number of frames to extend blur before first and after last detection Returns: List of (cluster_id, bbox) for this frame @@ -138,11 +140,14 @@ def get_bboxes_for_frame( if ki > frame_index and next_idx is None: next_idx = ki - # Before first or after last keyframe + # Before first keyframe - extend blur backward if within extend_frames if prev_idx is None and next_idx is not None: return keyframe_bboxes[next_idx] + + # After last keyframe - extend blur forward if within extend_frames if next_idx is None and prev_idx is not None: return keyframe_bboxes[prev_idx] + if prev_idx is None or next_idx is None: return [] diff --git a/src/faceblur/detect.py b/src/faceblur/detect.py index 81182dc..986192e 100644 --- a/src/faceblur/detect.py +++ b/src/faceblur/detect.py @@ -4,7 +4,7 @@ import cv2 import numpy as np from dataclasses import dataclass, field from pathlib import Path -from typing import List, Tuple +from typing import List, Tuple, Optional from uniface.detection import RetinaFace from uniface.recognition import ArcFace @@ -24,14 +24,97 @@ class FaceData: class FaceDetector: - """Face detector using RetinaFace + ArcFace via UniFace.""" + """Face detector using RetinaFace + ArcFace via UniFace. + + Supports multi-scale detection to catch faces at different distances, + and filters out low-quality detections based on face size. + + Key design: Detection runs at multiple scales, but embedding extraction + ALWAYS uses the original image to ensure consistent embeddings for clustering. + """ + + def __init__( + self, + confidence_threshold: float = 0.8, + min_face_size: int = 50, + scales: Optional[List[float]] = None, + ): + """Initialize the face detector. + + Args: + confidence_threshold: Minimum confidence to accept a detection (default: 0.8) + min_face_size: Minimum face width/height in pixels for reliable embeddings (default: 50) + scales: List of image scales to run detection on (default: [1.0, 1.5]) + - 1.0: Normal scale for regular faces + - 1.5: Upscaled to catch small/distant faces + """ + self.confidence_threshold = confidence_threshold + self.min_face_size = min_face_size + # Simplified scales: 1.0 (normal) + 1.5 (catch small faces) + # Removed 0.5x as it rarely helps and can cause issues + self.scales = scales or [1.0, 1.5] - def __init__(self, confidence_threshold: float = 0.7): self.detector = RetinaFace(confidence_threshold=confidence_threshold) self.recognizer = ArcFace() + def _nms_boxes( + self, boxes: List[Tuple], scores: List[float], iou_threshold: float = 0.4 + ) -> List[int]: + """Non-maximum suppression to remove duplicate detections from multi-scale. + + Args: + boxes: List of (x1, y1, x2, y2) bounding boxes + scores: Confidence scores for each box + iou_threshold: IOU threshold for suppression (lower = more aggressive) + + Returns: + List of indices to keep + """ + if not boxes: + return [] + + boxes_arr = np.array(boxes, dtype=np.float32) + scores_arr = np.array(scores, dtype=np.float32) + + x1, y1, x2, y2 = ( + boxes_arr[:, 0], + boxes_arr[:, 1], + boxes_arr[:, 2], + boxes_arr[:, 3], + ) + areas = (x2 - x1) * (y2 - y1) + + order = scores_arr.argsort()[::-1] + keep = [] + + while order.size > 0: + i = order[0] + keep.append(i) + + if order.size == 1: + break + + xx1 = np.maximum(x1[i], x1[order[1:]]) + yy1 = np.maximum(y1[i], y1[order[1:]]) + xx2 = np.minimum(x2[i], x2[order[1:]]) + yy2 = np.minimum(y2[i], y2[order[1:]]) + + w = np.maximum(0, xx2 - xx1) + h = np.maximum(0, yy2 - yy1) + inter = w * h + + iou = inter / (areas[i] + areas[order[1:]] - inter) + inds = np.where(iou <= iou_threshold)[0] + order = order[inds + 1] + + return keep + def detect_faces(self, frame_path: Path, frame_index: int) -> List[FaceData]: - """Detect faces in a frame and generate embeddings. + """Detect faces in a frame using multi-scale detection and generate embeddings. + + Runs detection at multiple image scales to catch faces at different distances, + then applies NMS to remove duplicates. Embeddings are ALWAYS extracted from + the original image to ensure consistency for clustering. Args: frame_path: Path to the frame image @@ -44,20 +127,87 @@ class FaceDetector: if image is None: raise ValueError(f"Could not read image: {frame_path}") - detections = self.detector.detect(image) + h, w = image.shape[:2] + # Collect detections from all scales + all_detections = [] # (bbox, confidence, landmarks_on_original) + + for scale in self.scales: + if scale == 1.0: + scaled_image = image + else: + new_w, new_h = int(w * scale), int(h * scale) + if new_w < 100 or new_h < 100: + continue # Skip if scaled image is too small + scaled_image = cv2.resize(image, (new_w, new_h)) + + detections = self.detector.detect(scaled_image) + + for det in detections: + # Scale bbox and landmarks back to original image coordinates + if scale != 1.0: + x1, y1, x2, y2 = det.bbox + bbox = ( + int(x1 / scale), + int(y1 / scale), + int(x2 / scale), + int(y2 / scale), + ) + landmarks = det.landmarks / scale + else: + x1, y1, x2, y2 = det.bbox + bbox = (int(x1), int(y1), int(x2), int(y2)) + landmarks = det.landmarks.copy() + + all_detections.append((bbox, det.confidence, landmarks)) + + if not all_detections: + return [] + + # Apply NMS to remove duplicates from multi-scale detection + # Using lower IOU threshold (0.4) to be more aggressive at removing duplicates + boxes = [d[0] for d in all_detections] + scores = [d[1] for d in all_detections] + keep_indices = self._nms_boxes(boxes, scores, iou_threshold=0.4) + + # Filter and generate embeddings faces = [] - for i, det in enumerate(detections): - bbox = tuple(int(v) for v in det.bbox) # (x1, y1, x2, y2) - confidence = det.confidence - landmarks = det.landmarks + face_idx = 0 - embedding = self.recognizer.get_normalized_embedding(image, landmarks) - embedding = embedding.flatten() + for idx in keep_indices: + bbox_tuple, confidence, landmarks = all_detections[idx] + x1, y1, x2, y2 = bbox_tuple + + # Filter by minimum face size for reliable embeddings + face_w = x2 - x1 + face_h = y2 - y1 + if face_w < self.min_face_size or face_h < self.min_face_size: + continue + + # Clamp bbox to image boundaries + x1 = max(0, min(x1, w - 1)) + y1 = max(0, min(y1, h - 1)) + x2 = max(0, min(x2, w)) + y2 = max(0, min(y2, h)) + bbox: Tuple[int, int, int, int] = (x1, y1, x2, y2) + + # Clamp landmarks to image boundaries + landmarks = landmarks.copy() + landmarks[:, 0] = np.clip(landmarks[:, 0], 0, w - 1) + landmarks[:, 1] = np.clip(landmarks[:, 1], 0, h - 1) + + # IMPORTANT: Always extract embedding from ORIGINAL image + # This ensures consistent embeddings regardless of detection scale + try: + embedding = self.recognizer.get_normalized_embedding(image, landmarks) + embedding = embedding.flatten() + except Exception: + # Skip faces where embedding extraction fails + continue faces.append( FaceData( - id=frame_index * 100 + i, + id=frame_index * 1000 + face_idx, frame_path=frame_path, frame_index=frame_index, bbox=bbox, @@ -66,6 +216,7 @@ class FaceDetector: landmarks=landmarks, ) ) + face_idx += 1 return faces diff --git a/src/faceblur/detect_yunet.py b/src/faceblur/detect_yunet.py new file mode 100644 index 0000000..8e20eaa --- /dev/null +++ b/src/faceblur/detect_yunet.py @@ -0,0 +1,273 @@ +"""Face detection module using YuNet (OpenCV built-in) + ArcFace. + +YuNet is a lightweight face detector built into OpenCV 4.5.4+. +It has good accuracy with fewer false positives than some other detectors. +""" + +import cv2 +import numpy as np +from dataclasses import dataclass, field +from pathlib import Path +from typing import List, Tuple, Optional + +from uniface.recognition import ArcFace + +from .detect import FaceData + + +class YuNetDetector: + """Face detector using YuNet (OpenCV) + ArcFace for embeddings. + + YuNet is a lightweight CNN-based face detector that provides: + - Good accuracy with fewer false positives + - 5-point facial landmarks for alignment + - Built into OpenCV, no additional dependencies + + Supports multi-scale detection to catch faces at different distances. + """ + + # Default model path relative to package + DEFAULT_MODEL = "models/face_detection_yunet_2023mar.onnx" + + def __init__( + self, + confidence_threshold: float = 0.8, + min_face_size: int = 50, + scales: Optional[List[float]] = None, + model_path: Optional[str] = None, + ): + """Initialize the YuNet face detector. + + Args: + confidence_threshold: Minimum confidence to accept a detection (default: 0.8) + min_face_size: Minimum face width/height in pixels (default: 50) + scales: List of image scales for multi-scale detection (default: [1.0, 1.5]) + model_path: Path to YuNet ONNX model file (default: auto-detect) + """ + self.confidence_threshold = confidence_threshold + self.min_face_size = min_face_size + self.scales = scales or [1.0, 1.5] + + # Find model path + if model_path is None: + # Try relative to current working directory first + model_path = self.DEFAULT_MODEL + if not Path(model_path).exists(): + # Try relative to this file + pkg_dir = Path(__file__).parent.parent.parent + model_path = str(pkg_dir / self.DEFAULT_MODEL) + + if not Path(model_path).exists(): + raise FileNotFoundError( + f"YuNet model not found at {model_path}. " + "Please download from: https://github.com/opencv/opencv_zoo/tree/main/models/face_detection_yunet" + ) + + self.model_path = model_path + + # YuNet detector will be created per-image since input size must match + self._detector = None + self._detector_size = None + + # ArcFace for embeddings + self.recognizer = ArcFace() + + def _get_detector(self, width: int, height: int) -> cv2.FaceDetectorYN: + """Get or create YuNet detector for given image size.""" + size = (width, height) + if self._detector is None or self._detector_size != size: + self._detector = cv2.FaceDetectorYN.create( + self.model_path, + "", # config (not needed for ONNX) + size, + self.confidence_threshold, + 0.3, # NMS threshold + 5000, # top_k + ) + self._detector_size = size + return self._detector + + def _nms_boxes( + self, boxes: List[Tuple], scores: List[float], iou_threshold: float = 0.4 + ) -> List[int]: + """Non-maximum suppression to remove duplicate detections. + + Args: + boxes: List of (x1, y1, x2, y2) bounding boxes + scores: Confidence scores for each box + iou_threshold: IOU threshold for suppression + + Returns: + List of indices to keep + """ + if not boxes: + return [] + + boxes_arr = np.array(boxes, dtype=np.float32) + scores_arr = np.array(scores, dtype=np.float32) + + x1, y1, x2, y2 = ( + boxes_arr[:, 0], + boxes_arr[:, 1], + boxes_arr[:, 2], + boxes_arr[:, 3], + ) + areas = (x2 - x1) * (y2 - y1) + + order = scores_arr.argsort()[::-1] + keep = [] + + while order.size > 0: + i = order[0] + keep.append(i) + + if order.size == 1: + break + + xx1 = np.maximum(x1[i], x1[order[1:]]) + yy1 = np.maximum(y1[i], y1[order[1:]]) + xx2 = np.minimum(x2[i], x2[order[1:]]) + yy2 = np.minimum(y2[i], y2[order[1:]]) + + w = np.maximum(0, xx2 - xx1) + h = np.maximum(0, yy2 - yy1) + inter = w * h + + iou = inter / (areas[i] + areas[order[1:]] - inter) + inds = np.where(iou <= iou_threshold)[0] + order = order[inds + 1] + + return keep + + def _convert_yunet_landmarks(self, yunet_landmarks: np.ndarray) -> np.ndarray: + """Convert YuNet landmarks to ArcFace order. + + YuNet order: right_eye, left_eye, nose, right_mouth, left_mouth + ArcFace order: left_eye, right_eye, nose, mouth_left, mouth_right + + Args: + yunet_landmarks: (5, 2) array in YuNet order + + Returns: + (5, 2) array in ArcFace order + """ + # Reorder: [1, 0, 2, 4, 3] + return yunet_landmarks[[1, 0, 2, 4, 3], :] + + def detect_faces(self, frame_path: Path, frame_index: int) -> List[FaceData]: + """Detect faces in a frame using multi-scale YuNet detection. + + Args: + frame_path: Path to the frame image + frame_index: Index of the frame in the video + + Returns: + List of FaceData objects with bboxes, embeddings, and confidence + """ + image = cv2.imread(str(frame_path)) + if image is None: + raise ValueError(f"Could not read image: {frame_path}") + + h, w = image.shape[:2] + + # Collect detections from all scales + all_detections = [] # (bbox_xyxy, confidence, landmarks_arcface_order) + + for scale in self.scales: + if scale == 1.0: + scaled_image = image + scaled_w, scaled_h = w, h + else: + scaled_w, scaled_h = int(w * scale), int(h * scale) + if scaled_w < 100 or scaled_h < 100: + continue + scaled_image = cv2.resize(image, (scaled_w, scaled_h)) + + # Get detector for this size + detector = self._get_detector(scaled_w, scaled_h) + _, faces = detector.detect(scaled_image) + + if faces is None: + continue + + for face in faces: + # YuNet output: [x, y, w, h, landmarks(10), score] + x, y, fw, fh = face[:4] + score = face[14] + yunet_landmarks = face[4:14].reshape(5, 2) + + # Scale back to original coordinates + if scale != 1.0: + x, y, fw, fh = x / scale, y / scale, fw / scale, fh / scale + yunet_landmarks = yunet_landmarks / scale + + # Convert to (x1, y1, x2, y2) format + x1, y1 = int(x), int(y) + x2, y2 = int(x + fw), int(y + fh) + bbox = (x1, y1, x2, y2) + + # Convert landmarks to ArcFace order + arcface_landmarks = self._convert_yunet_landmarks(yunet_landmarks) + + all_detections.append((bbox, float(score), arcface_landmarks)) + + if not all_detections: + return [] + + # Apply NMS to remove duplicates from multi-scale detection + boxes = [d[0] for d in all_detections] + scores = [d[1] for d in all_detections] + keep_indices = self._nms_boxes(boxes, scores, iou_threshold=0.4) + + # Filter and generate embeddings + faces = [] + face_idx = 0 + + for idx in keep_indices: + bbox_tuple, confidence, landmarks = all_detections[idx] + x1, y1, x2, y2 = bbox_tuple + + # Filter by minimum face size + face_w = x2 - x1 + face_h = y2 - y1 + if face_w < self.min_face_size or face_h < self.min_face_size: + continue + + # Clamp bbox to image boundaries + x1 = max(0, min(x1, w - 1)) + y1 = max(0, min(y1, h - 1)) + x2 = max(0, min(x2, w)) + y2 = max(0, min(y2, h)) + bbox: Tuple[int, int, int, int] = (x1, y1, x2, y2) + + # Clamp landmarks to image boundaries + landmarks = landmarks.copy() + landmarks[:, 0] = np.clip(landmarks[:, 0], 0, w - 1) + landmarks[:, 1] = np.clip(landmarks[:, 1], 0, h - 1) + + # Extract embedding from original image using ArcFace + try: + embedding = self.recognizer.get_normalized_embedding(image, landmarks) + embedding = embedding.flatten() + except Exception: + # Skip faces where embedding extraction fails + continue + + faces.append( + FaceData( + id=frame_index * 1000 + face_idx, + frame_path=frame_path, + frame_index=frame_index, + bbox=bbox, + embedding=embedding, + confidence=confidence, + landmarks=landmarks, + ) + ) + face_idx += 1 + + return faces + + def close(self): + """Release resources.""" + self._detector = None diff --git a/uv.lock b/uv.lock index 52aaa8f..d467415 100644 --- a/uv.lock +++ b/uv.lock @@ -1,6 +1,11 @@ version = 1 revision = 3 requires-python = ">=3.11" +resolution-markers = [ + "python_full_version >= '3.13'", + "python_full_version == '3.12.*'", + "python_full_version < '3.12'", +] [[package]] name = "certifi" @@ -460,7 +465,7 @@ requires-dist = [ { name = "opencv-python" }, { name = "questionary" }, { name = "rich" }, - { name = "scikit-learn" }, + { name = "scikit-learn", specifier = ">=1.3.0" }, { name = "uniface" }, ]