123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990 |
- import datetime
- import multiprocessing
- import os
- import chardet
- import pandas as pd
- def detect_file_encoding(filename):
- # 读取文件的前1000个字节(足够用于大多数编码检测)
- with open(filename, 'rb') as f:
- rawdata = f.read(1000)
- result = chardet.detect(rawdata)
- encoding = result['encoding']
- print("文件类型:", filename, encoding)
- if encoding is None:
- encoding = 'gb18030'
- if encoding.lower() in ['utf-8', 'ascii', 'utf8', 'utf-8-sig']:
- return 'utf-8'
- return 'gb18030'
- def __build_directory_dict(directory_dict, path, filter_types=None):
- # 遍历目录下的所有项
- for item in os.listdir(path):
- item_path = os.path.join(path, item)
- if os.path.isdir(item_path):
- __build_directory_dict(directory_dict, item_path, filter_types=filter_types)
- elif os.path.isfile(item_path):
- if path not in directory_dict:
- directory_dict[path] = []
- if filter_types is None or len(filter_types) == 0:
- directory_dict[path].append(item_path)
- elif str(item_path).split(".")[-1] in filter_types:
- if str(item_path).count("~$") == 0:
- directory_dict[path].append(item_path)
- # 读取路径下所有的excel文件
- def read_excel_files(read_path, filter_types=None):
- if filter_types is None:
- filter_types = ['xls', 'xlsx', 'csv', 'gz']
- if os.path.isfile(read_path):
- return [read_path]
- directory_dict = {}
- __build_directory_dict(directory_dict, read_path, filter_types=filter_types)
- return [path for paths in directory_dict.values() for path in paths if path]
- def read_file_to_df(file_path):
- df = pd.read_csv(file_path, encoding=detect_file_encoding(file_path))
- date = os.path.basename(file_path)[14:24]
- df['Time'] = df['Time'].apply(lambda x: date + ' ' + x)
- return df
- def read_files_and_save_csv(file_dir, month, save_dir):
- begin = datetime.datetime.now()
- base_dir = os.path.basename(file_dir)
- print(f"{datetime.datetime.now()}: 开始执行{base_dir}-{month}")
- all_files = read_excel_files(os.path.join(file_dir, month))
- df = pd.concat([read_file_to_df(file) for file in all_files], ignore_index=True)
- save_path = os.path.join(save_dir, base_dir, f'{month}.csv')
- os.makedirs(os.path.dirname(save_path), exist_ok=True)
- df.sort_values(by=['Time'], inplace=True)
- df.to_csv(save_path, encoding='utf8', index=False)
- print(f"{datetime.datetime.now()}: 执行{base_dir}-{month}结束,耗时{datetime.datetime.now() - begin}")
- if __name__ == '__main__':
- begin = datetime.datetime.now()
- read_dir = r'/data/download/collection_data/1进行中/张崾先风电场-陕西-华电/tmp/second/excel_tmp/'
- save_dir = r'/data/download/collection_data/1进行中/张崾先风电场-陕西-华电/清理数据/20241217完整字段'
- read_dirs = list()
- for i in range(26, 42):
- read_dirs.append(os.path.join(read_dir, str(i)))
- for read_dir in read_dirs:
- begin = datetime.datetime.now()
- with multiprocessing.Pool(6) as pool:
- pool.starmap(read_files_and_save_csv, [(read_dir, i, save_dir) for i in os.listdir(read_dir)])
- print(f"{datetime.datetime.now()}: 执行结束,总耗时{datetime.datetime.now() - begin}")
|