Files
DeleteChongfuTVYY/历史版本/duplicate_cleanerV5视音频解析.py

3363 lines
129 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import warnings
import os
import hashlib
import zipfile
import rarfile
import subprocess
from datetime import datetime
import argparse
import sqlite3
import logging
from typing import Dict, List, Any, Set, Tuple
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import re
from pathlib import Path
import shutil
# 在导入其他库之前设置警告过滤器
warnings.filterwarnings("ignore", category=UserWarning, module="numba")
warnings.filterwarnings("ignore", message="FNV hashing is not implemented in Numba")
# 配置日志系统
def setup_logging(log_level=logging.INFO, log_file="duplicate_cleaner.log"):
"""设置日志配置"""
logging.basicConfig(
level=log_level,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler(log_file, encoding="utf-8"),
logging.StreamHandler(),
],
)
return logging.getLogger(__name__)
logger = setup_logging()
# 尝试导入视频处理相关的库
try:
import cv2
import imagehash
from PIL import Image
import numpy as np
from skimage.metrics import structural_similarity as ssim
VIDEO_PROCESSING_AVAILABLE = True
except ImportError as e:
logger.warning(f"视频处理库导入失败: {e}")
logger.warning("基于内容的视频分析功能将被禁用")
VIDEO_PROCESSING_AVAILABLE = False
# 创建虚拟类以避免后续导入错误
class DummyCV2:
VideoCapture = None
CAP_PROP_FRAME_COUNT = 0
CAP_PROP_FPS = 0
CAP_PROP_POS_FRAMES = 0
COLOR_BGR2GRAY = 0
def isOpened(self):
return False
def read(self):
return False, None
def release(self):
pass
cv2 = DummyCV2()
imagehash = type("DummyImageHash", (), {"average_hash": lambda x: "dummy"})()
Image = type("DummyImage", (), {"fromarray": lambda x: type("DummyPIL", (), {})()})()
# 尝试导入音频处理库
try:
# 设置环境变量以减少 Numba 警告
os.environ['NUMBA_WARNINGS'] = '0'
import librosa
import numpy as np
from scipy import signal
# 尝试导入 numba 相关模块
try:
import numba
numba_logger = logging.getLogger('numba')
numba_logger.setLevel(logging.ERROR)
from numba.core.errors import NumbaPerformanceWarning
warnings.filterwarnings("ignore", category=NumbaPerformanceWarning)
except ImportError:
# numba 不可用,继续但不使用相关功能
pass
AUDIO_PROCESSING_AVAILABLE = True
except ImportError as e:
logger.warning(f"音频处理库导入失败: {e}")
logger.warning("基于内容的音频分析功能将被禁用")
AUDIO_PROCESSING_AVAILABLE = False
class PerformanceOptimizedFileDatabase:
def __init__(self, db_path: str = "file_cleaner.db"):
self.db_path = db_path
self.batch_size = 50
self.init_database()
def init_database(self):
"""初始化数据库表结构"""
# conn = sqlite3.connect(self.db_path)
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute("PRAGMA journal_mode=WAL")
cursor.execute("PRAGMA synchronous=NORMAL")
cursor.execute("PRAGMA cache_size=-64000")
cursor.execute("PRAGMA temp_store = memory")
cursor.execute("PRAGMA mmap_size = 268435456") # 256MB内存映射
cursor.execute("PRAGMA busy_timeout = 5000") # 添加5秒超时
# 创建主表
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
file_path TEXT UNIQUE,
file_hash TEXT,
file_size INTEGER,
file_type TEXT,
mod_time DATETIME,
is_archive BOOLEAN DEFAULT 0,
archive_path TEXT,
is_deleted BOOLEAN DEFAULT 0,
created_time DATETIME DEFAULT CURRENT_TIMESTAMP,
last_scanned DATETIME DEFAULT CURRENT_TIMESTAMP,
media_type TEXT DEFAULT 'unknown',
artist TEXT,
title TEXT,
album TEXT,
duration REAL,
bitrate INTEGER,
sample_rate INTEGER,
channels INTEGER
)
"""
)
# 检查并添加缺失的列
self._add_missing_columns(cursor)
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS operations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
operation_type TEXT,
file_path TEXT,
file_hash TEXT,
reason TEXT,
details TEXT,
operation_time DATETIME DEFAULT CURRENT_TIMESTAMP
)
"""
)
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS scan_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
scan_time DATETIME DEFAULT CURRENT_TIMESTAMP,
target_directory TEXT,
total_files INTEGER,
duplicate_groups INTEGER,
deleted_files INTEGER,
deleted_archives INTEGER,
duration_seconds REAL,
media_type TEXT DEFAULT 'all'
)
"""
)
# 创建索引
self._create_indexes(cursor)
conn.commit()
conn.close()
logger.info("数据库初始化完成")
def _get_connection(self, timeout=30):
"""获取数据库连接,带有重试机制"""
max_retries = 3
for attempt in range(max_retries):
try:
conn = sqlite3.connect(self.db_path, timeout=timeout)
return conn
except sqlite3.OperationalError as e:
if "locked" in str(e) and attempt < max_retries - 1:
logger.warning(f"数据库被锁定,重试 {attempt + 1}/{max_retries}...")
time.sleep(1) # 等待1秒后重试
else:
raise e
def _add_missing_columns(self, cursor):
"""检查并添加缺失的列"""
# 获取当前表结构
cursor.execute("PRAGMA table_info(files)")
existing_columns = {row[1] for row in cursor.fetchall()}
# 定义需要添加的列
columns_to_add = [
("media_type", "TEXT DEFAULT 'unknown'"),
("artist", "TEXT"),
("title", "TEXT"),
("album", "TEXT"),
("duration", "REAL"),
("bitrate", "INTEGER"),
("sample_rate", "INTEGER"),
("channels", "INTEGER")
]
for column_name, column_type in columns_to_add:
if column_name not in existing_columns:
try:
cursor.execute(f"ALTER TABLE files ADD COLUMN {column_name} {column_type}")
logger.info(f"添加缺失的列: {column_name}")
except sqlite3.OperationalError as e:
logger.warning(f"添加列 {column_name} 失败: {e}")
def _create_indexes(self, cursor):
"""创建索引"""
indexes = [
"CREATE INDEX IF NOT EXISTS idx_files_hash ON files(file_hash)",
"CREATE INDEX IF NOT EXISTS idx_files_path ON files(file_path)",
"CREATE INDEX IF NOT EXISTS idx_files_deleted ON files(is_deleted)",
"CREATE INDEX IF NOT EXISTS idx_files_size ON files(file_size)",
"CREATE INDEX IF NOT EXISTS idx_files_media_type ON files(media_type)",
"CREATE INDEX IF NOT EXISTS idx_files_artist ON files(artist)",
"CREATE INDEX IF NOT EXISTS idx_files_title ON files(title)",
"CREATE INDEX IF NOT EXISTS idx_operations_time ON operations(operation_time)",
"CREATE INDEX IF NOT EXISTS idx_operations_type ON operations(operation_type)"
]
for index_sql in indexes:
try:
cursor.execute(index_sql)
except sqlite3.OperationalError as e:
logger.warning(f"创建索引失败: {e}")
def bulk_add_files(self, file_infos: List[Dict[str, Any]]):
"""批量添加文件记录 - 修复SQLite变量限制问题"""
if not file_infos:
return
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
# 使用更小的批次大小
successful_inserts = 0
for i in range(0, len(file_infos), self.batch_size):
batch = file_infos[i : i + self.batch_size]
try:
# 使用事务处理每个批次
cursor.execute("BEGIN TRANSACTION")
for file_info in batch:
try:
cursor.execute(
"""
INSERT OR REPLACE INTO files
(file_path, file_hash, file_size, file_type, mod_time, is_archive,
archive_path, is_deleted, media_type, artist, title, album,
duration, bitrate, sample_rate, channels)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
file_info["path"],
file_info["hash"],
file_info.get("size", 0),
file_info.get("type", "unknown"),
file_info["mod_time"],
file_info.get("is_archive", False),
file_info.get("archive_path"),
0, # is_deleted
file_info.get("media_type", "unknown"),
file_info.get("artist"),
file_info.get("title"),
file_info.get("album"),
file_info.get("duration", 0),
file_info.get("bitrate", 0),
file_info.get("sample_rate", 0),
file_info.get("channels", 0)
)
)
successful_inserts += 1
except sqlite3.IntegrityError:
# 文件路径重复,更新记录
cursor.execute(
"""
UPDATE files SET
file_hash=?, file_size=?, file_type=?, mod_time=?, is_archive=?,
archive_path=?, is_deleted=?, media_type=?, artist=?, title=?,
album=?, duration=?, bitrate=?, sample_rate=?, channels=?,
last_scanned=CURRENT_TIMESTAMP
WHERE file_path=?
""",
(
file_info["hash"],
file_info.get("size", 0),
file_info.get("type", "unknown"),
file_info["mod_time"],
file_info.get("is_archive", False),
file_info.get("archive_path"),
0, # is_deleted
file_info.get("media_type", "unknown"),
file_info.get("artist"),
file_info.get("title"),
file_info.get("album"),
file_info.get("duration", 0),
file_info.get("bitrate", 0),
file_info.get("sample_rate", 0),
file_info.get("channels", 0),
file_info["path"]
)
)
successful_inserts += 1
except Exception as e:
logger.warning(f"插入文件记录失败 {file_info.get('path', 'unknown')}: {e}")
continue
cursor.execute("COMMIT")
except Exception as batch_error:
cursor.execute("ROLLBACK")
logger.error(f"批量插入失败: {batch_error}")
# 尝试逐个插入
for file_info in batch:
try:
self._insert_single_file(cursor, file_info)
successful_inserts += 1
except Exception as single_error:
logger.warning(f"单个文件插入失败 {file_info.get('path', 'unknown')}: {single_error}")
continue
conn.commit()
logger.debug(f"成功添加了 {successful_inserts} 个文件记录")
except Exception as e:
logger.error(f"批量添加文件记录时出错: {e}")
conn.rollback()
finally:
conn.close()
def _insert_single_file(self, cursor, file_info):
"""插入单个文件记录"""
try:
cursor.execute(
"""
INSERT OR REPLACE INTO files
(file_path, file_hash, file_size, file_type, mod_time, is_archive,
archive_path, is_deleted, media_type, artist, title, album,
duration, bitrate, sample_rate, channels)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
file_info["path"],
file_info["hash"],
file_info.get("size", 0),
file_info.get("type", "unknown"),
file_info["mod_time"],
file_info.get("is_archive", False),
file_info.get("archive_path"),
0, # is_deleted
file_info.get("media_type", "unknown"),
file_info.get("artist"),
file_info.get("title"),
file_info.get("album"),
file_info.get("duration", 0),
file_info.get("bitrate", 0),
file_info.get("sample_rate", 0),
file_info.get("channels", 0)
)
)
except sqlite3.IntegrityError:
# 文件路径重复,更新记录
cursor.execute(
"""
UPDATE files SET
file_hash=?, file_size=?, file_type=?, mod_time=?, is_archive=?,
archive_path=?, is_deleted=?, media_type=?, artist=?, title=?,
album=?, duration=?, bitrate=?, sample_rate=?, channels=?,
last_scanned=CURRENT_TIMESTAMP
WHERE file_path=?
""",
(
file_info["hash"],
file_info.get("size", 0),
file_info.get("type", "unknown"),
file_info["mod_time"],
file_info.get("is_archive", False),
file_info.get("archive_path"),
0, # is_deleted
file_info.get("media_type", "unknown"),
file_info.get("artist"),
file_info.get("title"),
file_info.get("album"),
file_info.get("duration", 0),
file_info.get("bitrate", 0),
file_info.get("sample_rate", 0),
file_info.get("channels", 0),
file_info["path"]
)
)
def mark_file_deleted(self, file_path: str, reason: str = "duplicate"):
"""标记文件为已删除 - 带重试机制"""
max_retries = 3
for attempt in range(max_retries):
try:
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute(
"""
UPDATE files
SET is_deleted = 1, last_scanned = CURRENT_TIMESTAMP
WHERE file_path = ?
""",
(file_path,),
)
cursor.execute(
"SELECT file_hash FROM files WHERE file_path = ?", (file_path,)
)
result = cursor.fetchone()
file_hash = result[0] if result else None
self.add_operation("delete", file_path, file_hash, reason)
conn.commit()
conn.close()
return # 成功执行,退出重试循环
except sqlite3.OperationalError as e:
if "locked" in str(e) and attempt < max_retries - 1:
logger.warning(f"标记删除时数据库被锁定,重试 {attempt + 1}/{max_retries}...")
time.sleep(1)
else:
logger.error(f"数据库错误 (标记删除): {e}")
if 'conn' in locals():
try:
conn.close()
except:
pass
raise e
except Exception as e:
logger.error(f"数据库错误 (标记删除): {e}")
if 'conn' in locals():
try:
conn.close()
except:
pass
raise e
def add_operation(
self,
operation_type: str,
file_path: str,
file_hash: str = None,
reason: str = "",
details: str = "",
):
"""添加操作记录 - 带重试机制"""
max_retries = 3
for attempt in range(max_retries):
try:
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute(
"""
INSERT INTO operations (operation_type, file_path, file_hash, reason, details)
VALUES (?, ?, ?, ?, ?)
""",
(operation_type, file_path, file_hash, reason, details),
)
conn.commit()
conn.close()
return # 成功执行,退出重试循环
except sqlite3.OperationalError as e:
if "locked" in str(e) and attempt < max_retries - 1:
logger.warning(f"添加操作时数据库被锁定,重试 {attempt + 1}/{max_retries}...")
time.sleep(1)
else:
logger.error(f"数据库错误 (添加操作): {e}")
if 'conn' in locals():
try:
conn.close()
except:
pass
# 对于操作记录,如果失败我们不抛出异常,只是记录错误
break
except Exception as e:
logger.error(f"数据库错误 (添加操作): {e}")
if 'conn' in locals():
try:
conn.close()
except:
pass
# 对于操作记录,如果失败我们不抛出异常,只是记录错误
break
def add_scan_history(self, scan_data: Dict[str, Any]):
"""添加扫描历史记录"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
cursor.execute(
"""
INSERT INTO scan_history
(target_directory, total_files, duplicate_groups, deleted_files, deleted_archives, duration_seconds, media_type)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
scan_data.get("target_directory", ""),
scan_data.get("total_files", 0),
scan_data.get("duplicate_groups", 0),
scan_data.get("deleted_files", 0),
scan_data.get("deleted_archives", 0),
scan_data.get("duration_seconds", 0),
scan_data.get("media_type", "all")
),
)
conn.commit()
except Exception as e:
logger.error(f"数据库错误 (添加扫描历史): {e}")
finally:
conn.close()
def get_scan_statistics(self) -> Dict[str, Any]:
"""获取扫描统计信息"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
cursor.execute("SELECT COUNT(*) FROM files")
total_files = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM files WHERE is_deleted = 1")
deleted_files = cursor.fetchone()[0]
cursor.execute(
"SELECT COUNT(DISTINCT file_hash) FROM files WHERE is_deleted = 0"
)
unique_files = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM operations")
total_operations = cursor.fetchone()[0]
return {
"total_files": total_files,
"deleted_files": deleted_files,
"unique_files": unique_files,
"total_operations": total_operations,
}
except Exception as e:
logger.error(f"数据库错误 (获取统计): {e}")
return {}
finally:
conn.close()
class MovieMetadataExtractor:
"""电影元数据提取器"""
# 常见分辨率模式
RESOLUTION_PATTERNS = [
r"(\d{3,4}[pi])", # 1080p, 720p, 480p, 2160p
r"([24]k)", # 2k, 4k
r"(hd)", # hd
r"(fhd)", # fhd
r"(uhd)", # uhd
]
# 常见编码格式
CODEC_PATTERNS = [
r"(x264)",
r"(x265)",
r"(h264)",
r"(h265)",
r"(hevc)",
r"(avc)",
r"(divx)",
r"(xvid)",
]
# 常见来源
SOURCE_PATTERNS = [
r"(bluray)",
r"(blu-ray)",
r"(webdl)",
r"(web-dl)",
r"(hdtv)",
r"(dvdrip)",
r"(bdrip)",
r"(brrip)",
]
# 常见音频格式
AUDIO_PATTERNS = [r"(dts)", r"(ac3)", r"(aac)", r"(flac)", r"(dd)"]
# 常见需要移除的模式 - 增强版
@staticmethod
def extract_movie_name_enhanced(filename):
"""增强版电影名称提取"""
# 移除扩展名
name = os.path.splitext(filename)[0]
# 更严格的模式匹配
patterns_to_remove = [
# 广告相关模式
r"[\[\(]?广告[\]\)]?",
r"[\[\(]?推广[\]\)]?",
r"[\[\(]?宣传[\]\)]?",
r"[\[\(]?片头[\]\)]?",
r"[\[\(]?片花[\]\)]?",
r"^[^a-zA-Z0-9\u4e00-\u9fff]*", # 开头的特殊字符
r"[\s_\-]*([\[\(]?\d{4}[\]\)]?)[\s_\-]*", # 年份
# 分辨率
r"[\s_\-]*(\d{3,4}[pi])[\s_\-]*",
r"[\s_\-]*([24]k)[\s_\-]*",
r"[\s_\-]*(hd|fhd|uhd)[\s_\-]*",
# 编码
r"[\s_\-]*(x264|x265|h264|h265|hevc|avc|divx|xvid)[\s_\-]*",
# 来源
r"[\s_\-]*(bluray|blu-ray|webdl|web-dl|hdtv|dvdrip|bdrip|brrip)[\s_\-]*",
# 音频
r"[\s_\-]*(dts|ac3|aac|flac|dd)[\s_\-]*",
# 发布组和其他信息
r"[\s_\-]*([\[\(][^\]\)]+[\]\)])[\s_\-]*", # 所有括号内容
r"[\s_\-]*([【][^】]+[】])[\s_\-]*", # 中文括号
r"[\s_\-]*([╬┅┉┊┋┌┍┎┏┐┑┒┓└┕┖┗┘┙┚┛├┝┞┟┠┡┢┣┤┥┦┧┨┩┪┫┬┭┮┯┰┱┲┳┴┵┶┷┸┹┺┻┼┽┾┿╀╁╂╃╄╅╆╇╈╉╊╋]+)[\s_\-]*", # 特殊符号
]
for pattern in patterns_to_remove:
name = re.sub(pattern, "", name, flags=re.IGNORECASE)
# 清理多余空格和分隔符
name = re.sub(r"[\._\-\s]+", " ", name)
name = name.strip()
return name
@staticmethod
def extract_core_movie_name(filename):
"""提取核心电影名称(最严格的清理)"""
name = MovieMetadataExtractor.extract_movie_name_enhanced(filename)
# 进一步清理:移除可能的前缀和后缀
# 常见的无关前缀
prefixes_to_remove = [
"电影",
"高清",
"最新",
"完整版",
"未删减版",
"国语",
"英语",
"中字",
"中文字幕",
"双语字幕",
"特效字幕",
]
for prefix in prefixes_to_remove:
if name.lower().startswith(prefix.lower()):
name = name[len(prefix) :].strip()
return name
@staticmethod
def extract_movie_name(filename):
"""提取电影名称"""
# 移除扩展名
name = os.path.splitext(filename)[0]
# 常见需要移除的模式
patterns_to_remove = [
# 年份
r"\s*[\(\[]?\d{4}[\)\]]?",
# 分辨率
r"\s*\d{3,4}[pi]",
r"\s*[24]k",
r"\s*hd",
r"\s*fhd",
r"\s*uhd",
# 编码
r"\s*x264",
r"\s*x265",
r"\s*h264",
r"\s*h265",
r"\s*hevc",
r"\s*avc",
r"\s*divx",
r"\s*xvid",
# 来源
r"\s*bluray",
r"\s*blu-ray",
r"\s*webdl",
r"\s*web-dl",
r"\s*hdtv",
r"\s*dvdrip",
r"\s*bdrip",
r"\s*brrip",
# 音频
r"\s*dts",
r"\s*ac3",
r"\s*aac",
r"\s*flac",
r"\s*dd",
# 发布组和其他信息
r"\s*-\s*[^-]+$", # 最后一个 - 之后的内容
r"\[[^\]]+\]", # 方括号内容
r"\([^\)]+\)", # 圆括号内容
]
for pattern in patterns_to_remove:
name = re.sub(pattern, "", name, flags=re.IGNORECASE)
# 清理多余空格和分隔符
name = re.sub(r"[\._\-\s]+", " ", name)
name = name.strip()
return name
@staticmethod
def extract_resolution(filename):
"""提取分辨率"""
filename_lower = filename.lower()
resolution_map = {
"2160p": "4K",
"4k": "4K",
"1080p": "1080p",
"720p": "720p",
"480p": "480p",
"hd": "HD",
}
for pattern, resolution in resolution_map.items():
if pattern in filename_lower:
return resolution
return "Unknown"
@staticmethod
def extract_quality_score(filename, file_size):
"""计算质量评分"""
score = 0
# 基于文件大小的评分
if file_size > 8 * 1024 * 1024 * 1024: # >8GB
score += 30
elif file_size > 4 * 1024 * 1024 * 1024: # >4GB
score += 20
elif file_size > 2 * 1024 * 1024 * 1024: # >2GB
score += 10
# 基于分辨率的评分
resolution = MovieMetadataExtractor.extract_resolution(filename)
resolution_scores = {"4K": 25, "1080p": 20, "720p": 15, "HD": 10, "Unknown": 5}
score += resolution_scores.get(resolution, 5)
# 基于编码的评分
filename_lower = filename.lower()
if "x265" in filename_lower or "hevc" in filename_lower:
score += 10 # 更高效的编码
if "x264" in filename_lower:
score += 5
# 基于来源的评分
if "bluray" in filename_lower or "blu-ray" in filename_lower:
score += 15
elif "webdl" in filename_lower or "web-dl" in filename_lower:
score += 10
elif "hdtv" in filename_lower:
score += 5
return score
class AdvancedMovieMetadataExtractor(MovieMetadataExtractor):
"""高级电影元数据提取器"""
@staticmethod
def extract_detailed_metadata(filename, file_path=None):
"""提取详细的电影元数据"""
metadata = {
"title": "",
"year": "",
"quality": "",
"codec": "",
"source": "",
"audio": "",
"group": "",
}
# 提取年份
year_match = re.search(r"(19|20)\d{2}", filename)
if year_match:
metadata["year"] = year_match.group()
# 提取质量信息
quality_terms = ["4k", "2160p", "1080p", "720p", "480p", "hd", "fhd", "uhd"]
for term in quality_terms:
if term in filename.lower():
metadata["quality"] = term.upper()
break
# 提取编码信息
codec_terms = ["x264", "x265", "h264", "h265", "hevc", "avc"]
for term in codec_terms:
if term in filename.lower():
metadata["codec"] = term.upper()
break
# 提取来源信息
source_terms = ["bluray", "blu-ray", "webdl", "web-dl", "hdtv", "dvdrip"]
for term in source_terms:
if term in filename.lower():
metadata["source"] = term.upper()
break
# 尝试从文件名中提取电影标题(更智能的方法)
metadata["title"] = AdvancedMovieMetadataExtractor.extract_movie_title_advanced(
filename
)
return metadata
@staticmethod
def extract_movie_title_advanced(filename):
"""高级电影标题提取"""
# 移除扩展名
name = os.path.splitext(filename)[0]
# 常见的需要移除的模式(更全面的列表)
patterns_to_remove = [
# 年份模式
r"[\(\[]?\s*(19|20)\d{2}\s*[\)\]]?",
# 质量模式
r"\b(4k|2160p|1080p|720p|480p|hd|fhd|uhd)\b",
# 编码模式
r"\b(x264|x265|h264|h265|hevc|avc|divx|xvid)\b",
# 来源模式
r"\b(bluray|blu-ray|webdl|web-dl|hdtv|dvdrip|bdrip|brrip)\b",
# 音频模式
r"\b(dts|ac3|aac|flac|dd|dts-hd|truehd)\b",
# 发布组模式
r"\[[^\]]+\]",
r"\s*-\s*[^-]+$",
# 特殊字符和序列号
r"[\(\{\[].*?[\)\}\]]",
r"\b(cd\d|disc\d|part\d)\b",
r"[\._\-]",
]
for pattern in patterns_to_remove:
name = re.sub(pattern, " ", name, flags=re.IGNORECASE)
# 清理多余空格
name = re.sub(r"\s+", " ", name).strip()
# 移除常见的无关词汇
common_words = [
"full",
"movie",
"film",
"video",
"hd",
"fhd",
"uhd",
"english",
"chinese",
"sub",
"subtitle",
"dubbed",
"extended",
"director",
"cut",
"theatrical",
"unrated",
]
words = name.split()
filtered_words = [word for word in words if word.lower() not in common_words]
return " ".join(filtered_words)
class MusicMetadataExtractor:
"""音乐元数据提取器"""
@staticmethod
def extract_music_metadata(filename):
"""从文件名提取音乐元数据"""
# 移除扩展名
name = os.path.splitext(filename)[0]
# 常见音乐文件名模式
patterns = [
# 艺术家 - 歌曲名
r"^(.*?)\s*[-–—]\s*(.*)$",
# 艺术家 - 专辑 - 歌曲名
r"^(.*?)\s*[-–—]\s*(.*?)\s*[-–—]\s*(.*)$",
# 数字. 歌曲名 - 艺术家
r"^\d+\s*[-\.]?\s*(.*?)\s*[-–—]\s*(.*)$",
]
metadata = {
"artist": "",
"title": "",
"album": "",
"cleaned_title": name
}
for pattern in patterns:
match = re.match(pattern, name, re.IGNORECASE)
if match:
groups = match.groups()
if len(groups) == 2:
metadata["artist"] = groups[0].strip()
metadata["title"] = groups[1].strip()
metadata["cleaned_title"] = groups[1].strip()
elif len(groups) == 3:
metadata["artist"] = groups[0].strip()
metadata["album"] = groups[1].strip()
metadata["title"] = groups[2].strip()
metadata["cleaned_title"] = groups[2].strip()
break
# 如果没匹配到模式,尝试其他方法
if not metadata["title"]:
# 移除常见后缀
suffixes_to_remove = [
r"\[.*?\]", r"\(.*?\)", r"【.*?】",
r"\b(official\s*(audio|video)?|lyrics?|video|audio|hd|hq|4k|1080p|720p)\b",
r"\b(music\s*video|mv|live|performance|cover|remix|version)\b",
r"\b(ft\.?|feat\.?|featuring)\s+.*$"
]
cleaned_name = name
for suffix in suffixes_to_remove:
cleaned_name = re.sub(suffix, "", cleaned_name, flags=re.IGNORECASE)
metadata["title"] = cleaned_name.strip()
metadata["cleaned_title"] = cleaned_name.strip()
return metadata
@staticmethod
def extract_quality_info(file_path, file_size):
"""提取音频质量信息"""
quality_info = {
"bitrate": 0,
"sample_rate": 0,
"channels": 0,
"duration": 0,
"quality_score": 0
}
try:
# 使用 mutagen 或其他库获取音频信息(如果可用)
if AUDIO_PROCESSING_AVAILABLE:
try:
import mutagen
audio = mutagen.File(file_path)
if audio is not None:
if hasattr(audio.info, 'bitrate'):
quality_info["bitrate"] = audio.info.bitrate // 1000 # 转换为kbps
if hasattr(audio.info, 'sample_rate'):
quality_info["sample_rate"] = audio.info.sample_rate
if hasattr(audio.info, 'channels'):
quality_info["channels"] = audio.info.channels
if hasattr(audio.info, 'length'):
quality_info["duration"] = audio.info.length
except ImportError:
pass
# 基于文件扩展名和大小估算质量
ext = os.path.splitext(file_path)[1].lower()
if ext == '.flac':
quality_info["quality_score"] += 30
if quality_info["bitrate"] == 0:
quality_info["bitrate"] = 900 # 估算FLAC比特率
elif ext == '.wav':
quality_info["quality_score"] += 25
elif ext in ['.mp3', '.m4a']:
if file_size > 8 * 1024 * 1024: # >8MB
quality_info["quality_score"] += 20
quality_info["bitrate"] = 320
elif file_size > 5 * 1024 * 1024: # >5MB
quality_info["quality_score"] += 15
quality_info["bitrate"] = 192
else:
quality_info["quality_score"] += 10
quality_info["bitrate"] = 128
# 基于文件大小的额外评分
if file_size > 20 * 1024 * 1024: # >20MB
quality_info["quality_score"] += 10
elif file_size > 10 * 1024 * 1024: # >10MB
quality_info["quality_score"] += 5
return quality_info
except Exception as e:
logger.debug(f"提取音频质量信息时出错 {file_path}: {e}")
return quality_info
class VideoFingerprintExtractor:
"""视频指纹提取器 - 基于关键帧和音频特征"""
def __init__(self):
self.frame_hashes = {}
def extract_key_frames(self, video_path, num_frames=10, skip_start=0.1):
"""提取关键帧 - 修复除以零错误"""
if not VIDEO_PROCESSING_AVAILABLE:
logger.warning("视频处理功能不可用,跳过关键帧提取")
return []
cap = None
try:
# 抑制 FFmpeg 警告
import os
os.environ["OPENCV_FFMPEG_LOGLEVEL"] = "0"
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
logger.warning(f"无法打开视频文件: {video_path}")
return []
# 获取视频属性并检查有效性
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS)
# 防止除以零错误和无效值
if fps <= 0:
logger.warning(f"视频FPS无效: {video_path} (fps: {fps})")
return []
if total_frames <= 0:
logger.warning(f"视频总帧数无效: {video_path} (总帧数: {total_frames})")
return []
# 计算持续时间
duration = total_frames / fps
if duration <= 0:
logger.warning(f"视频时长无效: {video_path} (时长: {duration})")
return []
# 跳过开头
start_frame = int(total_frames * skip_start)
if start_frame >= total_frames:
start_frame = max(0, total_frames - 1)
cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
# 计算要提取的帧数
available_frames = total_frames - start_frame
if available_frames <= 0:
logger.warning(f"跳过开头后无可用帧: {video_path}")
return []
frames_to_extract = min(num_frames, available_frames)
# 防止除以零错误
if frames_to_extract <= 0:
logger.warning(f"无可用帧可提取: {video_path}")
return []
frame_interval = max(1, available_frames // frames_to_extract)
key_frames = []
frame_hashes = []
for i in range(frames_to_extract):
frame_pos = start_frame + i * frame_interval
if frame_pos >= total_frames:
break
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_pos)
ret, frame = cap.read()
if ret and frame is not None:
try:
# 转换为灰度图并调整大小以提高处理速度
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
resized = cv2.resize(gray, (64, 64))
# 转换为PIL图像并计算哈希
pil_img = Image.fromarray(resized)
frame_hash = imagehash.average_hash(pil_img)
key_frames.append(frame)
frame_hashes.append(str(frame_hash))
except Exception as frame_error:
logger.debug(f"处理帧时出错 {video_path}{i}: {frame_error}")
continue
return frame_hashes
except Exception as e:
logger.error(f"提取关键帧时出错 {video_path}: {e}")
return []
finally:
# 确保资源被释放
if cap is not None:
cap.release()
def extract_audio_fingerprint(self, video_path):
"""提取音频指纹(简化版)"""
try:
# 使用文件大小和持续时间作为简化的音频特征
file_size = os.path.getsize(video_path)
# 尝试获取视频时长
duration = self.get_video_duration(video_path)
return f"audio_{file_size}_{duration}"
except Exception as e:
logger.error(f"提取音频指纹时出错 {video_path}: {e}")
return "audio_unknown"
def get_video_duration(self, video_path):
"""获取视频时长 - 增强错误处理"""
try:
# 首先尝试使用 OpenCV 获取时长
if VIDEO_PROCESSING_AVAILABLE:
cap = cv2.VideoCapture(video_path)
if cap.isOpened():
fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
cap.release()
# 防止除以零
if fps > 0 and frame_count > 0:
duration = frame_count / fps
if duration > 0:
return duration
# 如果 OpenCV 失败,尝试使用 ffprobe
try:
result = subprocess.run(
[
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
video_path,
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE, # 捕获 stderr 避免输出到控制台
text=True,
timeout=30, # 30秒超时
)
if result.returncode == 0:
duration = float(result.stdout.strip())
if duration > 0:
return duration
except (
subprocess.TimeoutExpired,
subprocess.SubprocessError,
ValueError,
) as e:
logger.debug(f"ffprobe 获取时长失败 {video_path}: {e}")
return 0 # 默认返回0
except Exception as e:
logger.debug(f"获取视频时长时出错 {video_path}: {e}")
return 0
def extract_video_fingerprint(self, video_path, num_frames=8, skip_start=0.1):
"""提取完整的视频指纹 - 增强错误处理"""
try:
# 首先检查文件是否存在且可读
if not os.path.exists(video_path):
logger.warning(f"视频文件不存在: {video_path}")
return None
if not os.access(video_path, os.R_OK):
logger.warning(f"视频文件不可读: {video_path}")
return None
# 提取关键帧哈希
frame_hashes = self.extract_key_frames(video_path, num_frames, skip_start)
if not frame_hashes:
logger.debug(f"无法提取关键帧哈希: {video_path}")
return None
# 提取音频指纹
audio_fingerprint = self.extract_audio_fingerprint(video_path)
# 组合指纹
frame_fingerprint = "_".join(sorted(frame_hashes))
full_fingerprint = f"video_{frame_fingerprint}_{audio_fingerprint}"
return full_fingerprint
except Exception as e:
logger.error(f"提取视频指纹时出错 {video_path}: {e}")
return None
def calculate_video_similarity(self, fingerprint1, fingerprint2):
"""计算两个视频指纹的相似度"""
if not fingerprint1 or not fingerprint2:
return 0
if fingerprint1 == fingerprint2:
return 1.0
# 简单的相似度计算:基于共同帧哈希的数量
try:
# 提取帧哈希部分
parts1 = fingerprint1.split("_")
parts2 = fingerprint2.split("_")
# 确保指纹格式正确
if len(parts1) < 3 or len(parts2) < 3:
return 0
frames1 = set(parts1[1:-2]) # 去掉video_前缀和音频部分
frames2 = set(parts2[1:-2])
if not frames1 or not frames2:
return 0
# 计算Jaccard相似度
intersection = len(frames1.intersection(frames2))
union = len(frames1.union(frames2))
similarity = intersection / union if union > 0 else 0
return similarity
except Exception as e:
logger.error(f"计算视频相似度时出错: {e}")
return 0
class AudioFingerprintExtractor:
"""音频指纹提取器"""
def __init__(self):
self.fingerprint_cache = {}
def extract_audio_fingerprint(self, file_path, sample_duration=30):
"""提取音频指纹"""
if not AUDIO_PROCESSING_AVAILABLE:
return None
try:
# 检查缓存
file_stat = os.stat(file_path)
cache_key = (file_path, file_stat.st_size, file_stat.st_mtime)
if cache_key in self.fingerprint_cache:
return self.fingerprint_cache[cache_key]
# 加载音频文件
y, sr = librosa.load(file_path, duration=sample_duration, mono=True)
# 提取特征
features = []
# 1. MFCC特征
mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)
mfcc_mean = np.mean(mfcc, axis=1)
features.extend(mfcc_mean.tolist())
# 2. 频谱质心
spectral_centroids = librosa.feature.spectral_centroid(y=y, sr=sr)
features.append(np.mean(spectral_centroids))
# 3. 频谱带宽
spectral_bandwidth = librosa.feature.spectral_bandwidth(y=y, sr=sr)
features.append(np.mean(spectral_bandwidth))
# 4. 过零率
zero_crossing_rate = librosa.feature.zero_crossing_rate(y)
features.append(np.mean(zero_crossing_rate))
# 5. 节奏特征
tempo, _ = librosa.beat.beat_track(y=y, sr=sr)
features.append(tempo)
# 转换为指纹字符串
fingerprint = self._features_to_fingerprint(features)
self.fingerprint_cache[cache_key] = fingerprint
return fingerprint
except Exception as e:
logger.debug(f"提取音频指纹时出错 {file_path}: {e}")
return None
def _features_to_fingerprint(self, features):
"""将特征向量转换为指纹字符串"""
# 将特征量化为整数以减少细微差异的影响
quantized = [int(f * 1000) for f in features]
return "audio_" + "_".join(map(str, quantized))
def calculate_audio_similarity(self, fingerprint1, fingerprint2):
"""计算两个音频指纹的相似度"""
if not fingerprint1 or not fingerprint2:
return 0
if fingerprint1 == fingerprint2:
return 1.0
try:
# 提取特征值
features1 = [int(x) for x in fingerprint1.replace("audio_", "").split("_")]
features2 = [int(x) for x in fingerprint2.replace("audio_", "").split("_")]
if len(features1) != len(features2):
return 0
# 计算余弦相似度
dot_product = np.dot(features1, features2)
norm1 = np.linalg.norm(features1)
norm2 = np.linalg.norm(features2)
if norm1 == 0 or norm2 == 0:
return 0
similarity = dot_product / (norm1 * norm2)
return max(0, similarity) # 确保非负
except Exception as e:
logger.debug(f"计算音频相似度时出错: {e}")
return 0
class ArchiveProcessor:
"""压缩包处理器"""
def __init__(self):
self.archive_extensions = {'.zip', '.rar', '.7z', '.tar', '.gz'}
def extract_archive_contents(self, archive_path):
"""提取压缩包内容信息"""
try:
archive_info = {
'path': archive_path,
'size': os.path.getsize(archive_path),
'files': [],
'all_files_exist': True
}
ext = os.path.splitext(archive_path)[1].lower()
if ext == '.zip':
with zipfile.ZipFile(archive_path, 'r') as zip_ref:
for file_info in zip_ref.infolist():
if not file_info.is_dir():
archive_info['files'].append({
'name': file_info.filename,
'size': file_info.file_size,
'compressed_size': file_info.compress_size
})
elif ext == '.rar':
with rarfile.RarFile(archive_path, 'r') as rar_ref:
for file_info in rar_ref.infolist():
if not file_info.is_dir():
archive_info['files'].append({
'name': file_info.filename,
'size': file_info.file_size,
'compressed_size': file_info.compress_size
})
return archive_info
except Exception as e:
logger.error(f"提取压缩包内容时出错 {archive_path}: {e}")
return None
def check_archive_contents_exist(self, archive_info, existing_files):
"""检查压缩包内的文件是否都已存在"""
if not archive_info or not archive_info['files']:
return False
for file_in_archive in archive_info['files']:
file_found = False
for existing_file in existing_files:
# 简单的文件名和大小匹配
if (os.path.basename(existing_file['path']).lower() == file_in_archive['name'].lower() and
existing_file['size'] >= file_in_archive['size'] * 0.9): # 允许10%的大小差异
file_found = True
break
if not file_found:
return False
return True
class ContentBasedDuplicateDetector:
"""基于内容的重复检测器 - 增强版,支持音乐"""
def __init__(self, similarity_threshold=0.7):
self.similarity_threshold = similarity_threshold
self.video_fingerprint_extractor = VideoFingerprintExtractor()
self.audio_fingerprint_extractor = AudioFingerprintExtractor()
self.metadata_extractor = AdvancedMovieMetadataExtractor()
self.music_metadata_extractor = MusicMetadataExtractor()
self.archive_processor = ArchiveProcessor()
def group_similar_movies_by_content(self, files):
"""基于内容指纹对电影进行分组 - 兼容性方法"""
# 这个方法是为了向后兼容,实际调用 group_similar_files_by_content
return self.group_similar_files_by_content(files, "video")
def group_similar_files_by_content(self, files, media_type="video"):
"""基于内容指纹对文件进行分组"""
logger.info(f"开始基于内容指纹的{media_type}相似度分析...")
if (media_type == "video" and not VIDEO_PROCESSING_AVAILABLE) or \
(media_type == "audio" and not AUDIO_PROCESSING_AVAILABLE):
logger.warning(f"{media_type}处理功能不可用,跳过基于内容的分析")
return []
file_fingerprints = {}
for file_info in files:
file_path = file_info["path"]
logger.debug(f"提取{media_type}指纹: {os.path.basename(file_path)}")
if media_type == "video" and VIDEO_PROCESSING_AVAILABLE:
fingerprint = self.video_fingerprint_extractor.extract_video_fingerprint(file_path)
elif media_type == "audio" and AUDIO_PROCESSING_AVAILABLE:
fingerprint = self.audio_fingerprint_extractor.extract_audio_fingerprint(file_path)
else:
fingerprint = None
if fingerprint:
file_info["content_fingerprint"] = fingerprint
file_fingerprints[file_path] = fingerprint
else:
file_info["content_fingerprint"] = None
# 基于指纹进行分组
groups = []
processed_files = set()
for file_path1, fingerprint1 in file_fingerprints.items():
if file_path1 in processed_files:
continue
current_group = [file_path1]
processed_files.add(file_path1)
for file_path2, fingerprint2 in file_fingerprints.items():
if file_path2 in processed_files or file_path1 == file_path2:
continue
if media_type == "video":
similarity = self.video_fingerprint_extractor.calculate_video_similarity(fingerprint1, fingerprint2)
else:
similarity = self.audio_fingerprint_extractor.calculate_audio_similarity(fingerprint1, fingerprint2)
if similarity >= self.similarity_threshold:
current_group.append(file_path2)
processed_files.add(file_path2)
if len(current_group) > 1:
groups.append(current_group)
# 转换为文件信息组
file_groups = []
for group in groups:
file_info_group = []
for file_path in group:
file_info = next((f for f in files if f["path"] == file_path), None)
if file_info:
file_info_group.append(file_info)
file_groups.append(file_info_group)
logger.info(f"基于内容指纹找到 {len(file_groups)} 组相似{media_type}文件")
return file_groups
def group_similar_music_by_metadata(self, files):
"""基于元数据对音乐进行分组"""
logger.info("开始基于元数据的音乐相似度分析...")
# 为每个文件提取音乐元数据
for file_info in files:
filename = file_info.get("filename", "")
metadata = self.music_metadata_extractor.extract_music_metadata(filename)
quality_info = self.music_metadata_extractor.extract_quality_info(
file_info["path"], file_info["size"]
)
file_info.update(metadata)
file_info.update(quality_info)
# 基于艺术家和标题进行分组
music_groups = {}
for file_info in files:
artist = file_info.get("artist", "").lower().strip()
title = file_info.get("cleaned_title", "").lower().strip()
if artist and title:
group_key = f"{artist}||{title}"
elif title:
group_key = f"unknown||{title}"
else:
continue
if group_key not in music_groups:
music_groups[group_key] = []
music_groups[group_key].append(file_info)
# 只返回有多个文件的组
similar_groups = [group for group in music_groups.values() if len(group) > 1]
logger.info(f"基于元数据找到 {len(similar_groups)} 组相似音乐文件")
return similar_groups
def enhance_with_metadata_matching(self, files, content_groups, media_type="video"):
"""使用元数据匹配增强内容分组"""
logger.info(f"使用元数据匹配增强{media_type}内容分组...")
if media_type == "video":
# 为每个文件提取详细元数据
for file_info in files:
filename = file_info.get("filename", "")
metadata = self.metadata_extractor.extract_detailed_metadata(filename)
file_info["detailed_metadata"] = metadata
# 基于元数据的补充分组
metadata_groups = self.group_by_video_metadata(files)
else:
# 音乐元数据分组
metadata_groups = self.group_similar_music_by_metadata(files)
# 合并内容分组和元数据分组
merged_groups = self.merge_groups(content_groups, metadata_groups)
return merged_groups
def group_by_video_metadata(self, files):
"""基于视频元数据分组"""
metadata_groups = {}
for file_info in files:
metadata = file_info.get("detailed_metadata", {})
title = metadata.get("title", "").lower().strip()
year = metadata.get("year", "")
if title and len(title) > 2:
group_key = f"{title}_{year}" if year else title
if group_key not in metadata_groups:
metadata_groups[group_key] = []
metadata_groups[group_key].append(file_info)
# 只返回有多个文件的组
return [group for group in metadata_groups.values() if len(group) > 1]
def merge_groups(self, content_groups, metadata_groups):
"""合并内容分组和元数据分组"""
all_groups = content_groups.copy()
for metadata_group in metadata_groups:
# 检查这个元数据组是否已经存在于内容分组中
found = False
for content_group in content_groups:
common_files = set(f["path"] for f in content_group) & set(
f["path"] for f in metadata_group
)
if common_files:
# 合并组
content_group.extend(
[
f
for f in metadata_group
if f["path"] not in set(f["path"] for f in content_group)
]
)
found = True
break
if not found:
all_groups.append(metadata_group)
return all_groups
def find_redundant_archives(self, files, archive_files):
"""查找冗余的压缩包(内容已全部存在)"""
logger.info("开始查找冗余压缩包...")
redundant_archives = []
for archive_file in archive_files:
archive_info = self.archive_processor.extract_archive_contents(archive_file["path"])
if archive_info and self.archive_processor.check_archive_contents_exist(archive_info, files):
redundant_archives.append(archive_file)
logger.info(f"发现冗余压缩包: {os.path.basename(archive_file['path'])}")
logger.info(f"找到 {len(redundant_archives)} 个冗余压缩包")
return redundant_archives
class IntelligentDuplicateCleaner:
def __init__(
self, target_dirs, db_path="file_cleaner.db", max_workers=4, prefer_folders=None
):
# 修改为支持多个目录
if isinstance(target_dirs, str):
self.target_dirs = [target_dirs]
else:
self.target_dirs = target_dirs
self.prefer_folders = prefer_folders or []
# 如果数据库文件存在,先备份再重新初始化
if os.path.exists(db_path):
logger.info("检测到现有数据库,进行备份和升级...")
backup_path = db_path + ".backup"
try:
shutil.copy2(db_path, backup_path)
logger.info(f"数据库已备份到: {backup_path}")
except Exception as e:
logger.warning(f"数据库备份失败: {e}")
self.db = PerformanceOptimizedFileDatabase(db_path)
self.max_workers = max_workers
self.metadata_extractor = MovieMetadataExtractor()
self.music_metadata_extractor = MusicMetadataExtractor()
# 添加内容检测器
self.content_detector = ContentBasedDuplicateDetector()
self.archive_processor = ArchiveProcessor()
# 媒体文件扩展名
self.video_extensions = {
".mp4", ".avi", ".mkv", ".mov", ".wmv", ".flv", ".webm",
".m4v", ".3gp", ".mpg", ".mpeg", ".ts", ".m2ts", ".vob", ".rmvb"
}
self.audio_extensions = {
".mp3", ".wav", ".flac", ".aac", ".ogg", ".wma", ".m4a",
".aiff", ".ape", ".opus", ".amr", ".mp2", ".mp1", ".ac3", ".dts"
}
self.archive_extensions = {'.zip', '.rar', '.7z', '.tar', '.gz'}
# 性能统计
self.stats = {
"files_processed": 0,
"files_skipped": 0,
"hash_time": 0,
"start_time": None,
}
self.hash_cache = {}
logger.info(f"初始化智能重复文件清理器,目标目录: {target_dirs}")
def get_file_source_folder(self, file_path):
"""获取文件所属的源文件夹"""
for target_dir in self.target_dirs:
if file_path.startswith(target_dir):
return target_dir
return None
def get_file_hash_complete(self, file_path):
"""完整文件哈希计算"""
hash_md5 = hashlib.md5()
try:
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
except Exception as e:
logger.error(f"计算文件完整哈希时出错 {file_path}: {e}")
return None
def get_file_sample_hash(self, file_path, sample_points=3, sample_size=4096):
"""文件采样哈希"""
try:
file_size = os.path.getsize(file_path)
if file_size <= sample_size * sample_points:
# 小文件直接计算完整哈希
return self.get_file_hash_complete(file_path)
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
# 采样点开头、25%、50%、75%、结尾
positions = [
0, # 开头
file_size // 4 - sample_size // 2, # 25%
file_size // 2 - sample_size // 2, # 50%
file_size * 3 // 4 - sample_size // 2, # 75%
file_size - sample_size, # 结尾
]
for pos in positions[:sample_points]:
if pos < 0:
pos = 0
f.seek(pos)
hash_md5.update(f.read(sample_size))
return hash_md5.hexdigest()
except Exception as e:
logger.error(f"文件采样时出错 {file_path}: {e}")
return None
def extract_content_signature(self, file_path, skip_start_percent=0.01):
"""提取内容特征签名 - 跳过开头部分避免广告影响"""
try:
file_size = os.path.getsize(file_path)
# 跳过开头的部分(通常是广告)
skip_bytes = int(file_size * skip_start_percent)
# 简单的内容特征提取策略
signature_parts = []
# 1. 文件大小范围
size_bucket = self.get_size_bucket(file_size)
signature_parts.append(f"size_{size_bucket}")
# 2. 跳过开头的文件采样哈希
sample_hash = self.get_file_sample_hash_skip_start(file_path, skip_bytes)
if sample_hash:
signature_parts.append(f"sample_{sample_hash[:12]}")
return "_".join(signature_parts)
except Exception as e:
logger.error(f"提取内容特征时出错 {file_path}: {e}")
return None
def get_file_sample_hash_skip_start(
self, file_path, skip_bytes, sample_points=4, sample_size=8192
):
"""文件采样哈希 - 跳过开头指定字节数"""
try:
file_size = os.path.getsize(file_path)
if file_size <= skip_bytes + sample_size * sample_points:
# 如果文件太小,使用完整哈希但跳过开头
return self.get_file_hash_skip_start(file_path, skip_bytes)
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
# 跳过开头指定字节
f.seek(skip_bytes)
# 采样点:跳过开头后的位置
positions = [
skip_bytes, # 跳过后的开头
skip_bytes + (file_size - skip_bytes) // 3, # 1/3处
skip_bytes + (file_size - skip_bytes) * 2 // 3, # 2/3处
file_size - sample_size, # 结尾
]
for pos in positions[:sample_points]:
if pos < skip_bytes:
pos = skip_bytes
if pos + sample_size > file_size:
pos = file_size - sample_size
f.seek(pos)
hash_md5.update(f.read(sample_size))
return hash_md5.hexdigest()
except Exception as e:
logger.error(f"文件采样时出错 {file_path}: {e}")
return None
def get_file_hash_skip_start(self, file_path, skip_bytes):
"""完整文件哈希 - 跳过开头指定字节数"""
hash_md5 = hashlib.md5()
try:
with open(file_path, "rb") as f:
# 跳过开头
f.seek(skip_bytes)
for chunk in iter(lambda: f.read(8192), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
except Exception as e:
logger.error(f"计算文件哈希时出错 {file_path}: {e}")
return None
def get_size_bucket(self, file_size):
"""将文件大小分桶"""
if file_size > 8 * 1024 * 1024 * 1024: # >8GB
return "xl"
elif file_size > 4 * 1024 * 1024 * 1024: # >4GB
return "large"
elif file_size > 2 * 1024 * 1024 * 1024: # >2GB
return "medium"
elif file_size > 1 * 1024 * 1024 * 1024: # >1GB
return "small"
else:
return "tiny"
def process_single_file(self, file_path):
"""处理单个文件,提取元数据"""
if not os.path.exists(file_path):
return None
# 检查文件是否可读
if not os.access(file_path, os.R_OK):
logger.debug(f"文件不可读,跳过: {file_path}")
self.stats["files_skipped"] += 1
return None
# 检查文件大小,跳过过小或过大的文件
try:
file_size = os.path.getsize(file_path)
if file_size < 1024: # 小于1KB的文件跳过
logger.debug(f"文件过小,跳过: {file_path}")
self.stats["files_skipped"] += 1
return None
if file_size > 100 * 1024 * 1024 * 1024: # 大于100GB的文件跳过
logger.debug(f"文件过大,跳过: {file_path}")
self.stats["files_skipped"] += 1
return None
except OSError:
self.stats["files_skipped"] += 1
return None
file_ext = os.path.splitext(file_path)[1].lower()
file_stat = os.stat(file_path)
if file_ext in self.video_extensions:
return self._process_video_file(file_path, file_stat, file_ext)
elif file_ext in self.audio_extensions:
return self._process_audio_file(file_path, file_stat, file_ext)
elif file_ext in self.archive_extensions:
return self._process_archive_file(file_path, file_stat, file_ext)
else:
self.stats["files_skipped"] += 1
return None
def _process_video_file(self, file_path, file_stat, file_ext):
"""处理视频文件"""
start_time = time.time()
cache_key = (file_path, file_stat.st_size, file_stat.st_mtime)
if cache_key in self.hash_cache:
file_hash = self.hash_cache[cache_key]
else:
# 对于大视频文件,使用采样哈希
if file_stat.st_size > 500 * 1024 * 1024: # >500MB
file_hash = self.get_file_sample_hash(file_path)
else:
file_hash = self.get_file_hash_complete(file_path)
if file_hash:
self.hash_cache[cache_key] = file_hash
hash_time = time.time() - start_time
self.stats["hash_time"] += hash_time
if file_hash:
# 提取电影元数据
filename = os.path.basename(file_path)
movie_name = self.metadata_extractor.extract_movie_name(filename)
resolution = self.metadata_extractor.extract_resolution(filename)
quality_score = self.metadata_extractor.extract_quality_score(
filename, file_stat.st_size
)
content_signature = self.extract_content_signature(file_path)
file_info = {
"path": file_path,
"hash": file_hash,
"size": file_stat.st_size,
"type": "video",
"mod_time": datetime.fromtimestamp(file_stat.st_mtime),
"is_archive": False,
"archive_path": None,
"movie_name": movie_name,
"resolution": resolution,
"quality_score": quality_score,
"content_signature": content_signature,
"filename": filename,
"media_type": "video"
}
self.stats["files_processed"] += 1
if self.stats["files_processed"] % 1000 == 0:
logger.info(
f"已处理 {self.stats['files_processed']} 个文件,跳过 {self.stats['files_skipped']} 个文件"
)
return file_info
self.stats["files_skipped"] += 1
return None
def _process_audio_file(self, file_path, file_stat, file_ext):
"""处理音频文件"""
start_time = time.time()
# 对于音频文件,使用完整哈希以确保准确性
file_hash = self.get_file_hash_complete(file_path)
hash_time = time.time() - start_time
self.stats["hash_time"] += hash_time
if file_hash:
# 提取音乐元数据
filename = os.path.basename(file_path)
music_metadata = self.music_metadata_extractor.extract_music_metadata(filename)
quality_info = self.music_metadata_extractor.extract_quality_info(file_path, file_stat.st_size)
file_info = {
"path": file_path,
"hash": file_hash,
"size": file_stat.st_size,
"type": "audio",
"mod_time": datetime.fromtimestamp(file_stat.st_mtime),
"is_archive": False,
"archive_path": None,
"filename": filename,
"media_type": "audio",
"artist": music_metadata.get("artist", ""),
"title": music_metadata.get("title", ""),
"album": music_metadata.get("album", ""),
"duration": quality_info.get("duration", 0),
"bitrate": quality_info.get("bitrate", 0),
"sample_rate": quality_info.get("sample_rate", 0),
"channels": quality_info.get("channels", 0),
"quality_score": quality_info.get("quality_score", 0)
}
self.stats["files_processed"] += 1
if self.stats["files_processed"] % 1000 == 0:
logger.info(
f"已处理 {self.stats['files_processed']} 个文件,跳过 {self.stats['files_skipped']} 个文件"
)
return file_info
self.stats["files_skipped"] += 1
return None
def _process_archive_file(self, file_path, file_stat, file_ext):
"""处理压缩包文件"""
file_hash = self.get_file_hash_complete(file_path)
if file_hash:
file_info = {
"path": file_path,
"hash": file_hash,
"size": file_stat.st_size,
"type": "archive",
"mod_time": datetime.fromtimestamp(file_stat.st_mtime),
"is_archive": True,
"archive_path": None,
"filename": os.path.basename(file_path),
"media_type": "archive"
}
self.stats["files_processed"] += 1
if self.stats["files_processed"] % 1000 == 0:
logger.info(
f"已处理 {self.stats['files_processed']} 个文件,跳过 {self.stats['files_skipped']} 个文件"
)
return file_info
self.stats["files_skipped"] += 1
return None
def scan_files_parallel(self, media_type="all"):
"""并行扫描多个目录中的所有文件"""
logger.info(f"开始并行扫描 {len(self.target_dirs)} 个目录,媒体类型: {media_type}...")
self.stats["start_time"] = time.time()
file_type_stats = {"video": 0, "audio": 0, "archive": 0, "other": 0, "skipped": 0}
all_files = []
media_files_to_process = []
logger.info("第一阶段:收集所有目录的文件路径...")
for target_dir in self.target_dirs:
logger.info(f"扫描目录: {target_dir}")
for root, dirs, files in os.walk(target_dir):
if any(
skip_dir in root
for skip_dir in ["temp_extract", "@eaDir", ".Trash"]
):
continue
for file in files:
file_path = os.path.join(root, file)
file_ext = os.path.splitext(file)[1].lower()
should_process = False
if media_type == "all":
should_process = (file_ext in self.video_extensions or
file_ext in self.audio_extensions or
file_ext in self.archive_extensions)
elif media_type == "video":
should_process = file_ext in self.video_extensions
elif media_type == "audio":
should_process = file_ext in self.audio_extensions
elif media_type == "archive":
should_process = file_ext in self.archive_extensions
if should_process:
media_files_to_process.append(file_path)
if file_ext in self.video_extensions:
file_type_stats["video"] += 1
elif file_ext in self.audio_extensions:
file_type_stats["audio"] += 1
elif file_ext in self.archive_extensions:
file_type_stats["archive"] += 1
else:
file_type_stats["other"] += 1
logger.info("文件类型统计:")
logger.info(f" 视频文件: {file_type_stats['video']}")
logger.info(f" 音频文件: {file_type_stats['audio']}")
logger.info(f" 压缩包文件: {file_type_stats['archive']}")
logger.info(f" 其他文件: {file_type_stats['other']}")
logger.info(f" 总计媒体文件: {len(media_files_to_process)}")
if len(media_files_to_process) == 0:
logger.warning("没有找到任何媒体文件!请检查文件扩展名配置和目录路径。")
return []
logger.info("第二阶段:并行处理文件...")
# 初始化 processed_count
processed_count = 0
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
future_to_file = {
executor.submit(self.process_single_file, file_path): file_path
for file_path in media_files_to_process
}
batch_files = []
for future in as_completed(future_to_file):
file_path = future_to_file[future]
try:
result = future.result()
if result:
# 添加文件来源信息
result["source_folder"] = self.get_file_source_folder(file_path)
batch_files.append(result)
processed_count += 1
if len(batch_files) >= 100:
self.db.bulk_add_files(batch_files)
all_files.extend(batch_files) # 修复在清空前添加到all_files
batch_files = []
# 更频繁地显示进度
if processed_count % 500 == 0: # 从1000改为500
logger.info(f"已处理 {processed_count} 个文件,跳过 {self.stats['files_skipped']} 个文件")
except Exception as e:
logger.error(f"处理文件 {file_path} 时出错: {e}")
self.stats["files_skipped"] += 1
if batch_files:
self.db.bulk_add_files(batch_files)
all_files.extend(batch_files)
total_time = time.time() - self.stats["start_time"]
logger.info(
f"文件扫描完成。处理了 {self.stats['files_processed']} 个文件,跳过 {self.stats['files_skipped']} 个文件"
)
logger.info(f"哈希计算总时间: {self.stats['hash_time']:.2f}")
logger.info(f"总扫描时间: {total_time:.2f}")
return all_files
def find_similar_movies_enhanced(
self, files, similarity_threshold=0.8, skip_start_percent=0.1
):
"""增强版相似电影查找 - 跳过开头避免广告影响"""
logger.info("开始查找相似电影文件(增强版)...")
# 使用增强版电影名称提取
movie_groups = {}
for file_info in files:
# 使用两种方法提取电影名称
filename = file_info.get("filename", "")
movie_name_normal = self.metadata_extractor.extract_movie_name(filename)
movie_name_core = self.metadata_extractor.extract_core_movie_name(filename)
# 选择最合适的名称
if movie_name_core and len(movie_name_core) > 3:
movie_name = movie_name_core
else:
movie_name = movie_name_normal
if movie_name and len(movie_name) > 2:
if movie_name not in movie_groups:
movie_groups[movie_name] = []
movie_groups[movie_name].append(file_info)
# 查找每个电影组内的重复/相似文件
similar_groups = {}
for movie_name, file_group in movie_groups.items():
if len(file_group) <= 1:
continue
logger.info(f"分析电影: {movie_name} (共{len(file_group)}个版本)")
# 按内容特征分组 - 使用跳过开头的特征
signature_groups = {}
for file_info in file_group:
# 重新计算内容特征,跳过开头
file_path = file_info["path"]
enhanced_signature = self.extract_content_signature(
file_path, skip_start_percent
)
if enhanced_signature:
signature = enhanced_signature
else:
signature = file_info.get("content_signature", "unknown")
if signature not in signature_groups:
signature_groups[signature] = []
signature_groups[signature].append(file_info)
# 对于每个签名组,选择最佳版本
for signature, signature_group in signature_groups.items():
if len(signature_group) > 1:
# 按质量评分排序
signature_group.sort(
key=lambda x: x.get("quality_score", 0), reverse=True
)
group_key = f"{movie_name}_{signature}"
similar_groups[group_key] = signature_group
logger.info(
f" 发现 {len(signature_group)} 个相似文件 (特征: {signature}):"
)
for i, file_info in enumerate(signature_group):
logger.info(
f" {i+1}. {file_info['filename']} "
f"(质量分: {file_info.get('quality_score', 0)})"
)
logger.info(f"找到 {len(similar_groups)} 组相似电影文件")
return similar_groups
def find_similar_music_files(self, files, similarity_threshold=0.8):
"""查找相似音乐文件"""
logger.info("开始查找相似音乐文件...")
# 使用内容检测器进行音乐分组
content_groups = self.content_detector.group_similar_files_by_content(files, "audio")
enhanced_groups = self.content_detector.enhance_with_metadata_matching(files, content_groups, "audio")
# 转换为字典格式
similar_groups = {}
for i, group in enumerate(enhanced_groups):
if len(group) > 1:
# 获取组的关键信息作为组名
first_file = group[0]
artist = first_file.get("artist", "Unknown")
title = first_file.get("title", "Unknown")
group_key = f"music_{i}_{artist}_{title}"[:100] # 限制长度
similar_groups[group_key] = group
logger.info(f" 发现音乐组: {artist} - {title} (共{len(group)}个文件)")
logger.info(f"找到 {len(similar_groups)} 组相似音乐文件")
return similar_groups
def find_redundant_archives(self, files):
"""查找冗余压缩包"""
logger.info("开始查找冗余压缩包...")
# 分离出压缩包文件和其他文件
archive_files = [f for f in files if f.get("media_type") == "archive"]
other_files = [f for f in files if f.get("media_type") in ["video", "audio"]]
redundant_archives = self.content_detector.find_redundant_archives(other_files, archive_files)
return redundant_archives
def select_best_version(self, file_group, strategy="quality"):
"""选择最佳版本的文件(增强版,支持文件夹优先级)"""
if not file_group:
return None, []
# 创建文件组的副本以避免修改原始数据
sorted_group = file_group.copy()
# 第一步:如果设置了文件夹优先级,优先考虑
if self.prefer_folders:
# 为每个文件计算优先级分数
for file_info in sorted_group:
source_folder = self.get_file_source_folder(file_info["path"])
if source_folder in self.prefer_folders:
# 在质量分基础上增加优先级分数
file_info["priority_boost"] = (
1000 - self.prefer_folders.index(source_folder) * 100
)
else:
file_info["priority_boost"] = 0
# 第二步:按策略排序
if strategy == "quality":
# 如果有优先级提升,则结合质量分和优先级
if self.prefer_folders:
sorted_group.sort(
key=lambda x: x.get("quality_score", 0)
+ x.get("priority_boost", 0),
reverse=True,
)
else:
sorted_group.sort(key=lambda x: x.get("quality_score", 0), reverse=True)
elif strategy == "size":
sorted_group.sort(key=lambda x: x["size"], reverse=True)
elif strategy == "resolution":
resolution_order = {"4K": 4, "1080p": 3, "720p": 2, "HD": 1, "Unknown": 0}
sorted_group.sort(
key=lambda x: resolution_order.get(x.get("resolution", "Unknown"), 0),
reverse=True,
)
else: # 'newest'
sorted_group.sort(key=lambda x: x["mod_time"], reverse=True)
best_file = sorted_group[0]
files_to_delete = sorted_group[1:]
return best_file, files_to_delete
def select_best_music_version(self, file_group, strategy="quality"):
"""选择最佳版本的音乐文件"""
if not file_group:
return None, []
sorted_group = file_group.copy()
# 第一步:如果设置了文件夹优先级,优先考虑
if self.prefer_folders:
for file_info in sorted_group:
source_folder = self.get_file_source_folder(file_info["path"])
if source_folder in self.prefer_folders:
file_info["priority_boost"] = (
1000 - self.prefer_folders.index(source_folder) * 100
)
else:
file_info["priority_boost"] = 0
# 第二步:按策略排序
if strategy == "quality":
if self.prefer_folders:
sorted_group.sort(
key=lambda x: x.get("quality_score", 0) + x.get("priority_boost", 0),
reverse=True,
)
else:
sorted_group.sort(key=lambda x: x.get("quality_score", 0), reverse=True)
elif strategy == "bitrate":
sorted_group.sort(key=lambda x: x.get("bitrate", 0), reverse=True)
elif strategy == "size":
sorted_group.sort(key=lambda x: x["size"], reverse=True)
else: # 'newest'
sorted_group.sort(key=lambda x: x["mod_time"], reverse=True)
best_file = sorted_group[0]
files_to_delete = sorted_group[1:]
return best_file, files_to_delete
def remove_similar_duplicates(
self, similar_groups, dry_run=True, strategy="quality", no_backup=False
):
"""删除相似的重复文件 - 支持直接删除模式"""
logger.info("开始处理相似电影文件...")
kept_files = []
deleted_files = []
delete_errors = []
for group_name, file_group in similar_groups.items():
if len(file_group) <= 1:
continue
best_file, files_to_delete = self.select_best_version(file_group, strategy)
logger.info(f"\n电影组: {group_name}")
logger.info(
f" 保留: {best_file['filename']} "
f"(质量分: {best_file.get('quality_score', 0)})"
)
kept_files.append(best_file)
for file_info in files_to_delete:
file_path = file_info["path"]
if dry_run:
logger.info(
f" [干运行] 将删除: {file_info['filename']} "
f"(质量分: {file_info.get('quality_score', 0)})"
)
else:
try:
if os.path.exists(file_path):
if no_backup:
# 直接删除模式
os.remove(file_path)
logger.info(f" 🗑️ 已直接删除: {file_info['filename']}")
deleted_files.append(file_path)
else:
# 备份模式
source_dir = os.path.dirname(file_path)
backup_dir = os.path.join(
source_dir, ".similar_movie_backup"
)
os.makedirs(backup_dir, exist_ok=True)
backup_path = os.path.join(
backup_dir, os.path.basename(file_path)
)
counter = 1
while os.path.exists(backup_path):
name, ext = os.path.splitext(
os.path.basename(file_path)
)
backup_path = os.path.join(
backup_dir, f"{name}_{counter}{ext}"
)
counter += 1
try:
os.rename(file_path, backup_path)
logger.info(
f" 已移动相似电影到备份: {file_info['filename']}"
)
except OSError as e:
if e.errno == 18: # EXDEV - 跨设备链接错误
logger.info(
f" 跨设备移动文件,使用复制方式: {file_info['filename']}"
)
shutil.copy2(file_path, backup_path)
os.remove(file_path)
logger.info(
f" 已复制并删除相似电影到备份: {file_info['filename']}"
)
else:
raise
deleted_files.append(file_path)
# 记录删除操作
self.db.mark_file_deleted(file_path, "similar_movie")
else:
logger.warning(
f" 文件不存在,跳过删除: {file_info['filename']}"
)
except Exception as e:
error_msg = f"删除文件时出错 {file_path}: {e}"
logger.error(error_msg)
delete_errors.append(error_msg)
self.db.add_operation(
"error", file_path, reason="delete_failed", details=str(e)
)
if delete_errors:
logger.error(f"删除过程中遇到 {len(delete_errors)} 个错误")
logger.info(f"保留了 {len(kept_files)} 个最佳版本文件")
logger.info(f"处理了 {len(deleted_files)} 个相似电影文件")
return kept_files, deleted_files
def remove_similar_music_duplicates(
self, similar_groups, dry_run=True, strategy="quality", no_backup=False
):
"""删除相似的重复音乐文件 - 优化数据库操作"""
logger.info("开始处理相似音乐文件...")
kept_files = []
deleted_files = []
delete_errors = []
for group_name, file_group in similar_groups.items():
if len(file_group) <= 1:
continue
best_file, files_to_delete = self.select_best_music_version(file_group, strategy)
# 显示组信息
artist = best_file.get("artist", "Unknown")
title = best_file.get("title", "Unknown")
logger.info(f"\n音乐组: {artist} - {title}")
logger.info(
f" 保留: {best_file['filename']} "
f"(质量分: {best_file.get('quality_score', 0)}, 比特率: {best_file.get('bitrate', 0)}kbps)"
)
kept_files.append(best_file)
# 批量处理删除操作以减少数据库锁定
batch_delete_files = []
for file_info in files_to_delete:
file_path = file_info["path"]
if dry_run:
logger.info(
f" [干运行] 将删除: {file_info['filename']} "
f"(质量分: {file_info.get('quality_score', 0)}, 比特率: {file_info.get('bitrate', 0)}kbps)"
)
else:
batch_delete_files.append((file_info, file_path))
# 如果不是干运行模式,批量处理删除
if not dry_run and batch_delete_files:
self._batch_delete_files(batch_delete_files, deleted_files, delete_errors, "similar_music")
if delete_errors:
logger.error(f"删除过程中遇到 {len(delete_errors)} 个错误")
logger.info(f"保留了 {len(kept_files)} 个最佳版本音乐文件")
logger.info(f"处理了 {len(deleted_files)} 个相似音乐文件")
return kept_files, deleted_files
def _batch_delete_files(self, batch_delete_files, deleted_files, delete_errors, reason):
"""批量删除文件,减少数据库操作频率"""
for file_info, file_path in batch_delete_files:
try:
if os.path.exists(file_path):
# 直接删除文件
os.remove(file_path)
logger.info(f" 🗑️ 已直接删除: {file_info['filename']}")
deleted_files.append(file_path)
# 批量标记删除,减少数据库操作频率
if len(deleted_files) % 10 == 0: # 每10个文件批量标记一次
self._batch_mark_files_deleted(deleted_files[-10:], reason)
else:
logger.warning(f" 文件不存在,跳过删除: {file_info['filename']}")
except Exception as e:
error_msg = f"删除文件时出错 {file_path}: {e}"
logger.error(error_msg)
delete_errors.append(error_msg)
self.db.add_operation(
"error", file_path, reason="delete_failed", details=str(e)
)
# 标记剩余的文件
if deleted_files:
self._batch_mark_files_deleted(deleted_files, reason)
def _batch_mark_files_deleted(self, file_paths, reason):
"""批量标记文件为已删除"""
try:
for file_path in file_paths:
self.db.mark_file_deleted(file_path, reason)
except Exception as e:
logger.error(f"批量标记删除失败: {e}")
def remove_redundant_archives(
self, redundant_archives, dry_run=True, no_backup=False
):
"""删除冗余压缩包"""
logger.info("开始处理冗余压缩包...")
deleted_archives = []
delete_errors = []
for archive_info in redundant_archives:
file_path = archive_info["path"]
if dry_run:
logger.info(f" [干运行] 将删除冗余压缩包: {archive_info['filename']}")
else:
try:
if os.path.exists(file_path):
if no_backup:
# 直接删除模式
os.remove(file_path)
logger.info(f" 🗑️ 已直接删除冗余压缩包: {archive_info['filename']}")
deleted_archives.append(file_path)
else:
# 备份模式
source_dir = os.path.dirname(file_path)
backup_dir = os.path.join(source_dir, ".redundant_archive_backup")
os.makedirs(backup_dir, exist_ok=True)
backup_path = os.path.join(backup_dir, os.path.basename(file_path))
counter = 1
while os.path.exists(backup_path):
name, ext = os.path.splitext(os.path.basename(file_path))
backup_path = os.path.join(backup_dir, f"{name}_{counter}{ext}")
counter += 1
try:
os.rename(file_path, backup_path)
logger.info(f" 已移动冗余压缩包到备份: {archive_info['filename']}")
except OSError as e:
if e.errno == 18: # EXDEV - 跨设备链接错误
logger.info(f" 跨设备移动文件,使用复制方式: {archive_info['filename']}")
shutil.copy2(file_path, backup_path)
os.remove(file_path)
logger.info(f" 已复制并删除冗余压缩包到备份: {archive_info['filename']}")
else:
raise
deleted_archives.append(file_path)
# 记录删除操作
self.db.mark_file_deleted(file_path, "redundant_archive")
else:
logger.warning(f" 文件不存在,跳过删除: {archive_info['filename']}")
except Exception as e:
error_msg = f"删除压缩包时出错 {file_path}: {e}"
logger.error(error_msg)
delete_errors.append(error_msg)
self.db.add_operation(
"error", file_path, reason="delete_failed", details=str(e)
)
if delete_errors:
logger.error(f"删除过程中遇到 {len(delete_errors)} 个错误")
logger.info(f"处理了 {len(deleted_archives)} 个冗余压缩包")
return deleted_archives
def remove_empty_folders_efficient(self, target_dir=None):
"""高效删除空文件夹 - 修复跨设备问题"""
if target_dir is None:
target_dir = self.target_dirs[0]
logger.info(f"开始清理空文件夹: {target_dir}")
empty_folders = []
for root, dirs, files in os.walk(target_dir, topdown=False):
# 跳过备份目录和系统目录
skip_dirs = [
"@eaDir",
".Trash",
".duplicate_backup",
"temp_extract",
".similar_movie_backup",
".similar_music_backup",
".redundant_archive_backup"
]
if any(skip_dir in root for skip_dir in skip_dirs):
continue
if not dirs and not files and root != target_dir:
try:
# 检查目录是否为空(可能有隐藏文件)
if len(os.listdir(root)) == 0:
os.rmdir(root)
empty_folders.append(root)
self.db.add_operation(
"delete_folder", root, reason="empty_folder"
)
logger.debug(f"删除空文件夹: {root}")
except OSError as e:
logger.debug(f"无法删除文件夹 {root}: {e}")
logger.info(f"删除了 {len(empty_folders)} 个空文件夹")
return empty_folders
def run_advanced_cleanup(
self,
dry_run=True,
strategy="quality",
similarity_threshold=0.7,
use_content_analysis=True,
no_backup=False,
):
"""运行高级清理流程 - 支持直接删除模式"""
logger.info("开始高级电影重复文件清理流程")
if no_backup:
logger.warning("⚠️ 直接删除模式已启用 - 文件将永久删除,不可恢复!")
start_time = time.time()
self.db.add_operation(
"scan_start",
str(self.target_dirs),
reason=f"advanced_cleanup_{'no_backup' if no_backup else 'with_backup'}",
)
try:
# 1. 扫描所有目录的文件并提取元数据
all_files = self.scan_files_parallel(media_type="video")
if not all_files:
logger.warning("没有找到任何视频文件")
return {}
# 2. 使用高级算法查找相似的电影文件
similar_groups = self.find_similar_movies_advanced(
all_files, similarity_threshold, use_content_analysis
)
if not similar_groups:
logger.info("没有找到相似的电影文件")
return {}
# 3. 删除相似的重复文件(传递 no_backup 参数)
kept_files, deleted_files = self.remove_similar_duplicates(
similar_groups, dry_run, strategy, no_backup
)
# 4. 清理所有目录的空文件夹
if not dry_run:
for target_dir in self.target_dirs:
self.remove_empty_folders_efficient(target_dir)
# 记录扫描结束
self.db.add_operation(
"scan_complete",
str(self.target_dirs),
reason="advanced_cleanup_finished",
)
# 计算持续时间
duration = time.time() - start_time
# 记录扫描历史
scan_data = {
"target_directory": str(self.target_dirs),
"total_files": len(all_files),
"similar_groups": len(similar_groups),
"kept_files": len(kept_files),
"deleted_files": len(deleted_files),
"deleted_file_details": deleted_files,
"duration_seconds": duration,
"no_backup_mode": no_backup,
"media_type": "video"
}
self.db.add_scan_history(scan_data)
# 显示统计信息
self.show_advanced_statistics(scan_data)
# 只有在备份模式下才显示备份位置
if not dry_run and deleted_files and not no_backup:
self.show_backup_locations()
return scan_data
except Exception as e:
logger.error(f"高级清理过程中发生错误: {e}")
self.db.add_operation(
"error", "SYSTEM", reason="advanced_cleanup_failed", details=str(e)
)
raise
def find_similar_movies_advanced(
self, files, similarity_threshold=0.7, use_content_analysis=True
):
"""高级相似电影查找 - 结合元数据和内容分析"""
logger.info("开始高级相似电影查找...")
if use_content_analysis and VIDEO_PROCESSING_AVAILABLE:
# 使用基于内容的分析
logger.info("使用基于内容的视频指纹分析")
content_groups = self.content_detector.group_similar_movies_by_content(
files
)
# 使用元数据增强内容分组
enhanced_groups = self.content_detector.enhance_with_metadata_matching(
files, content_groups
)
# 转换为字典格式以保持兼容性
similar_groups = {}
for i, group in enumerate(enhanced_groups):
group_key = f"content_group_{i}"
similar_groups[group_key] = group
logger.info(f"基于内容分析找到 {len(similar_groups)} 组相似电影")
return similar_groups
else:
# 回退到元数据相似性分析
logger.info("使用元数据相似性分析")
return self.find_similar_movies_enhanced(files, similarity_threshold)
def show_advanced_statistics(self, scan_data):
"""显示高级清理统计信息"""
logger.info("\n" + "=" * 60)
logger.info("高级清理统计信息")
logger.info("=" * 60)
logger.info(f"扫描目录: {', '.join(self.target_dirs)}")
logger.info(f"总视频文件: {scan_data['total_files']}")
logger.info(f"相似电影组: {scan_data['similar_groups']}")
logger.info(f"保留文件: {scan_data['kept_files']}")
logger.info(f"删除文件: {scan_data['deleted_files']}")
# 计算节省的空间(估算)
estimated_saved_gb = scan_data["deleted_files"] * 2 # 假设平均每个文件2GB
logger.info(f"释放空间: 约 {estimated_saved_gb:.2f} GB (估算)")
logger.info(f"耗时: {scan_data['duration_seconds']:.2f}")
def run_intelligent_cleanup(
self,
dry_run=True,
strategy="quality",
similarity_threshold=0.8,
skip_start_percent=0.1,
no_backup=False,
):
"""运行智能清理流程 - 增强版,支持备份策略"""
logger.info("开始智能电影重复文件清理流程(增强版)")
if no_backup:
logger.warning("⚠️ 直接删除模式已启用 - 文件将永久删除,不可恢复!")
start_time = time.time()
self.db.add_operation(
"scan_start",
str(self.target_dirs),
reason=f"intelligent_cleanup_{'no_backup' if no_backup else 'with_backup'}",
)
try:
# 1. 扫描所有目录的文件并提取元数据
all_files = self.scan_files_parallel(media_type="video")
if not all_files:
logger.warning("没有找到任何视频文件")
return {}
# 2. 使用增强版算法查找相似的电影文件
similar_groups = self.find_similar_movies_enhanced(
all_files, similarity_threshold, skip_start_percent
)
if not similar_groups:
logger.info("没有找到相似的电影文件")
return {}
# 3. 删除相似的重复文件(传递 no_backup 参数)
kept_files, deleted_files = self.remove_similar_duplicates(
similar_groups, dry_run, strategy, no_backup
)
# 4. 清理所有目录的空文件夹
if not dry_run:
for target_dir in self.target_dirs:
self.remove_empty_folders_efficient(target_dir)
# 记录扫描结束
self.db.add_operation(
"scan_complete",
str(self.target_dirs),
reason="intelligent_cleanup_finished",
)
# 计算持续时间
duration = time.time() - start_time
# 记录扫描历史
scan_data = {
"target_directory": str(self.target_dirs),
"total_files": len(all_files),
"similar_groups": len(similar_groups),
"kept_files": len(kept_files),
"deleted_files": len(deleted_files),
"deleted_file_details": deleted_files,
"duration_seconds": duration,
"no_backup_mode": no_backup,
"media_type": "video"
}
self.db.add_scan_history(scan_data)
# 显示统计信息
self.show_intelligent_statistics(scan_data)
# 只有在备份模式下才显示备份位置
if not dry_run and deleted_files and not no_backup:
self.show_backup_locations()
return scan_data
except Exception as e:
logger.error(f"智能清理过程中发生错误: {e}")
self.db.add_operation(
"error", "SYSTEM", reason="intelligent_cleanup_failed", details=str(e)
)
raise
def run_music_cleanup(
self,
dry_run=True,
strategy="quality",
similarity_threshold=0.8,
use_content_analysis=True,
no_backup=False,
):
"""运行音乐文件清理流程"""
logger.info("开始音乐重复文件清理流程")
if no_backup:
logger.warning("⚠️ 直接删除模式已启用 - 文件将永久删除,不可恢复!")
start_time = time.time()
self.db.add_operation(
"scan_start",
str(self.target_dirs),
reason=f"music_cleanup_{'no_backup' if no_backup else 'with_backup'}",
)
try:
# 1. 扫描所有目录的音乐文件并提取元数据
all_files = self.scan_files_parallel(media_type="audio")
if not all_files:
logger.warning("没有找到任何音乐文件")
return {}
# 2. 查找相似的音乐文件
similar_groups = self.find_similar_music_files(all_files, similarity_threshold)
if not similar_groups:
logger.info("没有找到相似的音乐文件")
return {}
# 3. 删除相似的重复文件
kept_files, deleted_files = self.remove_similar_music_duplicates(
similar_groups, dry_run, strategy, no_backup
)
# 4. 清理所有目录的空文件夹
if not dry_run:
for target_dir in self.target_dirs:
self.remove_empty_folders_efficient(target_dir)
# 记录扫描结束
self.db.add_operation(
"scan_complete",
str(self.target_dirs),
reason="music_cleanup_finished",
)
# 计算持续时间
duration = time.time() - start_time
# 记录扫描历史
scan_data = {
"target_directory": str(self.target_dirs),
"total_files": len(all_files),
"similar_groups": len(similar_groups),
"kept_files": len(kept_files),
"deleted_files": len(deleted_files),
"deleted_file_details": deleted_files,
"duration_seconds": duration,
"no_backup_mode": no_backup,
"media_type": "audio"
}
self.db.add_scan_history(scan_data)
# 显示统计信息
self.show_music_statistics(scan_data)
return scan_data
except Exception as e:
logger.error(f"音乐清理过程中发生错误: {e}")
self.db.add_operation(
"error", "SYSTEM", reason="music_cleanup_failed", details=str(e)
)
raise
def run_archive_cleanup(
self,
dry_run=True,
no_backup=False,
):
"""运行压缩包清理流程"""
logger.info("开始压缩包清理流程")
if no_backup:
logger.warning("⚠️ 直接删除模式已启用 - 文件将永久删除,不可恢复!")
start_time = time.time()
self.db.add_operation(
"scan_start",
str(self.target_dirs),
reason=f"archive_cleanup_{'no_backup' if no_backup else 'with_backup'}",
)
try:
# 1. 扫描所有目录的文件
all_files = self.scan_files_parallel(media_type="all")
if not all_files:
logger.warning("没有找到任何文件")
return {}
# 2. 查找冗余压缩包
redundant_archives = self.find_redundant_archives(all_files)
if not redundant_archives:
logger.info("没有找到冗余压缩包")
return {}
# 3. 删除冗余压缩包
deleted_archives = self.remove_redundant_archives(
redundant_archives, dry_run, no_backup
)
# 4. 清理所有目录的空文件夹
if not dry_run:
for target_dir in self.target_dirs:
self.remove_empty_folders_efficient(target_dir)
# 记录扫描结束
self.db.add_operation(
"scan_complete",
str(self.target_dirs),
reason="archive_cleanup_finished",
)
# 计算持续时间
duration = time.time() - start_time
# 记录扫描历史
scan_data = {
"target_directory": str(self.target_dirs),
"total_files": len(all_files),
"redundant_archives": len(redundant_archives),
"deleted_archives": len(deleted_archives),
"deleted_archive_details": deleted_archives,
"duration_seconds": duration,
"no_backup_mode": no_backup,
"media_type": "archive"
}
self.db.add_scan_history(scan_data)
# 显示统计信息
self.show_archive_statistics(scan_data)
return scan_data
except Exception as e:
logger.error(f"压缩包清理过程中发生错误: {e}")
self.db.add_operation(
"error", "SYSTEM", reason="archive_cleanup_failed", details=str(e)
)
raise
def show_intelligent_statistics(self, scan_data):
"""显示智能清理统计信息"""
logger.info("\n" + "=" * 60)
logger.info("智能清理统计信息")
logger.info("=" * 60)
logger.info(f"扫描目录: {', '.join(self.target_dirs)}")
logger.info(f"总视频文件: {scan_data['total_files']}")
logger.info(f"相似电影组: {scan_data['similar_groups']}")
logger.info(f"保留文件: {scan_data['kept_files']}")
logger.info(f"删除文件: {scan_data['deleted_files']}")
# 计算节省的空间(估算)
estimated_saved_gb = scan_data["deleted_files"] * 2 # 假设平均每个文件2GB
logger.info(f"释放空间: 约 {estimated_saved_gb:.2f} GB (估算)")
logger.info(f"耗时: {scan_data['duration_seconds']:.2f}")
def show_music_statistics(self, scan_data):
"""显示音乐清理统计信息"""
logger.info("\n" + "=" * 60)
logger.info("音乐清理统计信息")
logger.info("=" * 60)
logger.info(f"扫描目录: {', '.join(self.target_dirs)}")
logger.info(f"总音乐文件: {scan_data['total_files']}")
logger.info(f"相似音乐组: {scan_data['similar_groups']}")
logger.info(f"保留文件: {scan_data['kept_files']}")
logger.info(f"删除文件: {scan_data['deleted_files']}")
# 计算节省的空间(估算)
estimated_saved_mb = scan_data["deleted_files"] * 5 # 假设平均每个文件5MB
logger.info(f"释放空间: 约 {estimated_saved_mb:.2f} MB (估算)")
logger.info(f"耗时: {scan_data['duration_seconds']:.2f}")
def show_archive_statistics(self, scan_data):
"""显示压缩包清理统计信息"""
logger.info("\n" + "=" * 60)
logger.info("压缩包清理统计信息")
logger.info("=" * 60)
logger.info(f"扫描目录: {', '.join(self.target_dirs)}")
logger.info(f"总文件: {scan_data['total_files']}")
logger.info(f"冗余压缩包: {scan_data['redundant_archives']}")
logger.info(f"删除压缩包: {scan_data['deleted_archives']}")
# 计算节省的空间
total_saved_bytes = sum(os.path.getsize(path) for path in scan_data.get('deleted_archive_details', []))
total_saved_mb = total_saved_bytes / (1024 * 1024)
logger.info(f"释放空间: {total_saved_mb:.2f} MB")
logger.info(f"耗时: {scan_data['duration_seconds']:.2f}")
def show_backup_locations(self):
"""显示备份文件位置信息"""
logger.info("\n备份文件位置:")
backup_dirs_found = set()
for target_dir in self.target_dirs:
for root, dirs, files in os.walk(target_dir):
backup_dirs = [d for d in dirs if d.endswith('_backup')]
for backup_dir in backup_dirs:
full_backup_dir = os.path.join(root, backup_dir)
backup_dirs_found.add(full_backup_dir)
if backup_dirs_found:
for backup_dir in backup_dirs_found:
# 计算备份目录中的文件数量
try:
backup_files = [
f
for f in os.listdir(backup_dir)
if os.path.isfile(os.path.join(backup_dir, f))
]
total_size = sum(
os.path.getsize(os.path.join(backup_dir, f))
for f in backup_files
) / (1024 * 1024 * 1024) # GB
logger.info(
f" {backup_dir}: {len(backup_files)} 个文件, 总大小: {total_size:.2f} GB"
)
except OSError as e:
logger.warning(f" 无法访问备份目录 {backup_dir}: {e}")
else:
logger.info(" 未找到备份目录")
def run_comprehensive_cleanup(
self,
dry_run=True,
strategy="quality",
similarity_threshold=0.8,
use_content_analysis=True,
no_backup=False,
skip_archives=False
):
"""运行全面清理流程 - 包含视频、音乐和压缩包"""
logger.info("开始全面媒体文件清理流程")
if no_backup:
logger.warning("⚠️ 直接删除模式已启用 - 文件将永久删除,不可恢复!")
start_time = time.time()
self.db.add_operation(
"scan_start",
str(self.target_dirs),
reason=f"comprehensive_cleanup_{'no_backup' if no_backup else 'with_backup'}",
)
try:
results = {}
# 1. 视频文件清理
logger.info("\n" + "=" * 50)
logger.info("阶段1: 视频文件清理")
logger.info("=" * 50)
video_result = self.run_advanced_cleanup(
dry_run=dry_run,
strategy=strategy,
similarity_threshold=similarity_threshold,
use_content_analysis=use_content_analysis,
no_backup=no_backup
)
results["video"] = video_result
# 2. 音乐文件清理
logger.info("\n" + "=" * 50)
logger.info("阶段2: 音乐文件清理")
logger.info("=" * 50)
music_result = self.run_music_cleanup(
dry_run=dry_run,
strategy=strategy,
similarity_threshold=similarity_threshold,
use_content_analysis=use_content_analysis,
no_backup=no_backup
)
results["music"] = music_result
# 3. 压缩包清理(可选)
if not skip_archives:
logger.info("\n" + "=" * 50)
logger.info("阶段3: 压缩包清理")
logger.info("=" * 50)
archive_result = self.run_archive_cleanup(
dry_run=dry_run,
no_backup=no_backup
)
results["archive"] = archive_result
# 记录扫描结束
self.db.add_operation(
"scan_complete",
str(self.target_dirs),
reason="comprehensive_cleanup_finished",
)
# 计算总持续时间
total_duration = time.time() - start_time
# 显示总统计信息
self.show_comprehensive_statistics(results, total_duration)
return results
except Exception as e:
logger.error(f"全面清理过程中发生错误: {e}")
self.db.add_operation(
"error", "SYSTEM", reason="comprehensive_cleanup_failed", details=str(e)
)
raise
def show_comprehensive_statistics(self, results, total_duration):
"""显示全面清理统计信息"""
logger.info("\n" + "=" * 60)
logger.info("全面清理统计总结")
logger.info("=" * 60)
total_deleted = 0
total_saved_gb = 0
if results.get("video"):
video_result = results["video"]
video_deleted = video_result.get("deleted_files", 0)
total_deleted += video_deleted
total_saved_gb += video_deleted * 2 # 估算每个视频2GB
logger.info(f"视频文件: 删除 {video_deleted} 个文件")
if results.get("music"):
music_result = results["music"]
music_deleted = music_result.get("deleted_files", 0)
total_deleted += music_deleted
total_saved_gb += music_deleted * 0.005 # 估算每个音乐5MB
logger.info(f"音乐文件: 删除 {music_deleted} 个文件")
if results.get("archive"):
archive_result = results["archive"]
archive_deleted = archive_result.get("deleted_archives", 0)
total_deleted += archive_deleted
# 计算实际压缩包大小
archive_size_gb = 0
for path in archive_result.get('deleted_archive_details', []):
try:
archive_size_gb += os.path.getsize(path) / (1024 * 1024 * 1024)
except:
pass
total_saved_gb += archive_size_gb
logger.info(f"压缩包: 删除 {archive_deleted} 个文件")
logger.info(f"总计删除文件: {total_deleted}")
logger.info(f"总计释放空间: {total_saved_gb:.2f} GB")
logger.info(f"总耗时: {total_duration:.2f}")
# 在 main() 函数中添加备份策略选项
def main():
# 首先声明全局变量
global logger
parser = argparse.ArgumentParser(description="智能媒体重复文件清理工具 - 完整版")
parser.add_argument("directories", nargs="*", help="要扫描的目录路径(支持多个目录)")
parser.add_argument("--dry-run", action="store_true", help="干运行模式,只显示不会实际删除")
parser.add_argument(
"--strategy",
choices=["quality", "size", "resolution", "newest", "bitrate"],
default="quality",
help="选择最佳版本策略(默认: quality)",
)
parser.add_argument(
"--similarity-threshold",
type=float,
default=0.8,
help="相似度阈值(0.0-1.0,默认: 0.8)",
)
parser.add_argument(
"--skip-start",
type=float,
default=0.1,
help="跳过文件开头的比例(0.0-0.5,默认: 0.1)",
)
parser.add_argument("--db-path", default="file_cleaner.db", help="数据库文件路径")
parser.add_argument("--workers", type=int, default=4, help="并行工作线程数 (默认: 4)")
parser.add_argument(
"--log-level",
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
default="INFO",
help="日志级别 (默认: INFO)",
)
parser.add_argument("--log-file", default="duplicate_cleaner.log", help="日志文件路径")
parser.add_argument("--prefer-folder", nargs="+", help="优先保留的文件夹(当文件质量相同时)")
parser.add_argument(
"--content-analysis",
action="store_true",
help="启用基于内容的分析(更准确但更慢)",
)
parser.add_argument(
"--no-content-analysis",
action="store_true",
help="禁用基于内容的分析(更快但准确性较低)",
)
parser.add_argument("--backup-dir", help="指定备份目录路径(避免跨设备问题)")
parser.add_argument("--no-backup", action="store_true", help="不创建备份(直接删除文件)")
# 新增参数:媒体类型和清理模式
parser.add_argument(
"--media-type",
choices=["all", "video", "audio", "archive", "comprehensive"],
default="comprehensive",
help="要处理的媒体类型 (默认: comprehensive)",
)
parser.add_argument(
"--skip-archives",
action="store_true",
help="在全面清理模式下跳过压缩包清理",
)
args = parser.parse_args()
# 处理目录参数
if not args.directories:
args.directories = [os.getcwd()]
# 验证目录参数
for directory in args.directories:
if not os.path.exists(directory):
print(f"错误: 目录 {directory} 不存在")
return
# 验证参数
if args.skip_start < 0 or args.skip_start > 0.5:
print("错误: --skip-start 参数必须在 0.0 到 0.5 之间")
return
# 重新配置日志(根据命令行参数)
log_level = getattr(logging, args.log_level)
logger = setup_logging(log_level, args.log_file)
# 现在可以使用 logger 了
if len(args.directories) == 1 and args.directories[0] == os.getcwd():
logger.info(f"未指定目录,使用当前目录: {args.directories[0]}")
# 确定是否使用内容分析
use_content_analysis = True
if args.no_content_analysis:
use_content_analysis = False
elif args.content_analysis:
use_content_analysis = True
# 如果处理库不可用,强制禁用内容分析
if use_content_analysis and (args.media_type in ["video", "all", "comprehensive"]) and not VIDEO_PROCESSING_AVAILABLE:
logger.warning("视频处理库不可用,自动禁用内容分析")
use_content_analysis = False
if use_content_analysis and (args.media_type in ["audio", "all", "comprehensive"]) and not AUDIO_PROCESSING_AVAILABLE:
logger.warning("音频处理库不可用,自动禁用内容分析")
use_content_analysis = False
logger.info(f"启动智能媒体重复文件清理器")
logger.info(f"目标目录: {args.directories}")
logger.info(f"媒体类型: {args.media_type}")
logger.info(f"选择策略: {args.strategy}")
logger.info(f"相似阈值: {args.similarity_threshold}")
if args.prefer_folder:
logger.info(f"优先文件夹: {args.prefer_folder}")
if args.backup_dir:
logger.info(f"指定备份目录: {args.backup_dir}")
if args.no_backup:
logger.warning("警告: 已启用直接删除模式,不会创建备份!")
cleaner = IntelligentDuplicateCleaner(
args.directories, args.db_path, args.workers, args.prefer_folder
)
try:
if args.media_type == "audio":
# 音乐清理模式
result = cleaner.run_music_cleanup(
dry_run=args.dry_run,
strategy=args.strategy,
similarity_threshold=args.similarity_threshold,
use_content_analysis=use_content_analysis,
no_backup=args.no_backup,
)
elif args.media_type == "video":
# 视频清理模式
if use_content_analysis:
logger.info("使用基于内容的高级分析模式")
result = cleaner.run_advanced_cleanup(
dry_run=args.dry_run,
strategy=args.strategy,
similarity_threshold=args.similarity_threshold,
use_content_analysis=use_content_analysis,
no_backup=args.no_backup,
)
else:
result = cleaner.run_intelligent_cleanup(
dry_run=args.dry_run,
strategy=args.strategy,
similarity_threshold=args.similarity_threshold,
skip_start_percent=args.skip_start,
no_backup=args.no_backup,
)
elif args.media_type == "archive":
# 压缩包清理模式
result = cleaner.run_archive_cleanup(
dry_run=args.dry_run,
no_backup=args.no_backup,
)
else:
# 全面清理模式
result = cleaner.run_comprehensive_cleanup(
dry_run=args.dry_run,
strategy=args.strategy,
similarity_threshold=args.similarity_threshold,
use_content_analysis=use_content_analysis,
no_backup=args.no_backup,
skip_archives=args.skip_archives
)
if not args.dry_run and result:
if args.media_type == "comprehensive":
logger.info(f"\n=== 全面清理总结 ===")
if result.get("video"):
logger.info(f"视频 - 相似组: {result['video'].get('similar_groups', 0)}")
logger.info(f"视频 - 保留文件: {result['video'].get('kept_files', 0)}")
logger.info(f"视频 - 删除文件: {result['video'].get('deleted_files', 0)}")
if result.get("music"):
logger.info(f"音乐 - 相似组: {result['music'].get('similar_groups', 0)}")
logger.info(f"音乐 - 保留文件: {result['music'].get('kept_files', 0)}")
logger.info(f"音乐 - 删除文件: {result['music'].get('deleted_files', 0)}")
if result.get("archive"):
logger.info(f"压缩包 - 冗余压缩包: {result['archive'].get('redundant_archives', 0)}")
logger.info(f"压缩包 - 删除压缩包: {result['archive'].get('deleted_archives', 0)}")
else:
logger.info(f"\n=== 清理总结 ===")
logger.info(f"相似组: {result.get('similar_groups', result.get('redundant_archives', 0))}")
logger.info(f"保留文件: {result.get('kept_files', 0)}")
logger.info(f"删除文件: {result.get('deleted_files', result.get('deleted_archives', 0))}")
logger.info(f"耗时: {result.get('duration_seconds', 0):.2f}")
except KeyboardInterrupt:
logger.info("\n用户中断操作")
cleaner.db.add_operation("error", "SYSTEM", reason="user_interrupt")
except Exception as e:
logger.error(f"发生错误: {e}")
cleaner.db.add_operation("error", "SYSTEM", reason="exception", details=str(e))
if __name__ == "__main__":
main()