|
@@ -6,10 +6,10 @@ from os import *
|
|
|
|
|
|
import pandas as pd
|
|
import pandas as pd
|
|
|
|
|
|
|
|
+from service.common_connect import trans
|
|
from service.trans_conf_service import create_wave_table
|
|
from service.trans_conf_service import create_wave_table
|
|
from utils.file.trans_methods import split_array
|
|
from utils.file.trans_methods import split_array
|
|
from utils.log.trans_log import trans_print
|
|
from utils.log.trans_log import trans_print
|
|
-from service.common_connect import trans
|
|
|
|
|
|
|
|
|
|
|
|
def get_min_sec_conf(field_code, trans_type) -> dict:
|
|
def get_min_sec_conf(field_code, trans_type) -> dict:
|
|
@@ -58,7 +58,7 @@ def get_wave_conf(field_code) -> dict:
|
|
return res[0]
|
|
return res[0]
|
|
|
|
|
|
|
|
|
|
-def creat_min_sec_table(table_name, trans_type):
|
|
|
|
|
|
+def creat_min_sec_table(table_name, trans_type, use_tidb=False):
|
|
exists_table_sql = f"""
|
|
exists_table_sql = f"""
|
|
select count(1) as count from information_schema.tables where table_schema = '{trans.database}' and table_name = '{table_name}'
|
|
select count(1) as count from information_schema.tables where table_schema = '{trans.database}' and table_name = '{table_name}'
|
|
"""
|
|
"""
|
|
@@ -135,7 +135,10 @@ def creat_min_sec_table(table_name, trans_type):
|
|
KEY `time_stamp` (`time_stamp`),
|
|
KEY `time_stamp` (`time_stamp`),
|
|
KEY `wind_turbine_number` (`wind_turbine_number`),
|
|
KEY `wind_turbine_number` (`wind_turbine_number`),
|
|
{add_key}
|
|
{add_key}
|
|
- )
|
|
|
|
|
|
+ )
|
|
|
|
+ """
|
|
|
|
+ # if not use_tidb:
|
|
|
|
+ create_sql = create_sql + f"""
|
|
PARTITION BY LIST COLUMNS ({key}, `wind_turbine_number`) (
|
|
PARTITION BY LIST COLUMNS ({key}, `wind_turbine_number`) (
|
|
PARTITION pDefault VALUES IN ((000000, 'wind_turbine_number'))
|
|
PARTITION pDefault VALUES IN ((000000, 'wind_turbine_number'))
|
|
)
|
|
)
|
|
@@ -177,18 +180,37 @@ def add_or_remove_partation(table_name: str, date_str: str, wind_turbine_number)
|
|
add_partation(table_name, date_str, wind_turbine_number)
|
|
add_partation(table_name, date_str, wind_turbine_number)
|
|
|
|
|
|
|
|
|
|
-def save_partation_file_to_db(table_name: str, file: str, wind_turbine_number, date_str, batch_count=100000):
|
|
|
|
|
|
+def drop_exists_data(table_name, wind_turbine_number, min_date, max_date):
|
|
|
|
+ # sql = f"# delete from {table_name} where wind_turbine_number = '{wind_turbine_number}' and time_stamp between '{min_date}' and '{max_date}'"
|
|
|
|
+
|
|
|
|
+ sql = f"""
|
|
|
|
+ BATCH ON `time_stamp`, `wind_turbine_number` LIMIT 1000
|
|
|
|
+ DELETE FROM `{table_name}`
|
|
|
|
+ WHERE `rated_at` >= "{min_date}"
|
|
|
|
+ AND `rated_at` <= "{max_date}"
|
|
|
|
+ AND `wind_turbine_number` = "{wind_turbine_number}";
|
|
|
|
+ """
|
|
|
|
+
|
|
|
|
+ count = trans.execute(sql)
|
|
|
|
+ trans_print(f"删除数据{count}条,{table_name},{wind_turbine_number},{min_date},{max_date}")
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def save_scada_file_to_db(table_name, file: str, wind_turbine_number, date_str, batch_count=100000, use_tidb=False):
|
|
base_name = path.basename(file)
|
|
base_name = path.basename(file)
|
|
- # wind_turbine_number = path.basename(file).split(".")[0]
|
|
|
|
- # date_str = path.basename(path.dirname(file))
|
|
|
|
|
|
+ df = pd.read_csv(file)
|
|
|
|
+ # if use_tidb:
|
|
|
|
+ # min_date = df['time_stamp'].min()
|
|
|
|
+ # max_date = df['time_stamp'].max()
|
|
|
|
+ # # drop_exists_data(table_name, wind_turbine_number, min_date, max_date)
|
|
|
|
+ # else:
|
|
|
|
+ # add_or_remove_partation(table_name, date_str, wind_turbine_number)
|
|
|
|
|
|
add_or_remove_partation(table_name, date_str, wind_turbine_number)
|
|
add_or_remove_partation(table_name, date_str, wind_turbine_number)
|
|
|
|
|
|
try:
|
|
try:
|
|
- for i, df in enumerate(pd.read_csv(file, chunksize=batch_count)):
|
|
|
|
- trans.execute_df_save(df, table_name)
|
|
|
|
- count = (i + 1) * batch_count
|
|
|
|
- trans_print(base_name, f"Chunk {count} written to MySQL.")
|
|
|
|
|
|
+ trans_print(f"保存{table_name},{base_name},{wind_turbine_number},数据:{df.shape[0]}")
|
|
|
|
+ trans.execute_df_save(df, table_name, batch_count)
|
|
|
|
+ trans_print(f"保存到{table_name},{base_name},{wind_turbine_number} 成功,总条数:{df.shape[0]}")
|
|
except Exception as e:
|
|
except Exception as e:
|
|
trans_print(traceback.format_exc())
|
|
trans_print(traceback.format_exc())
|
|
message = base_name + str(e)
|
|
message = base_name + str(e)
|
|
@@ -198,11 +220,10 @@ def save_partation_file_to_db(table_name: str, file: str, wind_turbine_number, d
|
|
def save_file_to_db(table_name: str, file: str, batch_count=100000):
|
|
def save_file_to_db(table_name: str, file: str, batch_count=100000):
|
|
base_name = path.basename(file)
|
|
base_name = path.basename(file)
|
|
try:
|
|
try:
|
|
- for i, df in enumerate(pd.read_csv(file, chunksize=batch_count)):
|
|
|
|
- # df.to_sql(table_name, engine, if_exists='append', index=False)
|
|
|
|
- trans.execute_df_save(df, table_name)
|
|
|
|
- count = (i + 1) * batch_count
|
|
|
|
- trans_print(base_name, f"Chunk {count} written to MySQL.")
|
|
|
|
|
|
+ df = pd.read_csv(file)
|
|
|
|
+ trans_print(f"保存{table_name},总条数:{df.shape[0]}")
|
|
|
|
+ trans.execute_df_save(df, table_name, batch_count)
|
|
|
|
+ trans_print(f"保存到{table_name}成功,总条数:{df.shape[0]}")
|
|
except Exception as e:
|
|
except Exception as e:
|
|
trans_print(traceback.format_exc())
|
|
trans_print(traceback.format_exc())
|
|
message = base_name + str(e)
|
|
message = base_name + str(e)
|
|
@@ -210,12 +231,10 @@ def save_file_to_db(table_name: str, file: str, batch_count=100000):
|
|
|
|
|
|
|
|
|
|
def save_df_to_db(table_name: str, df: pd.DataFrame(), batch_count=100000):
|
|
def save_df_to_db(table_name: str, df: pd.DataFrame(), batch_count=100000):
|
|
- split_dfs = [df.iloc[i:i + batch_count] for i in range(0, len(df), batch_count)]
|
|
|
|
try:
|
|
try:
|
|
- for i, split_df in enumerate(split_dfs):
|
|
|
|
- trans.execute_df_save(split_df, table_name)
|
|
|
|
- count = (i + 1) * batch_count
|
|
|
|
- trans_print(f"Chunk {count} written to MySQL.")
|
|
|
|
|
|
+ trans_print(f"保存{table_name},总条数:{df.shape[0]}")
|
|
|
|
+ trans.execute_df_save(df, table_name, batch_count)
|
|
|
|
+ trans_print(f"保存到{table_name}成功,总条数:{df.shape[0]}")
|
|
except Exception as e:
|
|
except Exception as e:
|
|
trans_print(traceback.format_exc())
|
|
trans_print(traceback.format_exc())
|
|
raise Exception(str(e))
|
|
raise Exception(str(e))
|