import os from typing import List import psutil from conf.constants import ParallelProcessing from utils.log.trans_log import info, debug def print_memory_usage(detail: str = "") -> None: """ 打印内存使用情况 Args: detail: 详细信息 """ # 获取当前进程ID pid = os.getpid() # 获取进程信息 py = psutil.Process(pid) # 获取内存信息 memory_info = py.memory_info() # RSS (Resident Set Size) 是进程实际占用的物理内存大小 memory_usage_rss = memory_info.rss # VMS (Virtual Memory Size) 是进程使用的虚拟内存大小 memory_usage_vms = memory_info.vms # 将字节转换为更易读的单位 memory_usage_rss_mb = memory_usage_rss / (1024 ** 2) memory_usage_vms_mb = memory_usage_vms / (1024 ** 2) debug(f"{detail},Memory usage (RSS): {memory_usage_rss_mb:.2f} MB") debug(f"{detail},Memory usage (VMS): {memory_usage_vms_mb:.2f} MB") def get_cpu_count() -> int: """ 获取CPU核心数 Returns: CPU核心数 """ return psutil.cpu_count() def get_available_cpu_count_with_percent(percent: float = 1) -> int: """ 根据百分比获取可用CPU数 Args: percent: CPU使用百分比 Returns: 可用CPU数 """ cpu_count = get_cpu_count() return int(cpu_count * percent) def get_file_size(file_path: str) -> int: """ 获取文件大小 Args: file_path: 文件路径 Returns: 文件大小(字节) """ return os.path.getsize(file_path) def get_dir_size(dir_path: str) -> int: """ 获取目录大小 Args: dir_path: 目录路径 Returns: 目录大小(字节) """ return sum(get_file_size(os.path.join(dir_path, file)) for file in os.listdir(dir_path) if os.path.isfile(os.path.join(dir_path, file))) def get_available_memory_with_percent(percent: float = 1) -> int: """ 根据百分比获取可用内存 Args: percent: 内存使用百分比 Returns: 可用内存(字节) """ memory_info = psutil.virtual_memory() return int(memory_info.available * percent) def get_max_file_size(file_paths: List[str]) -> int: """ 获取文件列表中的最大文件大小 Args: file_paths: 文件路径列表 Returns: 最大文件大小(字节) """ max_size = 0 for file_path in file_paths: file_size = get_file_size(file_path) if file_size > max_size: max_size = file_size return max_size def use_files_get_max_cpu_count(file_paths: List[str], memory_percent: float = 1 / 12, cpu_percent: float = 2 / 5) -> int: """ 根据文件大小和内存情况计算最大进程数 Args: file_paths: 文件路径列表 memory_percent: 内存使用百分比 cpu_percent: CPU使用百分比 Returns: 最大进程数 """ max_file_size = get_max_file_size(file_paths) free_memory = get_available_memory_with_percent(memory_percent) count = int(free_memory / max_file_size) max_cpu_count = get_available_cpu_count_with_percent(cpu_percent) # 限制最大进程数 max_cpu_count = min(max_cpu_count, ParallelProcessing.MAX_PROCESSES) result = count if count <= max_cpu_count else max_cpu_count if result == 0: result = 1 if result > len(file_paths): result = len(file_paths) info("总文件数:", len(file_paths), ",获取最大文件大小:", str(round(max_file_size / 2 ** 20, 2)) + "M", "可用内存:", str(get_available_memory_with_percent(1) / 2 ** 20) + "M", "总CPU数:", get_cpu_count(), "CPU使用比例:", round(cpu_percent, 2), "CPU可用数量:", max_cpu_count, ",最终确定使用进程数:", result) return result def max_file_size_get_max_cpu_count(max_file_size: int, memory_percent: float = 1 / 6, cpu_percent: float = 2 / 5) -> int: """ 根据最大文件大小和内存情况计算最大进程数 Args: max_file_size: 最大文件大小 memory_percent: 内存使用百分比 cpu_percent: CPU使用百分比 Returns: 最大进程数 """ free_memory = get_available_memory_with_percent(memory_percent) count = int(free_memory / max_file_size) max_cpu_count = get_available_cpu_count_with_percent(cpu_percent) # 限制最大进程数 max_cpu_count = min(max_cpu_count, ParallelProcessing.MAX_PROCESSES) result = count if count <= max_cpu_count else max_cpu_count if result == 0: result = 1 info(",获取最大文件大小:", str(round(max_file_size / 2 ** 20, 2)) + "M", "可用内存:", str(get_available_memory_with_percent(1) / 2 ** 20) + "M", "总CPU数:", get_cpu_count(), "CPU使用比例:", round(cpu_percent, 2), "CPU可用数量:", max_cpu_count, ",最终确定使用进程数:", result) return result if __name__ == '__main__': from utils.file.trans_methods import read_files import datetime read_path = r"Z:\collection_data\1进行中\密马风电场-山西-大唐\收资数据\scada\秒级数据" begin = datetime.datetime.now() all_files = read_files(read_path) print(datetime.datetime.now() - begin) print(use_files_get_max_cpu_count(all_files)) print(get_available_memory_with_percent(1) / 2 ** 20) print(get_available_memory_with_percent(2 / 3) / 2 ** 20) begin = datetime.datetime.now() print(len(all_files)) print(get_max_file_size(all_files) / 2 ** 20) print(datetime.datetime.now() - begin)