|
@@ -1,31 +1,47 @@
|
|
|
import datetime
|
|
import datetime
|
|
|
import multiprocessing
|
|
import multiprocessing
|
|
|
|
|
+import os
|
|
|
import traceback
|
|
import traceback
|
|
|
-from os import *
|
|
|
|
|
|
|
|
|
|
import pandas as pd
|
|
import pandas as pd
|
|
|
|
|
|
|
|
|
|
+from conf.constants import ParallelProcessing
|
|
|
from etl.common.PathsAndTable import PathsAndTable
|
|
from etl.common.PathsAndTable import PathsAndTable
|
|
|
from etl.wind_power.min_sec import TransParam
|
|
from etl.wind_power.min_sec import TransParam
|
|
|
from service.trans_conf_service import update_trans_transfer_progress
|
|
from service.trans_conf_service import update_trans_transfer_progress
|
|
|
from utils.file.trans_methods import read_excel_files, split_array, del_blank, \
|
|
from utils.file.trans_methods import read_excel_files, split_array, del_blank, \
|
|
|
create_file_path, read_file_to_df, valid_eval
|
|
create_file_path, read_file_to_df, valid_eval
|
|
|
-from utils.log.trans_log import trans_print
|
|
|
|
|
|
|
+from utils.log.trans_log import info, debug, error
|
|
|
from utils.systeminfo.sysinfo import use_files_get_max_cpu_count, get_dir_size
|
|
from utils.systeminfo.sysinfo import use_files_get_max_cpu_count, get_dir_size
|
|
|
|
|
|
|
|
|
|
|
|
|
class ReadAndSaveTmp(object):
|
|
class ReadAndSaveTmp(object):
|
|
|
|
|
+ """读取并保存临时文件类"""
|
|
|
|
|
|
|
|
def __init__(self, pathsAndTable: PathsAndTable, trans_param: TransParam):
|
|
def __init__(self, pathsAndTable: PathsAndTable, trans_param: TransParam):
|
|
|
|
|
+ """
|
|
|
|
|
+ 初始化读取并保存临时文件类
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ pathsAndTable: 路径和表对象
|
|
|
|
|
+ trans_param: 转换参数对象
|
|
|
|
|
+ """
|
|
|
self.pathsAndTable = pathsAndTable
|
|
self.pathsAndTable = pathsAndTable
|
|
|
self.trans_param = trans_param
|
|
self.trans_param = trans_param
|
|
|
self.exist_wind_names = multiprocessing.Manager().list()
|
|
self.exist_wind_names = multiprocessing.Manager().list()
|
|
|
self.lock = multiprocessing.Manager().Lock()
|
|
self.lock = multiprocessing.Manager().Lock()
|
|
|
self.file_lock = multiprocessing.Manager().dict()
|
|
self.file_lock = multiprocessing.Manager().dict()
|
|
|
|
|
|
|
|
- def _save_to_tmp_csv_by_name(self, df, name):
|
|
|
|
|
|
|
+ def _save_to_tmp_csv_by_name(self, df: pd.DataFrame, name: str):
|
|
|
|
|
+ """
|
|
|
|
|
+ 根据风机名称保存到临时CSV文件
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ df: 数据帧
|
|
|
|
|
+ name: 风机名称
|
|
|
|
|
+ """
|
|
|
save_name = str(name) + '.csv'
|
|
save_name = str(name) + '.csv'
|
|
|
- save_path = path.join(self.pathsAndTable.get_read_tmp_path(), save_name)
|
|
|
|
|
|
|
+ save_path = os.path.join(self.pathsAndTable.get_read_tmp_path(), save_name)
|
|
|
create_file_path(save_path, is_file_path=True)
|
|
create_file_path(save_path, is_file_path=True)
|
|
|
|
|
|
|
|
with self.lock:
|
|
with self.lock:
|
|
@@ -41,7 +57,13 @@ class ReadAndSaveTmp(object):
|
|
|
else:
|
|
else:
|
|
|
df.to_csv(save_path, index=False, encoding='utf8')
|
|
df.to_csv(save_path, index=False, encoding='utf8')
|
|
|
|
|
|
|
|
- def save_merge_data(self, file_path):
|
|
|
|
|
|
|
+ def save_merge_data(self, file_path: str):
|
|
|
|
|
+ """
|
|
|
|
|
+ 保存合并数据
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ file_path: 文件路径
|
|
|
|
|
+ """
|
|
|
df = self.read_excel_to_df(file_path)
|
|
df = self.read_excel_to_df(file_path)
|
|
|
if self.trans_param.wind_name_exec:
|
|
if self.trans_param.wind_name_exec:
|
|
|
if valid_eval(self.trans_param.wind_name_exec):
|
|
if valid_eval(self.trans_param.wind_name_exec):
|
|
@@ -67,7 +89,7 @@ class ReadAndSaveTmp(object):
|
|
|
else:
|
|
else:
|
|
|
contains_name = False
|
|
contains_name = False
|
|
|
self.exist_wind_names.append(exist_name)
|
|
self.exist_wind_names.append(exist_name)
|
|
|
- save_path = path.join(merge_path, csv_name)
|
|
|
|
|
|
|
+ save_path = os.path.join(merge_path, csv_name)
|
|
|
now_df = df[df['wind_turbine_number'] == wind_name][['time_stamp', col]]
|
|
now_df = df[df['wind_turbine_number'] == wind_name][['time_stamp', col]]
|
|
|
if contains_name:
|
|
if contains_name:
|
|
|
now_df.to_csv(save_path, index=False, encoding='utf-8', mode='a',
|
|
now_df.to_csv(save_path, index=False, encoding='utf-8', mode='a',
|
|
@@ -75,7 +97,16 @@ class ReadAndSaveTmp(object):
|
|
|
else:
|
|
else:
|
|
|
now_df.to_csv(save_path, index=False, encoding='utf-8')
|
|
now_df.to_csv(save_path, index=False, encoding='utf-8')
|
|
|
|
|
|
|
|
- def trans_df_cols(self, df):
|
|
|
|
|
|
|
+ def trans_df_cols(self, df: pd.DataFrame) -> pd.DataFrame:
|
|
|
|
|
+ """
|
|
|
|
|
+ 转换数据帧列名
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ df: 数据帧
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ 转换后的数据帧
|
|
|
|
|
+ """
|
|
|
if self.trans_param.is_vertical_table:
|
|
if self.trans_param.is_vertical_table:
|
|
|
pass
|
|
pass
|
|
|
else:
|
|
else:
|
|
@@ -120,8 +151,13 @@ class ReadAndSaveTmp(object):
|
|
|
|
|
|
|
|
return df
|
|
return df
|
|
|
|
|
|
|
|
- def df_save_to_tmp_file(self, df=pd.DataFrame()):
|
|
|
|
|
-
|
|
|
|
|
|
|
+ def df_save_to_tmp_file(self, df: pd.DataFrame = pd.DataFrame()):
|
|
|
|
|
+ """
|
|
|
|
|
+ 保存数据帧到临时文件
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ df: 数据帧
|
|
|
|
|
+ """
|
|
|
df = self.trans_df_cols(df)
|
|
df = self.trans_df_cols(df)
|
|
|
|
|
|
|
|
df = del_blank(df, ['wind_turbine_number'])
|
|
df = del_blank(df, ['wind_turbine_number'])
|
|
@@ -133,19 +169,34 @@ class ReadAndSaveTmp(object):
|
|
|
|
|
|
|
|
self.save_to_tmp_csv(df)
|
|
self.save_to_tmp_csv(df)
|
|
|
|
|
|
|
|
- def save_to_tmp_csv(self, df):
|
|
|
|
|
|
|
+ def save_to_tmp_csv(self, df: pd.DataFrame):
|
|
|
|
|
+ """
|
|
|
|
|
+ 保存到临时CSV文件
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ df: 数据帧
|
|
|
|
|
+ """
|
|
|
names = set(df['wind_turbine_number'].values)
|
|
names = set(df['wind_turbine_number'].values)
|
|
|
if names:
|
|
if names:
|
|
|
- trans_print("开始保存", str(names), "到临时文件", df.shape)
|
|
|
|
|
|
|
+ debug("开始保存", str(names), "到临时文件", df.shape)
|
|
|
|
|
|
|
|
for name in names:
|
|
for name in names:
|
|
|
self._save_to_tmp_csv_by_name(df[df['wind_turbine_number'] == name], name)
|
|
self._save_to_tmp_csv_by_name(df[df['wind_turbine_number'] == name], name)
|
|
|
del df
|
|
del df
|
|
|
- trans_print("保存", str(names), "到临时文件成功, 风机数量", len(names))
|
|
|
|
|
-
|
|
|
|
|
- def merge_df(self, dir_path):
|
|
|
|
|
|
|
+ debug("保存", str(names), "到临时文件成功, 风机数量", len(names))
|
|
|
|
|
+
|
|
|
|
|
+ def merge_df(self, dir_path: str) -> pd.DataFrame:
|
|
|
|
|
+ """
|
|
|
|
|
+ 合并数据帧
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ dir_path: 目录路径
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ 合并后的数据帧
|
|
|
|
|
+ """
|
|
|
all_files = read_excel_files(dir_path)
|
|
all_files = read_excel_files(dir_path)
|
|
|
- wind_turbine_number = path.basename(dir_path)
|
|
|
|
|
|
|
+ wind_turbine_number = os.path.basename(dir_path)
|
|
|
df = pd.DataFrame()
|
|
df = pd.DataFrame()
|
|
|
for file in all_files:
|
|
for file in all_files:
|
|
|
now_df = read_file_to_df(file)
|
|
now_df = read_file_to_df(file)
|
|
@@ -161,8 +212,13 @@ class ReadAndSaveTmp(object):
|
|
|
return df
|
|
return df
|
|
|
|
|
|
|
|
def read_file_and_save_tmp(self):
|
|
def read_file_and_save_tmp(self):
|
|
|
|
|
+ """
|
|
|
|
|
+ 读取文件并保存到临时文件
|
|
|
|
|
+ """
|
|
|
all_files = read_excel_files(self.pathsAndTable.get_excel_tmp_path())
|
|
all_files = read_excel_files(self.pathsAndTable.get_excel_tmp_path())
|
|
|
split_count = use_files_get_max_cpu_count(all_files)
|
|
split_count = use_files_get_max_cpu_count(all_files)
|
|
|
|
|
+ # 限制最大进程数
|
|
|
|
|
+ split_count = min(split_count, ParallelProcessing.MAX_PROCESSES)
|
|
|
all_arrays = split_array(all_files, split_count)
|
|
all_arrays = split_array(all_files, split_count)
|
|
|
|
|
|
|
|
if self.trans_param.merge_columns:
|
|
if self.trans_param.merge_columns:
|
|
@@ -172,7 +228,7 @@ class ReadAndSaveTmp(object):
|
|
|
pool.starmap(self.save_merge_data, [(ar,) for ar in arr])
|
|
pool.starmap(self.save_merge_data, [(ar,) for ar in arr])
|
|
|
|
|
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
- trans_print(traceback.format_exc())
|
|
|
|
|
|
|
+ error(traceback.format_exc())
|
|
|
message = "整理临时文件,系统返回错误:" + str(e)
|
|
message = "整理临时文件,系统返回错误:" + str(e)
|
|
|
raise ValueError(message)
|
|
raise ValueError(message)
|
|
|
|
|
|
|
@@ -180,28 +236,28 @@ class ReadAndSaveTmp(object):
|
|
|
round(20 + 20 * (index + 1) / len(all_arrays), 2),
|
|
round(20 + 20 * (index + 1) / len(all_arrays), 2),
|
|
|
self.pathsAndTable.save_db)
|
|
self.pathsAndTable.save_db)
|
|
|
|
|
|
|
|
- dirs = [path.join(self.pathsAndTable.get_merge_tmp_path(), dir_name) for dir_name in
|
|
|
|
|
- listdir(self.pathsAndTable.get_merge_tmp_path())]
|
|
|
|
|
- dir_total_size = get_dir_size(dirs[0])
|
|
|
|
|
- # split_count = max_file_size_get_max_cpu_count(dir_total_size, memory_percent=1 / 12, cpu_percent=1 / 10)
|
|
|
|
|
- split_count = 2
|
|
|
|
|
- all_arrays = split_array(dirs, split_count)
|
|
|
|
|
- for index, arr in enumerate(all_arrays):
|
|
|
|
|
- try:
|
|
|
|
|
- with multiprocessing.Pool(split_count) as pool:
|
|
|
|
|
- pool.starmap(self.merge_df, [(ar,) for ar in arr])
|
|
|
|
|
-
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- trans_print(traceback.format_exc())
|
|
|
|
|
- message = "整理临时文件,系统返回错误:" + str(e)
|
|
|
|
|
- raise ValueError(message)
|
|
|
|
|
-
|
|
|
|
|
- update_trans_transfer_progress(self.pathsAndTable.id,
|
|
|
|
|
- round(20 + 30 * (index + 1) / len(all_arrays), 2),
|
|
|
|
|
- self.pathsAndTable.save_db)
|
|
|
|
|
|
|
+ dirs = [os.path.join(self.pathsAndTable.get_merge_tmp_path(), dir_name) for dir_name in
|
|
|
|
|
+ os.listdir(self.pathsAndTable.get_merge_tmp_path())]
|
|
|
|
|
+ if dirs:
|
|
|
|
|
+ dir_total_size = get_dir_size(dirs[0])
|
|
|
|
|
+ # 限制最大进程数
|
|
|
|
|
+ split_count = min(dir_total_size, ParallelProcessing.MAX_PROCESSES)
|
|
|
|
|
+ all_arrays = split_array(dirs, split_count)
|
|
|
|
|
+ for index, arr in enumerate(all_arrays):
|
|
|
|
|
+ try:
|
|
|
|
|
+ with multiprocessing.Pool(split_count) as pool:
|
|
|
|
|
+ pool.starmap(self.merge_df, [(ar,) for ar in arr])
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ error(traceback.format_exc())
|
|
|
|
|
+ message = "整理临时文件,系统返回错误:" + str(e)
|
|
|
|
|
+ raise ValueError(message)
|
|
|
|
|
+
|
|
|
|
|
+ update_trans_transfer_progress(self.pathsAndTable.id,
|
|
|
|
|
+ round(20 + 30 * (index + 1) / len(all_arrays), 2),
|
|
|
|
|
+ self.pathsAndTable.save_db)
|
|
|
|
|
|
|
|
else:
|
|
else:
|
|
|
-
|
|
|
|
|
for index, arr in enumerate(all_arrays):
|
|
for index, arr in enumerate(all_arrays):
|
|
|
try:
|
|
try:
|
|
|
with multiprocessing.Pool(split_count) as pool:
|
|
with multiprocessing.Pool(split_count) as pool:
|
|
@@ -209,7 +265,7 @@ class ReadAndSaveTmp(object):
|
|
|
for df in dfs:
|
|
for df in dfs:
|
|
|
self.df_save_to_tmp_file(df)
|
|
self.df_save_to_tmp_file(df)
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
- trans_print(traceback.format_exc())
|
|
|
|
|
|
|
+ error(traceback.format_exc())
|
|
|
message = "整理临时文件,系统返回错误:" + str(e)
|
|
message = "整理临时文件,系统返回错误:" + str(e)
|
|
|
raise ValueError(message)
|
|
raise ValueError(message)
|
|
|
|
|
|
|
@@ -217,8 +273,16 @@ class ReadAndSaveTmp(object):
|
|
|
round(20 + 30 * (index + 1) / len(all_arrays), 2),
|
|
round(20 + 30 * (index + 1) / len(all_arrays), 2),
|
|
|
self.pathsAndTable.save_db)
|
|
self.pathsAndTable.save_db)
|
|
|
|
|
|
|
|
- def read_excel_to_df(self, file_path):
|
|
|
|
|
-
|
|
|
|
|
|
|
+ def read_excel_to_df(self, file_path: str) -> pd.DataFrame:
|
|
|
|
|
+ """
|
|
|
|
|
+ 读取Excel文件到数据帧
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ file_path: 文件路径
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ 数据帧
|
|
|
|
|
+ """
|
|
|
read_cols = [v.split(",")[0] for k, v in self.trans_param.cols_tran.items() if v and not v.startswith("$")]
|
|
read_cols = [v.split(",")[0] for k, v in self.trans_param.cols_tran.items() if v and not v.startswith("$")]
|
|
|
|
|
|
|
|
trans_dict = {}
|
|
trans_dict = {}
|
|
@@ -300,7 +364,7 @@ class ReadAndSaveTmp(object):
|
|
|
|
|
|
|
|
for k, v in trans_dict.items():
|
|
for k, v in trans_dict.items():
|
|
|
if k.startswith("$file"):
|
|
if k.startswith("$file"):
|
|
|
- file = ".".join(path.basename(file_path).split(".")[0:-1])
|
|
|
|
|
|
|
+ file = ".".join(os.path.basename(file_path).split(".")[0:-1])
|
|
|
if k == "$file":
|
|
if k == "$file":
|
|
|
ks = k.split("|")
|
|
ks = k.split("|")
|
|
|
bool_contains = False
|
|
bool_contains = False
|
|
@@ -337,7 +401,7 @@ class ReadAndSaveTmp(object):
|
|
|
datas = str(k.split(",")[1].replace("$file_date", "").replace("[", "").replace("]", "")).split(":")
|
|
datas = str(k.split(",")[1].replace("$file_date", "").replace("[", "").replace("]", "")).split(":")
|
|
|
if len(datas) != 2:
|
|
if len(datas) != 2:
|
|
|
raise Exception("字段映射出现错误 :" + str(trans_dict))
|
|
raise Exception("字段映射出现错误 :" + str(trans_dict))
|
|
|
- file = ".".join(path.basename(file_path).split(".")[0:-1])
|
|
|
|
|
|
|
+ file = ".".join(os.path.basename(file_path).split(".")[0:-1])
|
|
|
date_str = str(file[int(datas[0]):int(datas[1])]).strip()
|
|
date_str = str(file[int(datas[0]):int(datas[1])]).strip()
|
|
|
df[v] = df[k.split(",")[0]].apply(lambda x: date_str + " " + str(x))
|
|
df[v] = df[k.split(",")[0]].apply(lambda x: date_str + " " + str(x))
|
|
|
|
|
|
|
@@ -351,8 +415,8 @@ class ReadAndSaveTmp(object):
|
|
|
if not bool_contains:
|
|
if not bool_contains:
|
|
|
cengshu = int(str(ks[0].replace("$folder", "").replace("[", "").replace("]", "")))
|
|
cengshu = int(str(ks[0].replace("$folder", "").replace("[", "").replace("]", "")))
|
|
|
for i in range(cengshu):
|
|
for i in range(cengshu):
|
|
|
- folder = path.dirname(folder)
|
|
|
|
|
- df[v] = str(str(folder).split(sep)[-1]).strip()
|
|
|
|
|
|
|
+ folder = os.path.dirname(folder)
|
|
|
|
|
+ df[v] = str(str(folder).split(os.sep)[-1]).strip()
|
|
|
elif k.startswith("$sheet_name"):
|
|
elif k.startswith("$sheet_name"):
|
|
|
df[v] = df['sheet_name']
|
|
df[v] = df['sheet_name']
|
|
|
|
|
|
|
@@ -374,9 +438,11 @@ class ReadAndSaveTmp(object):
|
|
|
return df
|
|
return df
|
|
|
|
|
|
|
|
def run(self):
|
|
def run(self):
|
|
|
- trans_print("开始保存数据到临时文件")
|
|
|
|
|
|
|
+ """
|
|
|
|
|
+ """
|
|
|
|
|
+ info("开始保存数据到临时文件")
|
|
|
begin = datetime.datetime.now()
|
|
begin = datetime.datetime.now()
|
|
|
self.read_file_and_save_tmp()
|
|
self.read_file_and_save_tmp()
|
|
|
update_trans_transfer_progress(self.pathsAndTable.id, 50,
|
|
update_trans_transfer_progress(self.pathsAndTable.id, 50,
|
|
|
self.pathsAndTable.save_db)
|
|
self.pathsAndTable.save_db)
|
|
|
- trans_print("保存数据到临时文件结束,耗时:", datetime.datetime.now() - begin)
|
|
|
|
|
|
|
+ info("保存数据到临时文件结束,耗时:", datetime.datetime.now() - begin)
|