Bladeren bron

添加列名处理
添加新华的临时处理代码

wzl 8 maanden geleden
bovenliggende
commit
8da9b664ba

+ 1 - 1
etl/common/BaseDataTrans.py

@@ -106,7 +106,7 @@ class BaseDataTrans(object):
             update_trans_status_error(self.batch_no, self.read_type, str(e), self.save_db)
             raise e
         finally:
-            self.pathsAndTable.delete_tmp_files()
+            # self.pathsAndTable.delete_tmp_files()
             trans_print("执行结束,总耗时:", str(datetime.datetime.now() - total_begin))
 
 

+ 1 - 1
etl/wind_power/min_sec/ClassIdentifier.py

@@ -5,7 +5,6 @@ from pandas import DataFrame
 
 from utils.file.trans_methods import read_file_to_df
 from utils.log.trans_log import trans_print
-from utils.systeminfo.sysinfo import print_memory_usage
 
 
 class ClassIdentifier(object):
@@ -350,6 +349,7 @@ class ClassIdentifier(object):
     def run(self):
         # Implement your class identification logic here
         begin = datetime.datetime.now()
+        trans_print("打标签开始,风机号:", self.wind_turbine_number, self.df.shape)
         df = self.identifier()
         trans_print("打标签结束,", df.shape, ",耗时:", datetime.datetime.now() - begin)
         return df

+ 4 - 2
etl/wind_power/min_sec/ReadAndSaveTmp.py

@@ -1,3 +1,4 @@
+import base64
 import datetime
 import multiprocessing
 import os
@@ -52,7 +53,7 @@ class ReadAndSaveTmp(object):
         names = set(df['wind_turbine_number'].values)
         cols = list(df.columns)
         cols.sort()
-        csv_name = "-".join(cols) + ".csv"
+        csv_name = base64.b64encode('-'.join(cols).encode('utf8')).decode('utf-8') + ".csv"
         for name in names:
             exist_name = name + '-' + csv_name
             merge_path = self.pathsAndTable.get_merge_tmp_path(name)
@@ -227,7 +228,8 @@ class ReadAndSaveTmp(object):
                     trans_cols.append(v)
             trans_cols = list(set(trans_cols))
             if self.trans_param.merge_columns:
-                df = read_file_to_df(file_path, trans_cols=trans_cols, not_find_header='ignore')
+                df = read_file_to_df(file_path, trans_cols=trans_cols, not_find_header='ignore',
+                                     resolve_col_prefix=self.trans_param.resolve_col_prefix)
             else:
                 if self.trans_param.need_valid_cols:
                     df = read_file_to_df(file_path, read_cols, trans_cols=trans_cols)

+ 3 - 2
etl/wind_power/min_sec/StatisticsAndSaveFile.py

@@ -94,12 +94,13 @@ class StatisticsAndSaveFile(object):
         solve_time_begin = datetime.datetime.now()
         # df = df[(df['time_stamp'].str.find('-') > 0) & (df['time_stamp'].str.find(':') > 0)]
         # trans_print(wind_col_name, "去掉非法时间后大小:", df.shape[0])
-        df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors="coerce", format='%d-%m-%Y %H:%M:%S')
+        # df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors="coerce", format='%d-%m-%Y %H:%M:%S')
+        df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors="coerce")
         df.dropna(subset=['time_stamp'], inplace=True)
         df.sort_values(by='time_stamp', inplace=True)
         df = df[[i for i in self.trans_param.cols_tran.keys() if i in df.columns]]
 
-        df['active_power'] = df['active_power'] / 1000
+        # df['active_power'] = df['active_power'] / 1000
         ## 做数据检测前,羡强行处理有功功率
         df = df[df['active_power'] < 5000]
 

+ 1 - 0
requirements.txt

@@ -25,4 +25,5 @@ six==1.16.0
 SQLAlchemy==2.0.30
 typing_extensions==4.12.2
 tzdata==2024.1
+xlrd==2.0.1
 zipp==3.20.1

+ 16 - 36
test_run_local.py

@@ -6,8 +6,6 @@ import os
 import sys
 import traceback
 
-import pandas as pd
-
 from utils.conf.read_conf import yaml_conf, read_conf
 
 
@@ -29,13 +27,13 @@ def get_exec_data(batch_no=None, read_type=None, run_count=1):
     return data
 
 
-def run(data: dict = dict(), save_db=False):
+def run(data: dict = dict(), save_db=False, step=0, end=4):
     exec_process = None
     if data['transfer_type'] in ['second', 'minute']:
-        exec_process = MinSecTrans(data=data, save_db=save_db)
+        exec_process = MinSecTrans(data=data, save_db=save_db, step=step, end=end)
 
     if data['transfer_type'] in ['fault', 'warn']:
-        exec_process = FaultWarnTrans(data=data, save_db=save_db)
+        exec_process = FaultWarnTrans(data=data, save_db=save_db, step=step, end=end)
 
     if exec_process is None:
         raise Exception("No exec process")
@@ -43,7 +41,7 @@ def run(data: dict = dict(), save_db=False):
 
 
 if __name__ == '__main__':
-    env = 'dev'
+    env = 'prod'
     if len(sys.argv) >= 2:
         env = sys.argv[1]
 
@@ -53,41 +51,23 @@ if __name__ == '__main__':
     os.environ['env'] = env
     run_count = int(read_conf(yaml_config, "run_batch_count", 1))
 
-
     from utils.log.trans_log import trans_print
     from service.plt_service import get_batch_exec_data, get_data_by_batch_no_and_type
     from etl.wind_power.fault_warn.FaultWarnTrans import FaultWarnTrans
     from etl.wind_power.min_sec.MinSecTrans import MinSecTrans
-    from utils.file.trans_methods import read_file_to_df
 
-    df = read_file_to_df("tmp_file/rebuild_data.csv")
-    results = list()
+    begin = datetime.datetime.now()
     data = dict()
-    for batch_code, batch_name, transfer_type, transfer_addr, field_code, field_name \
-            in zip(df['batch_code'], df['batch_name'], df['transfer_type'], df['transfer_addr'], df['field_code'],
-                   df['field_name']):
-        batch_begin = datetime.datetime.now()
-        transfer_addr = transfer_addr.replace(r"/data/download/collection_data",
-                                              r"/data/download/datang_shangxian")
-        trans_print("开始执行批次:", batch_code, batch_name, transfer_type, field_code, field_name)
-        trans_print("批次路径:", transfer_addr)
-
-        data['batch_code'] = batch_code
-        data['batch_name'] = batch_name
-        data['transfer_type'] = transfer_type
-        data['transfer_addr'] = transfer_addr
-        data['field_code'] = field_code
-        data['field_name'] = field_name
-        try:
-            run(data=data, save_db=True)
-            results.append((batch_code, batch_name, transfer_type, field_code, field_name, 'success'))
-        except Exception as e:
-            results.append((batch_code, batch_name, transfer_type, field_code, field_name, 'error'))
-            trans_print(traceback.format_exc())
-        finally:
-            trans_print("执行结束,耗时:", datetime.datetime.now() - batch_begin, "总耗时:", datetime.datetime.now() - begin)
-
-    for data in results:
-        trans_print(data)
+
+    data['batch_code'] = 'WOF085500008-WOB000002'
+    data['batch_name'] = 'HY秒级数据1009'
+    data['transfer_type'] = 'second'
+    data['transfer_addr'] = r'/data/download/collection_data/1进行中/红阳风电场-贵州-大唐/收资数据/sec/7-8'
+    data['field_code'] = 'WOF085500008'
+    data['field_name'] = '红阳风电场'
+    try:
+        run(data=data, save_db=False, step=3, end=3)
+    except Exception as e:
+        trans_print(traceback.format_exc())
 
     trans_print("执行结束,总耗时:", datetime.datetime.now() - begin)

+ 93 - 0
test_run_local_piliang.py

@@ -0,0 +1,93 @@
+# -*- coding: utf-8 -*-
+# @Time    : 2024/6/11
+# @Author  : 魏志亮
+import datetime
+import os
+import sys
+import traceback
+
+import pandas as pd
+
+from utils.conf.read_conf import yaml_conf, read_conf
+
+
+def get_exec_data(batch_no=None, read_type=None, run_count=1):
+    if batch_no and read_type:
+        data = get_data_by_batch_no_and_type(batch_no, read_type)
+        if data is None:
+            raise ValueError(f"未找到批次号:{batch_no},类型:{read_type}")
+
+    else:
+        data = get_batch_exec_data(run_count)
+        if data is None:
+            trans_print("当前有任务在执行")
+            sys.exit(0)
+        elif len(data.keys()) == 0:
+            trans_print("当前无任务")
+            sys.exit(0)
+
+    return data
+
+
+def run(data: dict = dict(), save_db=False):
+    exec_process = None
+    if data['transfer_type'] in ['second', 'minute']:
+        exec_process = MinSecTrans(data=data, save_db=save_db)
+
+    if data['transfer_type'] in ['fault', 'warn']:
+        exec_process = FaultWarnTrans(data=data, save_db=save_db)
+
+    if exec_process is None:
+        raise Exception("No exec process")
+    exec_process.run()
+
+
+if __name__ == '__main__':
+    env = 'dev'
+    if len(sys.argv) >= 2:
+        env = sys.argv[1]
+
+    conf_path = os.path.abspath(f"./conf/etl_config_{env}.yaml")
+    os.environ['ETL_CONF'] = conf_path
+    yaml_config = yaml_conf(conf_path)
+    os.environ['env'] = env
+    run_count = int(read_conf(yaml_config, "run_batch_count", 1))
+
+    from utils.log.trans_log import trans_print
+    from service.plt_service import get_batch_exec_data, get_data_by_batch_no_and_type
+    from etl.wind_power.fault_warn.FaultWarnTrans import FaultWarnTrans
+    from etl.wind_power.min_sec.MinSecTrans import MinSecTrans
+    from utils.file.trans_methods import read_file_to_df
+
+    begin = datetime.datetime.now()
+    df = read_file_to_df("tmp_file/rebuild_data.csv")
+    results = list()
+    data = dict()
+    for batch_code, batch_name, transfer_type, transfer_addr, field_code, field_name \
+            in zip(df['batch_code'], df['batch_name'], df['transfer_type'], df['transfer_addr'], df['field_code'],
+                   df['field_name']):
+        batch_begin = datetime.datetime.now()
+        transfer_addr = transfer_addr.replace(r"/data/download/collection_data",
+                                              r"/data/download/datang_shangxian")
+        trans_print("开始执行批次:", batch_code, batch_name, transfer_type, field_code, field_name)
+        trans_print("批次路径:", transfer_addr)
+
+        data['batch_code'] = batch_code
+        data['batch_name'] = batch_name
+        data['transfer_type'] = transfer_type
+        data['transfer_addr'] = transfer_addr
+        data['field_code'] = field_code
+        data['field_name'] = field_name
+        try:
+            run(data=data, save_db=True)
+            results.append((batch_code, batch_name, transfer_type, field_code, field_name, 'success'))
+        except Exception as e:
+            results.append((batch_code, batch_name, transfer_type, field_code, field_name, 'error'))
+            trans_print(traceback.format_exc())
+        finally:
+            trans_print("执行结束,耗时:", datetime.datetime.now() - batch_begin, "总耗时:", datetime.datetime.now() - begin)
+
+    for data in results:
+        trans_print(data)
+
+    trans_print("执行结束,总耗时:", datetime.datetime.now() - begin)

+ 171 - 0
tmp_file/organize_xinhua_files.py

@@ -0,0 +1,171 @@
+import multiprocessing
+import datetime
+import os
+import warnings
+
+import pandas as pd
+warnings.filterwarnings("ignore")
+
+
+def __build_directory_dict(directory_dict, path, filter_types=None):
+    # 遍历目录下的所有项
+    for item in os.listdir(path):
+        item_path = os.path.join(path, item)
+        if os.path.isdir(item_path):
+            __build_directory_dict(directory_dict, item_path, filter_types=filter_types)
+        elif os.path.isfile(item_path):
+            if path not in directory_dict:
+                directory_dict[path] = []
+
+            if filter_types is None or len(filter_types) == 0:
+                directory_dict[path].append(item_path)
+            elif str(item_path).split(".")[-1] in filter_types:
+                if str(item_path).count("~$") == 0:
+                    directory_dict[path].append(item_path)
+
+
+# 读取路径下所有的excel文件
+def read_excel_files(read_path):
+    if os.path.isfile(read_path):
+        return [read_path]
+
+    directory_dict = {}
+    __build_directory_dict(directory_dict, read_path, filter_types=['xls', 'xlsx', 'csv', 'gz'])
+
+    return [path for paths in directory_dict.values() for path in paths if path]
+
+
+# 创建路径
+def create_file_path(path, is_file_path=False):
+    """
+    创建路径
+    :param path:创建文件夹的路径
+    :param is_file_path: 传入的path是否包含具体的文件名
+    """
+    if is_file_path:
+        path = os.path.dirname(path)
+
+    if not os.path.exists(path):
+        os.makedirs(path, exist_ok=True)
+
+
+def boolean_is_check_data(df_cols):
+    fault_list = ['快速停机', '故障名称', '故障代码', '故障停机']
+
+    df_cols = [str(i).split('_')[-1] for i in df_cols]
+    for fault in fault_list:
+        if fault in df_cols:
+            return True
+
+    return False
+
+
+def read_fle_to_df(file_path):
+    df = pd.read_excel(file_path)
+    wind_name = [i for i in df.columns if i.find('_') > -1][0].split('_')[0]
+    df.columns = [i.split('_')[-1] for i in df.columns]
+    df['wind_name'] = wind_name
+
+    return boolean_is_check_data(df.columns), wind_name, df
+
+
+def save_to_file(dfs, wind_name, save_path='', param='', is_check=False, all_cols=list(),
+                 result_data_list=multiprocessing.Manager().list()):
+    try:
+        if is_check:
+            df = pd.concat(dfs)
+        else:
+            df = dfs[0]
+            for index, now_df in enumerate(dfs):
+                if index > 0:
+                    df = pd.merge(df, now_df, on=['采样时间', 'wind_name'], how='outer')
+    except Exception as e:
+        print(wind_name, e)
+        raise e
+
+    df.reset_index(inplace=True)
+    df.drop_duplicates(inplace=True, subset=['采样时间', 'wind_name'])
+    if 'index' in df.columns:
+        del df['index']
+    create_file_path(save_path)
+    df.sort_values(by='采样时间', inplace=True)
+
+    loss_cols = list([i for i in df.columns if i != 'wind_name'])
+    loss_cols.sort()
+
+    loss_cols.insert(0, wind_name)
+    loss_cols.insert(0, os.path.basename(save_path) + '-' + param)
+
+    result_data_list.append(loss_cols)
+
+    # for col in set(all_cols):
+    #     if col not in df.columns:
+    #         df[col] = np.nan
+
+    # df.to_csv(os.path.join(save_path, param, wind_name + '.csv'), encoding='utf8', index=False)
+
+
+if __name__ == '__main__':
+    begin = datetime.datetime.now()
+    # dir1 = r'D:\data\新华水电\测试'
+    # save_path = r'D:\data\新华水电\整理数据'
+    result_datas = [
+        (r'/data/download/collection_data/1进行中/新华水电/风机SCADA数据/8月风机数据',
+         r'/data/download/collection_data/1进行中/新华水电/整理数据/8月'),
+        (r'/data/download/collection_data/1进行中/新华水电/风机SCADA数据/9月风机数据',
+         r'/data/download/collection_data/1进行中/新华水电/整理数据/9月')
+    ]
+
+    result_data_list = multiprocessing.Manager().list()
+
+    for dir1, save_path in result_datas:
+        files = read_excel_files(dir1)
+        with multiprocessing.Pool(30) as pool:
+            datas = pool.starmap(read_fle_to_df, [(file,) for file in files])
+        data_wind_name = dict()
+        check_wind_name = dict()
+
+        data_all_cols = list()
+        check_all_cols = list()
+        for data in datas:
+            check_data, wind_name, df = data[0], data[1], data[2]
+
+            if '工作模式' not in df.columns:
+                # df.reset_index(inplace=True)
+                # df.set_index(keys=['采样时间'], inplace=True)
+                if check_data:
+                    check_all_cols.extend(list(df.columns))
+                    if wind_name in check_wind_name.keys():
+                        check_wind_name[wind_name].append(df)
+                    else:
+                        check_wind_name[wind_name] = [df]
+                else:
+                    data_all_cols.extend(list(df.columns))
+                    if wind_name in data_wind_name.keys():
+                        data_wind_name[wind_name].append(df)
+                    else:
+                        data_wind_name[wind_name] = [df]
+
+        with multiprocessing.Pool(30) as pool:
+            pool.starmap(save_to_file,
+                         [(dfs, wind_name, save_path, "事件数据", True, check_all_cols, result_data_list) for wind_name, dfs
+                          in
+                          check_wind_name.items()])
+
+        with multiprocessing.Pool(30) as pool:
+            pool.starmap(save_to_file,
+                         [(dfs, wind_name, save_path, "数据", False, data_all_cols, result_data_list) for wind_name, dfs
+                          in
+                          data_wind_name.items()])
+
+        print(datetime.datetime.now() - begin)
+
+    normal_list = list(result_data_list)
+    normal_list.sort(key=lambda x: (x[0], int(x[1][2:])))
+
+    with open('loss_col.csv', 'w', encoding='utf8') as f:
+        for datas in normal_list:
+            f.write(",".join(datas))
+            f.write('\n')
+
+    print(datetime.datetime.now() - begin)

+ 97 - 0
tmp_file/orgranize_hongyang.py

@@ -0,0 +1,97 @@
+import copy
+import multiprocessing
+import os
+import warnings
+
+import chardet
+import pandas as pd
+
+warnings.filterwarnings("ignore")
+
+# read_path = r'/home/wzl/test_data/红阳'
+# save_dir = r'/home/wzl/test_data/整理'
+
+read_path = r'D:\data\红阳\红阳秒级分测点\红阳'
+save_dir = r'D:\data\红阳\红阳秒级分测点\整理'
+
+def __build_directory_dict(directory_dict, path, filter_types=None):
+    # 遍历目录下的所有项
+    for item in os.listdir(path):
+        item_path = os.path.join(path, item)
+        if os.path.isdir(item_path):
+            __build_directory_dict(directory_dict, item_path, filter_types=filter_types)
+        elif os.path.isfile(item_path):
+            if path not in directory_dict:
+                directory_dict[path] = []
+
+            if filter_types is None or len(filter_types) == 0:
+                directory_dict[path].append(item_path)
+            elif str(item_path).split(".")[-1] in filter_types:
+                if str(item_path).count("~$") == 0:
+                    directory_dict[path].append(item_path)
+
+
+# 读取路径下所有的excel文件
+def read_excel_files(read_path):
+    if os.path.isfile(read_path):
+        return [read_path]
+
+    directory_dict = {}
+    __build_directory_dict(directory_dict, read_path, filter_types=['xls', 'xlsx', 'csv', 'gz'])
+
+    return [path for paths in directory_dict.values() for path in paths if path]
+
+
+all_files = read_excel_files(read_path)
+
+
+# 获取文件编码
+def detect_file_encoding(filename):
+    # 读取文件的前1000个字节(足够用于大多数编码检测)
+    with open(filename, 'rb') as f:
+        rawdata = f.read(1000)
+    result = chardet.detect(rawdata)
+    encoding = result['encoding']
+
+    if encoding is None:
+        encoding = 'gb18030'
+
+    if encoding.lower() in ['utf-8', 'ascii', 'utf8']:
+        return 'utf-8'
+
+    return 'gb18030'
+
+
+def read_and_organize(file):
+    df = pd.read_csv(file, encoding=detect_file_encoding(file))
+    return file, df
+
+
+if __name__ == '__main__':
+
+    with multiprocessing.Pool(10) as pool:
+        bak_datas = pool.starmap(read_and_organize, [(i,) for i in all_files])
+
+    datas = copy.deepcopy(bak_datas)
+    wind_name_df = dict()
+    for file, df in datas:
+        all_cols = [i for i in df.columns if i.find('#') > -1]
+        col = all_cols[0]
+        cedian = str(col).split("_")[-1]
+        wind_names = set([str(i).split("#")[0].replace("红阳风电场_", "") for i in all_cols])
+
+        print(file, df.columns)
+        for wind_name in wind_names:
+            cols = [i for i in all_cols if i.find('_' + wind_name) > -1]
+            cols.insert(0, '统计时间')
+            query_df = df[cols]
+            query_df.columns = [str(i).split('_')[-1] for i in query_df.columns]
+            query_df['风机编号'] = wind_name
+            if wind_name in wind_name_df.keys():
+                now_df = wind_name_df[wind_name]
+                wind_name_df[wind_name] = pd.merge(now_df, query_df, on=['统计时间', '风机编号'], how='outer')
+            else:
+                wind_name_df[wind_name] = query_df
+
+    for wind_name, df in wind_name_df.items():
+        df.to_csv(os.path.join(save_dir, wind_name + '#.csv'), index=False, encoding='utf8')

+ 4 - 2
tmp_file/read_and_draw_png.py

@@ -10,7 +10,9 @@ def draw(file, fengchang='测试'):
     name = os.path.basename(file).split('.')[0]
     df = read_file_to_df(file)
 
-    identifier = ClassIdentifier(wind_turbine_number='test', origin_df=df, rated_power=1500, cut_out_speed=25,active_power='功率',wind_velocity='风速')
+    identifier = ClassIdentifier(wind_turbine_number='test', origin_df=df, rated_power=1500, cut_out_speed=25,
+                                 active_power='active_power', wind_velocity='wind_velocity',
+                                 pitch_angle_blade='pitch_angle_blade')
     df = identifier.run()
 
     df.loc[df['active_power'] <= 0, 'lab'] = -1
@@ -27,7 +29,7 @@ def draw(file, fengchang='测试'):
 
 
 if __name__ == '__main__':
-    read_dir = r"D:\data\清理数据\和风元宝山\test_11_test\test_A01"
+    read_dir = r"D:\data\清理数据\和风元宝山\test_11_test\test"
 
     files = [read_dir + os.sep + i for i in os.listdir(read_dir)]
 

+ 14 - 9
tmp_file/对比文件夹列名差值.py

@@ -4,16 +4,16 @@ from utils.file.trans_methods import *
 
 
 def boolean_is_check_data(df_vas):
-    fault_list = ['Checked', 'Indeterminate', 'Unchecked']
-    for fault in fault_list:
-        if fault in df_vas:
-            return True
+    # fault_list = ['Checked', 'Indeterminate', 'Unchecked']
+    # for fault in fault_list:
+    #     if fault in df_vas:
+    #         return True
 
     return False
 
 
 def compareTwoFolders(df1s, df2s):
-    for is_falut in [False, True]:
+    for is_falut in [False]:
         list1 = list()
         for df in df1s:
             tmp_list = [str(i).split('_')[-1] for i in list(df.columns) if i != 'sheet_name']
@@ -53,7 +53,12 @@ def compareTwoFolders(df1s, df2s):
         print(list3)
         print(list4)
 
-        max_count = max(len(list1), len(list2), len(list3), len(list4))
+        list5 = list(set1)
+        list5.extend(list(set2))
+        list5 = list(set(list5))
+        list5.sort()
+
+        max_count = len(list5)
         list1.extend([''] * (max_count - len(list1)))
         list2.extend([''] * (max_count - len(list2)))
         list3.extend([''] * (max_count - len(list3)))
@@ -61,10 +66,10 @@ def compareTwoFolders(df1s, df2s):
 
         file_name = 'col_compare.csv' if not is_falut else 'col_compare_falut.csv'
         with open(file_name, 'w', encoding='utf8') as f:
-            f.write(",".join(["对方提供", "自己获取", "对方提供多的字段", "自己提供多的字段"]))
+            f.write(",".join(["全部字段", "对方提供(3-25风机)", "自己获取(1-2风机)", "对方提供多的字段", "自己提供多的字段"]))
             f.write('\n')
-            for a, b, c, d in zip(list1, list2, list3, list4):
-                f.write(",".join([a, b, c, d]))
+            for e, a, b, c, d in zip(list5, list1, list2, list3, list4):
+                f.write(",".join([e, a, b, c, d]))
                 f.write('\n')
 
             f.flush()

+ 17 - 13
utils/file/trans_methods.py

@@ -46,13 +46,18 @@ def split_array(array, num):
     return [array[i:i + num] for i in range(0, len(array), num)]
 
 
-def find_read_header(file_path, trans_cols):
+def find_read_header(file_path, trans_cols, resolve_col_prefix=None):
     df = read_file_to_df(file_path, nrows=20)
     df.reset_index(inplace=True)
     count = 0
     header = None
+    df_cols = df.columns
+    if resolve_col_prefix:
+        valid_eval(resolve_col_prefix)
+        df_cols = [eval(resolve_col_prefix) for column in df.columns]
+
     for col in trans_cols:
-        if col in df.columns:
+        if col in df_cols:
             count = count + 1
             if count >= 2:
                 header = 0
@@ -60,31 +65,30 @@ def find_read_header(file_path, trans_cols):
 
     count = 0
 
-    values = list()
     for index, row in df.iterrows():
+        if resolve_col_prefix:
+            values = [eval(resolve_col_prefix) for column in row.values]
+        else:
+            values = row.values
         for col in trans_cols:
-            if col in row.values:
+            if col in values:
                 count = count + 1
                 if count > 2:
                     header = index + 1
                     break
 
-    read_cols = []
-    for col in values:
-        if col in trans_cols:
-            read_cols.append(col)
-
-    return header, read_cols
+    return header
 
 
 # 读取数据到df
-def read_file_to_df(file_path, read_cols=list(), trans_cols=None, nrows=None, not_find_header='raise'):
+def read_file_to_df(file_path, read_cols=list(), trans_cols=None, nrows=None, not_find_header='raise',
+                    resolve_col_prefix=None):
     begin = datetime.datetime.now()
     trans_print('开始读取文件', file_path)
     header = 0
     find_cols = list()
     if trans_cols:
-        header, find_cols = find_read_header(file_path, trans_cols)
+        header = find_read_header(file_path, trans_cols, resolve_col_prefix)
         trans_print(os.path.basename(file_path), "读取第", header, "行")
         if header is None:
             if not_find_header == 'raise':
@@ -94,7 +98,7 @@ def read_file_to_df(file_path, read_cols=list(), trans_cols=None, nrows=None, no
             elif not_find_header == 'ignore':
                 pass
 
-    read_cols.extend(find_cols)
+    # read_cols.extend(find_cols)
     df = pd.DataFrame()
     if header is not None:
         try: