1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556 |
- from service.import_data_service import get_mapping_field
- from trans.common.PathParam import PathParam
- class ExecParam(object):
- def __init__(self, data, tmp_root, read_dir):
- self.process_executor_id = data['id']
- self.process_executor = data['name']
- self.read_path = data['readPath']
- # 0:竖向 1:横向
- self.join_type = data['joinType']
- self.path_param = PathParam(tmp_root, f'{self.process_executor_id}_{self.process_executor}', read_dir)
- self.mapping_list = list()
- results = get_mapping_field(self.process_executor_id)
- if results:
- for result in results:
- self.mapping_list.append(MappingFiled(result))
- else:
- raise Exception(f"根据 映射执行器ID:{self.process_executor_id} 没有找到映射字段")
- self.index_cols = list()
- self.split_cols = list()
- self.has_header = True
- self.mapping_cols = dict()
- self.use_cols = list()
- self.read_cols(self.mapping_list)
- def read_cols(self, mapping_list):
- for mapping_filed in mapping_list:
- data_name = mapping_filed.data_name
- if str(data_name).startswith('无列名'):
- self.has_header = False
- name = int(str(data_name).replace('无列名', ''))
- self.use_cols.append(name)
- self.mapping_cols[name] = mapping_filed.standardized_name
- else:
- data_name_arrays = [str(i).replace('缺失列名', 'Unnamed: ') for i in str(data_name).split(',') if i]
- self.use_cols.extend(data_name_arrays)
- for data_name_str in data_name_arrays:
- self.mapping_cols[data_name_str] = mapping_filed.standardized_name
- if mapping_filed.is_index:
- self.index_cols.append(mapping_filed.standardized_name)
- if mapping_filed.is_cut_col:
- self.split_cols.append(mapping_filed.standardized_name)
- class MappingFiled(object):
- def __init__(self, data):
- self.standardized_name = data['standardized_name']
- self.data_name = data['data_name']
- self.is_cut_col = data['is_cut_col']
- self.is_index = data['is_index']
|