123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283 |
- # -*- coding: utf-8 -*-
- """
- Created on Tue Jul 9 16:28:48 2024
- @author: Administrator
- """
- import multiprocessing
- from datetime import datetime
- from os import *
- import chardet
- import numpy as np
- import pandas as pd
- pd.options.mode.copy_on_write = True
- # 获取文件编码
- def detect_file_encoding(filename):
- # 读取文件的前1000个字节(足够用于大多数编码检测)
- with open(filename, 'rb') as f:
- rawdata = f.read(1000)
- result = chardet.detect(rawdata)
- encoding = result['encoding']
- if encoding is None:
- encoding = 'gb18030'
- if encoding and encoding.lower() == 'gb2312' or encoding.lower().startswith("windows"):
- encoding = 'gb18030'
- return encoding
- # 读取数据到df
- def read_file_to_df(file_path, read_cols=list(), header=0):
- df = pd.DataFrame()
- if str(file_path).lower().endswith("csv") or str(file_path).lower().endswith("gz"):
- encoding = detect_file_encoding(file_path)
- end_with_gz = str(file_path).lower().endswith("gz")
- if read_cols:
- if end_with_gz:
- df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, compression='gzip', header=header)
- else:
- df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, header=header, on_bad_lines='warn')
- else:
- if end_with_gz:
- df = pd.read_csv(file_path, encoding=encoding, compression='gzip', header=header)
- else:
- df = pd.read_csv(file_path, encoding=encoding, header=header, on_bad_lines='warn')
- else:
- xls = pd.ExcelFile(file_path)
- # 获取所有的sheet名称
- sheet_names = xls.sheet_names
- for sheet in sheet_names:
- if read_cols:
- df = pd.concat([df, pd.read_excel(xls, sheet_name=sheet, header=header, usecols=read_cols)])
- else:
- df = pd.concat([df, pd.read_excel(xls, sheet_name=sheet, header=header)])
- return df
- def __build_directory_dict(directory_dict, path, filter_types=None):
- # 遍历目录下的所有项
- for item in listdir(path):
- item_path = path.join(path, item)
- if path.isdir(item_path):
- __build_directory_dict(directory_dict, item_path, filter_types=filter_types)
- elif path.isfile(item_path):
- if path not in directory_dict:
- directory_dict[path] = []
- if filter_types is None or len(filter_types) == 0:
- directory_dict[path].append(item_path)
- elif str(item_path).split(".")[-1] in filter_types:
- if str(item_path).count("~$") == 0:
- directory_dict[path].append(item_path)
- # 读取所有文件
- # 读取路径下所有的excel文件
- def read_excel_files(read_path):
- directory_dict = {}
- __build_directory_dict(directory_dict, read_path, filter_types=['xls', 'xlsx', 'csv', 'gz'])
- return [path for paths in directory_dict.values() for path in paths if path]
- # 创建路径
- def create_file_path(path, is_file_path=False):
- if is_file_path:
- path = path.dirname(path)
- if not path.exists(path):
- makedirs(path, exist_ok=True)
- def split_index(split_data: str, split_str: str):
- count = split_data.find(split_str)
- if count > -1:
- return split_data[count + len(split_str):]
- else:
- return split_str
- def replece_col_to_biaozhun(col):
- for k, v in dianjian_dict.items():
- if col.find(k) > -1:
- col = col.replace(k, v)
- return col
- return col
- def row_to_datas(row, pv_dict, inverter_cols, df_cols):
- row_datas = list(list())
- for xiangbian in pv_dict.keys():
- for nibianqi in pv_dict[xiangbian].keys():
- for pv in pv_dict[xiangbian][nibianqi]:
- datas = [np.nan] * 14
- datas[0] = row['时间']
- datas[1] = xiangbian
- datas[2] = nibianqi
- datas[3] = pv
- datas_4_col = "_".join([str(xiangbian), str(nibianqi), str(pv), '支路输出电压'])
- if datas_4_col in df_cols:
- datas[4] = row[datas_4_col]
- else:
- datas[4] = np.nan
- datas_5_col = "_".join([str(xiangbian), str(nibianqi), str(pv), '支路输出电流'])
- if datas_5_col in df_cols:
- datas[5] = row[datas_5_col]
- else:
- datas[5] = np.nan
- row_datas.append(datas)
- for xiangbian in pv_dict.keys():
- for nibianqi in pv_dict[xiangbian].keys():
- datas = [np.nan] * 14
- datas[0] = row['时间']
- datas[1] = xiangbian
- datas[2] = nibianqi
- datas[3] = 0
- for index, col_name in enumerate(inverter_cols):
- col = '_'.join([str(xiangbian), str(nibianqi), col_name])
- if col in df_cols:
- datas[index + 6] = row[col]
- else:
- datas[index + 6] = np.nan
- row_datas.append(datas)
- return row_datas
- def df_to_biaozhun(df):
- pv_cols = ['支路输出电压', '支路输出电流']
- inverter_cols = ['逆变器总发电量', '逆变器输出有功功率', '逆变器温度', '交流输出电压', '逆变器输出电流A相', '逆变器输出电流B相', '逆变器输出电流C相', '逆变器转换效率']
- # 从列名获取箱变->逆变器->PV等的字典
- pv_dict = dict(dict())
- for col in df.columns:
- for pv_col in pv_cols:
- if str(col).endswith(pv_col):
- datas = col.split("_")
- xiangbian = datas[0]
- nibiangqi = datas[1]
- pv = datas[2]
- if xiangbian in pv_dict.keys():
- if nibiangqi in pv_dict[xiangbian]:
- pv_dict[xiangbian][nibiangqi].add(pv)
- else:
- pv_dict[xiangbian][nibiangqi] = set([pv])
- else:
- pv_dict[xiangbian] = {nibiangqi: set([pv])}
- results = df.apply(row_to_datas, args=(pv_dict, inverter_cols, df.columns), axis=1)
- df_datas = results.to_list()
- df_datas = [da for data in df_datas for da in data]
- df_cols = ["时间", "箱变", "逆变器", "支路"]
- df_cols.extend(pv_cols)
- df_cols.extend(inverter_cols)
- df = pd.DataFrame(df_datas, columns=df_cols)
- type_conver_list = []
- type_conver_list.extend(pv_cols)
- type_conver_list.extend(inverter_cols)
- for type_conver in type_conver_list:
- df[type_conver] = pd.to_numeric(df[type_conver], errors='coerce')
- return df
- def read_and_save_csv(file_path, save_path):
- begin = datetime.now()
- base_name = path.basename(file_path)
- print('开始', base_name)
- df = read_file_to_df(file_path)
- for col in df.columns:
- for del_col in del_cols:
- if col.find(del_col) > -1:
- del df[col]
- df['时间'] = pd.to_datetime(df['时间'])
- xiangbian = [col for col in df.columns if str(col).startswith('#') and str(col).find('逆变器') > -1][0].split("逆变器")[
- 0].replace("#", "")
- df.columns = [xiangbian + "_" + split_index(df_col, "逆变器").replace('PV', "").replace("()", "").replace("-",
- "_") if df_col.startswith(
- "#") else df_col for df_col in
- df.columns]
- df.columns = [col.replace("输入", "_输入") for col in df.columns]
- df.columns = [replece_col_to_biaozhun(col) for col in df.columns]
- df = df_to_biaozhun(df)
- # df.sort_values(by=['时间', "箱变", "逆变器", "支路"], inplace=True)
- # save_file = path.join(save_path, path.basename(file_path))
- # create_file_path(save_file, True)
- # df.to_csv(save_file, encoding='utf-8', index=False)
- print('结束', base_name, '耗时:' + str(datetime.now() - begin))
- return df
- dianjian_data_str = """
- 输入电压 支路输出电压
- 输入电流 支路输出电流
- 功率因数
- 总发电量 逆变器总发电量
- 无功功率
- 有功功率 逆变器输出有功功率
- 机内温度 逆变器温度
- 电网AB线电压 交流输出电压
- 电网A相电流 逆变器输出电流A相
- 电网BC线电压
- 电网B相电流 逆变器输出电流B相
- 电网CA线电压
- 电网C相电流 逆变器输出电流C相
- 逆变器效率 逆变器转换效率
- """
- dianjian_dict = {}
- del_cols = []
- for data in dianjian_data_str.split("\n"):
- if data:
- datas = data.split("\t")
- if len(datas) == 2 and datas[1]:
- dianjian_dict[datas[0]] = datas[1]
- else:
- del_cols.append(datas[0])
- if __name__ == '__main__':
- path = r'/data/download/大唐玉湖性能分析离线分析/05整理数据/逆变器数据'
- save_path = r'/data/download/大唐玉湖性能分析离线分析/06整理数据/逆变器数据'
- # path = r'D:\trans_data\大唐玉湖性能分析离线分析\test\yuanshi'
- # save_path = r'D:\trans_data\大唐玉湖性能分析离线分析\test\zhengli'
- all_files = read_excel_files(path)
- all_datas = list(all_files)
- all_datas.sort()
- print(all_datas)
- # for file in all_datas:
- # read_and_save_csv(file, save_path)
- with multiprocessing.Pool(40) as pool:
- dfs = pool.starmap(read_and_save_csv, [(i, save_path) for i in all_datas])
- saved_pv_df = pd.concat(dfs)
- saved_pv_df.sort_values(by=['时间', "箱变", "逆变器", "支路"], inplace=True)
- save_file = path.join(save_path, "合并.csv")
- create_file_path(save_file, True)
- saved_pv_df.sort_values(by=['时间', "箱变", "逆变器", "支路"], inplace=True)
- saved_pv_df.to_csv(save_file, encoding='utf-8', index=False)
|