| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- import os
- from typing import List
- import psutil
- from conf.constants import ParallelProcessing
- from utils.log.trans_log import info, debug
- def print_memory_usage(detail: str = "") -> None:
- """
- 打印内存使用情况
-
- Args:
- detail: 详细信息
- """
- # 获取当前进程ID
- pid = os.getpid()
- # 获取进程信息
- py = psutil.Process(pid)
- # 获取内存信息
- memory_info = py.memory_info()
- # RSS (Resident Set Size) 是进程实际占用的物理内存大小
- memory_usage_rss = memory_info.rss
- # VMS (Virtual Memory Size) 是进程使用的虚拟内存大小
- memory_usage_vms = memory_info.vms
- # 将字节转换为更易读的单位
- memory_usage_rss_mb = memory_usage_rss / (1024 ** 2)
- memory_usage_vms_mb = memory_usage_vms / (1024 ** 2)
- debug(f"{detail},Memory usage (RSS): {memory_usage_rss_mb:.2f} MB")
- debug(f"{detail},Memory usage (VMS): {memory_usage_vms_mb:.2f} MB")
- def get_cpu_count() -> int:
- """
- 获取CPU核心数
-
- Returns:
- CPU核心数
- """
- return psutil.cpu_count()
- def get_available_cpu_count_with_percent(percent: float = 1) -> int:
- """
- 根据百分比获取可用CPU数
-
- Args:
- percent: CPU使用百分比
-
- Returns:
- 可用CPU数
- """
- cpu_count = get_cpu_count()
- return int(cpu_count * percent)
- def get_file_size(file_path: str) -> int:
- """
- 获取文件大小
-
- Args:
- file_path: 文件路径
-
- Returns:
- 文件大小(字节)
- """
- return os.path.getsize(file_path)
- def get_dir_size(dir_path: str) -> int:
- """
- 获取目录大小
-
- Args:
- dir_path: 目录路径
-
- Returns:
- 目录大小(字节)
- """
- return sum(get_file_size(os.path.join(dir_path, file)) for file in os.listdir(dir_path) if
- os.path.isfile(os.path.join(dir_path, file)))
- def get_available_memory_with_percent(percent: float = 1) -> int:
- """
- 根据百分比获取可用内存
-
- Args:
- percent: 内存使用百分比
-
- Returns:
- 可用内存(字节)
- """
- memory_info = psutil.virtual_memory()
- return int(memory_info.available * percent)
- def get_max_file_size(file_paths: List[str]) -> int:
- """
- 获取文件列表中的最大文件大小
-
- Args:
- file_paths: 文件路径列表
-
- Returns:
- 最大文件大小(字节)
- """
- max_size = 0
- for file_path in file_paths:
- file_size = get_file_size(file_path)
- if file_size > max_size:
- max_size = file_size
- return max_size
- def use_files_get_max_cpu_count(file_paths: List[str], memory_percent: float = 1 / 12,
- cpu_percent: float = 2 / 5) -> int:
- """
- 根据文件大小和内存情况计算最大进程数
-
- Args:
- file_paths: 文件路径列表
- memory_percent: 内存使用百分比
- cpu_percent: CPU使用百分比
-
- Returns:
- 最大进程数
- """
- max_file_size = get_max_file_size(file_paths)
- free_memory = get_available_memory_with_percent(memory_percent)
- count = int(free_memory / max_file_size)
- max_cpu_count = get_available_cpu_count_with_percent(cpu_percent)
- # 限制最大进程数
- max_cpu_count = min(max_cpu_count, ParallelProcessing.MAX_PROCESSES)
- result = count if count <= max_cpu_count else max_cpu_count
- if result == 0:
- result = 1
- if result > len(file_paths):
- result = len(file_paths)
- info("总文件数:", len(file_paths), ",获取最大文件大小:", str(round(max_file_size / 2 ** 20, 2)) + "M",
- "可用内存:", str(get_available_memory_with_percent(1) / 2 ** 20) + "M",
- "总CPU数:", get_cpu_count(), "CPU使用比例:", round(cpu_percent, 2), "CPU可用数量:", max_cpu_count,
- ",最终确定使用进程数:", result)
- return result
- def max_file_size_get_max_cpu_count(max_file_size: int, memory_percent: float = 1 / 6,
- cpu_percent: float = 2 / 5) -> int:
- """
- 根据最大文件大小和内存情况计算最大进程数
-
- Args:
- max_file_size: 最大文件大小
- memory_percent: 内存使用百分比
- cpu_percent: CPU使用百分比
-
- Returns:
- 最大进程数
- """
- free_memory = get_available_memory_with_percent(memory_percent)
- count = int(free_memory / max_file_size)
- max_cpu_count = get_available_cpu_count_with_percent(cpu_percent)
- # 限制最大进程数
- max_cpu_count = min(max_cpu_count, ParallelProcessing.MAX_PROCESSES)
- result = count if count <= max_cpu_count else max_cpu_count
- if result == 0:
- result = 1
- info(",获取最大文件大小:", str(round(max_file_size / 2 ** 20, 2)) + "M",
- "可用内存:", str(get_available_memory_with_percent(1) / 2 ** 20) + "M",
- "总CPU数:", get_cpu_count(), "CPU使用比例:", round(cpu_percent, 2), "CPU可用数量:", max_cpu_count,
- ",最终确定使用进程数:", result)
- return result
- if __name__ == '__main__':
- from utils.file.trans_methods import read_files
- import datetime
- read_path = r"Z:\collection_data\1进行中\密马风电场-山西-大唐\收资数据\scada\秒级数据"
- begin = datetime.datetime.now()
- all_files = read_files(read_path)
- print(datetime.datetime.now() - begin)
- print(use_files_get_max_cpu_count(all_files))
- print(get_available_memory_with_percent(1) / 2 ** 20)
- print(get_available_memory_with_percent(2 / 3) / 2 ** 20)
- begin = datetime.datetime.now()
- print(len(all_files))
- print(get_max_file_size(all_files) / 2 ** 20)
- print(datetime.datetime.now() - begin)
|