Преглед на файлове

优化mysql分区方式,默认引擎

wzl преди 9 месеца
родител
ревизия
79780314ba
променени са 6 файла, в които са добавени 102 реда и са изтрити 27 реда
  1. 1 1
      app_run.py
  2. 2 2
      etl/base/PathsAndTable.py
  3. 8 3
      etl/step/SaveToDb.py
  4. 1 1
      service/plt_service.py
  5. 86 16
      service/trans_service.py
  6. 4 4
      test_app_run.py

+ 1 - 1
app_run.py

@@ -109,7 +109,7 @@ def __exec_trans(step, end, batch_no, batch_name, transfer_type, transfer_file_a
             update_trans_status_error(batch_no, transfer_type, message, save_db)
         finally:
             set_trance_id("")
-            # trans_subject.pathsAndTable.delete_tmp_files()
+            trans_subject.pathsAndTable.delete_tmp_files()
 
 
 if __name__ == '__main__':

+ 2 - 2
etl/base/PathsAndTable.py

@@ -76,8 +76,8 @@ class PathsAndTable(object):
             drop_table(table_name, self.save_db)
             trans_print("删除表结束")
 
-    def create_batch_db(self, count=1):
+    def create_batch_db(self, wind_names=list()):
         if self.save_db:
             trans_print("开始创建表")
-            creat_table_and_add_partition(self.get_table_name(), count, self.read_type)
+            creat_table_and_add_partition(self.get_table_name(), wind_names, self.read_type)
             trans_print("建表结束")

+ 8 - 3
etl/step/SaveToDb.py

@@ -1,10 +1,11 @@
 import datetime
 import multiprocessing
+import os
 import traceback
 
 from etl.base.PathsAndTable import PathsAndTable
 from service.plt_service import update_trans_transfer_progress
-from service.trans_service import creat_table_and_add_partition, save_file_to_db
+from service.trans_service import save_file_to_db
 from utils.file.trans_methods import read_excel_files, split_array
 from utils.log.trans_log import trans_print
 from utils.systeminfo.sysinfo import get_available_cpu_count_with_percent
@@ -21,8 +22,12 @@ class SaveToDb(object):
         self.pathsAndTable.delete_batch_db()
         trans_print("开始保存到数据库文件")
         all_saved_files = read_excel_files(self.pathsAndTable.get_save_path())
-        creat_table_and_add_partition(self.pathsAndTable.get_table_name(), len(all_saved_files),
-                                      self.pathsAndTable.read_type)
+        wind_names = [str(os.path.basename(i)).replace(".csv", "") for i in all_saved_files]
+
+        # creat_table_and_add_partition(self.pathsAndTable.get_table_name(), wind_names,
+        #                               self.pathsAndTable.read_type)
+
+        self.pathsAndTable.create_batch_db(wind_names)
 
         split_count = get_available_cpu_count_with_percent(percent=1 / 2)
         split_count = split_count if split_count <= len(all_saved_files) else len(all_saved_files)

+ 1 - 1
service/plt_service.py

@@ -26,7 +26,7 @@ def update_timeout_trans_data():
 def update_trans_status_running(batch_no, trans_type, schedule_exec=True):
     if schedule_exec:
         exec_sql = """
-        update data_transfer set trans_sys_status = 0 ,transfer_start_time = now()
+        update data_transfer set transfer_state = 0,trans_sys_status = 0 ,transfer_start_time = now()
         where batch_code = %s  and transfer_type = %s
         """
         plt.execute(exec_sql, (batch_no, trans_type))

+ 86 - 16
service/trans_service.py

@@ -28,7 +28,7 @@ def save_to_trans_conf(data_dict=dict()):
 zhishu_list = [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97]
 
 
-def creat_table_and_add_partition(table_name, count, read_type):
+def creat_table_and_add_partition(table_name, win_names, read_type):
     create_sql = f"""
     CREATE TABLE
     IF NOT EXISTS `{table_name}` (
@@ -88,17 +88,17 @@ def creat_table_and_add_partition(table_name, count, read_type):
         `param10` VARCHAR (20) DEFAULT NULL COMMENT '预留10',
          KEY `time_stamp` (`time_stamp`),
          KEY `wind_turbine_number` (`wind_turbine_number`)
-    ) ENGINE = INNODB DEFAULT CHARSET = utf8mb4
+    ) ENGINE = myisam DEFAULT CHARSET = utf8mb4
     """
 
-    if read_type == 'second' and count > 1:
+    if read_type == 'second' and len(win_names) > 1:
 
-        for zhishu in zhishu_list:
-            if zhishu >= count:
-                count = zhishu
-                break
+        create_sql = create_sql + f" PARTITION BY LIST COLUMNS(`wind_turbine_number`) ("
+        partition_strs = list()
+        for wind_name in win_names:
+            partition_strs.append(f" PARTITION p{wind_name} VALUES IN('{wind_name}')")
 
-        create_sql = create_sql + f" PARTITION BY KEY (`wind_turbine_number`) PARTITIONS {count}"
+        create_sql = create_sql + ",".join(partition_strs) + ")"
 
     trans.execute(create_sql)
 
@@ -121,7 +121,7 @@ def drop_table(table_name, save_db=True):
             trans_print(traceback.format_exc())
 
 
-def save_file_to_db(table_name: str, file: str, batch_count=50000):
+def save_file_to_db(table_name: str, file: str, batch_count=100000):
     base_name = os.path.basename(file)
     try:
         for i, df in enumerate(pd.read_csv(file, chunksize=batch_count)):
@@ -151,10 +151,80 @@ if __name__ == '__main__':
     # for path in files:
     #     save_file_to_db("WOF063100040-WOB00013_second", path_prix + os.sep + path, batch_count=100000)
 
-    count = 13
-    if count > 1:
-        for i in zhishu_list:
-            if i >= count:
-                count = i
-                break
-    print(count)
+    table_name = "test"
+    read_type = "second"
+    wind_names = ['WOG00030', 'WOG00034']
+
+    create_sql = f"""
+    CREATE TABLE
+    IF NOT EXISTS `{table_name}` (
+        `wind_turbine_number` VARCHAR (20) DEFAULT NULL COMMENT '风机编号',
+        `wind_turbine_name` VARCHAR(20) DEFAULT NULL COMMENT '风机原始名称',
+        `time_stamp` datetime NOT NULL COMMENT '时间戳',
+        `active_power` DOUBLE DEFAULT NULL COMMENT '有功功率',
+        `rotor_speed` DOUBLE DEFAULT NULL COMMENT '风轮转速',
+        `generator_speed` DOUBLE DEFAULT NULL COMMENT '发电机转速',
+        `wind_velocity` DOUBLE DEFAULT NULL COMMENT '风速',
+        `pitch_angle_blade_1` DOUBLE DEFAULT NULL COMMENT '桨距角1',
+        `pitch_angle_blade_2` DOUBLE DEFAULT NULL COMMENT '桨距角2',
+        `pitch_angle_blade_3` DOUBLE DEFAULT NULL COMMENT '桨距角3',
+        `cabin_position` DOUBLE DEFAULT NULL COMMENT '机舱位置',
+        `true_wind_direction` DOUBLE DEFAULT NULL COMMENT '绝对风向',
+        `yaw_error1` DOUBLE DEFAULT NULL COMMENT '对风角度',
+        `set_value_of_active_power` DOUBLE DEFAULT NULL COMMENT '有功功率设定值',
+        `gearbox_oil_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱油温',
+        `generatordrive_end_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '发电机驱动端轴承温度',
+        `generatornon_drive_end_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '发电机非驱动端轴承温度',
+        `cabin_temperature` DOUBLE DEFAULT NULL COMMENT '机舱内温度',
+        `twisted_cable_angle` DOUBLE DEFAULT NULL COMMENT '扭缆角度',
+        `front_back_vibration_of_the_cabin` DOUBLE DEFAULT NULL COMMENT '机舱前后振动',
+        `side_to_side_vibration_of_the_cabin` DOUBLE DEFAULT NULL COMMENT '机舱左右振动',
+        `actual_torque` DOUBLE DEFAULT NULL COMMENT '实际力矩',
+        `given_torque` DOUBLE DEFAULT NULL COMMENT '给定力矩',
+        `clockwise_yaw_count` DOUBLE DEFAULT NULL COMMENT '顺时针偏航次数',
+        `counterclockwise_yaw_count` DOUBLE DEFAULT NULL COMMENT '逆时针偏航次数',
+        `unusable` DOUBLE DEFAULT NULL COMMENT '不可利用',
+        `power_curve_available` DOUBLE DEFAULT NULL COMMENT '功率曲线可用',
+        `required_gearbox_speed` DOUBLE DEFAULT NULL COMMENT '齿轮箱转速',
+        `inverter_speed_master_control` DOUBLE DEFAULT NULL COMMENT '变频器转速(主控)',
+        `outside_cabin_temperature` DOUBLE DEFAULT NULL COMMENT '环境温度',
+        `main_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '主轴承轴承温度',
+        `gearbox_high_speed_shaft_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱高速轴轴承温度',
+        `gearboxmedium_speed_shaftbearing_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱中速轴轴承温度',
+        `gearbox_low_speed_shaft_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱低速轴轴承温度',
+        `generator_winding1_temperature` DOUBLE DEFAULT NULL COMMENT '发电机绕组1温度',
+        `generator_winding2_temperature` DOUBLE DEFAULT NULL COMMENT '发电机绕组2温度',
+        `generator_winding3_temperature` DOUBLE DEFAULT NULL COMMENT '发电机绕组3温度',
+        `wind_turbine_status` DOUBLE DEFAULT NULL COMMENT '风机状态1',
+        `wind_turbine_status2` DOUBLE DEFAULT NULL COMMENT '风机状态2',
+        `turbulence_intensity` DOUBLE DEFAULT NULL COMMENT '湍流强度',
+        `lab` int DEFAULT NULL COMMENT '-1:停机 0:好点  1:欠发功率点;2:超发功率点;3:额定风速以上的超发功率点 4: 限电',
+        `year` INT (4) DEFAULT NULL COMMENT '年',
+        `month` INT (2) DEFAULT NULL COMMENT '月',
+        `day` INT (2) DEFAULT NULL COMMENT '日',
+        `param1` DOUBLE DEFAULT NULL COMMENT '预留1',
+        `param2` DOUBLE DEFAULT NULL COMMENT '预留2',
+        `param3` DOUBLE DEFAULT NULL COMMENT '预留3',
+        `param4` DOUBLE DEFAULT NULL COMMENT '预留4',
+        `param5` DOUBLE DEFAULT NULL COMMENT '预留5',
+        `param6` VARCHAR (20) DEFAULT NULL COMMENT '预留6',
+        `param7` VARCHAR (20) DEFAULT NULL COMMENT '预留7',
+        `param8` VARCHAR (20) DEFAULT NULL COMMENT '预留8',
+        `param9` VARCHAR (20) DEFAULT NULL COMMENT '预留9',
+        `param10` VARCHAR (20) DEFAULT NULL COMMENT '预留10',
+         KEY `time_stamp` (`time_stamp`),
+         KEY `wind_turbine_number` (`wind_turbine_number`)
+    ) ENGINE = myisam DEFAULT CHARSET = utf8mb4
+    """
+
+    if read_type == 'second' and len(wind_names) > 1:
+
+        create_sql = create_sql + f" PARTITION BY LIST COLUMNS(`wind_turbine_number`) ("
+        partition_strs = list()
+        for wind_name in wind_names:
+            partition_strs.append(f" PARTITION p{wind_name} VALUES IN('{wind_name}')")
+
+        create_sql = create_sql + ",".join(partition_strs) + ")"
+
+
+    print(create_sql)

+ 4 - 4
test_app_run.py

@@ -118,7 +118,7 @@ if __name__ == '__main__':
     if len(sys.argv) >= 2:
         env = sys.argv[1]
     else:
-        env = 'prod'
+        env = 'dev'
     print(sys.argv)
     if env is None:
         raise Exception("请配置运行环境")
@@ -142,9 +142,9 @@ if __name__ == '__main__':
     #           transfer_file_addr=r'D:\trans_data\密马风电场\收资数据\minute', field_name='密马风电场',
     #           field_code="WOF035200003", save_db=False)
 
-    run_local(0, 3, batch_no='test_11', batch_name='test', transfer_type='minute',
-              transfer_file_addr=r'D:\trans_data\和风元宝山\收资数据\min', field_name='和风元宝山',
-              field_code="WOF039800012", save_db=False)
+    run_local(4, 4, batch_no='WOF053600062-WOB000010', batch_name='ZYFDC000013', transfer_type='second',
+              transfer_file_addr=r'/data/download/collection_data/2完成/招远风电场-山东-大唐/收资数据/招远秒级数据', field_name='招远风电场',
+              field_code="WOF053600062", save_db=True)
 
     # run_local(0, 3, batch_no='WOF043600007-WOB000001', batch_name='XALFDC0814', transfer_type='second',
     #           transfer_file_addr=r'D:\trans_data\新艾里风电场\收资数据\1号风机', field_name='新艾里风电场',