Files
DeleteChongfuTVYY/历史版本/duplicate_cleanerV5视频解析.py

1149 lines
45 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import hashlib
import zipfile
import rarfile
import subprocess
from datetime import datetime
import argparse
import sqlite3
import logging
from typing import Dict, List, Any, Set, Tuple
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import re
from pathlib import Path
import shutil # 添加这个导入
# 配置日志系统
def setup_logging(log_level=logging.INFO, log_file='duplicate_cleaner.log'):
"""设置日志配置"""
logging.basicConfig(
level=log_level,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file, encoding='utf-8'),
logging.StreamHandler()
]
)
return logging.getLogger(__name__)
logger = setup_logging()
class PerformanceOptimizedFileDatabase:
def __init__(self, db_path: str = "file_cleaner.db"):
self.db_path = db_path
self.batch_size = 1000
self.init_database()
def init_database(self):
"""初始化数据库表结构"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('PRAGMA journal_mode=WAL')
cursor.execute('PRAGMA synchronous=NORMAL')
cursor.execute('PRAGMA cache_size=-64000')
cursor.execute('''
CREATE TABLE IF NOT EXISTS files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
file_path TEXT UNIQUE,
file_hash TEXT,
file_size INTEGER,
file_type TEXT,
mod_time DATETIME,
is_archive BOOLEAN DEFAULT 0,
archive_path TEXT,
is_deleted BOOLEAN DEFAULT 0,
created_time DATETIME DEFAULT CURRENT_TIMESTAMP,
last_scanned DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS operations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
operation_type TEXT,
file_path TEXT,
file_hash TEXT,
reason TEXT,
details TEXT,
operation_time DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS scan_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
scan_time DATETIME DEFAULT CURRENT_TIMESTAMP,
target_directory TEXT,
total_files INTEGER,
duplicate_groups INTEGER,
deleted_files INTEGER,
deleted_archives INTEGER,
duration_seconds REAL
)
''')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_files_hash ON files(file_hash)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_files_path ON files(file_path)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_files_deleted ON files(is_deleted)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_files_size ON files(file_size)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_operations_time ON operations(operation_time)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_operations_type ON operations(operation_type)')
conn.commit()
conn.close()
logger.info("数据库初始化完成")
def bulk_add_files(self, file_infos: List[Dict[str, Any]]):
"""批量添加文件记录"""
if not file_infos:
return
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
for i in range(0, len(file_infos), self.batch_size):
batch = file_infos[i:i + self.batch_size]
placeholders = []
values = []
for file_info in batch:
placeholders.append('(?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)')
values.extend([
file_info['path'],
file_info['hash'],
file_info.get('size', 0),
file_info.get('type', 'unknown'),
file_info['mod_time'],
file_info.get('is_archive', False),
file_info.get('archive_path'),
0
])
sql = f'''
INSERT OR REPLACE INTO files
(file_path, file_hash, file_size, file_type, mod_time, is_archive, archive_path, is_deleted, last_scanned)
VALUES {",".join(placeholders)}
'''
cursor.execute(sql, values)
conn.commit()
logger.debug(f"批量添加了 {len(file_infos)} 个文件记录")
except Exception as e:
logger.error(f"批量添加文件记录时出错: {e}")
conn.rollback()
finally:
conn.close()
def mark_file_deleted(self, file_path: str, reason: str = "duplicate"):
"""标记文件为已删除"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
cursor.execute('''
UPDATE files
SET is_deleted = 1, last_scanned = CURRENT_TIMESTAMP
WHERE file_path = ?
''', (file_path,))
cursor.execute('SELECT file_hash FROM files WHERE file_path = ?', (file_path,))
result = cursor.fetchone()
file_hash = result[0] if result else None
self.add_operation("delete", file_path, file_hash, reason)
conn.commit()
except Exception as e:
logger.error(f"数据库错误 (标记删除): {e}")
finally:
conn.close()
def add_operation(self, operation_type: str, file_path: str, file_hash: str = None,
reason: str = "", details: str = ""):
"""添加操作记录"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
cursor.execute('''
INSERT INTO operations (operation_type, file_path, file_hash, reason, details)
VALUES (?, ?, ?, ?, ?)
''', (operation_type, file_path, file_hash, reason, details))
conn.commit()
except Exception as e:
logger.error(f"数据库错误 (添加操作): {e}")
finally:
conn.close()
def add_scan_history(self, scan_data: Dict[str, Any]):
"""添加扫描历史记录"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
cursor.execute('''
INSERT INTO scan_history
(target_directory, total_files, duplicate_groups, deleted_files, deleted_archives, duration_seconds)
VALUES (?, ?, ?, ?, ?, ?)
''', (
scan_data.get('target_directory', ''),
scan_data.get('total_files', 0),
scan_data.get('duplicate_groups', 0),
scan_data.get('deleted_files', 0),
scan_data.get('deleted_archives', 0),
scan_data.get('duration_seconds', 0)
))
conn.commit()
except Exception as e:
logger.error(f"数据库错误 (添加扫描历史): {e}")
finally:
conn.close()
def get_scan_statistics(self) -> Dict[str, Any]:
"""获取扫描统计信息"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
cursor.execute('SELECT COUNT(*) FROM files')
total_files = cursor.fetchone()[0]
cursor.execute('SELECT COUNT(*) FROM files WHERE is_deleted = 1')
deleted_files = cursor.fetchone()[0]
cursor.execute('SELECT COUNT(DISTINCT file_hash) FROM files WHERE is_deleted = 0')
unique_files = cursor.fetchone()[0]
cursor.execute('SELECT COUNT(*) FROM operations')
total_operations = cursor.fetchone()[0]
return {
'total_files': total_files,
'deleted_files': deleted_files,
'unique_files': unique_files,
'total_operations': total_operations
}
except Exception as e:
logger.error(f"数据库错误 (获取统计): {e}")
return {}
finally:
conn.close()
class MovieMetadataExtractor:
"""电影元数据提取器"""
# 常见分辨率模式
RESOLUTION_PATTERNS = [
r'(\d{3,4}[pi])', # 1080p, 720p, 480p, 2160p
r'([24]k)', # 2k, 4k
r'(hd)', # hd
r'(fhd)', # fhd
r'(uhd)', # uhd
]
# 常见编码格式
CODEC_PATTERNS = [
r'(x264)', r'(x265)', r'(h264)', r'(h265)', r'(hevc)',
r'(avc)', r'(divx)', r'(xvid)'
]
# 常见来源
SOURCE_PATTERNS = [
r'(bluray)', r'(blu-ray)', r'(webdl)', r'(web-dl)',
r'(hdtv)', r'(dvdrip)', r'(bdrip)', r'(brrip)'
]
# 常见音频格式
AUDIO_PATTERNS = [
r'(dts)', r'(ac3)', r'(aac)', r'(flac)', r'(dd)'
]
# 常见需要移除的模式 - 增强版
@staticmethod
def extract_movie_name_enhanced(filename):
"""增强版电影名称提取"""
# 移除扩展名
name = os.path.splitext(filename)[0]
# 更严格的模式匹配
patterns_to_remove = [
# 广告相关模式
r'[\[\(]?广告[\]\)]?', r'[\[\(]?推广[\]\)]?', r'[\[\(]?宣传[\]\)]?',
r'[\[\(]?片头[\]\)]?', r'[\[\(]?片花[\]\)]?',
r'^[^a-zA-Z0-9\u4e00-\u9fff]*', # 开头的特殊字符
r'[\s_\-]*([\[\(]?\d{4}[\]\)]?)[\s_\-]*', # 年份
# 分辨率
r'[\s_\-]*(\d{3,4}[pi])[\s_\-]*',
r'[\s_\-]*([24]k)[\s_\-]*',
r'[\s_\-]*(hd|fhd|uhd)[\s_\-]*',
# 编码
r'[\s_\-]*(x264|x265|h264|h265|hevc|avc|divx|xvid)[\s_\-]*',
# 来源
r'[\s_\-]*(bluray|blu-ray|webdl|web-dl|hdtv|dvdrip|bdrip|brrip)[\s_\-]*',
# 音频
r'[\s_\-]*(dts|ac3|aac|flac|dd)[\s_\-]*',
# 发布组和其他信息
r'[\s_\-]*([\[\(][^\]\)]+[\]\)])[\s_\-]*', # 所有括号内容
r'[\s_\-]*([【][^】]+[】])[\s_\-]*', # 中文括号
r'[\s_\-]*([╬┅┉┊┋┌┍┎┏┐┑┒┓└┕┖┗┘┙┚┛├┝┞┟┠┡┢┣┤┥┦┧┨┩┪┫┬┭┮┯┰┱┲┳┴┵┶┷┸┹┺┻┼┽┾┿╀╁╂╃╄╅╆╇╈╉╊╋]+)[\s_\-]*', # 特殊符号
]
for pattern in patterns_to_remove:
name = re.sub(pattern, '', name, flags=re.IGNORECASE)
# 清理多余空格和分隔符
name = re.sub(r'[\._\-\s]+', ' ', name)
name = name.strip()
return name
@staticmethod
def extract_core_movie_name(filename):
"""提取核心电影名称(最严格的清理)"""
name = MovieMetadataExtractor.extract_movie_name_enhanced(filename)
# 进一步清理:移除可能的前缀和后缀
# 常见的无关前缀
prefixes_to_remove = [
'电影', '高清', '最新', '完整版', '未删减版', '国语', '英语',
'中字', '中文字幕', '双语字幕', '特效字幕'
]
for prefix in prefixes_to_remove:
if name.lower().startswith(prefix.lower()):
name = name[len(prefix):].strip()
return name
@staticmethod
def extract_movie_name(filename):
"""提取电影名称"""
# 移除扩展名
name = os.path.splitext(filename)[0]
# 常见需要移除的模式
patterns_to_remove = [
# 年份
r'\s*[\(\[]?\d{4}[\)\]]?',
# 分辨率
r'\s*\d{3,4}[pi]',
r'\s*[24]k',
r'\s*hd',
r'\s*fhd',
r'\s*uhd',
# 编码
r'\s*x264', r'\s*x265', r'\s*h264', r'\s*h265', r'\s*hevc',
r'\s*avc', r'\s*divx', r'\s*xvid',
# 来源
r'\s*bluray', r'\s*blu-ray', r'\s*webdl', r'\s*web-dl',
r'\s*hdtv', r'\s*dvdrip', r'\s*bdrip', r'\s*brrip',
# 音频
r'\s*dts', r'\s*ac3', r'\s*aac', r'\s*flac', r'\s*dd',
# 发布组和其他信息
r'\s*-\s*[^-]+$', # 最后一个 - 之后的内容
r'\[[^\]]+\]', # 方括号内容
r'\([^\)]+\)', # 圆括号内容
]
for pattern in patterns_to_remove:
name = re.sub(pattern, '', name, flags=re.IGNORECASE)
# 清理多余空格和分隔符
name = re.sub(r'[\._\-\s]+', ' ', name)
name = name.strip()
return name
@staticmethod
def extract_resolution(filename):
"""提取分辨率"""
filename_lower = filename.lower()
resolution_map = {
'2160p': '4K', '4k': '4K',
'1080p': '1080p',
'720p': '720p',
'480p': '480p',
'hd': 'HD'
}
for pattern, resolution in resolution_map.items():
if pattern in filename_lower:
return resolution
return 'Unknown'
@staticmethod
def extract_quality_score(filename, file_size):
"""计算质量评分"""
score = 0
# 基于文件大小的评分
if file_size > 8 * 1024 * 1024 * 1024: # >8GB
score += 30
elif file_size > 4 * 1024 * 1024 * 1024: # >4GB
score += 20
elif file_size > 2 * 1024 * 1024 * 1024: # >2GB
score += 10
# 基于分辨率的评分
resolution = MovieMetadataExtractor.extract_resolution(filename)
resolution_scores = {
'4K': 25,
'1080p': 20,
'720p': 15,
'HD': 10,
'Unknown': 5
}
score += resolution_scores.get(resolution, 5)
# 基于编码的评分
filename_lower = filename.lower()
if 'x265' in filename_lower or 'hevc' in filename_lower:
score += 10 # 更高效的编码
if 'x264' in filename_lower:
score += 5
# 基于来源的评分
if 'bluray' in filename_lower or 'blu-ray' in filename_lower:
score += 15
elif 'webdl' in filename_lower or 'web-dl' in filename_lower:
score += 10
elif 'hdtv' in filename_lower:
score += 5
return score
class AdvancedMovieMetadataExtractor(MovieMetadataExtractor):
"""高级电影元数据提取器"""
@staticmethod
def extract_detailed_metadata(filename, file_path=None):
"""提取详细的电影元数据"""
metadata = {
'title': '',
'year': '',
'quality': '',
'codec': '',
'source': '',
'audio': '',
'group': ''
}
# 提取年份
year_match = re.search(r'(19|20)\d{2}', filename)
if year_match:
metadata['year'] = year_match.group()
# 提取质量信息
quality_terms = ['4k', '2160p', '1080p', '720p', '480p', 'hd', 'fhd', 'uhd']
for term in quality_terms:
if term in filename.lower():
metadata['quality'] = term.upper()
break
# 提取编码信息
codec_terms = ['x264', 'x265', 'h264', 'h265', 'hevc', 'avc']
for term in codec_terms:
if term in filename.lower():
metadata['codec'] = term.upper()
break
# 提取来源信息
source_terms = ['bluray', 'blu-ray', 'webdl', 'web-dl', 'hdtv', 'dvdrip']
for term in source_terms:
if term in filename.lower():
metadata['source'] = term.upper()
break
# 尝试从文件名中提取电影标题(更智能的方法)
metadata['title'] = AdvancedMovieMetadataExtractor.extract_movie_title_advanced(filename)
return metadata
@staticmethod
def extract_movie_title_advanced(filename):
"""高级电影标题提取"""
# 移除扩展名
name = os.path.splitext(filename)[0]
# 常见的需要移除的模式(更全面的列表)
patterns_to_remove = [
# 年份模式
r'[\(\[]?\s*(19|20)\d{2}\s*[\)\]]?',
# 质量模式
r'\b(4k|2160p|1080p|720p|480p|hd|fhd|uhd)\b',
# 编码模式
r'\b(x264|x265|h264|h265|hevc|avc|divx|xvid)\b',
# 来源模式
r'\b(bluray|blu-ray|webdl|web-dl|hdtv|dvdrip|bdrip|brrip)\b',
# 音频模式
r'\b(dts|ac3|aac|flac|dd|dts-hd|truehd)\b',
# 发布组模式
r'\[[^\]]+\]',
r'\s*-\s*[^-]+$',
# 特殊字符和序列号
r'[\(\{\[].*?[\)\}\]]',
r'\b(cd\d|disc\d|part\d)\b',
r'[\._\-]',
]
for pattern in patterns_to_remove:
name = re.sub(pattern, ' ', name, flags=re.IGNORECASE)
# 清理多余空格
name = re.sub(r'\s+', ' ', name).strip()
# 移除常见的无关词汇
common_words = [
'full', 'movie', 'film', 'video', 'hd', 'fhd', 'uhd',
'english', 'chinese', 'sub', 'subtitle', 'dubbed',
'extended', 'director', 'cut', 'theatrical', 'unrated'
]
words = name.split()
filtered_words = [word for word in words if word.lower() not in common_words]
return ' '.join(filtered_words)
# 尝试导入视频处理相关的库,如果失败则提供回退方案
try:
import cv2
import imagehash
from PIL import Image
import numpy as np
from skimage.metrics import structural_similarity as ssim
VIDEO_PROCESSING_AVAILABLE = True
except ImportError as e:
logger.warning(f"视频处理库导入失败: {e}")
logger.warning("基于内容的视频分析功能将被禁用")
VIDEO_PROCESSING_AVAILABLE = False
# 创建虚拟类以避免后续导入错误
class DummyCV2:
VideoCapture = None
CAP_PROP_FRAME_COUNT = 0
CAP_PROP_FPS = 0
CAP_PROP_POS_FRAMES = 0
COLOR_BGR2GRAY = 0
def isOpened(self): return False
def read(self): return False, None
def release(self): pass
cv2 = DummyCV2()
imagehash = type('DummyImageHash', (), {'average_hash': lambda x: 'dummy'})()
Image = type('DummyImage', (), {'fromarray': lambda x: type('DummyPIL', (), {})()})()
class VideoFingerprintExtractor:
"""视频指纹提取器 - 基于关键帧和音频特征"""
def __init__(self):
self.frame_hashes = {}
def extract_key_frames(self, video_path, num_frames=10, skip_start=0.1):
"""提取关键帧"""
if not VIDEO_PROCESSING_AVAILABLE:
logger.warning("视频处理功能不可用,跳过关键帧提取")
return []
try:
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
logger.warning(f"无法打开视频文件: {video_path}")
return []
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS)
duration = total_frames / fps if fps > 0 else 0
# 跳过开头
start_frame = int(total_frames * skip_start)
cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
frames_to_extract = min(num_frames, total_frames - start_frame)
frame_interval = max(1, (total_frames - start_frame) // frames_to_extract)
key_frames = []
frame_hashes = []
for i in range(frames_to_extract):
frame_pos = start_frame + i * frame_interval
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_pos)
ret, frame = cap.read()
if ret and frame is not None:
# 转换为灰度图并调整大小以提高处理速度
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
resized = cv2.resize(gray, (64, 64))
# 转换为PIL图像并计算哈希
pil_img = Image.fromarray(resized)
frame_hash = imagehash.average_hash(pil_img)
key_frames.append(frame)
frame_hashes.append(str(frame_hash))
cap.release()
return frame_hashes
except Exception as e:
logger.error(f"提取关键帧时出错 {video_path}: {e}")
return []
def extract_audio_fingerprint(self, video_path):
"""提取音频指纹(简化版)"""
try:
# 使用文件大小和持续时间作为简化的音频特征
file_size = os.path.getsize(video_path)
# 尝试获取视频时长
duration = self.get_video_duration(video_path)
return f"audio_{file_size}_{duration}"
except Exception as e:
logger.error(f"提取音频指纹时出错 {video_path}: {e}")
return "audio_unknown"
def get_video_duration(self, video_path):
"""获取视频时长"""
try:
result = subprocess.run([
'ffprobe', '-v', 'error', '-show_entries',
'format=duration', '-of',
'default=noprint_wrappers=1:nokey=1', video_path
], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
duration = float(result.stdout.strip())
return duration
except:
# 如果ffprobe不可用使用OpenCV估算
try:
if VIDEO_PROCESSING_AVAILABLE:
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
cap.release()
if fps > 0:
return frame_count / fps
except:
pass
return 0
def extract_video_fingerprint(self, video_path, num_frames=8, skip_start=0.1):
"""提取完整的视频指纹"""
try:
# 提取关键帧哈希
frame_hashes = self.extract_key_frames(video_path, num_frames, skip_start)
if not frame_hashes:
return None
# 提取音频指纹
audio_fingerprint = self.extract_audio_fingerprint(video_path)
# 组合指纹
frame_fingerprint = "_".join(sorted(frame_hashes))
full_fingerprint = f"video_{frame_fingerprint}_{audio_fingerprint}"
return full_fingerprint
except Exception as e:
logger.error(f"提取视频指纹时出错 {video_path}: {e}")
return None
def calculate_video_similarity(self, fingerprint1, fingerprint2):
"""计算两个视频指纹的相似度"""
if not fingerprint1 or not fingerprint2:
return 0
if fingerprint1 == fingerprint2:
return 1.0
# 简单的相似度计算:基于共同帧哈希的数量
try:
# 提取帧哈希部分
frames1 = set(fingerprint1.split('_')[1:-2]) # 去掉video_前缀和音频部分
frames2 = set(fingerprint2.split('_')[1:-2])
if not frames1 or not frames2:
return 0
# 计算Jaccard相似度
intersection = len(frames1.intersection(frames2))
union = len(frames1.union(frames2))
similarity = intersection / union if union > 0 else 0
return similarity
except Exception as e:
logger.error(f"计算视频相似度时出错: {e}")
return 0
class ContentBasedDuplicateDetector:
"""基于内容的重复检测器"""
def __init__(self, similarity_threshold=0.7):
self.similarity_threshold = similarity_threshold
self.fingerprint_extractor = VideoFingerprintExtractor()
self.metadata_extractor = AdvancedMovieMetadataExtractor()
def group_similar_movies_by_content(self, files):
"""基于内容指纹对电影进行分组"""
if not VIDEO_PROCESSING_AVAILABLE:
logger.warning("视频处理功能不可用,跳过基于内容的分析")
return []
logger.info("开始基于内容指纹的电影相似度分析...")
# 提取所有文件的指纹
file_fingerprints = {}
for file_info in files:
file_path = file_info['path']
logger.debug(f"提取指纹: {os.path.basename(file_path)}")
fingerprint = self.fingerprint_extractor.extract_video_fingerprint(file_path)
if fingerprint:
file_info['content_fingerprint'] = fingerprint
file_fingerprints[file_path] = fingerprint
else:
file_info['content_fingerprint'] = None
# 基于指纹进行分组
groups = []
processed_files = set()
for file_path1, fingerprint1 in file_fingerprints.items():
if file_path1 in processed_files:
continue
current_group = [file_path1]
processed_files.add(file_path1)
for file_path2, fingerprint2 in file_fingerprints.items():
if file_path2 in processed_files or file_path1 == file_path2:
continue
similarity = self.fingerprint_extractor.calculate_video_similarity(
fingerprint1, fingerprint2
)
if similarity >= self.similarity_threshold:
current_group.append(file_path2)
processed_files.add(file_path2)
if len(current_group) > 1:
groups.append(current_group)
# 转换为文件信息组
file_groups = []
for group in groups:
file_info_group = []
for file_path in group:
file_info = next((f for f in files if f['path'] == file_path), None)
if file_info:
file_info_group.append(file_info)
file_groups.append(file_info_group)
logger.info(f"基于内容指纹找到 {len(file_groups)} 组相似电影")
return file_groups
def enhance_with_metadata_matching(self, files, content_groups):
"""使用元数据匹配增强内容分组"""
logger.info("使用元数据匹配增强内容分组...")
# 为每个文件提取详细元数据
for file_info in files:
filename = file_info.get('filename', '')
metadata = self.metadata_extractor.extract_detailed_metadata(filename)
file_info['detailed_metadata'] = metadata
# 基于元数据的补充分组
metadata_groups = self.group_by_metadata(files)
# 合并内容分组和元数据分组
merged_groups = self.merge_groups(content_groups, metadata_groups)
return merged_groups
def group_by_metadata(self, files):
"""基于元数据分组"""
metadata_groups = {}
for file_info in files:
metadata = file_info.get('detailed_metadata', {})
title = metadata.get('title', '').lower().strip()
year = metadata.get('year', '')
if title and len(title) > 2:
group_key = f"{title}_{year}" if year else title
if group_key not in metadata_groups:
metadata_groups[group_key] = []
metadata_groups[group_key].append(file_info)
# 只返回有多个文件的组
return [group for group in metadata_groups.values() if len(group) > 1]
def merge_groups(self, content_groups, metadata_groups):
"""合并内容分组和元数据分组"""
all_groups = content_groups.copy()
for metadata_group in metadata_groups:
# 检查这个元数据组是否已经存在于内容分组中
found = False
for content_group in content_groups:
common_files = set(f['path'] for f in content_group) & set(f['path'] for f in metadata_group)
if common_files:
# 合并组
content_group.extend([f for f in metadata_group if f['path'] not in set(f['path'] for f in content_group)])
found = True
break
if not found:
all_groups.append(metadata_group)
return all_groups
class IntelligentDuplicateCleaner:
# ... 其他代码保持不变 ...
def remove_similar_duplicates(self, similar_groups, dry_run=True, strategy='quality', no_backup=False):
"""删除相似的重复文件 - 修复跨设备移动问题"""
logger.info("开始处理相似电影文件...")
kept_files = []
deleted_files = []
delete_errors = []
for group_name, file_group in similar_groups.items():
if len(file_group) <= 1:
continue
best_file, files_to_delete = self.select_best_version(file_group, strategy)
logger.info(f"\n电影组: {group_name}")
logger.info(f" 保留: {best_file['filename']} "
f"(质量分: {best_file.get('quality_score', 0)})")
kept_files.append(best_file)
for file_info in files_to_delete:
file_path = file_info['path']
if dry_run:
logger.info(f" [干运行] 将删除: {file_info['filename']} "
f"(质量分: {file_info.get('quality_score', 0)})")
else:
try:
if os.path.exists(file_path):
if no_backup:
# 直接删除模式
os.remove(file_path)
logger.info(f" 🗑️ 已直接删除: {file_info['filename']}")
else:
# 创建备份 - 修复跨设备移动问题
# 在源文件所在目录创建备份,避免跨设备问题
source_dir = os.path.dirname(file_path)
backup_dir = os.path.join(source_dir, ".similar_movie_backup")
os.makedirs(backup_dir, exist_ok=True)
backup_path = os.path.join(backup_dir, os.path.basename(file_path))
counter = 1
while os.path.exists(backup_path):
name, ext = os.path.splitext(os.path.basename(file_path))
backup_path = os.path.join(backup_dir, f"{name}_{counter}{ext}")
counter += 1
# 使用shutil.move或copy2+remove来跨设备移动
try:
# 先尝试在同一设备内移动
os.rename(file_path, backup_path)
logger.info(f" 已移动相似电影到同设备备份: {file_info['filename']}")
except OSError as e:
if e.errno == 18: # EXDEV - 跨设备链接错误
# 使用复制+删除的方式跨设备移动
logger.info(f" 跨设备移动文件,使用复制方式: {file_info['filename']}")
shutil.copy2(file_path, backup_path) # 复制文件和元数据
os.remove(file_path) # 删除原文件
logger.info(f" 已复制并删除相似电影到跨设备备份: {file_info['filename']}")
else:
raise # 重新抛出其他错误
deleted_files.append(file_path)
# 记录删除操作
self.db.mark_file_deleted(file_path, "similar_movie")
else:
logger.warning(f" 文件不存在,跳过删除: {file_info['filename']}")
except Exception as e:
error_msg = f"删除文件时出错 {file_path}: {e}"
logger.error(error_msg)
delete_errors.append(error_msg)
self.db.add_operation("error", file_path, reason="delete_failed", details=str(e))
if delete_errors:
logger.error(f"删除过程中遇到 {len(delete_errors)} 个错误")
logger.info(f"保留了 {len(kept_files)} 个最佳版本文件")
logger.info(f"删除了 {len(deleted_files)} 个相似电影文件")
return kept_files, deleted_files
def remove_empty_folders_efficient(self, target_dir=None):
"""高效删除空文件夹 - 修复跨设备问题"""
if target_dir is None:
target_dir = self.target_dirs[0]
logger.info(f"开始清理空文件夹: {target_dir}")
empty_folders = []
for root, dirs, files in os.walk(target_dir, topdown=False):
# 跳过备份目录和系统目录
skip_dirs = ['@eaDir', '.Trash', '.duplicate_backup', 'temp_extract', '.similar_movie_backup']
if any(skip_dir in root for skip_dir in skip_dirs):
continue
if not dirs and not files and root != target_dir:
try:
# 检查目录是否为空(可能有隐藏文件)
if len(os.listdir(root)) == 0:
os.rmdir(root)
empty_folders.append(root)
self.db.add_operation("delete_folder", root, reason="empty_folder")
logger.debug(f"删除空文件夹: {root}")
except OSError as e:
logger.debug(f"无法删除文件夹 {root}: {e}")
logger.info(f"删除了 {len(empty_folders)} 个空文件夹")
return empty_folders
def run_intelligent_cleanup(self, dry_run=True, strategy='quality',
similarity_threshold=0.8, skip_start_percent=0.1,
no_backup=False):
"""运行智能清理流程 - 增强版,支持备份策略"""
logger.info("开始智能电影重复文件清理流程(增强版)")
if no_backup:
logger.warning("⚠️ 直接删除模式已启用 - 文件将永久删除,不可恢复!")
start_time = time.time()
self.db.add_operation("scan_start", str(self.target_dirs), reason=f"intelligent_cleanup_{'no_backup' if no_backup else 'with_backup'}")
try:
# 1. 扫描所有目录的文件并提取元数据
all_files = self.scan_files_parallel()
if not all_files:
logger.warning("没有找到任何视频文件")
return {}
# 2. 使用增强版算法查找相似的电影文件
similar_groups = self.find_similar_movies_enhanced(
all_files, similarity_threshold, skip_start_percent
)
if not similar_groups:
logger.info("没有找到相似的电影文件")
return {}
# 3. 删除相似的重复文件
kept_files, deleted_files = self.remove_similar_duplicates(
similar_groups, dry_run, strategy, no_backup
)
# 4. 清理所有目录的空文件夹
if not dry_run:
for target_dir in self.target_dirs:
self.remove_empty_folders_efficient(target_dir)
# 记录扫描结束
self.db.add_operation("scan_complete", str(self.target_dirs),
reason="intelligent_cleanup_enhanced_finished")
# 计算持续时间
duration = time.time() - start_time
# 记录扫描历史
scan_data = {
'target_directory': str(self.target_dirs),
'total_files': len(all_files),
'similar_groups': len(similar_groups),
'kept_files': len(kept_files),
'deleted_files': len(deleted_files),
'deleted_file_details': deleted_files,
'duration_seconds': duration,
'no_backup_mode': no_backup
}
self.db.add_scan_history(scan_data)
# 显示统计信息
self.show_intelligent_statistics(scan_data)
# 只有在备份模式下才显示备份位置
if not dry_run and deleted_files and not no_backup:
self.show_backup_locations()
return scan_data
except Exception as e:
logger.error(f"智能清理过程中发生错误: {e}")
self.db.add_operation("error", "SYSTEM",
reason="intelligent_cleanup_enhanced_failed", details=str(e))
raise
def show_backup_locations(self):
"""显示备份文件位置信息"""
logger.info("\n备份文件位置:")
backup_dirs_found = set()
for target_dir in self.target_dirs:
for root, dirs, files in os.walk(target_dir):
if '.similar_movie_backup' in dirs:
backup_dir = os.path.join(root, '.similar_movie_backup')
backup_dirs_found.add(backup_dir)
if backup_dirs_found:
for backup_dir in backup_dirs_found:
# 计算备份目录中的文件数量
try:
backup_files = [f for f in os.listdir(backup_dir)
if os.path.isfile(os.path.join(backup_dir, f))]
total_size = sum(os.path.getsize(os.path.join(backup_dir, f))
for f in backup_files) / (1024*1024*1024) # GB
logger.info(f" {backup_dir}: {len(backup_files)} 个文件, 总大小: {total_size:.2f} GB")
except OSError as e:
logger.warning(f" 无法访问备份目录 {backup_dir}: {e}")
else:
logger.info(" 未找到备份目录")
# 在 main() 函数中添加备份策略选项
def main():
# 首先声明全局变量
global logger
parser = argparse.ArgumentParser(description='智能电影重复文件清理工具 - 增强版')
parser.add_argument('directories', nargs='*', help='要扫描的目录路径(支持多个目录)')
parser.add_argument('--dry-run', action='store_true', help='干运行模式,只显示不会实际删除')
parser.add_argument('--strategy', choices=['quality', 'size', 'resolution', 'newest'],
default='quality', help='选择最佳版本策略(默认: quality)')
parser.add_argument('--similarity-threshold', type=float, default=0.8,
help='相似度阈值(0.0-1.0,默认: 0.8)')
parser.add_argument('--skip-start', type=float, default=0.1,
help='跳过文件开头的比例(0.0-0.5,默认: 0.1)')
parser.add_argument('--db-path', default='file_cleaner.db', help='数据库文件路径')
parser.add_argument('--workers', type=int, default=4, help='并行工作线程数 (默认: 4)')
parser.add_argument('--log-level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'],
default='INFO', help='日志级别 (默认: INFO)')
parser.add_argument('--log-file', default='duplicate_cleaner.log', help='日志文件路径')
parser.add_argument('--prefer-folder', nargs='+', help='优先保留的文件夹(当文件质量相同时)')
parser.add_argument('--content-analysis', action='store_true',
help='启用基于内容的分析(更准确但更慢)')
parser.add_argument('--no-content-analysis', action='store_true',
help='禁用基于内容的分析(更快但准确性较低)')
parser.add_argument('--backup-dir', help='指定备份目录路径(避免跨设备问题)')
parser.add_argument('--no-backup', action='store_true', help='不创建备份(直接删除文件)')
args = parser.parse_args()
# 处理目录参数
if not args.directories:
args.directories = [os.getcwd()]
# 验证目录参数 - 使用 print 而不是 logger
for directory in args.directories:
if not os.path.exists(directory):
print(f"错误: 目录 {directory} 不存在")
return
# 验证参数
if args.skip_start < 0 or args.skip_start > 0.5:
print("错误: --skip-start 参数必须在 0.0 到 0.5 之间")
return
# 重新配置日志(根据命令行参数)
log_level = getattr(logging, args.log_level)
logger = setup_logging(log_level, args.log_file)
# 现在可以使用 logger 了
if len(args.directories) == 1 and args.directories[0] == os.getcwd():
logger.info(f"未指定目录,使用当前目录: {args.directories[0]}")
# 确定是否使用内容分析
use_content_analysis = True
if args.no_content_analysis:
use_content_analysis = False
elif args.content_analysis:
use_content_analysis = True
# 如果视频处理库不可用,强制禁用内容分析
if use_content_analysis and not VIDEO_PROCESSING_AVAILABLE:
logger.warning("视频处理库不可用,自动禁用内容分析")
use_content_analysis = False
logger.info(f"启动智能电影重复文件清理器")
logger.info(f"目标目录: {args.directories}")
logger.info(f"选择策略: {args.strategy}")
logger.info(f"相似阈值: {args.similarity_threshold}")
if args.prefer_folder:
logger.info(f"优先文件夹: {args.prefer_folder}")
if args.backup_dir:
logger.info(f"指定备份目录: {args.backup_dir}")
if args.no_backup:
logger.warning("警告: 已启用直接删除模式,不会创建备份!")
cleaner = IntelligentDuplicateCleaner(
args.directories,
args.db_path,
args.workers,
args.prefer_folder
)
try:
if use_content_analysis:
logger.info("使用基于内容的高级分析模式")
result = cleaner.run_advanced_cleanup(
dry_run=args.dry_run,
strategy=args.strategy,
similarity_threshold=args.similarity_threshold,
use_content_analysis=use_content_analysis
)
else:
# 使用新的直接删除模式
result = cleaner.run_intelligent_cleanup(
dry_run=args.dry_run,
strategy=args.strategy,
similarity_threshold=args.similarity_threshold,
skip_start_percent=args.skip_start,
no_backup=args.no_backup # 传递这个参数
)
if not args.dry_run and result:
logger.info(f"\n=== 清理总结 ===")
logger.info(f"相似电影组: {result.get('similar_groups', 0)}")
logger.info(f"保留文件: {result.get('kept_files', 0)}")
logger.info(f"删除文件: {result.get('deleted_files', 0)}")
logger.info(f"耗时: {result.get('duration_seconds', 0):.2f}")
# 显示备份信息
if not args.no_backup:
cleaner.show_backup_locations()
except KeyboardInterrupt:
logger.info("\n用户中断操作")
cleaner.db.add_operation("error", "SYSTEM", reason="user_interrupt")
except Exception as e:
logger.error(f"发生错误: {e}")
cleaner.db.add_operation("error", "SYSTEM", reason="exception", details=str(e))
if __name__ == "__main__":
main()