import os from datetime import datetime import chardet import pandas as pd from sqlalchemy import create_engine, text, inspect from utils.log.trans_log import logger def get_engine(host='192.168.50.235', port=30306, user='root', pwd='admin123456', db='datang'): engine = create_engine(f'mysql+pymysql://{user}:{pwd}@{host}:{port}/{db}') return engine 1 def print_log(message: str = '', bool_print_log=False, pre_time=None): log_now = datetime.now() if bool_print_log: if pre_time is None: logger.info(message) else: logger.info('%s,耗时:%s', message, log_now - pre_time) return log_now # 获取文件编码 def detect_file_encoding(filename): with open(filename, 'rb') as f: rawdata = f.read(1000) result = chardet.detect(rawdata) encoding = result['encoding'] if encoding is None: encoding = 'gb18030' if encoding.lower() in ['utf-8', 'ascii', 'utf8', 'utf-8-sig']: return 'utf-8' return 'gb18030' def df_save_file(df: pd.DataFrame = None, file_name: str = '', bool_add_time: bool = True, bool_print_log: bool = True): if df is None: raise Exception('df不能为None') res_file_name = file_name if bool_add_time: now = datetime.now() now_str = now.strftime('%Y%m%d%H%M%S') res_file_name = os.path.join(os.path.dirname(file_name), now_str + '-' + os.path.basename(file_name)) log_now = print_log(f'开始保存df:{df.shape}到{res_file_name}', bool_print_log) os.makedirs(os.path.dirname(file_name), exist_ok=True) if file_name.endswith('csv'): df.to_csv(res_file_name, index=False, encoding='utf8') elif file_name.endswith('xls') or file_name.endswith('xlsx'): df.to_excel(res_file_name, index=False) elif file_name.endswith('parquet'): df.to_parquet(res_file_name, index=False) else: raise Exception('需要添加映射') print_log(f'完成保存df:{df.shape}到{res_file_name}', bool_print_log, pre_time=log_now) return res_file_name def read_file(file_path, read_cols: list = None, header: int = 0, nrows: int = None, bool_print_log: bool = True) -> pd.DataFrame: # 参数验证 if not file_path or not isinstance(file_path, str): raise ValueError("文件路径不能为空且必须是字符串") if not os.path.exists(file_path): raise FileNotFoundError(f"文件不存在: {file_path}") file_ext = file_path.lower() # 构建读取参数的通用字典 read_params = {'header': header} if nrows is not None: read_params['nrows'] = nrows try: log_now = print_log(f'开始读取文件: {file_path}', bool_print_log) # 根据文件扩展名选择读取方式 if file_ext.endswith('.csv'): encoding = detect_file_encoding(file_path) read_params['encoding'] = encoding if read_cols: df = pd.read_csv(file_path, usecols=read_cols, **read_params) else: df = pd.read_csv(file_path, **read_params) elif file_ext.endswith(('.xls', '.xlsx')): if read_cols: df = pd.read_excel(file_path, usecols=read_cols, **read_params) else: df = pd.read_excel(file_path, **read_params) elif file_ext.endswith('.parquet'): # parquet文件使用不同的参数名 parquet_params = {'columns': read_cols} if read_cols else {} df = pd.read_parquet(file_path, **parquet_params) elif file_ext.endswith('.json'): if read_cols: df = pd.read_json(file_path, **read_params)[read_cols] else: df = pd.read_json(file_path, **read_params) else: supported_formats = ['.csv', '.xls', '.xlsx', '.parquet', '.json'] raise ValueError( f"不支持的文件格式: {os.path.splitext(file_path)[1]}\n" f"支持的文件格式: {', '.join(supported_formats)}" ) print_log(f'文件读取成功: {file_path}, 数据量: {df.shape}', bool_print_log, pre_time=log_now) return df except pd.errors.EmptyDataError: print_log(f'文件为空: {file_path}', bool_print_log) return pd.DataFrame() except Exception as e: print_log(f'读取文件失败: {file_path}, 错误: {str(e)}', bool_print_log) raise def df_save_table(engine, df: pd.DataFrame, table_name, pre_save: str = None): """ engine: 数据库引擎 df: DataFrame table_name: 表名 pre_save: 删除表(DROP),清空表(TRUNCATE)或者不处理(None) """ log_time = print_log(f'开始保存到表{table_name},数据量:{df.shape},前置处理表:{pre_save}', bool_print_log=True) if not pre_save is None: with engine.connect() as conn: # 检查表是否存在 inspector = inspect(engine) if table_name in inspector.get_table_names(): # 删除表 conn.execute(text(f"{pre_save} TABLE {table_name}")) conn.commit() df.to_sql(table_name, con=engine, if_exists='append', index=False) print_log(f'开始保存到表{table_name},数据量:{df.shape},前置处理表:{pre_save}', bool_print_log=True, pre_time=log_time) from pathlib import Path def get_all_files(read_path: str, pre_filter: tuple = None): result = list() if os.path.isfile(read_path): if pre_filter is None: result.append(str(Path(read_path))) else: if read_path.split('.')[-1].endswith(pre_filter): result.append(str(Path(read_path))) else: for root, dir, files in os.walk(read_path): for file in files: whole_path = os.path.join(root, file) if pre_filter is None: result.append(str(Path(whole_path))) else: if whole_path.split('.')[-1].endswith(pre_filter): result.append(str(Path(whole_path))) return result