123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196 |
- # -*- coding: utf-8 -*-
- # @Time : 2024/5/16
- # @Author : 魏志亮
- import datetime
- import os
- import re
- import shutil
- import warnings
- import chardet
- import pandas as pd
- from utils.log.trans_log import trans_print
- warnings.filterwarnings("ignore")
- # 获取文件编码
- def detect_file_encoding(filename):
- # 读取文件的前1000个字节(足够用于大多数编码检测)
- with open(filename, 'rb') as f:
- rawdata = f.read(1000)
- result = chardet.detect(rawdata)
- encoding = result['encoding']
- trans_print("文件类型:", filename, encoding)
- if encoding is None:
- encoding = 'gb18030'
- if encoding.lower() in ['utf-8', 'ascii', 'utf8']:
- return 'utf-8'
- return 'gb18030'
- def del_blank(df=pd.DataFrame(), cols=list()):
- for col in cols:
- if df[col].dtype == object:
- df[col] = df[col].str.strip()
- return df
- # 切割数组到多个数组
- def split_array(array, num):
- return [array[i:i + num] for i in range(0, len(array), num)]
- def find_read_header(file_path, trans_cols):
- print(trans_cols)
- df = read_file_to_df(file_path, nrows=20)
- df.reset_index(inplace=True)
- count = 0
- for col in trans_cols:
- if col in df.columns:
- count = count + 1
- if count >= 2:
- return 0
- count = 0
- values = list()
- for index, row in df.iterrows():
- values = list(row.values)
- if type(row.name) == tuple:
- values.extend(list(row.name))
- for col in trans_cols:
- if col in values:
- count = count + 1
- if count >= 2:
- return index + 1
-
- return None
- # 读取数据到df
- def read_file_to_df(file_path, read_cols=list(), header=0, trans_cols=None, nrows=None):
- begin = datetime.datetime.now()
- trans_print('开始读取文件', file_path)
- if trans_cols:
- header = find_read_header(file_path, trans_cols)
- trans_print(os.path.basename(file_path), "读取第", header, "行")
- if header is None:
- message = '未匹配到开始行,请检查并重新指定'
- trans_print(message)
- raise Exception(message)
- try:
- df = pd.DataFrame()
- if str(file_path).lower().endswith("csv") or str(file_path).lower().endswith("gz"):
- encoding = detect_file_encoding(file_path)
- end_with_gz = str(file_path).lower().endswith("gz")
- if read_cols:
- if end_with_gz:
- df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, compression='gzip', header=header,
- nrows=nrows)
- else:
- df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, header=header,
- on_bad_lines='warn', nrows=nrows)
- else:
- if end_with_gz:
- df = pd.read_csv(file_path, encoding=encoding, compression='gzip', header=header, nrows=nrows)
- else:
- df = pd.read_csv(file_path, encoding=encoding, header=header, on_bad_lines='warn', nrows=nrows)
- else:
- xls = pd.ExcelFile(file_path)
- # 获取所有的sheet名称
- sheet_names = xls.sheet_names
- for sheet_name in sheet_names:
- if read_cols:
- now_df = pd.read_excel(xls, sheet_name=sheet_name, header=header, usecols=read_cols, nrows=nrows)
- else:
- now_df = pd.read_excel(xls, sheet_name=sheet_name, header=header, nrows=nrows)
- now_df['sheet_name'] = sheet_name
- df = pd.concat([df, now_df])
- trans_print('文件读取成功', file_path, '文件数量', df.shape, '耗时', datetime.datetime.now() - begin)
- except Exception as e:
- trans_print('读取文件出错', file_path, str(e))
- message = '文件:' + os.path.basename(file_path) + ',' + str(e)
- raise ValueError(message)
- return df
- def __build_directory_dict(directory_dict, path, filter_types=None):
- # 遍历目录下的所有项
- for item in os.listdir(path):
- item_path = os.path.join(path, item)
- if os.path.isdir(item_path):
- __build_directory_dict(directory_dict, item_path, filter_types=filter_types)
- elif os.path.isfile(item_path):
- if path not in directory_dict:
- directory_dict[path] = []
- if filter_types is None or len(filter_types) == 0:
- directory_dict[path].append(item_path)
- elif str(item_path).split(".")[-1] in filter_types:
- if str(item_path).count("~$") == 0:
- directory_dict[path].append(item_path)
- # 读取路径下所有的excel文件
- def read_excel_files(read_path):
- directory_dict = {}
- __build_directory_dict(directory_dict, read_path, filter_types=['xls', 'xlsx', 'csv', 'gz'])
- return [path for paths in directory_dict.values() for path in paths if path]
- # 读取路径下所有的文件
- def read_files(read_path):
- directory_dict = {}
- __build_directory_dict(directory_dict, read_path, filter_types=['xls', 'xlsx', 'csv', 'gz', 'zip', 'rar'])
- return [path for paths in directory_dict.values() for path in paths if path]
- def copy_to_new(from_path, to_path):
- is_file = False
- if to_path.count('.') > 0:
- is_file = True
- create_file_path(to_path, is_file_path=is_file)
- shutil.copy(from_path, to_path)
- # 创建路径
- def create_file_path(path, is_file_path=False):
- if is_file_path:
- path = os.path.dirname(path)
- if not os.path.exists(path):
- os.makedirs(path, exist_ok=True)
- # 格式化风机名称
- def generate_turbine_name(turbine_name='F0001', prefix='F'):
- strinfo = re.compile(r"[\D*]")
- name = strinfo.sub('', str(turbine_name))
- return prefix + str(int(name)).zfill(3)
- if __name__ == '__main__':
- # files = read_excel_files(r'D:\trans_data\10.xls')
- # for file in files:
- file = r'D:\trans_data\新艾里风电场10号风机.csv'
- read_file_to_df(file, trans_cols=
- ['', '风向', '时间', '设备号', '机舱方向总角度', '$folder[2]', '发电机转速30秒平均值', '机组运行模式', '机舱旋转角度', '主轴转速', '变桨角度30秒平均值', '记录时间',
- '发电机功率30秒平均值', '风速30秒平均值'])
|