| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- import os
- from datetime import datetime
- import chardet
- import pandas as pd
- from sqlalchemy import create_engine, text, inspect
- from utils.log.trans_log import logger
- def get_engine(host='192.168.50.235', port=30306, user='root', pwd='admin123456', db='datang'):
- engine = create_engine(f'mysql+pymysql://{user}:{pwd}@{host}:{port}/{db}')
- return engine
- 1
- def print_log(message: str = '', bool_print_log=False, pre_time=None):
- log_now = datetime.now()
- if bool_print_log:
- if pre_time is None:
- logger.info(message)
- else:
- logger.info('%s,耗时:%s', message, log_now - pre_time)
- return log_now
- # 获取文件编码
- def detect_file_encoding(filename):
- with open(filename, 'rb') as f:
- rawdata = f.read(1000)
- result = chardet.detect(rawdata)
- encoding = result['encoding']
- if encoding is None:
- encoding = 'gb18030'
- if encoding.lower() in ['utf-8', 'ascii', 'utf8', 'utf-8-sig']:
- return 'utf-8'
- return 'gb18030'
- def df_save_file(df: pd.DataFrame = None, file_name: str = '', bool_add_time: bool = True, bool_print_log: bool = True):
- if df is None:
- raise Exception('df不能为None')
- res_file_name = file_name
- if bool_add_time:
- now = datetime.now()
- now_str = now.strftime('%Y%m%d%H%M%S')
- res_file_name = os.path.join(os.path.dirname(file_name), now_str + '-' + os.path.basename(file_name))
- log_now = print_log(f'开始保存df:{df.shape}到{res_file_name}', bool_print_log)
- os.makedirs(os.path.dirname(file_name), exist_ok=True)
- if file_name.endswith('csv'):
- df.to_csv(res_file_name, index=False, encoding='utf8')
- elif file_name.endswith('xls') or file_name.endswith('xlsx'):
- df.to_excel(res_file_name, index=False)
- elif file_name.endswith('parquet'):
- df.to_parquet(res_file_name, index=False)
- else:
- raise Exception('需要添加映射')
- print_log(f'完成保存df:{df.shape}到{res_file_name}', bool_print_log, pre_time=log_now)
- return res_file_name
- def read_file(file_path, read_cols: list = None, header: int = 0,
- nrows: int = None, bool_print_log: bool = True) -> pd.DataFrame:
- # 参数验证
- if not file_path or not isinstance(file_path, str):
- raise ValueError("文件路径不能为空且必须是字符串")
- if not os.path.exists(file_path):
- raise FileNotFoundError(f"文件不存在: {file_path}")
- file_ext = file_path.lower()
- # 构建读取参数的通用字典
- read_params = {'header': header}
- if nrows is not None:
- read_params['nrows'] = nrows
- try:
- log_now = print_log(f'开始读取文件: {file_path}', bool_print_log)
- # 根据文件扩展名选择读取方式
- if file_ext.endswith('.csv'):
- encoding = detect_file_encoding(file_path)
- read_params['encoding'] = encoding
- if read_cols:
- df = pd.read_csv(file_path, usecols=read_cols, **read_params)
- else:
- df = pd.read_csv(file_path, **read_params)
- elif file_ext.endswith(('.xls', '.xlsx')):
- if read_cols:
- df = pd.read_excel(file_path, usecols=read_cols, **read_params)
- else:
- df = pd.read_excel(file_path, **read_params)
- elif file_ext.endswith('.parquet'):
- # parquet文件使用不同的参数名
- parquet_params = {'columns': read_cols} if read_cols else {}
- df = pd.read_parquet(file_path, **parquet_params)
- elif file_ext.endswith('.json'):
- if read_cols:
- df = pd.read_json(file_path, **read_params)[read_cols]
- else:
- df = pd.read_json(file_path, **read_params)
- else:
- supported_formats = ['.csv', '.xls', '.xlsx', '.parquet', '.json']
- raise ValueError(
- f"不支持的文件格式: {os.path.splitext(file_path)[1]}\n"
- f"支持的文件格式: {', '.join(supported_formats)}"
- )
- print_log(f'文件读取成功: {file_path}, 数据量: {df.shape}', bool_print_log, pre_time=log_now)
- return df
- except pd.errors.EmptyDataError:
- print_log(f'文件为空: {file_path}', bool_print_log)
- return pd.DataFrame()
- except Exception as e:
- print_log(f'读取文件失败: {file_path}, 错误: {str(e)}', bool_print_log)
- raise
- def df_save_table(engine, df: pd.DataFrame, table_name, pre_save: str = None):
- """
- engine: 数据库引擎
- df: DataFrame
- table_name: 表名
- pre_save: 删除表(DROP),清空表(TRUNCATE)或者不处理(None)
- """
- log_time = print_log(f'开始保存到表{table_name},数据量:{df.shape},前置处理表:{pre_save}', bool_print_log=True)
- if not pre_save is None:
- with engine.connect() as conn:
- # 检查表是否存在
- inspector = inspect(engine)
- if table_name in inspector.get_table_names():
- # 删除表
- conn.execute(text(f"{pre_save} TABLE {table_name}"))
- conn.commit()
- df.to_sql(table_name, con=engine, if_exists='append', index=False)
- print_log(f'开始保存到表{table_name},数据量:{df.shape},前置处理表:{pre_save}', bool_print_log=True,
- pre_time=log_time)
- from pathlib import Path
- def get_all_files(read_path: str, pre_filter: tuple = None):
- result = list()
- if os.path.isfile(read_path):
- if pre_filter is None:
- result.append(str(Path(read_path)))
- else:
- if read_path.split('.')[-1].endswith(pre_filter):
- result.append(str(Path(read_path)))
- else:
- for root, dir, files in os.walk(read_path):
- for file in files:
- whole_path = os.path.join(root, file)
- if pre_filter is None:
- result.append(str(Path(whole_path)))
- else:
- if whole_path.split('.')[-1].endswith(pre_filter):
- result.append(str(Path(whole_path)))
- return result
|